• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nz-gravity / LogPSplinePSD / 19383920888

15 Nov 2025 03:43AM UTC coverage: 81.033% (+1.7%) from 79.377%
19383920888

Pull #17

github

web-flow
Merge 136aa51ca into 6c0003f36
Pull Request #17: Add testing with LISA

739 of 862 branches covered (85.73%)

Branch coverage included in aggregate %.

289 of 344 new or added lines in 9 files covered. (84.01%)

1 existing line in 1 file now uncovered.

4738 of 5897 relevant lines covered (80.35%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.41
/src/log_psplines/example_datasets/lisa_data.py
1
#!/usr/bin/env python3
2
"""
3
Produce analytic LISA PSDs/CSDs using the LDC noise realization and compare them
4
against periodograms estimated from the provided tdi.h5 file.  The script always:
5

6
1. Loads X2/Y2/Z2 time series from data/tdi.h5
7
2. Estimates the auto- and cross-periodograms
8
3. Builds the TDI2 analytic PSD/CSD using the LDC discrete-noise filters
9
4. Saves the PSD/CSD table + covariance cube
10
5. Generates a triangle plot with PSDs on the diagonal and |CSD|s off-diagonal
11

12
See https://arxiv.org/abs/2211.02539
13
"""
14

15
from __future__ import annotations
2✔
16

17
import time
2✔
18
from dataclasses import dataclass
2✔
19
from pathlib import Path
2✔
20
from typing import Dict, Optional, Tuple, Union
2✔
21

22
import h5py
2✔
23
import matplotlib.pyplot as plt
2✔
24
import numpy as np
2✔
25
import requests
2✔
26

27
# --- Constants and default paths -------------------------------------------------
28
C_LIGHT = 299_792_458.0  # m / s
2✔
29
L_ARM = 2.5e9  # m
2✔
30
LIGHT_TRAVEL_TIME = L_ARM / C_LIGHT  # ≈ 8.33 s
2✔
31

32
OMS_ASD = 1.5e-11
2✔
33
OMS_FKNEE = 2e-3
2✔
34
PM_ASD = 3e-15
2✔
35
PM_LOW_FKNEE = 4e-4
2✔
36
PM_HIGH_FKNEE = 8e-3
2✔
37
LASER_FREQ = 2.81e14  # Hz
2✔
38

39
DATA_PATH = Path("data/tdi.h5")
2✔
40
PSD_PNG = Path("lisa_psd_plot.png")
2✔
41
TRIANGLE_PNG = Path("spectra_triangle.png")
2✔
42
TEN_DAYS = "https://raw.githubusercontent.com/nz-gravity/test_data/main/lisa_noise/noise_4a_truncated/data/tdi.h5"
2✔
43

44

45
def ensure_lisa_tdi_data(
2✔
46
    data_url: str = TEN_DAYS, destination: Path = DATA_PATH
47
) -> float:
48
    """Download the default LISA TDI dataset if it is not present locally."""
49
    start_download = time.perf_counter()
2✔
50
    if destination.exists():
2✔
NEW
51
        print("Data file exists; download skipped.")
×
NEW
52
        return 0.0
×
53

54
    destination.parent.mkdir(parents=True, exist_ok=True)
2✔
55
    print("Downloading data...")
2✔
56
    response = requests.get(data_url, stream=True)
2✔
57
    if not response.ok:
2✔
NEW
58
        raise RuntimeError(f"Download failed ({response.status_code})")
×
59

60
    with destination.open("wb") as f:
2✔
61
        for chunk in response.iter_content(8192):
2✔
62
            f.write(chunk)
2✔
63

64
    download_time = time.perf_counter() - start_download
2✔
65
    print(f"Download complete: {download_time:.3f} s")
2✔
66
    return download_time
2✔
67

68

69
# --- Noise and transfer helpers --------------------------------------------------
70
def lisa_link_noises_ldc(
2✔
71
    freq: np.ndarray,
72
    fs: float,
73
    fmin: float,
74
) -> Tuple[np.ndarray, np.ndarray]:
75
    """
76
    Reproduce the single-link proof-mass (Spm) and optical-path (Sop) PSDs
77
    used in the LDC noise realizations.
78

79
    See https://zenodo.org/doi/10.5281/zenodo.15698080
80
    """
81
    exp_term = np.exp(-2.0 * np.pi * fmin / fs) * np.exp(
2✔
82
        -2j * np.pi * freq / fs
83
    )
84
    denom_mag2 = np.abs(1.0 - exp_term) ** 2
2✔
85

86
    psd_tm_high = (
2✔
87
        (2.0 * PM_ASD * LASER_FREQ / (2.0 * np.pi * C_LIGHT)) ** 2
88
        * (2.0 * np.pi * fmin) ** 2
89
        / denom_mag2
90
        / (fs * fmin) ** 2
91
    )
92
    psd_tm_low = (
2✔
93
        (2.0 * PM_ASD * LASER_FREQ * PM_LOW_FKNEE / (2.0 * np.pi * C_LIGHT))
94
        ** 2
95
        * (2.0 * np.pi * fmin) ** 2
96
        / denom_mag2
97
        / (fs * fmin) ** 2
98
        * np.abs(1.0 / (1.0 - np.exp(-2j * np.pi * freq / fs))) ** 2
99
        * (2.0 * np.pi / fs) ** 2
100
    )
101
    Spm = psd_tm_high + psd_tm_low
2✔
102

103
    psd_oms_high = (OMS_ASD * fs * LASER_FREQ / C_LIGHT) ** 2 * np.sin(
2✔
104
        2.0 * np.pi * freq / fs
105
    ) ** 2
106
    psd_oms_low = (
2✔
107
        (2.0 * np.pi * OMS_ASD * LASER_FREQ * OMS_FKNEE**2 / C_LIGHT) ** 2
108
        * (2.0 * np.pi * fmin) ** 2
109
        / denom_mag2
110
        / (fs * fmin) ** 2
111
    )
112
    Sop = psd_oms_high + psd_oms_low
2✔
113
    return Spm, Sop
2✔
114

115

116
def tdi2_psd_and_csd(
2✔
117
    freq: np.ndarray, Spm: np.ndarray, Sop: np.ndarray
118
) -> Tuple[np.ndarray, np.ndarray]:
119
    """
120
    Compute diagonal PSD (X2) and cross-term CSD (XY) for TDI2 combinations.
121

122
    Because of the symmetries of the equal-arm constellation:
123
        S_X2 = S_Y2 = S_Z2
124
        S_XY = S_YZ = S_ZX
125

126
    See e.g. Eq. (54-55) of https://arxiv.org/pdf/2211.02539
127

128
    Also see
129
    https://github.com/mikekatz04/LISAanalysistools/blob/bf2ea28e2c62e9f0be0b8f74884b601b9bf2097d/src/lisatools/sensitivity.py#L347C10-L347C39
130
    """
131
    x = 2.0 * np.pi * LIGHT_TRAVEL_TIME * freq
2✔
132
    sinx = np.sin(x)
2✔
133
    sin2x = np.sin(2.0 * x)
2✔
134
    cosx = np.cos(x)
2✔
135
    cos2x = np.cos(2.0 * x)
2✔
136

137
    diag = 64.0 * sinx**2 * sin2x**2 * Sop
2✔
138
    diag += 256.0 * (3.0 + cos2x) * cosx**2 * sinx**4 * Spm
2✔
139

140
    csd = -16.0 * sinx * (sin2x**3) * (4.0 * Spm + Sop)
2✔
141
    return diag, csd
2✔
142

143

144
def covariance_matrix(diag: np.ndarray, csd: np.ndarray) -> np.ndarray:
2✔
145
    """Assemble the 3×3 covariance matrix Σ(f) for each frequency."""
146
    nf = diag.size
2✔
147
    cov = np.zeros((nf, 3, 3), dtype=np.complex128)
2✔
148
    cov[:, 0, 0] = cov[:, 1, 1] = cov[:, 2, 2] = diag
2✔
149
    cov[:, 0, 1] = cov[:, 1, 0] = csd
2✔
150
    cov[:, 1, 2] = cov[:, 2, 1] = csd
2✔
151
    cov[:, 0, 2] = cov[:, 2, 0] = csd
2✔
152
    return cov
2✔
153

154

155
def periodogram_covariance(
2✔
156
    auto_psd: Dict[str, np.ndarray], cross_csd: Dict[str, np.ndarray]
157
) -> np.ndarray:
158
    """Build the empirical 3×3 spectral matrix from auto/cross periodograms."""
159
    nf = len(next(iter(auto_psd.values())))
2✔
160
    cov = np.zeros((nf, 3, 3), dtype=np.complex128)
2✔
161
    cov[:, 0, 0] = auto_psd["X"]
2✔
162
    cov[:, 1, 1] = auto_psd["Y"]
2✔
163
    cov[:, 2, 2] = auto_psd["Z"]
2✔
164

165
    cov[:, 0, 1] = cross_csd["XY"]
2✔
166
    cov[:, 1, 0] = np.conj(cov[:, 0, 1])
2✔
167

168
    cov[:, 1, 2] = cross_csd["YZ"]
2✔
169
    cov[:, 2, 1] = np.conj(cov[:, 1, 2])
2✔
170

171
    cov[:, 2, 0] = cross_csd["ZX"]
2✔
172
    cov[:, 0, 2] = np.conj(cov[:, 2, 0])
2✔
173
    return cov
2✔
174

175

176
def make_plot(
2✔
177
    freq: np.ndarray, diag: np.ndarray, csd: np.ndarray, out_path: Path
178
) -> None:
179
    """Produce a quick-look log-log plot of PSD and |CSD|."""
180
    fig, ax = plt.subplots(figsize=(6.0, 4.0))
2✔
181
    ax.loglog(freq, diag, label="PSD (X2=Y2=Z2)")
2✔
182
    ax.loglog(freq, np.abs(csd), label="|CSD| (XY=YZ=ZX)")
2✔
183
    ax.set_xlabel("Frequency [Hz]")
2✔
184
    ax.set_ylabel("Spectral Density [1/Hz]")
2✔
185
    ax.legend()
2✔
186
    ax.set_title("LISA TDI2 analytic noise")
2✔
187
    fig.tight_layout()
2✔
188
    fig.savefig(out_path, dpi=200)
2✔
189
    plt.close(fig)
2✔
190

191

192
# --- Data handling + spectral estimates -----------------------------------------
193
def load_tdi_timeseries(
2✔
194
    h5_path: Path,
195
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
196
    """Read t, X2, Y2, Z2 arrays from the HDF5 file."""
197
    with h5py.File(h5_path, "r") as f:
2✔
198
        t = np.array(f["t"])
2✔
199
        X2 = np.array(f["X2"])
2✔
200
        Y2 = np.array(f["Y2"])
2✔
201
        Z2 = np.array(f["Z2"])
2✔
202
    return t, X2, Y2, Z2
2✔
203

204

205
def compute_periodograms(
2✔
206
    t: np.ndarray,
207
    X2: np.ndarray,
208
    Y2: np.ndarray,
209
    Z2: np.ndarray,
210
) -> Tuple[
211
    np.ndarray, Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, float]
212
]:
213
    """Return (freq, auto-PSD dict, cross-CSD dict, metadata) from the time-domain data."""
214
    dt = t[1] - t[0]
2✔
215
    n = len(t)
2✔
216
    freq_full = np.fft.rfftfreq(n, dt)
2✔
217
    fft_x = np.fft.rfft(X2)
2✔
218
    fft_y = np.fft.rfft(Y2)
2✔
219
    fft_z = np.fft.rfft(Z2)
2✔
220

221
    scale = dt / n
2✔
222
    auto = {
2✔
223
        "X": scale * np.abs(fft_x) ** 2,
224
        "Y": scale * np.abs(fft_y) ** 2,
225
        "Z": scale * np.abs(fft_z) ** 2,
226
    }
227
    cross = {
2✔
228
        "XY": scale * fft_x * np.conj(fft_y),
229
        "YZ": scale * fft_y * np.conj(fft_z),
230
        "ZX": scale * fft_z * np.conj(fft_x),
231
    }
232

233
    # Double the positive-frequency interior to account for two-sided FFT.
234
    for arr in list(auto.values()) + list(cross.values()):
2✔
235
        arr[1:-1] *= 2.0
2✔
236

237
    # Drop the DC bin for plotting (log axis incompatible with zero frequency).
238
    freq = freq_full[1:]
2✔
239
    for key in auto:
2✔
240
        auto[key] = auto[key][1:]
2✔
241
    for key in cross:
2✔
242
        cross[key] = cross[key][1:]
2✔
243

244
    meta = {"dt": dt, "fs": 1.0 / dt, "n": n, "fmin": 1.0 / (n * dt)}
2✔
245
    return freq, auto, cross, meta
2✔
246

247

248
# --- Plotting --------------------------------------------------------------------
249
def plot_triangle_spectra(
2✔
250
    freq: np.ndarray,
251
    diag: np.ndarray,
252
    csd: np.ndarray,
253
    auto_psd: Dict[str, np.ndarray],
254
    cross_csd: Dict[str, np.ndarray],
255
    out_path: Path,
256
) -> None:
257
    """
258
    Plot a lower-triangular matrix with PSDs on the diagonal and |CSD|s
259
    on the off-diagonal entries:
260

261
        X
262
        |XY|   Y
263
        |XZ|  |YZ|   Z
264
    """
265

266
    channels = ["X", "Y", "Z"]
2✔
267
    combo_map = {
2✔
268
        (1, 0): "XY",
269
        (2, 0): "ZX",  # equals |XZ|
270
        (2, 1): "YZ",
271
    }
272

273
    fig, axes = plt.subplots(
2✔
274
        3, 3, figsize=(8.0, 8.0), sharex=True, sharey=False
275
    )
276

277
    for i in range(3):
2✔
278
        for j in range(3):
2✔
279
            ax = axes[i, j]
2✔
280
            if j > i:
2✔
281
                ax.axis("off")
2✔
282
                continue
2✔
283

284
            if i == j:
2✔
285
                chan = channels[i]
2✔
286
                ax.loglog(
2✔
287
                    freq,
288
                    auto_psd.get(chan, diag),
289
                    label=f"{chan} periodogram",
290
                    color=f"C{i}",
291
                    alpha=0.7,
292
                )
293
                ax.loglog(freq, diag, "k-", lw=1.2, label="Analytic PSD")
2✔
294
                ax.set_title(f"{chan} PSD")
2✔
295
            else:
296
                combo = combo_map[(i, j)]
2✔
297
                label = f"|{combo}| periodogram"
2✔
298
                ax.loglog(
2✔
299
                    freq,
300
                    np.abs(cross_csd.get(combo, csd)),
301
                    label=label,
302
                    color="tab:blue",
303
                    alpha=0.7,
304
                )
305
                ax.loglog(
2✔
306
                    freq, np.abs(csd), "k-", lw=1.2, label="|Analytic CSD|"
307
                )
308
                ax.set_title(f"|{combo}| CSD")
2✔
309

310
            ax.set_yscale("log")
2✔
311
            ax.set_xscale("log")
2✔
312
            if i == 2:
2✔
313
                ax.set_xlabel("Frequency [Hz]")
2✔
314
            if j == 0:
2✔
315
                ax.set_ylabel("Spectral density [1/Hz]")
2✔
316
            ax.legend(fontsize="small")
2✔
317

318
    fig.tight_layout()
2✔
319
    fig.savefig(out_path, dpi=200)
2✔
320
    plt.close(fig)
2✔
321

322

323
@dataclass
2✔
324
class LISAData:
2✔
325
    """Container for frequency- and time-domain LISA spectra and helpers."""
326

327
    time: np.ndarray
2✔
328
    data: np.ndarray
2✔
329
    freq: np.ndarray
2✔
330
    matrix: np.ndarray
2✔
331
    true_matrix: np.ndarray
2✔
332

333
    @classmethod
2✔
334
    def load(
2✔
335
        cls, data_path: Path = DATA_PATH, data_url: str = TEN_DAYS
336
    ) -> "LISAData":
337
        """Fetch the LISA noise sample (if needed) and assemble spectra."""
338
        ensure_lisa_tdi_data(data_url=data_url, destination=data_path)
2✔
339

340
        t, X2, Y2, Z2 = load_tdi_timeseries(data_path)
2✔
341
        data = np.vstack((X2, Y2, Z2)).T  # shape (n_times, 3)
2✔
342
        freq, auto_psd, cross_csd, meta = compute_periodograms(t, X2, Y2, Z2)
2✔
343
        print(
2✔
344
            f"Loaded {len(t)} samples from {data_path} (fs={meta['fs']:.6f} Hz)."
345
        )
346

347
        Spm, Sop = lisa_link_noises_ldc(freq, fs=meta["fs"], fmin=meta["fmin"])
2✔
348
        diag, csd = tdi2_psd_and_csd(freq, Spm, Sop)
2✔
349
        true_cov = covariance_matrix(diag, csd)
2✔
350
        empirical_cov = periodogram_covariance(auto_psd, cross_csd)
2✔
351
        return cls(
2✔
352
            time=t,
353
            freq=freq,
354
            matrix=empirical_cov,
355
            true_matrix=true_cov,
356
            data=data,
357
        )
358

359
    def plot(
2✔
360
        self,
361
        fname: Union[Path, str] = TRIANGLE_PNG,
362
        diag_path: Optional[Union[Path, str]] = PSD_PNG,
363
    ) -> None:
364
        """Produce the diagnostic PSD/CSD plots for the stored spectra."""
365
        triangle_path = Path(fname)
2✔
366
        diag = self.true_matrix[:, 0, 0].real
2✔
367
        csd = self.true_matrix[:, 0, 1]
2✔
368

369
        auto_psd = {
2✔
370
            "X": self.matrix[:, 0, 0].real,
371
            "Y": self.matrix[:, 1, 1].real,
372
            "Z": self.matrix[:, 2, 2].real,
373
        }
374
        cross_csd = {
2✔
375
            "XY": self.matrix[:, 0, 1],
376
            "YZ": self.matrix[:, 1, 2],
377
            "ZX": self.matrix[:, 2, 0],
378
        }
379

380
        if diag_path is not None:
2✔
381
            diag_path = Path(diag_path)
2✔
382
            make_plot(self.freq, diag, csd, diag_path)
2✔
383
            print(f"Wrote analytic PSD plot to {diag_path.resolve()}")
2✔
384

385
        plot_triangle_spectra(
2✔
386
            self.freq, diag, csd, auto_psd, cross_csd, triangle_path
387
        )
388
        print(f"Wrote triangle plot to {triangle_path.resolve()}")
2✔
389

390

391
# --- Main entry point ------------------------------------------------------------
392
def main() -> None:
2✔
NEW
393
    lisa_data = LISAData.load()
×
NEW
394
    lisa_data.plot(TRIANGLE_PNG)
×
395

396

397
if __name__ == "__main__":
398
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc