• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moeyensj / thor / 11268133582

10 Oct 2024 05:40AM UTC coverage: 73.78% (-0.2%) from 73.99%
11268133582

Pull #166

github

web-flow
Merge 50fcd5fc6 into 3172ac203
Pull Request #166: Swap to pdm

211 of 286 new or added lines in 38 files covered. (73.78%)

2766 of 3749 relevant lines covered (73.78%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.74
/src/thor/tests/memory/test_memory.py
1
"""
2
A series of unit tests to run with memray for memory profiling.
3

4
# These test should be run with the following command:
5
pytest -v --memray --native --memray-bin-path=.cache/bench/memory . -m memory
6

7
# You can then generate flamegraphs and other reports with something like this:
8
memray flamegraph .cache/bench/memory/path_to_output.bin
9

10
# Open in a browser
11
open .cache/bench/memory/path_to_output.html
12

13
# You can also view a graph of the _system_ memory usage
14
# for the duration of the test
15
open .cache.bench/memory/[session_name]/[test_name].png
16

17
"""
18

19
import os
1✔
20
import subprocess
1✔
21
import threading
1✔
22
import time
1✔
23
from pathlib import Path
1✔
24

25
import matplotlib.pyplot as plt
1✔
26
import psutil
1✔
27
import pytest
1✔
28

29
TEST_ORBIT_ID = "896831"
1✔
30
FIXTURES_DIR = Path(__file__).parent / "fixtures"
1✔
31
CONFIG_PROCESSES = [1, 4]
1✔
32

33

34
def get_git_branch_or_revision():
1✔
35
    """
36
    Get the current Git branch name or revision hash
37
    """
38
    try:
×
NEW
39
        branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip().decode()
×
40
        if branch != "HEAD":
×
41
            return branch
×
42
        else:
43
            # If HEAD is detached, get the revision hash
NEW
44
            return subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode()
×
45
    except subprocess.CalledProcessError:
×
46
        return "unknown"
×
47

48

49
@pytest.fixture(scope="session")
1✔
50
def git_branch_or_revision():
1✔
51
    return get_git_branch_or_revision()
×
52

53

54
@pytest.fixture
1✔
55
def memory_snapshot(request, git_branch_or_revision):
1✔
56
    root_path = request.config.getoption("memory_graph_paths", ".cache/bench/memory")
×
57
    timestamps = []
×
58
    stop_event = threading.Event()  # Event to signal the thread to stop
×
59
    mem_usage = []
×
60

61
    def snapshot():
×
62
        start_time = time.time()  # Record the start time
×
63
        while not stop_event.is_set():
×
64
            mem_info = psutil.virtual_memory()
×
65
            mem_usage.append(mem_info.used / (1024**2))  # Convert to MB
×
66
            timestamps.append(time.time() - start_time)  # Timestamp relative to start
×
67
            time.sleep(0.1)
×
68

69
    snapshot_thread = threading.Thread(target=snapshot)
×
70
    snapshot_thread.start()
×
71
    yield
×
72
    stop_event.set()  # Signal the thread to stop
×
73
    snapshot_thread.join()  # Wait for the thread to finish
×
74

75
    # Use a style for nicer plots
76
    plt.style.use("seaborn-v0_8-notebook")
×
77

78
    plt.figure(figsize=(12, 6))
×
NEW
79
    plt.plot(timestamps, mem_usage, color="magenta", linestyle="-", linewidth=2)  # Removed marker
×
80
    plt.title(
×
81
        f"Memory Usage [{git_branch_or_revision},{TEST_ORBIT_ID} {request.node.name}",
82
        fontsize=12,
83
    )
84
    plt.xlabel("Time (seconds)", fontsize=12)
×
85
    plt.ylabel("Memory Usage (MB)", fontsize=12)
×
86
    plt.grid(True, which="both", linestyle="--", linewidth=0.5)
×
87
    plt.xlim(left=0)
×
88
    max_mem_usage = max(mem_usage)
×
89
    min_mem_usage = min(mem_usage)
×
NEW
90
    plt.ylim(bottom=min_mem_usage * 0.8, top=max_mem_usage + (max_mem_usage * 0.1))  # Add 10% breathing room
×
91

92
    # Save the plot to a folder based on Git branch/revision in a file based on the test name
93
    os.makedirs(f"{root_path}/{git_branch_or_revision}", exist_ok=True)
×
NEW
94
    plt.savefig(f"{root_path}/{git_branch_or_revision}/{TEST_ORBIT_ID}-{request.node.name}.png")
×
95

96

97
@pytest.fixture
1✔
98
def memory_input_observations():
1✔
99
    from thor.observations import InputObservations
×
100

101
    return InputObservations.from_parquet(FIXTURES_DIR / "input_observations.parquet")
×
102

103

104
# We are going to test all the major stages used in link_test_orbit
105
@pytest.fixture
1✔
106
def memory_observations():
1✔
107
    from thor.observations import Observations
×
108

109
    return Observations.from_feather(FIXTURES_DIR / "inputs/observations.feather")
×
110

111

112
@pytest.fixture
1✔
113
def memory_test_orbit():
1✔
114
    from thor.orbit import TestOrbits
×
115

116
    return TestOrbits.from_parquet(
×
117
        FIXTURES_DIR / "inputs/test_orbits.parquet",
118
        filters=[("orbit_id", "=", TEST_ORBIT_ID)],
119
    )
120

121

122
@pytest.fixture
1✔
123
def memory_config(request):
1✔
124
    from thor.config import Config
×
125

126
    max_processes = getattr(request, "param", 1)
×
127
    return Config(max_processes=max_processes)
×
128

129

130
@pytest.fixture
1✔
131
def memory_filtered_observations():
1✔
132
    from thor.observations import Observations
×
133

NEW
134
    return Observations.from_parquet(FIXTURES_DIR / f"{TEST_ORBIT_ID}/filtered_observations.parquet")
×
135

136

137
@pytest.fixture
1✔
138
def memory_transformed_detections():
1✔
139
    from thor.range_and_transform import TransformedDetections
×
140

141
    return TransformedDetections.from_parquet(
×
142
        FIXTURES_DIR / f"{TEST_ORBIT_ID}/transformed_detections.parquet"
143
    )
144

145

146
@pytest.fixture
1✔
147
def memory_clusters():
1✔
148
    from thor.clusters import ClusterMembers, Clusters
×
149

150
    clusters = Clusters.from_parquet(FIXTURES_DIR / f"{TEST_ORBIT_ID}/clusters.parquet")
×
NEW
151
    cluster_members = ClusterMembers.from_parquet(FIXTURES_DIR / f"{TEST_ORBIT_ID}/cluster_members.parquet")
×
152
    return clusters, cluster_members
×
153

154

155
@pytest.fixture
1✔
156
def memory_iod_orbits():
1✔
157
    from thor.orbit_determination import FittedOrbitMembers, FittedOrbits
×
158

NEW
159
    iod_orbits = FittedOrbits.from_parquet(FIXTURES_DIR / f"{TEST_ORBIT_ID}/iod_orbits.parquet")
×
160
    iod_orbit_members = FittedOrbitMembers.from_parquet(
×
161
        FIXTURES_DIR / f"{TEST_ORBIT_ID}/iod_orbit_members.parquet"
162
    )
163
    return iod_orbits, iod_orbit_members
×
164

165

166
@pytest.fixture
1✔
167
def memory_od_orbits():
1✔
168
    from thor.orbit_determination import FittedOrbitMembers, FittedOrbits
×
169

NEW
170
    od_orbits = FittedOrbits.from_parquet(FIXTURES_DIR / f"{TEST_ORBIT_ID}/od_orbits.parquet")
×
171
    od_orbit_members = FittedOrbitMembers.from_parquet(
×
172
        FIXTURES_DIR / f"{TEST_ORBIT_ID}/od_orbit_members.parquet"
173
    )
174
    return od_orbits, od_orbit_members
×
175

176

177
@pytest.fixture
1✔
178
def ray_cluster(memory_config):
1✔
179
    import ray
×
180
    from adam_core.ray_cluster import initialize_use_ray
×
181

182
    if memory_config.max_processes > 1:
×
NEW
183
        initialize_use_ray(num_cpus=memory_config.max_processes, object_store_bytes=4000000000)
×
184

185
    yield
×
186
    if memory_config.max_processes > 1:
×
187
        ray.shutdown()
×
188

189

190
@pytest.mark.memory
1✔
191
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
192
def test_load_input_observations(memory_snapshot, memory_config, ray_cluster, memory_input_observations):
1✔
193
    from thor.observations import Observations
×
194

195
    # It's always necessary to sort ahead of time, so we include it in our test
196
    memory_input_observations = memory_input_observations.sort_by(
×
197
        ["time.days", "time.nanos", "observatory_code"]
198
    )
199
    memory_input_observations = memory_input_observations.set_column(
×
200
        "time", memory_input_observations.time.rescale("utc")
201
    )
NEW
202
    Observations.from_input_observations(memory_input_observations)
×
203

204

205
@pytest.mark.memory
1✔
206
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
207
def test_filter_observations(
1✔
208
    memory_snapshot, memory_config, ray_cluster, memory_observations, memory_test_orbit
209
):
210
    from thor.main import filter_observations
×
211

NEW
212
    filter_observations(memory_observations, memory_test_orbit, memory_config)
×
213

214

215
@pytest.mark.memory
1✔
216
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
217
def test_range_and_transform(
1✔
218
    memory_snapshot,
219
    memory_test_orbit,
220
    memory_filtered_observations,
221
    memory_config,
222
    ray_cluster,
223
):
224
    from thor.range_and_transform import range_and_transform
×
225

226
    range_and_transform(
×
227
        memory_test_orbit,
228
        memory_filtered_observations,
229
        max_processes=memory_config.max_processes,
230
    )
231

232

233
@pytest.mark.memory
1✔
234
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
235
def test_cluster_and_link(memory_transformed_detections, memory_config, ray_cluster, memory_snapshot):
1✔
236
    from thor.main import cluster_and_link
×
237

NEW
238
    cluster_and_link(
×
239
        memory_transformed_detections,
240
        vx_range=[memory_config.vx_min, memory_config.vx_max],
241
        vy_range=[memory_config.vy_min, memory_config.vy_max],
242
        vx_bins=memory_config.vx_bins,
243
        vy_bins=memory_config.vy_bins,
244
        radius=memory_config.cluster_radius,
245
        min_obs=memory_config.cluster_min_obs,
246
        min_arc_length=memory_config.cluster_min_arc_length,
247
        alg=memory_config.cluster_algorithm,
248
        chunk_size=memory_config.cluster_chunk_size,
249
        max_processes=memory_config.max_processes,
250
    )
251

252

253
@pytest.mark.memory
1✔
254
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
255
def test_initial_orbit_determination(
1✔
256
    memory_config,
257
    memory_filtered_observations,
258
    memory_clusters,
259
    ray_cluster,
260
    memory_snapshot,
261
):
262
    from thor.orbits.iod import initial_orbit_determination
×
263

NEW
264
    _, cluster_members = memory_clusters
×
NEW
265
    initial_orbit_determination(
×
266
        memory_filtered_observations,
267
        cluster_members,
268
        min_obs=memory_config.iod_min_obs,
269
        min_arc_length=memory_config.iod_min_arc_length,
270
        contamination_percentage=memory_config.iod_contamination_percentage,
271
        rchi2_threshold=memory_config.iod_rchi2_threshold,
272
        observation_selection_method=memory_config.iod_observation_selection_method,
273
        # propagator=memory_config.propagator,
274
        propagator_kwargs={},
275
        chunk_size=memory_config.iod_chunk_size,
276
        max_processes=memory_config.max_processes,
277
    )
278

279

280
@pytest.mark.memory
1✔
281
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
282
def test_differential_correction(
1✔
283
    memory_iod_orbits,
284
    memory_filtered_observations,
285
    memory_config,
286
    ray_cluster,
287
    memory_snapshot,
288
):
289
    from thor.orbits.od import differential_correction
×
290

291
    orbits, orbit_members = memory_iod_orbits
×
292
    differential_correction(
×
293
        orbits,
294
        orbit_members,
295
        memory_filtered_observations,
296
        min_obs=memory_config.od_min_obs,
297
        min_arc_length=memory_config.od_min_arc_length,
298
        contamination_percentage=memory_config.od_contamination_percentage,
299
        rchi2_threshold=memory_config.od_rchi2_threshold,
300
        delta=memory_config.od_delta,
301
        max_iter=memory_config.od_max_iter,
302
        chunk_size=memory_config.od_chunk_size,
303
        max_processes=memory_config.max_processes,
304
    )
305

306

307
@pytest.mark.memory
1✔
308
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
309
def test_merge_and_extend(
1✔
310
    memory_od_orbits,
311
    memory_filtered_observations,
312
    memory_config,
313
    ray_cluster,
314
    memory_snapshot,
315
):
316
    from thor.orbits.attribution import merge_and_extend_orbits
×
317

318
    orbits, orbit_members = memory_od_orbits
×
319
    merge_and_extend_orbits(
×
320
        orbits,
321
        orbit_members,
322
        memory_filtered_observations,
323
        min_obs=memory_config.arc_extension_min_obs,
324
        min_arc_length=memory_config.arc_extension_min_arc_length,
325
        contamination_percentage=memory_config.arc_extension_contamination_percentage,
326
        rchi2_threshold=memory_config.arc_extension_rchi2_threshold,
327
        radius=memory_config.arc_extension_radius,
328
        orbits_chunk_size=memory_config.arc_extension_chunk_size,
329
        max_processes=memory_config.max_processes,
330
    )
331

332

333
# Disabling until smaller dataset is available
334
@pytest.mark.memory
1✔
335
@pytest.mark.parametrize("memory_config", CONFIG_PROCESSES, indirect=True)
1✔
336
def test_link_test_orbit(memory_snapshot, memory_config, memory_observations, memory_test_orbit, ray_cluster):
1✔
337
    from thor.main import link_test_orbit
×
338

339
    for result in link_test_orbit(
×
340
        memory_test_orbit,
341
        memory_observations,
342
        config=memory_config,
343
    ):
344
        pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc