• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nz-gravity / sgvb_psd / 23607120213

26 Mar 2026 04:56PM UTC coverage: 86.509% (+0.05%) from 86.459%
23607120213

push

github

jianan-joseph-liu
Robust frequency index handling in FFT utilities

Make frequency index handling safer and clearer in analysis and utils modules.

- backend/analysis_data.py: when fmax_for_analysis is None use the full frequency range by setting fmax_idx = len(ftrue_y); only call searchsorted when an explicit fmax is provided.
- utils/utils.py: unify and harden get_freq slicing logic by computing fmin_idx and fmax_idx up front, clipping them to valid bounds, and ensuring fmax_idx >= fmin_idx before slicing. This prevents out-of-range indices and inverted slices when fmin/fmax are None or out of range.

These changes avoid errors from searchsorted on unexpected inputs and make frequency-range extraction more robust.

9 of 11 new or added lines in 2 files covered. (81.82%)

3 existing lines in 2 files now uncovered.

1898 of 2194 relevant lines covered (86.51%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/src/sgvb_psd/backend/analysis_data.py
1
from typing import Tuple
1✔
2

3
import numpy as np
1✔
4
import tensorflow as tf
1✔
5

6
from ..logging import logger
1✔
7

8

9
class AnalysisData:  # Parent used to create BayesianModel object
1✔
10
    def __init__(
1✔
11
        self,
12
        x: np.ndarray,
13
        nchunks: int = 128,
14
        fmax_for_analysis: float = 128,
15
        fs: float = 2048.0,
16
        N_theta: int = 15,
17
        N_delta: int = 15,
18
        fmin_for_analysis: float = None,
19
    ):
20
        # x:      N-by-p, multivariate timeseries with N samples and p dimensions
21
        # y_ft:   fourier transformed time series
22
        # freq:   frequencies w/ y_ft
23
        # p:  dimension of x
24
        # Xmat:   basis matrix
25
        # Zar:    arry of design matrix Z_k for every freq k
26
        self.x = x
1✔
27
        if x.shape[1] < 2:
1✔
28
            raise Exception("Time series should be at least 2 dimensional.")
×
29
        self.p = x.shape[1]
1✔
30
        self.nchunks = nchunks
1✔
31
        self.N_theta = N_theta
1✔
32
        self.N_delta = N_delta
1✔
33

34
        self.fs = fs
1✔
35
        self.fmax_for_analysis = fmax_for_analysis
1✔
36
        self.fmin_for_analysis = fmin_for_analysis
1✔
37

38
        # Compute the required datasets
39
        self.y_ft, self.freq = compute_chunked_fft(
1✔
40
            self.x,
41
            self.nchunks,
42
            self.fmax_for_analysis,
43
            self.fs,
44
            self.fmin_for_analysis,
45
        )
46
        self.Zar = _compute_chunked_Zmatrix(self.y_ft)
1✔
47
        Xmat_delta, Xmat_theta = _compute_Xmatrices(
1✔
48
            self.freq, N_delta, N_theta
49
        )
50

51
        # Setup tensors
52
        y_ft = tf.convert_to_tensor(self.y_ft, dtype=tf.complex64)
1✔
53
        self.y_re = tf.math.real(y_ft)
1✔
54
        self.y_im = tf.math.imag(y_ft)
1✔
55
        self.Xmat_delta = tf.convert_to_tensor(Xmat_delta, dtype=tf.float32)
1✔
56
        self.Xmat_theta = tf.convert_to_tensor(Xmat_theta, dtype=tf.float32)
1✔
57

58
        Zar = tf.convert_to_tensor(self.Zar, dtype=tf.complex64)
1✔
59
        self.Z_re = tf.math.real(Zar)
1✔
60
        self.Z_im = tf.math.imag(Zar)
1✔
61

62
        logger.info(f"Loaded {self}")
1✔
63

64
    def __repr__(self):
1✔
65
        x = self.x.shape
1✔
66
        y = self.y_ft.shape
1✔
67
        Xd = self.Xmat_delta.shape
1✔
68
        Xt = self.Xmat_theta.shape
1✔
69
        Z = self.Zar.shape
1✔
70
        return f"AnalysisData(x(t)={x}, y(f)={y}, Xmat_delta={Xd}, Xmat_theta={Xt}, Z={Z})"
1✔
71

72

73
def _compute_Xmatrices(
1✔
74
    freq, N_delta: int = 15, N_theta: int = 15
75
) -> Tuple[np.ndarray, np.ndarray]:
76
    """
77
    Returns the X matrices for delta and theta based on the provided frequencies.
78

79
    Parameters:
80
    freq (np.ndarray): vector of frequencies
81
    N_delta (int): The number of basis functions to use for delta (default is 15).
82
    N_theta (int): The number of basis functions to use for theta (default is 15).
83

84
    Returns:
85
    Xd (np.ndarray): The design matrix for Demmler-Reinsch basis functions of delta,
86
                     the shape is (n, 2 + N_delta).
87
    Xt (np.ndarray): The design matrix for Demmler-Reinsch basis functions of theta,
88
                     the shape is (n, 2 + N_theta).
89

90
    """
91
    fstack = np.column_stack([np.repeat(1, freq.shape[0]), freq])
1✔
92
    Xd = np.concatenate([fstack, DR_basis(freq, N=N_delta)], axis=1)
1✔
93
    Xt = np.concatenate([fstack, DR_basis(freq, N=N_theta)], axis=1)
1✔
94
    return Xd, Xt
1✔
95

96

97
def _compute_Zmatrix(y_k: np.ndarray) -> np.ndarray:
1✔
98
    """
99
    Compute the design matrix Z_k for each frequency k.
100

101
    Parameters:
102
    y_k (np.ndarray): Fourier transformed time series data of shape (n, p).
103

104
    Returns:
105
    np.ndarray: Design matrix Z_k of shape (n, p, p*(p-1)/2).
106
    """
107
    n, p = y_k.shape
1✔
108
    Z_k = np.zeros((n, p, int(p * (p - 1) / 2)), dtype=np.complex64)
1✔
109

110
    for j in range(n):
1✔
111
        count = 0
1✔
112
        for i in range(1, p):
1✔
113
            Z_k[j, i, count : count + i] = y_k[j, :i]
1✔
114
            count += i
1✔
115

116
    return Z_k
1✔
117

118

119
def _compute_chunked_Zmatrix(y_ft: np.ndarray) -> np.ndarray:
1✔
120
    """
121
    Compute the design matrix Z, a 3D array (The design matrix Z_k for every frequency k).
122

123
    Parameters:
124
    y_ft (np.ndarray): Fourier transformed time series data of shape (chunks, n_per_chunk, p).
125

126
    Returns:
127
    np.ndarray: 3D array of design matrices Z_k for each frequency k.
128
    """
129
    chunks, n_per_chunk, p = y_ft.shape
1✔
130
    if p == 1:
1✔
131
        return np.zeros((chunks, n_per_chunk, 0), dtype=np.complex64)
×
132

133
    if chunks == 1:
1✔
134
        y_ls = np.squeeze(y_ft, axis=0)
1✔
135
        Z_ = _compute_Zmatrix(y_ls)
1✔
136
    else:
137
        y_ls = np.squeeze(np.split(y_ft, chunks))
1✔
138
        Z_ = np.array([_compute_Zmatrix(x) for x in y_ls])
1✔
139

140
    return Z_
1✔
141

142

143
def DR_basis(freq: np.ndarray, N=10):
1✔
144
    """
145
    Return the basis matrix for the Demmler-Reinsch basis
146
    for linear smoothing splines (Eubank,1999)
147

148
            # freq: vector of frequencies
149
    # N:  amount of basis used
150
    # return a len(freq)-by-N matrix
151
    """
152
    return np.array(
1✔
153
        [
154
            np.sqrt(2) * np.cos(x * np.pi * freq * 2)
155
            for x in np.arange(1, N + 1)
156
        ]
157
    ).T
158

159

160
def compute_chunked_fft(
1✔
161
    x: np.ndarray,
162
    nchunks: int,
163
    fmax_for_analysis: float,
164
    fs: float,
165
    fmin_for_analysis: float = None,
166
) -> Tuple[np.ndarray, np.ndarray]:
167
    """
168
    Scaled fft and get the elements of freq = 1:[Nquist] (or 1:[fmax_for_analysis] if specified)
169
    discarding the rest of freqs
170
    """
171

172
    if np.any(np.mean(x, axis=0) != 0) or np.any(np.std(x, axis=0) != 1):
1✔
173
        logger.warning("Input data not standardised!")
1✔
174

175
    orig_n, p = x.shape
1✔
176
    if orig_n < p:
1✔
177
        raise ValueError(
×
178
            f"Number of samples {orig_n} is less than number of dimensions {p}."
179
        )
180
    # split x into chunks
181
    n_per_chunk = x.shape[0] // nchunks
1✔
182
    chunked_x = np.array(np.split(x[0 : n_per_chunk * nchunks, :], nchunks))
1✔
183
    assert chunked_x.shape == (nchunks, n_per_chunk, p)
1✔
184

185
    #chunked_x = chunked_x - np.mean(chunked_x, axis=1, keepdims=True)
186

187
    # compute fft for each chunk
188
    y_ft = np.apply_along_axis(np.fft.fft, 1, chunked_x)
1✔
189
    #
190
    # y = []
191
    # for i in range(nchunks):
192
    #     y_fft = np.apply_along_axis(np.fft.fft, 0, chunked_x[i])
193
    #     y.append(y_fft)
194
    # y = np.array(y)
195

196
    # scale it
197
    y_ft = y_ft / np.sqrt(n_per_chunk)
1✔
198
    Ts = 1  # for VB backend we use Duration of 1.0 (rescale later)
1✔
199
    fq_y = np.fft.fftfreq(np.size(chunked_x, axis=1), Ts)
1✔
200
    ftrue_y = np.fft.fftfreq(n_per_chunk, d=1 / fs)
1✔
201

202
    # Truncate the FFT'd data
203
    if np.mod(n_per_chunk, 2) == 0:  # n is even
1✔
204
        idx = int(n_per_chunk / 2)
1✔
205
    else:  # n is odd
206
        idx = int((n_per_chunk - 1) / 2)
×
207

208
    y_ft = y_ft[:, 1:idx, :]
1✔
209
    fq_y = fq_y[1:idx]
1✔
210
    ftrue_y = ftrue_y[1:idx]
1✔
211

212
    if fmax_for_analysis is None:
1✔
NEW
213
        fmax_idx = len(ftrue_y)
×
214
    else:
215
        fmax_idx = np.searchsorted(ftrue_y, fmax_for_analysis)
1✔
216
    fmin_idx = 0
1✔
217
    if fmin_for_analysis is not None:
1✔
218
        fmin_idx = np.searchsorted(ftrue_y, fmin_for_analysis)
×
219
    y_ft = y_ft[:, fmin_idx:fmax_idx, :]
1✔
220
    fq_y = fq_y[fmin_idx:fmax_idx]
1✔
221
    return y_ft, fq_y
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc