• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nz-gravity / LogPSplinePSD / 19384398576

15 Nov 2025 04:24AM UTC coverage: 79.66% (-0.1%) from 79.76%
19384398576

push

github

avivajpeyi
fix scaling and one sided psd

727 of 864 branches covered (84.14%)

Branch coverage included in aggregate %.

42 of 47 new or added lines in 6 files covered. (89.36%)

6 existing lines in 2 files now uncovered.

4662 of 5901 relevant lines covered (79.0%)

1.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.35
/src/log_psplines/example_datasets/varma_data.py
1
from typing import Optional
2✔
2

3
import matplotlib.pyplot as plt
2✔
4
import numpy as np
2✔
5
from numpy.fft import rfft
2✔
6

7

8
class VARMAData:
2✔
9
    """
10
    Simulate Vector Autoregressive Moving Average (VARMA) processes and compute related spectral properties.
11
    """
12

13
    def __init__(
2✔
14
        self,
15
        n_samples: int = 1024,
16
        sigma: np.ndarray = np.array([[1.0, 0.9], [0.9, 1.0]]),
17
        var_coeffs: np.ndarray = np.array(
18
            [[[0.5, 0.0], [0.0, -0.3]], [[0.0, 0.0], [0.0, -0.5]]]
19
        ),
20
        vma_coeffs: np.ndarray = np.array([[[1.0, 0.0], [0.0, 1.0]]]),
21
        seed: int = None,
22
        fs: float = 1.0,
23
    ):
24
        """
25
        Initialize the SimVARMA class.
26

27
        Args:
28
            n_samples (int): Number of samples to generate.
29
            var_coeffs (np.ndarray): VAR coefficient array.
30
            vma_coeffs (np.ndarray): VMA coefficient array.
31
            sigma (np.ndarray): Covariance matrix or scalar variance.
32
        """
33
        self.n_samples = n_samples
2✔
34
        self.var_coeffs = var_coeffs
2✔
35
        self.vma_coeffs = vma_coeffs
2✔
36
        self.sigma = sigma
2✔
37
        self.dim = vma_coeffs.shape[1]
2✔
38
        self.psd_scaling = 1
2✔
39
        self.n_freq_samples = n_samples // 2
2✔
40

41
        self.fs = float(fs)
2✔
42
        self.freq = np.fft.rfftfreq(n_samples, d=1 / self.fs)[1:]
2✔
43
        self.time = np.arange(n_samples) / self.fs
2✔
44
        self.data = None  # set in "resimulate"
2✔
45
        self.periodogram = None  # set in "resimulate"
2✔
46
        self.welch_psd = None  # set in "resimulate"
2✔
47
        self.welch_f = None  # set in "resimulate"
2✔
48
        self.resimulate(seed=seed)
2✔
49

50
        self.psd = _calculate_true_varma_psd(
2✔
51
            self.freq,
52
            self.dim,
53
            self.var_coeffs,
54
            self.vma_coeffs,
55
            self.sigma,
56
            self.fs,
57
        )
58

59
    def resimulate(self, seed=None):
2✔
60
        """
61
        Simulate VARMA process.
62

63
        Args:
64
            seed (int, optional): Random seed for reproducibility.
65

66
        Returns:
67
            np.ndarray: Simulated VARMA process data.
68
        """
69
        if seed is not None:
2✔
70
            np.random.seed(seed)
2✔
71

72
        lag_ma = self.vma_coeffs.shape[0]
2✔
73
        lag_ar = self.var_coeffs.shape[0]
2✔
74

75
        if self.sigma.shape[0] == 1:
2✔
76
            cov_matrix = np.identity(self.dim) * self.sigma
×
77
        else:
78
            cov_matrix = self.sigma
2✔
79

80
        x_init = np.zeros((lag_ar + 1, self.dim))
2✔
81
        x = np.empty((self.n_samples + 101, self.dim))
2✔
82
        x[:] = np.nan
2✔
83
        x[: lag_ar + 1] = x_init
2✔
84
        epsilon = np.random.multivariate_normal(
2✔
85
            np.zeros(self.dim), cov_matrix, size=[lag_ma]
86
        )
87

88
        for i in range(lag_ar + 1, x.shape[0]):
2✔
89
            epsilon = np.concatenate(
2✔
90
                [
91
                    np.random.multivariate_normal(
92
                        np.zeros(self.dim), cov_matrix, size=[1]
93
                    ),
94
                    epsilon[:-1],
95
                ]
96
            )
97
            x[i] = np.sum(
2✔
98
                np.matmul(
99
                    self.var_coeffs,
100
                    x[i - 1 : i - lag_ar - 1 : -1][..., np.newaxis],
101
                ),
102
                axis=(0, -1),
103
            ) + np.sum(
104
                np.matmul(self.vma_coeffs, epsilon[..., np.newaxis]),
105
                axis=(0, -1),
106
            )
107

108
        self.data = x[101:]
2✔
109

110
    def get_periodogram(self):
2✔
111
        """
112
        Return a one-sided PSD matrix estimate for the simulated data.
113
        """
114
        n_freq = self.n_freq_samples
2✔
115
        dim = self.dim
2✔
116
        data = self.data
2✔
117
        N = data.shape[0]
2✔
118
        data = data - np.mean(data, axis=0)
2✔
119
        fft_vals = rfft(data, axis=0)[1 : n_freq + 1]
2✔
120
        if fft_vals.shape[0] != self.freq.shape[0]:
2✔
NEW
121
            raise ValueError("FFT output and frequency grid mismatch.")
×
122
        scale = np.full(self.freq.shape, 2.0 / (N * self.fs), dtype=np.float64)
2✔
123
        if N % 2 == 0 and scale.size > 0:
2✔
124
            scale[-1] = 1.0 / (N * self.fs)
2✔
125
        periodogram = scale[:, None, None] * (
2✔
126
            fft_vals[:, :, None] * np.conj(fft_vals[:, None, :])
127
        )
128
        # Add epsilon to avoid log(0) and extremely small values
129
        eps = 1e-12
2✔
130
        periodogram = np.where(np.abs(periodogram) < eps, eps, periodogram)
2✔
131
        return periodogram
2✔
132

133
    def get_true_psd(self):
2✔
134
        """Return the theoretical one-sided PSD matrix."""
135
        eps = 1e-12
2✔
136
        true_psd = np.where(np.abs(self.psd) < eps, eps, self.psd)
2✔
137
        return true_psd
2✔
138

139
    def plot(self, axs=None, fname: Optional[str] = None):
2✔
140
        """
141
        Matrix plot: diagonal is the PSD, below diagonal is real CSD, above diagonal is imag CSD.
142
        Plots both the true PSD and the periodogram using consistent scaling.
143
        """
144
        if self.data is None:
2✔
145
            raise ValueError("No data to plot. Run resimulate first.")
×
146
        dim = self.dim
2✔
147
        periodogram = self.get_periodogram()
2✔
148
        true_psd = self.get_true_psd()
2✔
149
        freq = self.freq
2✔
150
        # Setup axes
151
        if axs is None:
2✔
152
            fig, axs = plt.subplots(
2✔
153
                dim, dim, figsize=(4 * dim, 4 * dim), sharex=True
154
            )
155
        else:
156
            fig = axs[0, 0].figure
×
157
        data_kwgs = dict(alpha=0.3, lw=2, zorder=-10, color="k")
2✔
158
        true_kwgs = dict(lw=1, zorder=10, color="k")
2✔
159
        for i in range(dim):
2✔
160
            for j in range(dim):
2✔
161
                ax = axs[i, j]
2✔
162
                if i == j:
2✔
163
                    ax.plot(
2✔
164
                        freq,
165
                        true_psd[:, i, i].real,
166
                        label="True PSD",
167
                        **true_kwgs,
168
                    )
169
                    ax.plot(
2✔
170
                        freq,
171
                        periodogram[:, i, i].real,
172
                        label="Periodogram",
173
                        **data_kwgs,
174
                    )
175
                    ax.set_title(f"PSD: channel {i + 1}")
2✔
176
                    ax.set_yscale("log")
2✔
177
                elif i > j:
2✔
178
                    ax.plot(
2✔
179
                        freq,
180
                        true_psd[:, i, j].real,
181
                        label="True Re(CSD)",
182
                        **true_kwgs,
183
                    )
184
                    ax.plot(
2✔
185
                        freq,
186
                        periodogram[:, i, j].real,
187
                        label="Periodogram Re(CSD)",
188
                        **data_kwgs,
189
                    )
190
                    ax.set_title(f"Re(CSD): {i + 1},{j + 1}")
2✔
191
                else:
192
                    ax.plot(
2✔
193
                        freq,
194
                        true_psd[:, i, j].imag,
195
                        label="True Im(CSD)",
196
                        **true_kwgs,
197
                    )
198
                    ax.plot(
2✔
199
                        freq,
200
                        periodogram[:, i, j].imag,
201
                        label="Periodogram Im(CSD)",
202
                        **data_kwgs,
203
                    )
204
                    ax.set_title(f"Im(CSD): {i + 1},{j + 1}")
2✔
205
                if i == dim - 1:
2✔
206
                    ax.set_xlabel("Frequency (Hz)")
2✔
207
                if j == 0:
2✔
208
                    ax.set_ylabel("Power / CSD")
2✔
209
                ax.legend(fontsize=8)
2✔
210
        fig.tight_layout()
2✔
211
        if fname:
2✔
212
            fig.savefig(fname, bbox_inches="tight")
2✔
213
        return axs
2✔
214

215

216
def _calculate_true_varma_psd(
2✔
217
    freqs: np.ndarray,
218
    dim: int,
219
    var_coeffs: np.ndarray,
220
    vma_coeffs: np.ndarray,
221
    sigma: np.ndarray,
222
    fs: float,
223
) -> np.ndarray:
224
    """
225
    Calculate the spectral matrix for given frequencies.
226

227
    Args:
228
        freqs (np.ndarray): Positive frequency grid in Hz.
229
        dim (int): Process dimension.
230
        var_coeffs/vma_coeffs: VAR/MA coefficient arrays.
231
        sigma (np.ndarray): Innovation covariance matrix.
232
        fs (float): Sampling frequency in Hz.
233

234
    Returns:
235
        np.ndarray: One-sided PSD matrix evaluated at ``freqs``.
236
    """
237
    omega = 2.0 * np.pi * freqs / fs
2✔
238
    spec_matrix = np.empty((freqs.size, dim, dim), dtype=np.complex128)
2✔
239
    for idx, w in enumerate(omega):
2✔
240
        spec_matrix[idx] = _calculate_spec_matrix_helper(
2✔
241
            float(w), dim, var_coeffs, vma_coeffs, sigma
242
        )
243
    return spec_matrix / fs
2✔
244

245

246
def _calculate_spec_matrix_helper(omega, dim, var_coeffs, vma_coeffs, sigma):
2✔
247
    """
248
    Helper function to calculate spectral matrix for a single frequency.
249

250
    Args:
251
        omega (float): Angular frequency (rad/sample).
252
    """
253
    if sigma.shape[0] == 1:
2✔
254
        cov_matrix = np.identity(dim) * sigma
×
255
    else:
256
        cov_matrix = sigma
2✔
257

258
    k_ar = np.arange(1, var_coeffs.shape[0] + 1)
2✔
259
    angles_ar = k_ar[:, np.newaxis, np.newaxis] * omega
2✔
260
    A_f_re_ar = np.sum(var_coeffs * np.cos(angles_ar), axis=0)
2✔
261
    A_f_im_ar = -np.sum(var_coeffs * np.sin(angles_ar), axis=0)
2✔
262
    A_f_ar = A_f_re_ar + 1j * A_f_im_ar
2✔
263
    A_bar_f_ar = np.identity(dim) - A_f_ar
2✔
264
    H_f_ar = np.linalg.inv(A_bar_f_ar)
2✔
265

266
    k_ma = np.arange(vma_coeffs.shape[0])
2✔
267
    angles_ma = k_ma[:, np.newaxis, np.newaxis] * omega
2✔
268
    A_f_re_ma = np.sum(vma_coeffs * np.cos(angles_ma), axis=0)
2✔
269
    A_f_im_ma = -np.sum(vma_coeffs * np.sin(angles_ma), axis=0)
2✔
270
    A_f_ma = A_f_re_ma + 1j * A_f_im_ma
2✔
271
    A_bar_f_ma = A_f_ma
2✔
272
    H_f_ma = A_bar_f_ma
2✔
273

274
    spec_mat = H_f_ar @ H_f_ma @ cov_matrix @ H_f_ma.conj().T @ H_f_ar.conj().T
2✔
275
    return spec_mat
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc