• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

maurergroup / dfttoolkit / 15065810745

16 May 2025 09:56AM UTC coverage: 30.583% (+8.8%) from 21.747%
15065810745

Pull #59

github

b2c890
web-flow
Merge f52b00038 into e895278a4
Pull Request #59: Vibrations refactor

1164 of 3806 relevant lines covered (30.58%)

0.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
dfttoolkit/utils/vibrations_utils.py
1
import os
×
2
import warnings
×
3
from ast import literal_eval
×
4
from concurrent.futures import ThreadPoolExecutor
×
5

6
import numpy as np
×
7
import numpy.typing as npt
×
8
from numba import njit, prange
×
9
from scipy.interpolate import interp1d
×
10
from scipy.linalg import solve_toeplitz
×
11
from scipy.optimize import brentq, curve_fit
×
12
from scipy.signal import argrelextrema
×
13

14
import dfttoolkit.utils.math_utils as mu
×
15

16
# get environment variable for parallelisation in numba
17
parallel_numba = os.environ.get("PARALLEL_NUMBA")
×
18

19
if parallel_numba is None:
×
20
    warnings.warn(
×
21
        "System variable <parallel_numba> not set. Using default!", stacklevel=2
22
    )
23
    parallel_numba = True
×
24
else:
25
    parallel_numba = literal_eval(parallel_numba)
×
26

27

28
def get_cross_correlation_function(
×
29
    signal_0: npt.NDArray, signal_1: npt.NDArray, bootstrapping_blocks: int = 1
30
) -> npt.NDArray:
31
    """TODO."""
32
    if signal_0.size != signal_1.size:
×
33
        msg = (
×
34
            "The parameters signal_0 and signal_1 must have the same size but they "
35
            f" are {signal_0.size} and {signal_1.size}."
36
        )
37
        raise ValueError(msg)
×
38

39
    signal_length = len(signal_0)
×
40
    block_size = int(np.floor(signal_length / bootstrapping_blocks))
×
41

42
    cross_correlation = []
×
43

44
    for block in range(bootstrapping_blocks):
×
45
        block_start = block * block_size
×
46
        block_end = (block + 1) * block_size
×
47
        block_end = min(block_end, signal_length)
×
48

49
        signal_0_block = signal_0[block_start:block_end]
×
50
        signal_1_block = signal_1[block_start:block_end]
×
51

52
        cross_correlation_block = mu.get_cross_correlation_function(
×
53
            signal_0_block, signal_1_block
54
        )
55
        cross_correlation.append(cross_correlation_block)
×
56

57
    cross_correlation = np.atleast_2d(cross_correlation)
×
58

59
    return np.mean(cross_correlation, axis=0)
×
60

61

62
# TODO Fix docstrings and types
63
def get_cross_spectrum(
×
64
    signal_0: npt.NDArray,
65
    signal_1: npt.NDArray,
66
    time_step: float,
67
    bootstrapping_blocks: int = 1,
68
    bootstrapping_overlap: int = 0,
69
    zero_padding: int = 0,
70
    cutoff_at_last_maximum: bool = False,
71
    window_function: str = "none",
72
) -> tuple[npt.NDArray, npt.NDArray]:
73
    """
74
    Determine the cross spectrum for a given signal using bootstrapping.
75

76
        - Splitting the sigmal into blocks and for each block:
77
            * Determining the cross correlation function of the signal
78
            * Determining the fourire transform of the autocorrelation
79
              function to get the power spectrum for the block
80
        - Calculating the average power spectrum by averaging of the power
81
          spectra of all blocks.
82

83
    Parameters
84
    ----------
85
    signal_0 : 1D np.array
86
        First siganl for which the correlation function should be calculated.
87
    signal_1 : 1D np.array
88
        Second siganl for which the correlation function should be calculated.
89
    time_step : float
90
        DESCRIPTION.
91
    bootstrapping_blocks : int, default=1
92
        DESCRIPTION
93
    bootstrapping_overlap : int, default=0
94
        DESCRIPTION
95
    zero_padding : int, default=0
96
        Pad the cross correlation function with zeros to increase the frequency
97
        resolution of the FFT. This also avoids the effect of varying spectral
98
        leakage. However, it artificially broadens the resulting cross spectrum
99
        and introduces wiggles.
100
    cutoff_at_last_maximum : bool, default=False
101
        Cut off the cross correlation function at the last maximum to hide
102
        spectral leakage.
103

104
    Returns
105
    -------
106
    frequencies : np.array
107
        Frequiencies of the power spectrum in units depending on the
108
        tims_step.
109

110
    cross_spectrum : np.array
111
        Power spectrum.
112
    """
113
    if signal_0.size != signal_1.size:
×
114
        msg = (
×
115
            "The parameters signal_0 and signal_1 must have the same size but they are "
116
            f" {signal_0.size} and {signal_1.size}."
117
        )
118
        raise ValueError(msg)
×
119

120
    signal_length = len(signal_0)
×
121
    block_size = int(
×
122
        np.floor(
123
            signal_length
124
            * (1 + bootstrapping_overlap)
125
            / (bootstrapping_blocks + bootstrapping_overlap)
126
        )
127
    )
128

129
    frequencies = None
×
130
    cross_spectrum = []
×
131

132
    for block in range(bootstrapping_blocks):
×
133
        block_start = int(np.ceil(block * block_size / (1 + bootstrapping_overlap)))
×
134
        block_start = max(block_start, 0)
×
135

136
        block_end = block_start + block_size
×
137
        block_end = min(block_end, signal_length)
×
138

139
        signal_0_block = signal_0[block_start:block_end]
×
140
        signal_1_block = signal_1[block_start:block_end]
×
141

142
        if window_function == "gaussian":
×
143
            signal_0_block = mu.apply_gaussian_window(signal_0_block)
×
144
            signal_1_block = mu.apply_gaussian_window(signal_1_block)
×
145
        elif window_function == "hann":
×
146
            signal_0_block = mu.apply_hann_window(signal_0_block)
×
147
            signal_1_block = mu.apply_hann_window(signal_1_block)
×
148

149
        cross_correlation = mu.get_cross_correlation_function(
×
150
            signal_0_block, signal_1_block
151
        )
152

153
        # truncate cross correlation function at last maximum
154
        if cutoff_at_last_maximum:
×
155
            cutoff_index = get_last_maximum(cross_correlation)
×
156
            cross_correlation = cross_correlation[:cutoff_index]
×
157

158
        # add zero padding
159
        zero_padding = max(zero_padding, len(cross_correlation))
×
160

161
        cross_correlation = np.pad(
×
162
            cross_correlation,
163
            (0, zero_padding - len(cross_correlation)),
164
            "constant",
165
        )
166

167
        frequencies_block, cross_spectrum_block = mu.get_fourier_transform(
×
168
            cross_correlation, time_step
169
        )
170

171
        if block == 0:
×
172
            frequencies = frequencies_block
×
173
        else:
174
            f = interp1d(
×
175
                frequencies_block,
176
                cross_spectrum_block,
177
                kind="linear",
178
                fill_value="extrapolate",
179
            )
180
            cross_spectrum_block = f(frequencies)
×
181

182
        cross_spectrum.append(np.abs(cross_spectrum_block))
×
183

184
    cross_spectrum = np.atleast_2d(cross_spectrum)
×
185
    cross_spectrum = np.average(cross_spectrum, axis=0)
×
186

187
    return frequencies, cross_spectrum  # pyright: ignore
×
188

189

190
def get_cross_spectrum_mem(
×
191
    signal_0: npt.NDArray,
192
    signal_1: npt.NDArray,
193
    time_step: int,
194
    model_order: int,
195
    n_freqs: int = 512,
196
) -> tuple[npt.NDArray, npt.NDArray]:
197
    """
198
    Estimate the power spectral density (PSD) of a time series.
199

200
    Use the Maximum Entropy Method (MEM).
201

202
    Parameters
203
    ----------
204
    - x: array-like, time series data.
205
    - p: int, model order (number of poles). Controls the smoothness and resolution of
206
      the PSD.
207
    - n_freqs: int, number of frequency bins for the PSD.
208

209
    Returns
210
    -------
211
    - freqs: array of frequency bins.
212
    - psd: array of PSD values at each frequency.
213
    """
214
    # Calculate the autocorrelation of the time series
215
    autocorr = np.correlate(signal_0, signal_1, mode="full") / len(signal_0)
×
216
    autocorr = autocorr[len(autocorr) // 2 : len(autocorr) // 2 + model_order + 1]
×
217

218
    # Create a Toeplitz matrix from the autocorrelation function
219
    r = autocorr[1:]
×
220

221
    # Solve for the model coefficients using the Yule-Walker equations
222
    model_coeffs = solve_toeplitz((autocorr[:-1], autocorr[:-1]), r)
×
223

224
    # Compute the PSD from the model coefficients
225
    # Normalized frequency (Nyquist = 0.5)
226
    freqs = np.linspace(0, 0.5, n_freqs)
×
227
    psd = np.zeros(n_freqs)
×
228
    for i, f in enumerate(freqs):
×
229
        z = np.exp(-2j * np.pi * f)
×
230
        denominator = 1 - np.dot(
×
231
            model_coeffs, [z ** (-k) for k in range(1, model_order + 1)]
232
        )
233
        psd[i] = 1.0 / np.abs(denominator) ** 2
×
234

235
    return freqs / time_step, psd
×
236

237

238
def get_last_maximum(x: npt.NDArray) -> int:
×
239
    """TODO."""
240
    maxima = argrelextrema(x, np.greater_equal)[0]
×
241

242
    last_maximum = maxima[-1]
×
243

244
    if last_maximum == len(x) - 1:
×
245
        last_maximum = maxima[-2]
×
246

247
    return last_maximum
×
248

249

250
def lorentzian_fit(
×
251
    frequencies: npt.NDArray,
252
    power_spectrum: npt.NDArray,
253
    p_0: list[float] | None = None,
254
    filter_maximum: int = 0,
255
) -> npt.NDArray[np.float64]:
256
    """TODO."""
257
    if filter_maximum:
×
258
        delete_ind = np.argmax(power_spectrum)
×
259
        delete_ind = np.array(
×
260
            range(delete_ind - filter_maximum + 1, delete_ind + filter_maximum)
261
        )
262
        frequencies = np.delete(frequencies, delete_ind)
×
263
        power_spectrum = np.delete(power_spectrum, delete_ind)
×
264

265
    max_ind = np.argmax(power_spectrum)
×
266

267
    if p_0 is None:
×
268
        # determine reasonable starting parameters
269
        a_0 = frequencies[max_ind]
×
270
        b_0 = np.abs(frequencies[1] - frequencies[0])
×
271
        c_0 = np.max(power_spectrum)
×
272

273
        p_0 = [a_0, b_0, c_0]
×
274

275
    try:
×
276
        res, _ = curve_fit(mu.lorentzian, frequencies, power_spectrum, p0=p_0)
×
277

278
    except RuntimeError:
×
279
        res = np.array([np.nan, np.nan, np.nan])
×
280

281
    return res
×
282

283

284
def get_peak_parameters(frequencies: npt.NDArray, power_spectrum: npt.NDArray) -> list:
×
285
    """TODO."""
286
    max_ind = np.argmax(power_spectrum)
×
287
    frequency = frequencies[max_ind]
×
288

289
    half_max = power_spectrum[max_ind] / 2.0
×
290

291
    f_interp = interp1d(frequencies, power_spectrum, kind="cubic")
×
292

293
    # Define a function to find roots (y - half_max)
294
    def f_half_max(x_val: float) -> float:
×
295
        return f_interp(x_val) - half_max
×
296

297
    # Find roots (i.e., the points where the function crosses the half maximum)
298
    root1 = brentq(
×
299
        f_half_max, frequencies[0], frequencies[max_ind]
300
    )  # Left intersection
301
    root2 = brentq(
×
302
        f_half_max, frequencies[max_ind], frequencies[-1]
303
    )  # Right intersection
304

305
    # Calculate the FWHM
306
    line_width = np.abs(root1 - root2)
×
307

308
    return [frequency, line_width, power_spectrum[max_ind]]
×
309

310

311
def get_line_widths(
×
312
    frequencies: npt.NDArray,
313
    power_spectrum: npt.NDArray,
314
    filter_maximum: bool = True,
315
    use_lorentzian: bool = True,
316
) -> tuple[float, float, float]:
317
    """TODO."""
318
    res = [np.nan, np.nan, np.nan]
×
319

320
    if use_lorentzian:
×
321
        res = lorentzian_fit(frequencies, power_spectrum, filter_maximum=filter_maximum)
×
322

323
    if np.isnan(res[0]):
×
324
        res = get_peak_parameters(frequencies, power_spectrum)
×
325

326
    frequency = res[0]
×
327
    line_width = res[1]
×
328
    lifetime = 1.0 / (np.pi * line_width)
×
329

330
    return frequency, line_width, lifetime
×
331

332

333
def get_normal_mode_decomposition(
×
334
    velocities: npt.NDArray,
335
    eigenvectors: npt.NDArray,
336
    use_numba: bool = True,
337
) -> npt.NDArray:
338
    """
339
    Calculate the normal-mode-decomposition of the velocities.
340

341
    Projecting the atomic velocities onto the vibrational eigenvectors.
342
    See equation 10 in: https://doi.org/10.1016/j.cpc.2017.08.017.
343

344
    Parameters
345
    ----------
346
    velocities : npt.NDArray
347
        Array containing the velocities from an MD trajectory structured in
348
        the following way:
349
        [number of time steps, number of atoms, number of dimensions].
350
    eigenvectors : npt.NDArray
351
        Array of eigenvectors structured in the following way:
352
        [number of frequencies, number of atoms, number of dimensions].
353

354
    Returns
355
    -------
356
    velocities_projected : np.array
357
        Velocities projected onto the eigenvectors structured as follows:
358
        [number of time steps, number of frequencies]
359

360
    """
361
    # Projection in vibration coordinates
362
    velocities_projected = np.zeros(
×
363
        (velocities.shape[0], eigenvectors.shape[0]), dtype=np.complex128
364
    )
365

366
    if use_numba:
×
367
        # Get normal mode decompositon parallelised by numba
368
        _get_normal_mode_decomposition_numba(
×
369
            velocities_projected, velocities, eigenvectors.conj()
370
        )
371
    else:
372
        _get_normal_mode_decomposition_numpy(
×
373
            velocities_projected, velocities, eigenvectors.conj()
374
        )
375

376
    return velocities_projected
×
377

378

379
@njit(parallel=parallel_numba, fastmath=True)
×
380
def _get_normal_mode_decomposition_numba(
×
381
    velocities_projected: npt.NDArray,
382
    velocities: npt.NDArray,
383
    eigenvectors: npt.NDArray,
384
) -> None:
385
    number_of_timesteps, number_of_cell_atoms, velocity_components = velocities.shape
×
386
    number_of_frequencies = eigenvectors.shape[0]
×
387

388
    # Loop over all frequencies
389
    for k in prange(number_of_frequencies):
×
390
        # Loop over all timesteps
391
        for n in prange(number_of_timesteps):
×
392
            # Temporary variable to accumulate the projection result for this
393
            # frequency and timestep
394
            projection_sum = 0.0
×
395

396
            # Loop over atoms and components
397
            for i in range(number_of_cell_atoms):
×
398
                for m in range(velocity_components):
×
399
                    projection_sum += velocities[n, i, m] * eigenvectors[k, i, m]
×
400

401
            # Store the result in the projected velocities array
402
            velocities_projected[n, k] = projection_sum
×
403

404

405
def _get_normal_mode_decomposition_numpy(
×
406
    velocities_projected: npt.NDArray,
407
    velocities: npt.NDArray,
408
    eigenvectors: npt.NDArray,
409
) -> None:
410
    # Use einsum to perform the double summation over cell atoms and time steps
411
    velocities_projected += np.einsum("tij,kij->tk", velocities, eigenvectors.conj())
×
412

413

414
def get_coupling_matrix(
×
415
    velocities_proj: float,
416
    n_points: int,
417
    time_step: int,
418
    bootstrapping_blocks: int,
419
    bootstrapping_overlap: int,
420
    cutoff_at_last_maximum: bool,
421
    window_function: str,
422
    num_threads: int | None = 1,
423
) -> npt.NDArray:
424
    """TODO."""
425
    # Generate all index pairs
426
    index_pairs = []
×
427
    for index_0 in range(n_points):
×
428
        for index_1 in range(n_points):
×
429
            # Skip lower triangle
430
            if index_0 < index_1:
×
431
                continue
×
432

433
            index_pairs.append((index_0, index_1))
×
434

435
    if num_threads is None:
×
436
        num_threads = os.cpu_count()
×
437

438
    print(
×
439
        f"Using {num_threads} threads to determine coupling matrix.",
440
        flush=True,
441
    )
442

443
    # Parallel processing using ThreadPoolExecutor
444
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
×
445
        results = list(
×
446
            executor.map(
447
                get_coupling,
448
                index_pairs,
449
                [velocities_proj] * len(index_pairs),
450
                [time_step] * len(index_pairs),
451
                [bootstrapping_blocks] * len(index_pairs),
452
                [bootstrapping_overlap] * len(index_pairs),
453
                [cutoff_at_last_maximum] * len(index_pairs),
454
                [window_function] * len(index_pairs),
455
            )
456
        )
457

458
    coupling_matrix = np.zeros((n_points, n_points))
×
459

460
    # Populate the coupling matrix
461
    # TODO: fix return values from `results`
462
    for index_0, index_1, coupling_value, _, _ in results:
×
463
        if index_0 is not None and index_1 is not None:
×
464
            coupling_matrix[index_0, index_1] = coupling_value
×
465

466
    return coupling_matrix
×
467

468

469
def get_coupling(
×
470
    index_pair: tuple[int, int],
471
    velocities_proj: npt.NDArray,
472
    time_step: int,
473
    bootstrapping_blocks: int,
474
    bootstrapping_overlap: int,
475
    cutoff_at_last_maximum: bool,
476
    window_function: str,
477
) -> tuple[int, int, int, int, int]:
478
    """TODO."""
479
    index_0, index_1 = index_pair
×
480

481
    frequencies, power_spectrum = get_cross_spectrum(
×
482
        velocities_proj[:, index_1],
483
        velocities_proj[:, index_1],
484
        time_step,
485
        bootstrapping_blocks=bootstrapping_blocks,
486
        bootstrapping_overlap=bootstrapping_overlap,
487
        cutoff_at_last_maximum=cutoff_at_last_maximum,
488
        window_function=window_function,
489
    )
490

491
    frequencies, cross_spectrum = get_cross_spectrum(
×
492
        velocities_proj[:, index_0],
493
        velocities_proj[:, index_1],
494
        time_step,
495
        bootstrapping_blocks=bootstrapping_blocks,
496
        bootstrapping_overlap=bootstrapping_overlap,
497
        cutoff_at_last_maximum=cutoff_at_last_maximum,
498
        window_function=window_function,
499
    )
500

501
    power_spectrum_1 = np.real(power_spectrum)
×
502
    max_index_1 = np.argmax(power_spectrum_1)
×
503
    f_1 = frequencies[max_index_1]
×
504

505
    cross_spectrum_1 = np.real(cross_spectrum)
×
506

507
    max_f = argrelextrema(cross_spectrum_1, np.greater_equal)[0]
×
508

509
    coupling_index_0 = np.argmin(np.abs(frequencies[max_f] - f_1))
×
510
    coupling_index = max_f[coupling_index_0]
×
511

512
    a_0 = frequencies[coupling_index]
×
513
    b_0 = np.abs(frequencies[1] - frequencies[0])
×
514
    c_0 = cross_spectrum_1[coupling_index]
×
515

516
    p_0 = [a_0, b_0, c_0]
×
517

518
    res = lorentzian_fit(frequencies, cross_spectrum_1, p_0=p_0)
×
519

520
    print(index_0, index_1, res[2], flush=True)
×
521

522
    return index_0, index_1, res[0], res[1], res[2]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc