• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moeyensj / difi / 18207934935

02 Oct 2025 11:06PM UTC coverage: 85.832% (-10.5%) from 96.321%
18207934935

Pull #54

github

web-flow
Merge 450ada65e into 1252b17f0
Pull Request #54: Quivr-ize difi

730 of 874 new or added lines in 15 files covered. (83.52%)

10 existing lines in 1 file now uncovered.

1042 of 1214 relevant lines covered (85.83%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/difi/tests/create_test_data.py
NEW
1
if __name__ == "__main__":
×
2

NEW
3
    import argparse
×
NEW
4
    import uuid
×
NEW
5
    from pathlib import Path
×
6

NEW
7
    import numpy as np
×
NEW
8
    import pyarrow as pa
×
NEW
9
    import pyarrow.compute as pc
×
NEW
10
    from adam_assist import ASSISTPropagator
×
NEW
11
    from adam_core.observers import Observers
×
NEW
12
    from adam_core.observers.utils import calculate_observing_night
×
NEW
13
    from adam_core.orbits import Orbits
×
NEW
14
    from adam_core.time import Timestamp
×
NEW
15
    from adam_core.utils.helpers import make_real_orbits
×
16

NEW
17
    from difi.difi import LinkageMembers
×
NEW
18
    from difi.observations import Observations
×
19

NEW
20
    def create_observations(
×
21
        orbits: Orbits,
22
        start_night: int = 61000,
23
        nights: int = 10,
24
        obs_times_in_night: np.ndarray = np.arange(0, 90, 30),
25
        observatory_codes: list[str] = ["X05", "W84"],
26
        seed: int | None = None,
27
    ) -> Observers:
28
        """
29
        Create observers for a given start night, number of nights, and observatory codes.
30
        The observatory code for a given night is randomly chosen from the list of observatory codes.
31

32
        Parameters
33
        ----------
34
        start_night : int
35
            The start night.
36
        obs_times_in_night : np.ndarray
37
            The times in the night.
38
        nights : int
39
            The number of nights.
40
        observatory_codes : list[str]
41
            The list of observatory codes.
42
        seed : int | None, optional
43
            The seed for the random number generator.
44

45
        Returns
46
        -------
47
        Observers
48
            The observers.
49
        """
NEW
50
        num_obs = len(obs_times_in_night) * nights
×
NEW
51
        observation_times = np.empty(num_obs, dtype=float)
×
NEW
52
        observatory_codes_observations = np.empty(num_obs, dtype=object)
×
NEW
53
        exposure_ids = np.empty(num_obs, dtype=object)
×
54

NEW
55
        rng = np.random.default_rng(seed=seed)
×
NEW
56
        for i in range(nights):
×
NEW
57
            observatory_code = rng.choice(observatory_codes)
×
NEW
58
            observation_times[i * len(obs_times_in_night) : (i + 1) * len(obs_times_in_night)] = (
×
59
                start_night + obs_times_in_night / (24 * 60) + i
60
            )
NEW
61
            observatory_codes_observations[
×
62
                i * len(obs_times_in_night) : (i + 1) * len(obs_times_in_night)
63
            ] = observatory_code
NEW
64
            exposure_ids[i * len(obs_times_in_night) : (i + 1) * len(obs_times_in_night)] = [
×
65
                f"exp_{i}_{j:02d}" for j in range(len(obs_times_in_night))
66
            ]
67

NEW
68
        times = Timestamp.from_mjd(observation_times, scale="utc")
×
NEW
69
        observers = Observers.from_codes(pa.array(observatory_codes_observations), times)
×
70

NEW
71
        propagator = ASSISTPropagator()
×
NEW
72
        ephemeris = propagator.generate_ephemeris(orbits, observers, max_processes=1)
×
NEW
73
        ephemeris = ephemeris.sort_by(["orbit_id", "coordinates.time.days", "coordinates.time.nanos"])
×
NEW
74
        exposure_ids = np.tile(exposure_ids, len(orbits))
×
75

NEW
76
        observations = Observations.from_kwargs(
×
77
            id=[uuid.uuid4().hex for _ in range(len(ephemeris))],
78
            time=ephemeris.coordinates.time,
79
            ra=ephemeris.coordinates.lon,
80
            dec=ephemeris.coordinates.lat,
81
            observatory_code=ephemeris.coordinates.origin.code,
82
            object_id=ephemeris.orbit_id,
83
            night=calculate_observing_night(ephemeris.coordinates.origin.code, ephemeris.coordinates.time),
84
        )
NEW
85
        observations = observations.sort_by(["time.days", "time.nanos"])
×
NEW
86
        return observations
×
87

NEW
88
    def create_linkages(
×
89
        observations: Observations,
90
        num_mixed: int = 5,
91
        partial_contamination_percents: list[float] | None = None,
92
        seed: int | None = None,
93
    ) -> LinkageMembers:
94
        """
95
        Create linkages from observations.
96

97
        For each unique object_id in the observations, this function creates:
98
        - one pure linkage that includes all observations for the object
99
        - several pure-incomplete linkages that include subsets of observations for the object
100
        - one partial (contaminated) linkage with a specific contamination percentage
101

102
        It also creates a configurable number of mixed linkages that contain
103
        observations from multiple different objects.
104

105
        Parameters
106
        ----------
107
        observations : Observations
108
            Table of observations to build linkages from.
109
        num_mixed : int
110
            Number of mixed linkages to create. Each mixed linkage will contain a
111
            random number of observations between 6 and 10 (inclusive) drawn from
112
            at least 3 distinct objects.
113
        partial_contamination_percents : list[float] | None
114
            If provided, must have length equal to number of unique objects. Values are percentages [0-100]
115
            specifying contamination for each object's partial linkage. If None, values will be spaced
116
            evenly between 5 and 50% across objects.
117
        seed : int | None
118
            Seed for reproducible random selection.
119

120
        Returns
121
        -------
122
        LinkageMembers
123
            Table of linkage members with columns linkage_id and obs_id.
124
        """
NEW
125
        rng = np.random.default_rng(seed=seed)
×
126

NEW
127
        linkage_ids: list[str] = []
×
NEW
128
        obs_ids: list[str] = []
×
129

NEW
130
        unique_object_ids = observations.object_id.unique().to_pylist()
×
131

132
        # Create pure, pure-incomplete, and partial (contaminated) linkages per object
NEW
133
        if partial_contamination_percents is not None:
×
NEW
134
            if len(partial_contamination_percents) != len(unique_object_ids):
×
NEW
135
                raise ValueError("partial_contamination_percents length must match number of unique objects")
×
NEW
136
            contamination_percents = partial_contamination_percents
×
137
        else:
138
            # Evenly space contamination percentages from 5% to 50% (inclusive) across objects
NEW
139
            if len(unique_object_ids) == 1:
×
NEW
140
                contamination_percents = [25.0]
×
141
            else:
NEW
142
                contamination_percents = np.linspace(5.0, 50.0, num=len(unique_object_ids)).tolist()
×
143

NEW
144
        for object_id in unique_object_ids:
×
NEW
145
            object_obs = observations.apply_mask(pc.equal(observations.object_id, object_id))
×
NEW
146
            object_obs_ids = object_obs.id.to_pylist()
×
147

148
            # Pure linkage: all observations for this object
NEW
149
            pure_linkage_id = f"linkage_pure_{object_id}"
×
NEW
150
            linkage_ids.extend([pure_linkage_id] * len(object_obs_ids))
×
NEW
151
            obs_ids.extend(object_obs_ids)
×
152

153
            # Pure-incomplete linkages: strict subsets (no completeness)
NEW
154
            if len(object_obs_ids) > 1:
×
NEW
155
                max_subset = max(1, len(object_obs_ids) - 1)
×
NEW
156
                desired_k = int(rng.integers(low=6, high=11))  # 6..10 inclusive
×
NEW
157
                k = max(1, min(desired_k, max_subset))
×
NEW
158
                subset_obs_ids = rng.choice(object_obs_ids, size=k, replace=False).tolist()
×
NEW
159
                pure_incomplete_linkage_id = f"linkage_pure_incomplete_{object_id}"
×
NEW
160
                linkage_ids.extend([pure_incomplete_linkage_id] * len(subset_obs_ids))
×
NEW
161
                obs_ids.extend(subset_obs_ids)
×
162

163
            # Partial (contaminated) linkage: choose a target contamination percent for this object
164
            # Determine this object's contamination target
NEW
165
            obj_index = unique_object_ids.index(object_id)
×
NEW
166
            contamination_percent = contamination_percents[obj_index]
×
167
            # Choose total linkage size reasonably large but <= total obs
NEW
168
            total_obs = len(object_obs_ids)
×
NEW
169
            if total_obs <= 2:
×
NEW
170
                total_linkage_size = total_obs
×
171
            else:
172
                # Prefer around 12 or fewer, but at least 3 and at most total_obs
NEW
173
                total_linkage_size = int(min(max(3, 12), total_obs))
×
174
            # Compute contaminated count and correct count
NEW
175
            contaminated_count = int(round((contamination_percent / 100.0) * total_linkage_size))
×
NEW
176
            contaminated_count = max(1, min(contaminated_count, total_linkage_size - 1))
×
NEW
177
            correct_count = total_linkage_size - contaminated_count
×
178

179
            # Sample correct obs from this object
NEW
180
            partial_correct = rng.choice(object_obs_ids, size=correct_count, replace=False).tolist()
×
181

182
            # Sample contaminant obs from other objects
NEW
183
            other_obs_ids = observations.apply_mask(
×
184
                pc.not_equal(observations.object_id, object_id)
185
            ).id.to_pylist()
NEW
186
            if len(other_obs_ids) > 0:
×
NEW
187
                partial_contaminants = rng.choice(
×
188
                    other_obs_ids, size=contaminated_count, replace=False
189
                ).tolist()
190
            else:
NEW
191
                partial_contaminants = []
×
192
                # If there are no other objects, fall back to pure-incomplete
193
                # by moving all contaminated slots into correct slots
NEW
194
                partial_correct = rng.choice(object_obs_ids, size=total_linkage_size, replace=False).tolist()
×
195

NEW
196
            partial_obs = partial_correct + partial_contaminants
×
NEW
197
            partial_linkage_id = f"linkage_partial_{object_id}"
×
NEW
198
            linkage_ids.extend([partial_linkage_id] * len(partial_obs))
×
NEW
199
            obs_ids.extend(partial_obs)
×
200

201
        # Create mixed linkages: draw a random size [6..10] with >=3 distinct objects
NEW
202
        num_objects = len(unique_object_ids)
×
NEW
203
        if num_objects > 0 and num_mixed > 0:
×
NEW
204
            for i in range(num_mixed):
×
NEW
205
                desired_k = int(rng.integers(low=6, high=11))  # 6..10 inclusive
×
NEW
206
                num_distinct = min(max(3, 3), min(desired_k, num_objects))
×
NEW
207
                chosen_objects = rng.choice(unique_object_ids, size=num_distinct, replace=False).tolist()
×
208

209
                # Distribute counts as evenly as possible to ensure no single object dominates (>50%)
NEW
210
                base = desired_k // num_distinct
×
NEW
211
                remainder = desired_k % num_distinct
×
NEW
212
                counts = np.full(num_distinct, base, dtype=int)
×
NEW
213
                if remainder > 0:
×
NEW
214
                    counts[:remainder] += 1
×
NEW
215
                rng.shuffle(counts)
×
216

NEW
217
                mixed_obs: list[str] = []
×
NEW
218
                for oid, cnt in zip(chosen_objects, counts.tolist()):
×
NEW
219
                    obj_obs_ids = observations.apply_mask(
×
220
                        pc.equal(observations.object_id, oid)
221
                    ).id.to_pylist()
222
                    # Sample without replacement per object
NEW
223
                    sampled = rng.choice(obj_obs_ids, size=min(cnt, len(obj_obs_ids)), replace=False).tolist()
×
NEW
224
                    mixed_obs.extend(sampled)
×
225

NEW
226
                mixed_linkage_id = f"linkage_mixed_{i:05d}"
×
NEW
227
                linkage_ids.extend([mixed_linkage_id] * len(mixed_obs))
×
NEW
228
                obs_ids.extend(mixed_obs)
×
229

NEW
230
        return LinkageMembers.from_kwargs(
×
231
            linkage_id=linkage_ids,
232
            obs_id=obs_ids,
233
        )
234

NEW
235
    parser = argparse.ArgumentParser(description="Generate test observations and linkages parquet files.")
×
NEW
236
    parser.add_argument("--seed", type=int, default=42, help="Random seed for reproducibility")
×
NEW
237
    args = parser.parse_args()
×
238

NEW
239
    testdata_dir = Path(__file__).parent / "testdata"
×
NEW
240
    testdata_dir.mkdir(parents=True, exist_ok=True)
×
241

242
    # Generate data
NEW
243
    orbits = make_real_orbits(5)
×
NEW
244
    observations = create_observations(orbits, seed=args.seed)
×
NEW
245
    linkage_members = create_linkages(observations, seed=args.seed)
×
246

247
    # Save observations
NEW
248
    observations_file = testdata_dir / "observations.parquet"
×
NEW
249
    observations.to_parquet(observations_file)
×
250

251
    # Save linkage members directly
NEW
252
    linkage_members_file = testdata_dir / "linkage_members.parquet"
×
NEW
253
    linkage_members.to_parquet(linkage_members_file)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc