• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nz-gravity / LogPSplinePSD / 19928520537

04 Dec 2025 12:11PM UTC coverage: 79.909% (-0.02%) from 79.93%
19928520537

push

github

avivajpeyi
refactor: streamline imports and improve frequency handling in VARMAData; update tests for frequency validation

847 of 1004 branches covered (84.36%)

Branch coverage included in aggregate %.

16 of 19 new or added lines in 4 files covered. (84.21%)

1 existing line in 1 file now uncovered.

5135 of 6482 relevant lines covered (79.22%)

1.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.1
/src/log_psplines/example_datasets/varma_data.py
1
from typing import Optional
2✔
2

3
import matplotlib.pyplot as plt
2✔
4
import numpy as np
2✔
5
from numpy.fft import rfft
2✔
6

7

8
class VARMAData:
2✔
9
    """
10
    Simulate Vector Autoregressive Moving Average (VARMA) processes and compute related spectral properties.
11
    """
12

13
    def __init__(
2✔
14
        self,
15
        n_samples: int = 1024,
16
        sigma: np.ndarray = np.array([[1.0, 0.9], [0.9, 1.0]]),
17
        var_coeffs: np.ndarray = np.array(
18
            [[[0.5, 0.0], [0.0, -0.3]], [[0.0, 0.0], [0.0, -0.5]]]
19
        ),
20
        vma_coeffs: np.ndarray = np.array([[[1.0, 0.0], [0.0, 1.0]]]),
21
        seed: int = None,
22
        fs: float = 1.0,
23
    ):
24
        """
25
        Initialize the SimVARMA class.
26

27
        Args:
28
            n_samples (int): Number of samples to generate.
29
            var_coeffs (np.ndarray): VAR coefficient array.
30
            vma_coeffs (np.ndarray): VMA coefficient array.
31
            sigma (np.ndarray): Covariance matrix or scalar variance.
32
        """
33
        self.n_samples = n_samples
2✔
34
        self.var_coeffs = var_coeffs
2✔
35
        self.vma_coeffs = vma_coeffs
2✔
36
        self.sigma = sigma
2✔
37
        self.dim = vma_coeffs.shape[1]
2✔
38
        self.psd_scaling = 1.0
2✔
39
        self.n_freq_samples = n_samples // 2
2✔
40

41
        self.fs = float(fs)
2✔
42
        # Frequency grid in Hz (drop the DC bin to match PSD plotting expectations)
43
        self.freq = np.fft.rfftfreq(n_samples, d=1.0 / self.fs)[1:]
2✔
44
        self.time = np.arange(n_samples) / self.fs
2✔
45
        self.data = None  # set in "resimulate"
2✔
46
        self.periodogram = None  # set in "resimulate"
2✔
47
        self.welch_psd = None  # set in "resimulate"
2✔
48
        self.welch_f = None  # set in "resimulate"
2✔
49
        self.resimulate(seed=seed)
2✔
50

51
        self.channel_stds = np.std(self.data, axis=0)
2✔
52
        self.psd_scaling = float(np.std(self.data) ** 2)
2✔
53
        self.psd = _calculate_true_varma_psd(
2✔
54
            self.freq,
55
            self.dim,
56
            self.var_coeffs,
57
            self.vma_coeffs,
58
            self.sigma,
59
            self.fs,
60
            self.channel_stds,
61
            self.psd_scaling,
62
        )
63

64
    def resimulate(self, seed=None):
2✔
65
        """
66
        Simulate VARMA process.
67

68
        Args:
69
            seed (int, optional): Random seed for reproducibility.
70

71
        Returns:
72
            np.ndarray: Simulated VARMA process data.
73
        """
74
        if seed is not None:
2✔
75
            np.random.seed(seed)
2✔
76

77
        lag_ma = self.vma_coeffs.shape[0]
2✔
78
        lag_ar = self.var_coeffs.shape[0]
2✔
79

80
        if self.sigma.shape[0] == 1:
2✔
81
            cov_matrix = np.identity(self.dim) * self.sigma
×
82
        else:
83
            cov_matrix = self.sigma
2✔
84

85
        x_init = np.zeros((lag_ar + 1, self.dim))
2✔
86
        x = np.empty((self.n_samples + 101, self.dim))
2✔
87
        x[:] = np.nan
2✔
88
        x[: lag_ar + 1] = x_init
2✔
89
        epsilon = np.random.multivariate_normal(
2✔
90
            np.zeros(self.dim), cov_matrix, size=[lag_ma]
91
        )
92

93
        for i in range(lag_ar + 1, x.shape[0]):
2✔
94
            epsilon = np.concatenate(
2✔
95
                [
96
                    np.random.multivariate_normal(
97
                        np.zeros(self.dim), cov_matrix, size=[1]
98
                    ),
99
                    epsilon[:-1],
100
                ]
101
            )
102
            x[i] = np.sum(
2✔
103
                np.matmul(
104
                    self.var_coeffs,
105
                    x[i - 1 : i - lag_ar - 1 : -1][..., np.newaxis],
106
                ),
107
                axis=(0, -1),
108
            ) + np.sum(
109
                np.matmul(self.vma_coeffs, epsilon[..., np.newaxis]),
110
                axis=(0, -1),
111
            )
112

113
        self.data = x[101:]
2✔
114

115
    def get_periodogram(self):
2✔
116
        """
117
        Return a one-sided PSD matrix estimate for the simulated data.
118
        """
119
        n_freq = self.n_freq_samples
2✔
120
        dim = self.dim
2✔
121
        data = self.data
2✔
122
        N = data.shape[0]
2✔
123
        data = data - np.mean(data, axis=0)
2✔
124
        fft_vals = rfft(data, axis=0)[1 : n_freq + 1]
2✔
125
        if fft_vals.shape[0] != self.freq.shape[0]:
2✔
126
            raise ValueError("FFT output and frequency grid mismatch.")
×
127
        scale = np.full(self.freq.shape, 2.0 / (N * self.fs), dtype=np.float64)
2✔
128
        if N % 2 == 0 and scale.size > 0:
2✔
129
            scale[-1] = 1.0 / (N * self.fs)
2✔
130
        periodogram = scale[:, None, None] * (
2✔
131
            fft_vals[:, :, None] * np.conj(fft_vals[:, None, :])
132
        )
133
        # Add epsilon to avoid log(0) and extremely small values
134
        eps = 1e-12
2✔
135
        periodogram = np.where(np.abs(periodogram) < eps, eps, periodogram)
2✔
136
        return periodogram
2✔
137

138
    def get_true_psd(self):
2✔
139
        """Return the theoretical one-sided PSD matrix."""
140
        eps = 1e-12
2✔
141
        true_psd = np.where(np.abs(self.psd) < eps, eps, self.psd)
2✔
142
        return true_psd
2✔
143

144
    def plot(self, axs=None, fname: Optional[str] = None):
2✔
145
        """
146
        Matrix plot: diagonal is the PSD, below diagonal is real CSD, above diagonal is imag CSD.
147
        Plots both the true PSD and the periodogram using consistent scaling.
148
        """
149
        if self.data is None:
2✔
150
            raise ValueError("No data to plot. Run resimulate first.")
×
151
        dim = self.dim
2✔
152
        periodogram = self.get_periodogram()
2✔
153
        true_psd = self.get_true_psd()
2✔
154
        freq_hz = self.freq
2✔
155
        # Setup axes
156
        if axs is None:
2✔
157
            fig, axs = plt.subplots(
2✔
158
                dim, dim, figsize=(4 * dim, 4 * dim), sharex=True
159
            )
160
        else:
161
            fig = axs[0, 0].figure
×
162
        data_kwgs = dict(alpha=0.3, lw=2, zorder=-10, color="k")
2✔
163
        true_kwgs = dict(lw=1, zorder=10, color="k")
2✔
164
        for i in range(dim):
2✔
165
            for j in range(dim):
2✔
166
                ax = axs[i, j]
2✔
167
                if i == j:
2✔
168
                    ax.plot(
2✔
169
                        freq_hz,
170
                        true_psd[:, i, i].real,
171
                        label="True PSD",
172
                        **true_kwgs,
173
                    )
174
                    ax.plot(
2✔
175
                        freq_hz,
176
                        periodogram[:, i, i].real,
177
                        label="Periodogram",
178
                        **data_kwgs,
179
                    )
180
                    ax.set_title(f"PSD: channel {i + 1}")
2✔
181
                    ax.set_yscale("log")
2✔
182
                elif i > j:
2✔
183
                    ax.plot(
2✔
184
                        freq_hz,
185
                        true_psd[:, i, j].real,
186
                        label="True Re(CSD)",
187
                        **true_kwgs,
188
                    )
189
                    ax.plot(
2✔
190
                        freq_hz,
191
                        periodogram[:, i, j].real,
192
                        label="Periodogram Re(CSD)",
193
                        **data_kwgs,
194
                    )
195
                    ax.set_title(f"Re(CSD): {i + 1},{j + 1}")
2✔
196
                else:
197
                    ax.plot(
2✔
198
                        freq_hz,
199
                        true_psd[:, i, j].imag,
200
                        label="True Im(CSD)",
201
                        **true_kwgs,
202
                    )
203
                    ax.plot(
2✔
204
                        freq_hz,
205
                        periodogram[:, i, j].imag,
206
                        label="Periodogram Im(CSD)",
207
                        **data_kwgs,
208
                    )
209
                    ax.set_title(f"Im(CSD): {i + 1},{j + 1}")
2✔
210
                if i == dim - 1:
2✔
211
                    ax.set_xlabel("Frequency (Hz)")
2✔
212
                if j == 0:
2✔
213
                    ax.set_ylabel("Power / CSD")
2✔
214
                ax.legend(fontsize=8)
2✔
215
        fig.tight_layout()
2✔
216
        if fname:
2✔
217
            fig.savefig(fname, bbox_inches="tight")
2✔
218
        return axs
2✔
219

220

221
def _calculate_true_varma_psd(
2✔
222
    freqs_hz: np.ndarray,
223
    dim: int,
224
    var_coeffs: np.ndarray,
225
    vma_coeffs: np.ndarray,
226
    sigma: np.ndarray,
227
    fs: float,
228
    channel_stds: np.ndarray,
229
    scaling_factor: float,
230
) -> np.ndarray:
231
    """
232
    Calculate the spectral matrix for given frequencies.
233

234
    Args:
235
        freqs_hz (np.ndarray): Positive frequency grid in Hz.
236
        dim (int): Process dimension.
237
        var_coeffs/vma_coeffs: VAR/MA coefficient arrays.
238
        sigma (np.ndarray): Innovation covariance matrix.
239
        fs (float): Sampling frequency in Hz.
240

241
    Returns:
242
        np.ndarray: One-sided PSD matrix evaluated at ``freqs_hz``.
243
    """
244
    freqs_hz = np.asarray(freqs_hz, dtype=np.float64)
2✔
245
    if freqs_hz.size:
2✔
246
        nyquist = fs / 2.0
2✔
247
        if freqs_hz.max() > nyquist + 1e-12:
2✔
NEW
248
            raise ValueError(
×
249
                "Frequency grid should be in Hz (0 .. fs/2). "
250
                "Angular-frequency input detected."
251
            )
252

253
    omega = 2.0 * np.pi * freqs_hz / fs  # radians per sample
2✔
254
    spec_matrix = np.empty((freqs_hz.size, dim, dim), dtype=np.complex128)
2✔
255
    for idx, w in enumerate(omega):
2✔
256
        spec_matrix[idx] = _calculate_spec_matrix_helper(
2✔
257
            float(w), dim, var_coeffs, vma_coeffs, sigma
258
        )
259

260
    # Convert to one-sided PSD: double positive frequencies except Nyquist
261
    psd = (2.0 * spec_matrix) / fs
2✔
262
    if freqs_hz.size and np.isclose(freqs_hz[-1], fs / 2.0):
2✔
263
        psd[-1] *= 0.5
2✔
264
    if channel_stds is not None and scaling_factor not in (None, 0):
2✔
265
        scale_matrix = np.outer(channel_stds, channel_stds) / float(
2✔
266
            scaling_factor
267
        )
268
        psd = psd * scale_matrix[None, :, :]
2✔
269
    return psd
2✔
270

271

272
def _calculate_spec_matrix_helper(omega, dim, var_coeffs, vma_coeffs, sigma):
2✔
273
    """
274
    Helper function to calculate spectral matrix for a single frequency.
275

276
    Args:
277
        omega (float): Angular frequency (rad/sample).
278
    """
279
    if sigma.shape[0] == 1:
2✔
280
        cov_matrix = np.identity(dim) * sigma
×
281
    else:
282
        cov_matrix = sigma
2✔
283

284
    k_ar = np.arange(1, var_coeffs.shape[0] + 1)
2✔
285
    angles_ar = k_ar[:, np.newaxis, np.newaxis] * omega
2✔
286
    A_f_re_ar = np.sum(var_coeffs * np.cos(angles_ar), axis=0)
2✔
287
    A_f_im_ar = -np.sum(var_coeffs * np.sin(angles_ar), axis=0)
2✔
288
    A_f_ar = A_f_re_ar + 1j * A_f_im_ar
2✔
289
    A_bar_f_ar = np.identity(dim) - A_f_ar
2✔
290
    H_f_ar = np.linalg.inv(A_bar_f_ar)
2✔
291

292
    k_ma = np.arange(vma_coeffs.shape[0])
2✔
293
    angles_ma = k_ma[:, np.newaxis, np.newaxis] * omega
2✔
294
    A_f_re_ma = np.sum(vma_coeffs * np.cos(angles_ma), axis=0)
2✔
295
    A_f_im_ma = -np.sum(vma_coeffs * np.sin(angles_ma), axis=0)
2✔
296
    A_f_ma = A_f_re_ma + 1j * A_f_im_ma
2✔
297
    A_bar_f_ma = A_f_ma
2✔
298
    H_f_ma = A_bar_f_ma
2✔
299

300
    spec_mat = H_f_ar @ H_f_ma @ cov_matrix @ H_f_ma.conj().T @ H_f_ar.conj().T
2✔
301
    return spec_mat
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc