• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MHKiT-Software / MHKiT-Python / 11367210922

16 Oct 2024 01:57PM UTC coverage: 85.658%. First build
11367210922

Pull #349

github

web-flow
Merge 4f0c18b1d into 6445365d0
Pull Request #349: Acoustics Module

344 of 472 new or added lines in 5 files covered. (72.88%)

9341 of 10905 relevant lines covered (85.66%)

5.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.86
/mhkit/acoustics/analysis.py
1
"""
3✔
2
This file includes the main passive acoustics analysis functions. They
3
are designed to function on top of one another, starting from reading
4
in wav files from the io submodule.
5
"""
6

7
from typing import Union, Dict, Tuple, Optional
6✔
8
import warnings
6✔
9
import numpy as np
6✔
10
import xarray as xr
6✔
11

12
from mhkit.dolfyn import VelBinner
6✔
13
from mhkit.dolfyn.time import epoch2dt64, dt642epoch
6✔
14

15

16
def _fmax_warning(
6✔
17
    fn: Union[int, float, np.ndarray], fmax: Union[int, float, np.ndarray]
18
) -> Union[int, float, np.ndarray]:
19
    """
20
    Checks that the maximum frequency limit isn't greater than the Nyquist frequency.
21

22
    Parameters
23
    ----------
24
    fn: int, float, or numpy.ndarray
25
        The Nyquist frequency in Hz.
26
    fmax: float
27
        The maximum frequency limit in Hz.
28

29
    Returns
30
    -------
31
    fmax: float
32
        The adjusted maximum frequency limit, ensuring it does not exceed the Nyquist frequency.
33
    """
34

35
    if not isinstance(fn, (int, float, np.ndarray)):
6✔
36
        raise TypeError("'fn' must be a numeric type (int or float).")
6✔
37
    if not isinstance(fmax, (int, float, np.ndarray)):
6✔
38
        raise TypeError("'fmax' must be a numeric type (int or float).")
6✔
39

40
    if fmax > fn:
6✔
41
        warnings.warn(
6✔
42
            "`fmax` = {fmax} is greater than the Nyquist frequency. Setting"
43
            "fmax = {fn}"
44
        )
45
        fmax = fn
6✔
46

47
    return fmax
6✔
48

49

50
def minimum_frequency(
6✔
51
    water_depth: Union[int, float, np.ndarray, list],
52
    c: Union[int, float] = 1500,
53
    c_seabed: Union[int, float] = 1700,
54
) -> Union[float, np.ndarray]:
55
    """
56
    Estimate the shallow water cutoff frequency based on the speed of
57
    sound in the water column and the speed of sound in the seabed
58
    material (generally ranges from 1450 - 1800 m/s)
59

60
    Parameters
61
    ----------
62
    water_depth: int, float or array-like
63
        Depth of the water column in meters.
64
    c: float, optional
65
        Speed of sound in the water column in meters per second. Default is 1500 m/s.
66
    c_seabed: float, optional
67
        Speed of sound in the seabed material in meters per second. Default is 1700 m/s.
68

69
    Returns
70
    -------
71
    f_min: float or numpy.ndarray
72
        The minimum cutoff frequency in Hz.
73

74
    Reference
75
    ---------
76
    Jennings 2011 - Computational Ocean Acoustics, 2nd ed.
77
    """
78

79
    # Convert water_depth to a NumPy array for vectorized operations
80
    water_depth = np.asarray(water_depth)
6✔
81

82
    # Validate water_depth
83
    if not np.issubdtype(water_depth.dtype, np.number):
6✔
NEW
84
        raise TypeError("'water_depth' must be a numeric type or array of numerics.")
×
85

86
    if not isinstance(c, (int, float)):
6✔
NEW
87
        raise TypeError("'c' must be a numeric type (int or float).")
×
88
    if not isinstance(c_seabed, (int, float)):
6✔
NEW
89
        raise TypeError("'c_seabed' must be a numeric type (int or float).")
×
90

91
    if np.any(water_depth <= 0):
6✔
NEW
92
        raise ValueError("All elements of 'water_depth' must be positive numbers.")
×
93
    if c <= 0:
6✔
NEW
94
        raise ValueError("'c' must be a positive number.")
×
95
    if c_seabed <= 0:
6✔
NEW
96
        raise ValueError("'c_seabed' must be a positive number.")
×
97
    if c_seabed <= c:
6✔
NEW
98
        raise ValueError("'c_seabed' must be greater than 'c'.")
×
99

100
    fmin = c / (4 * water_depth * np.sqrt(1 - (c / c_seabed) ** 2))
6✔
101

102
    return fmin
6✔
103

104

105
def sound_pressure_spectral_density(
6✔
106
    pressure: xr.DataArray, fs: Union[int, float], bin_length: Union[int, float] = 1
107
) -> xr.DataArray:
108
    """
109
    Calculates the mean square sound pressure spectral density from audio
110
    samples split into FFTs with a specified bin length in seconds, using Hanning
111
    windowing with 50% overlap. The amplitude of the PSD is adjusted
112
    according to Parseval's theorem.
113

114
    Parameters
115
    ----------
116
    pressure: xarray.DataArray (time)
117
        Sound pressure in [Pa] or voltage [V]
118
    fs: int or float
119
        Data collection sampling rate [Hz]
120
    bin_length: int or float
121
        Length of time in seconds to create FFTs. Default: 1.
122

123
    Returns
124
    -------
125
    spsd: xarray.DataArray (time, freq)
126
        Spectral density [Pa^2/Hz] indexed by time and frequency
127
    """
128

129
    # Type checks
130
    if not isinstance(pressure, xr.DataArray):
6✔
NEW
131
        raise TypeError("'pressure' must be an xarray.DataArray.")
×
132
    if not isinstance(fs, (int, float)):
6✔
NEW
133
        raise TypeError("'fs' must be a numeric type (int or float).")
×
134
    if not isinstance(bin_length, (int, float)):
6✔
NEW
135
        raise TypeError("'bin_length' must be a numeric type (int or float).")
×
136

137
    # Ensure that 'pressure' has a 'time' coordinate
138
    if "time" not in pressure.dims:
6✔
NEW
139
        raise ValueError("'pressure' must be indexed by 'time' dimension.")
×
140

141
    # window length of each time series
142
    nbin = bin_length * fs
6✔
143

144
    # Use dolfyn PSD
145
    binner = VelBinner(n_bin=nbin, fs=fs, n_fft=nbin)
6✔
146
    # Always 50% overlap if numbers reshape perfectly
147
    # Mean square sound pressure
148
    psd = binner.power_spectral_density(pressure, freq_units="Hz")
6✔
149
    samples = binner.reshape(pressure.values) - binner.mean(pressure.values)[:, None]
6✔
150
    # Power in time domain
151
    t_power = np.sum(samples**2, axis=1) / nbin
6✔
152
    # Power in frequency domain
153
    f_power = psd.sum("freq") * (fs / nbin)
6✔
154
    # Adjust the amplitude of PSD according to Parseval's theorem
155
    psd_adj = psd * t_power[:, None] / f_power
6✔
156

157
    out = xr.DataArray(
6✔
158
        psd_adj,
159
        coords={"time": psd_adj["time"], "freq": psd_adj["freq"]},
160
        attrs={
161
            "units": pressure.units + "^2/Hz",
162
            "long_name": "Mean Square Sound Pressure Spectral Density",
163
            "fs": fs,
164
            "nbin": str(bin_length) + " s",
165
            "overlap": "50%",
166
            "nfft": nbin,
167
        },
168
    )
169

170
    return out
6✔
171

172

173
def apply_calibration(
6✔
174
    spsd: xr.DataArray,
175
    sensitivity_curve: xr.DataArray,
176
    fill_value: Union[float, int, np.ndarray],
177
) -> xr.DataArray:
178
    """
179
    Applies custom calibration to spectral density values.
180

181
    Parameters
182
    ----------
183
    spsd: xarray.DataArray (time, freq)
184
        Mean square sound pressure spectral density in V^2/Hz.
185
    sensitivity_curve: xarray.DataArray (freq)
186
        Calibrated sensitivity curve in units of dB rel 1 V^2/uPa^2.
187
        First column should be frequency, second column should be calibration values.
188
    fill_value: float or int
189
        Value with which to fill missing values from the calibration curve,
190
        in units of dB rel 1 V^2/uPa^2.
191

192
    Returns
193
    -------
194
    spsd_calibrated: xarray.DataArray (time, freq)
195
        Spectral density in Pa^2/Hz, indexed by time and frequency.
196
    """
197

198
    if not isinstance(spsd, xr.DataArray):
6✔
NEW
199
        raise TypeError("'spsd' must be an xarray.DataArray.")
×
200
    if not isinstance(sensitivity_curve, xr.DataArray):
6✔
NEW
201
        raise TypeError("'sensitivity_curve' must be an xarray.DataArray.")
×
202
    if not isinstance(fill_value, (int, float, np.ndarray)):
6✔
NEW
203
        raise TypeError("'fill_value' must be a numeric type (int or float).")
×
204

205
    # Ensure 'freq' dimension exists in 'spsd'
206
    if "freq" not in spsd.dims:
6✔
NEW
207
        if len(spsd.dims) > 1:
×
208
            # Issue a warning and assign the 0th dimension as 'freq'
NEW
209
            warnings.warn(
×
210
                f"'spsd' does not have 'freq' as a dimension and has multiple dimensions. "
211
                f"Using the first dimension '{spsd.dims[0]}' as 'freq'."
212
            )
213
        # Assign the 0th dimension as 'freq'
NEW
214
        spsd = spsd.rename({spsd.dims[0]: "freq"})
×
215

216
    # Ensure 'freq' dimension exists in 'sensitivity_curve'
217
    if "freq" not in sensitivity_curve.dims:
6✔
218
        if len(sensitivity_curve.dims) > 1:
6✔
219
            # Issue a warning and assign the 0th dimension as 'freq'
NEW
220
            warnings.warn(
×
221
                f"'sensitivity_curve' does not have 'freq' as a dimension \
222
                      and has multiple dimensions. "
223
                f"Using the first dimension '{sensitivity_curve.dims[0]}' as 'freq'."
224
            )
225
        # Assign the 0th dimension as 'freq'
226
        sensitivity_curve = sensitivity_curve.rename(
6✔
227
            {sensitivity_curve.dims[0]: "freq"}
228
        )
229

230
    # Create a copy of spsd to avoid in-place modification
231
    spsd_calibrated = spsd.copy()
6✔
232

233
    # Read calibration curve
234
    freq = sensitivity_curve.dims[0]
6✔
235
    # Interpolate calibration curve to desired value
236
    calibration = sensitivity_curve.interp(
6✔
237
        {freq: spsd_calibrated["freq"]}, method="linear"
238
    ).drop_vars(freq)
239
    # Fill missing with provided value
240
    calibration = calibration.fillna(fill_value)
6✔
241

242
    # Subtract from sound pressure spectral density
243
    sensitivity_ratio = 10 ** (calibration / 10)  # V^2/uPa^2
6✔
244
    spsd_calibrated /= sensitivity_ratio  # uPa^2/Hz
6✔
245
    spsd_calibrated /= 1e12  # Pa^2/Hz
6✔
246
    spsd_calibrated.attrs["units"] = "Pa^2/Hz"
6✔
247

248
    return spsd_calibrated
6✔
249

250

251
def sound_pressure_spectral_density_level(spsd: xr.DataArray) -> xr.DataArray:
6✔
252
    """
253
    Calculates the sound pressure spectral density level from
254
    the mean square sound pressure spectral density.
255

256
    Parameters
257
    ----------
258
    spsd: xarray.DataArray (time, freq)
259
        Mean square sound pressure spectral density in Pa^2/Hz
260

261
    Returns
262
    -------
263
    spsdl: xarray.DataArray (time, freq)
264
        Sound pressure spectral density level [dB re 1 uPa^2/Hz]
265
        indexed by time and frequency
266
    """
267

268
    # Reference value of sound pressure
269
    reference = 1e-12  # Pa^2 to 1 uPa^2
6✔
270

271
    # Sound pressure spectral density level from mean square values
272
    lpf = 10 * np.log10(spsd.values / reference)
6✔
273

274
    out = xr.DataArray(
6✔
275
        lpf,
276
        coords={"time": spsd["time"], "freq": spsd["freq"]},
277
        attrs={
278
            "units": "dB re 1 uPa^2/Hz",
279
            "long_name": "Sound Pressure Spectral Density Level",
280
        },
281
    )
282

283
    return out
6✔
284

285

286
def _validate_method(
6✔
287
    method: Union[str, Dict[str, Union[float, int]]]
288
) -> Tuple[str, Optional[Union[float, int]]]:
289
    """
290
    Validates the 'method' parameter and returns the method name and argument (if any).
291
    """
292
    allowed_methods = [
6✔
293
        "median",
294
        "mean",
295
        "min",
296
        "max",
297
        "sum",
298
        "quantile",
299
        "std",
300
        "var",
301
        "count",
302
    ]
303

304
    if isinstance(method, str):
6✔
305
        method_name = method.lower()
6✔
306
        if method_name not in allowed_methods:
6✔
307
            raise ValueError(
6✔
308
                f"Method '{method}' is not supported. Supported methods are: {allowed_methods}"
309
            )
310
        if method_name == "quantile":
6✔
NEW
311
            raise ValueError(
×
312
                "The 'quantile' method must be provided as a dictionary with "
313
                "the quantile value, e.g., {'quantile': 0.25}."
314
            )
315
        method_arg = None
6✔
316
    elif isinstance(method, dict):
6✔
317
        if len(method) != 1:
6✔
NEW
318
            raise ValueError(
×
319
                "'method' dictionary must contain exactly one key-value pair."
320
            )
321
        method_name, method_arg = list(method.items())[0]
6✔
322
        if not isinstance(method_name, str):
6✔
NEW
323
            raise TypeError("Key in 'method' dictionary must be a string.")
×
324
        method_name = method_name.lower()
6✔
325
        if method_name not in allowed_methods:
6✔
326
            raise ValueError(
6✔
327
                f"Method '{method_name}' is not supported. Supported methods are: {allowed_methods}"
328
            )
329
        if method_name == "quantile":
6✔
330
            if not isinstance(method_arg, (float, int)) or not 0 <= method_arg <= 1:
6✔
331
                raise ValueError(
6✔
332
                    "The 'quantile' method must have a float between 0 and 1 as an argument."
333
                )
334
    else:
NEW
335
        raise ValueError(
×
336
            f"Unsupported method type: {type(method)}. Must be a string or dictionary."
337
        )
338
    return method_name, method_arg
6✔
339

340

341
def band_average(
6✔
342
    spsdl: xr.DataArray,
343
    octave: int = 3,
344
    fmin: int = 10,
345
    fmax: int = 100000,
346
    method: Union[str, Dict[str, Union[float, int]]] = "median",
347
) -> xr.DataArray:
348
    """
349
    Reorganizes spectral density level frequency tensor into
350
    fractional octave bands and applies a function to them.
351

352
    Parameters
353
    ----------
354
    spsdl: xarray.DataArray (time, freq)
355
        Mean square sound pressure spectral density level in dB rel 1 uPa^2/Hz
356
    octave: int
357
        Octave to subdivide spectral density level by. Default = 3 (third octave)
358
    fmin: int
359
        Lower frequency band limit (lower limit of the hydrophone). Default: 10 Hz
360
    fmax: int
361
        Upper frequency band limit (Nyquist frequency). Default: 100000 Hz
362
    method: str or dict
363
        Method to run on the binned data. Can be a string (e.g., "median") or a dict
364
        where the key is the method and the value is its argument (e.g., {"quantile": 0.25}).
365
        Options: [median, mean, min, max, sum, quantile, std, var, count]
366

367
    Returns
368
    -------
369
    out: xarray.DataArray (time, freq_bins)
370
        Frequency band-averaged sound pressure spectral density level [dB re 1 uPa^2/Hz]
371
        indexed by time and frequency
372
    """
373

374
    # Type checks
375
    if not isinstance(spsdl, xr.DataArray):
6✔
NEW
376
        raise TypeError("'spsdl' must be an xarray.DataArray.")
×
377
    if not isinstance(octave, int):
6✔
NEW
378
        raise TypeError("'octave' must be an integer.")
×
379
    if not isinstance(fmin, int):
6✔
NEW
380
        raise TypeError("'fmin' must be an integer.")
×
381
    if not isinstance(fmax, int):
6✔
NEW
382
        raise TypeError("'fmax' must be an integer.")
×
383
    if not isinstance(method, (str, dict)):
6✔
NEW
384
        raise TypeError("'method' must be a string or a dictionary.")
×
385

386
    # Value checks
387
    if "freq" not in spsdl.dims or "time" not in spsdl.dims:
6✔
NEW
388
        raise ValueError("'spsdl' must have 'time' and 'freq' as dimensions.")
×
389
    if octave <= 0:
6✔
NEW
390
        raise ValueError("'octave' must be a positive integer.")
×
391
    if fmin <= 0:
6✔
NEW
392
        raise ValueError("'fmin' must be a positive integer.")
×
393
    if fmax <= fmin:
6✔
NEW
394
        raise ValueError("'fmax' must be greater than 'fmin'.")
×
395

396
    # Validate method and get method_name and method_arg
397
    method_name, method_arg = _validate_method(method)
6✔
398

399
    # Check fmax
400
    fn = spsdl["freq"].max().values
6✔
401
    fmax = _fmax_warning(fn, fmax)
6✔
402

403
    bandwidth = 2 ** (1 / octave)
6✔
404
    half_bandwidth = 2 ** (1 / (octave * 2))
6✔
405

406
    band = {}
6✔
407
    band["center_freq"] = 10 ** np.arange(
6✔
408
        np.log10(fmin),
409
        np.log10(fmax * bandwidth),
410
        step=np.log10(bandwidth),
411
    )
412
    band["lower_limit"] = band["center_freq"] / half_bandwidth
6✔
413
    band["upper_limit"] = band["center_freq"] * half_bandwidth
6✔
414
    octave_bins = np.append(band["lower_limit"], band["upper_limit"][-1])
6✔
415

416
    # Use xarray binning methods
417
    spsdl_group = spsdl.groupby_bins("freq", octave_bins, labels=band["center_freq"])
6✔
418

419
    # Handle method being a string or a dict
420
    if isinstance(method, str):
6✔
421
        func = getattr(spsdl_group, method.lower())
6✔
422
        out = func()
6✔
NEW
423
    elif isinstance(method, dict):
×
NEW
424
        method_name, method_arg = list(method.items())[0]
×
NEW
425
        func = getattr(spsdl_group, method_name.lower())
×
NEW
426
        out = func(method_arg)
×
427
    else:
NEW
428
        raise ValueError(
×
429
            f"Unsupported method type: {type(method)}. "
430
            "Must be a string or dictionary."
431
        )
432

433
    out.attrs.update(
6✔
434
        {"units": spsdl.units, "comment": f"Third octave frequency band {method}"}
435
    )
436

437
    return out
6✔
438

439

440
def time_average(
6✔
441
    spsdl: xr.DataArray,
442
    window: int = 60,
443
    method: Union[str, Dict[str, Union[float, int]]] = "median",
444
) -> xr.DataArray:
445
    """
446
    Reorganizes spectral density level frequency tensor into
447
    time windows and applies a function to them.
448

449
    If the window length is equivalent to the size of spsdl["time"],
450
    this function is equivalent to spsdl.<method>("time")
451

452
    Parameters
453
    ----------
454
    spsdl: xarray.DataArray (time, freq)
455
        Mean square sound pressure spectral density level in dB rel 1 uPa^2/Hz
456
    window: int
457
        Time in seconds to subdivide spectral density level into. Default: 60 s.
458
    method: str or dict
459
        Method to run on the binned data. Can be a string (e.g., "median") or a dict
460
        where the key is the method and the value is its argument (e.g., {"quantile": 0.25}).
461
        Options: [median, mean, min, max, sum, quantile, std, var, count]
462

463
    Returns
464
    -------
465
    out: xarray.DataArray (time_bins, freq)
466
        Time-averaged sound pressure spectral density level [dB re 1 uPa^2/Hz]
467
        indexed by time and frequency
468
    """
469

470
    # Type checks
471
    if not isinstance(spsdl, xr.DataArray):
6✔
NEW
472
        raise TypeError("'spsdl' must be an xarray.DataArray.")
×
473
    if not isinstance(window, int):
6✔
NEW
474
        raise TypeError("'window' must be an integer.")
×
475
    if not isinstance(method, (str, dict)):
6✔
NEW
476
        raise TypeError("'method' must be a string or dictionary.")
×
477
    if "time" not in spsdl.dims:
6✔
NEW
478
        raise ValueError("'spsdl' must have 'time' dimension.")
×
479

480
    # Value checks
481
    if window <= 0:
6✔
NEW
482
        raise ValueError("'window' must be a positive integer.")
×
483

484
    # Ensure 'time' coordinate is of datetime64 dtype
485
    if not np.issubdtype(spsdl["time"].dtype, np.datetime64):
6✔
NEW
486
        raise TypeError("'spsdl['time']' must be of dtype 'datetime64'.")
×
487

488
    # Validate method and get method_name and method_arg
489
    method_name, method_arg = _validate_method(method)
6✔
490

491
    window = np.timedelta64(window, "s")
6✔
492
    time_bins_lower = np.arange(
6✔
493
        spsdl["time"][0].values, spsdl["time"][-1].values, window
494
    )
495
    time_bins_upper = time_bins_lower + window
6✔
496
    time_bins = np.append(time_bins_lower, time_bins_upper[-1])
6✔
497
    center_time = epoch2dt64(
6✔
498
        0.5 * (dt642epoch(time_bins_lower) + dt642epoch(time_bins_upper))
499
    )
500

501
    # Use xarray binning methods
502
    spsdl_group = spsdl.groupby_bins("time", time_bins, labels=center_time)
6✔
503

504
    # Apply the aggregation method
505
    func = getattr(spsdl_group, method_name)
6✔
506
    if method_arg is not None:
6✔
507
        out = func(method_arg)
6✔
508
    else:
509
        out = func()
6✔
510

511
    # Update attributes
512
    out.attrs["units"] = spsdl.units
6✔
513
    out.attrs["comment"] = f"Time average {method}"
6✔
514

515
    # Remove 'quantile' coordinate if present
516
    if method == "quantile":
6✔
NEW
517
        out = out.drop_vars("quantile")
×
518

519
    return out
6✔
520

521

522
def sound_pressure_level(
6✔
523
    spsd: xr.DataArray, fmin: int = 10, fmax: int = 100000
524
) -> xr.DataArray:
525
    """
526
    Calculates the sound pressure level in a specified frequency band
527
    from the mean square sound pressure spectral density.
528

529
    Parameters
530
    ----------
531
    spsd: xarray.DataArray (time, freq)
532
        Mean square sound pressure spectral density in [Pa^2/Hz]
533
    fmin: int
534
        Lower frequency band limit (lower limit of the hydrophone). Default: 10 Hz
535
    fmax: int
536
        Upper frequency band limit (Nyquist frequency). Default: 100000 Hz
537

538
    Returns
539
    -------
540
    spl: xarray.DataArray (time)
541
        Sound pressure level [dB re 1 uPa] indexed by time
542
    """
543

544
    # Type checks
545
    if not isinstance(spsd, xr.DataArray):
6✔
NEW
546
        raise TypeError("'spsd' must be an xarray.DataArray.")
×
547
    if not isinstance(fmin, int):
6✔
NEW
548
        raise TypeError("'fmin' must be an integer.")
×
549
    if not isinstance(fmax, int):
6✔
NEW
550
        raise TypeError("'fmax' must be an integer.")
×
551

552
    # Ensure 'freq' and 'time' dimensions are present
553
    if "freq" not in spsd.dims or "time" not in spsd.dims:
6✔
NEW
554
        raise ValueError("'spsd' must have 'time' and 'freq' as dimensions.")
×
555

556
    # Check that 'fs' (sampling frequency) is available in attributes
557
    if "fs" not in spsd.attrs:
6✔
NEW
558
        raise ValueError(
×
559
            "'spsd' must have 'fs' (sampling frequency) in its attributes."
560
        )
561

562
    # Value checks
563
    if fmin <= 0:
6✔
NEW
564
        raise ValueError("'fmin' must be a positive integer.")
×
565
    if fmax <= fmin:
6✔
NEW
566
        raise ValueError("'fmax' must be greater than 'fmin'.")
×
567

568
    # Check fmax
569
    fn = spsd.attrs["fs"] // 2
6✔
570
    fmax = _fmax_warning(fn, fmax)
6✔
571

572
    # Reference value of sound pressure
573
    reference = 1e-12  # Pa^2, = 1 uPa^2
6✔
574

575
    # Mean square sound pressure in a specified frequency band from mean square values
576
    pressure_squared = np.trapz(
6✔
577
        spsd.sel(freq=slice(fmin, fmax)), spsd["freq"].sel(freq=slice(fmin, fmax))
578
    )
579

580
    # Mean square sound pressure level
581
    mspl = 10 * np.log10(pressure_squared / reference)
6✔
582

583
    out = xr.DataArray(
6✔
584
        mspl,
585
        coords={"time": spsd["time"]},
586
        attrs={
587
            "units": "dB re 1 uPa",
588
            "long_name": "Sound Pressure Level",
589
            "freq_band_min": fmin,
590
            "freq_band_max": fmax,
591
        },
592
    )
593

594
    return out
6✔
595

596

597
def _band_sound_pressure_level(
6✔
598
    spsd: xr.DataArray,
599
    bandwidth: int,
600
    half_bandwidth: int,
601
    fmin: int = 10,
602
    fmax: int = 100000,
603
) -> xr.DataArray:
604
    """
605
    Calculates band-averaged sound pressure levels
606

607
    Parameters
608
    ----------
609
    spsd: xarray.DataArray (time, freq)
610
        Mean square sound pressure spectral density.
611
    bandwidth : int or float
612
        Bandwidth to average over.
613
    half_bandwidth : int or float
614
        Half-bandwidth, used to set upper and lower bandwidth limits.
615
    fmin : int, optional
616
        Lower frequency band limit (lower limit of the hydrophone). Default is 10 Hz.
617
    fmax : int, optional
618
        Upper frequency band limit (Nyquist frequency). Default is 100,000 Hz.
619

620

621
    Returns
622
    -------
623
    out: xarray.DataArray (time, freq_bins)
624
        Sound pressure level [dB re 1 uPa] indexed by time and frequency of specified bandwidth
625
    """
626

627
    # Type checks
628
    if not isinstance(spsd, xr.DataArray):
6✔
NEW
629
        raise TypeError("'spsd' must be an xarray.DataArray.")
×
630
    if not isinstance(bandwidth, (int, float)):
6✔
NEW
631
        raise TypeError("'bandwidth' must be a numeric type (int or float).")
×
632
    if not isinstance(half_bandwidth, (int, float)):
6✔
NEW
633
        raise TypeError("'half_bandwidth' must be a numeric type (int or float).")
×
634
    if not isinstance(fmin, int):
6✔
NEW
635
        raise TypeError("'fmin' must be an integer.")
×
636
    if not isinstance(fmax, int):
6✔
NEW
637
        raise TypeError("'fmax' must be an integer.")
×
638

639
    # Ensure 'freq' and 'time' dimensions are present
640
    if "freq" not in spsd.dims or "time" not in spsd.dims:
6✔
NEW
641
        raise ValueError("'spsd' must have 'time' and 'freq' as dimensions.")
×
642

643
    # Check that 'fs' (sampling frequency) is available in attributes
644
    if "fs" not in spsd.attrs:
6✔
NEW
645
        raise ValueError(
×
646
            "'spsd' must have 'fs' (sampling frequency) in its attributes."
647
        )
648

649
    # Value checks
650
    if fmin <= 0:
6✔
NEW
651
        raise ValueError("'fmin' must be a positive integer.")
×
652
    if fmax <= fmin:
6✔
NEW
653
        raise ValueError("'fmax' must be greater than 'fmin'.")
×
654

655
    # Check fmax
656
    fn = spsd.attrs["fs"] // 2
6✔
657
    fmax = _fmax_warning(fn, fmax)
6✔
658

659
    # Reference value of sound pressure
660
    reference = 1e-12  # Pa^2, = 1 uPa^2
6✔
661

662
    band = {}
6✔
663
    band["center_freq"] = 10 ** np.arange(
6✔
664
        np.log10(fmin),
665
        np.log10(fmax * bandwidth),
666
        step=np.log10(bandwidth),
667
    )
668
    band["lower_limit"] = band["center_freq"] / half_bandwidth
6✔
669
    band["upper_limit"] = band["center_freq"] * half_bandwidth
6✔
670
    octave_bins = np.append(band["lower_limit"], band["upper_limit"][-1])
6✔
671

672
    # Manual trapezoidal rule to get Pa^2
673
    pressure_squared = xr.DataArray(
6✔
674
        coords={"time": spsd["time"], "freq_bins": band["center_freq"]},
675
        dims=["time", "freq_bins"],
676
    )
677
    for i, key in enumerate(band["center_freq"]):
6✔
678
        band_min = octave_bins[i]
6✔
679
        band_max = octave_bins[i + 1]
6✔
680
        pressure_squared.loc[{"freq_bins": key}] = np.trapz(
6✔
681
            spsd.sel(freq=slice(band_min, band_max)),
682
            spsd["freq"].sel(freq=slice(band_min, band_max)),
683
        )
684

685
    # Mean square sound pressure level in dB rel 1 uPa
686
    mspl = 10 * np.log10(pressure_squared / reference)
6✔
687

688
    return mspl
6✔
689

690

691
def third_octave_sound_pressure_level(
6✔
692
    spsd: xr.DataArray, fmin: int = 10, fmax: int = 100000
693
) -> xr.DataArray:
694
    """
695
    Calculates the sound pressure level in third octave bands directly
696
    from the mean square sound pressure spectral density.
697

698
    Parameters
699
    ----------
700
    spsd: xarray.DataArray (time, freq)
701
        Mean square sound pressure spectral density.
702
    fmin: int
703
        Lower frequency band limit (lower limit of the hydrophone). Default: 10 Hz
704
    fmax: int
705
        Upper frequency band limit (Nyquist frequency). Default: 100000 Hz
706

707
    Returns
708
    -------
709
    mspl: xarray.DataArray (time, freq_bins)
710
        Sound pressure level [dB re 1 uPa] indexed by time and third octave bands
711
    """
712

713
    # Type checks
714
    if not isinstance(spsd, xr.DataArray):
6✔
NEW
715
        raise TypeError("'spsd' must be an xarray.DataArray.")
×
716
    if not isinstance(fmin, int):
6✔
NEW
717
        raise TypeError("'fmin' must be an integer.")
×
718
    if not isinstance(fmax, int):
6✔
NEW
719
        raise TypeError("'fmax' must be an integer.")
×
720

721
    # Ensure 'freq' and 'time' dimensions are present
722
    if "freq" not in spsd.dims or "time" not in spsd.dims:
6✔
NEW
723
        raise ValueError("'spsd' must have 'time' and 'freq' as dimensions.")
×
724

725
    # Check that 'fs' (sampling frequency) is available in attributes
726
    if "fs" not in spsd.attrs:
6✔
NEW
727
        raise ValueError(
×
728
            "'spsd' must have 'fs' (sampling frequency) in its attributes."
729
        )
730

731
    # Value checks
732
    if fmin <= 0:
6✔
NEW
733
        raise ValueError("'fmin' must be a positive integer.")
×
734
    if fmax <= fmin:
6✔
NEW
735
        raise ValueError("'fmax' must be greater than 'fmin'.")
×
736

737
    # Third octave bin frequencies
738
    bandwidth = 2 ** (1 / 3)
6✔
739
    half_bandwidth = 2 ** (1 / 6)
6✔
740

741
    mspl = _band_sound_pressure_level(spsd, bandwidth, half_bandwidth, fmin, fmax)
6✔
742
    mspl.attrs = {
6✔
743
        "units": "dB re 1 uPa",
744
        "long_name": "Third Octave Sound Pressure Level",
745
    }
746

747
    return mspl
6✔
748

749

750
def decidecade_sound_pressure_level(
6✔
751
    spsd: xr.DataArray, fmin: int = 10, fmax: int = 100000
752
) -> xr.DataArray:
753
    """
754
    Calculates the sound pressure level in decidecade bands directly
755
    from the mean square sound pressure spectral density.
756

757
    Parameters
758
    ----------
759
    spsd: xarray.DataArray (time, freq)
760
        Mean square sound pressure spectral density.
761
    fmin: int
762
        Lower frequency band limit (lower limit of the hydrophone). Default: 10 Hz
763
    fmax: int
764
        Upper frequency band limit (Nyquist frequency). Default: 100000 Hz
765

766
    Returns
767
    -------
768
    mspl : xarray.DataArray (time, freq_bins)
769
        Sound pressure level [dB re 1 uPa] indexed by time and third octave bands
770
    """
771

772
    # Type checks
773
    if not isinstance(spsd, xr.DataArray):
6✔
NEW
774
        raise TypeError("'spsd' must be an xarray.DataArray.")
×
775
    if not isinstance(fmin, int):
6✔
NEW
776
        raise TypeError("'fmin' must be an integer.")
×
777
    if not isinstance(fmax, int):
6✔
NEW
778
        raise TypeError("'fmax' must be an integer.")
×
779

780
    # Ensure 'freq' and 'time' dimensions are present
781
    if "freq" not in spsd.dims or "time" not in spsd.dims:
6✔
NEW
782
        raise ValueError("'spsd' must have 'time' and 'freq' as dimensions.")
×
783

784
    # Check that 'fs' (sampling frequency) is available in attributes
785
    if "fs" not in spsd.attrs:
6✔
NEW
786
        raise ValueError(
×
787
            "'spsd' must have 'fs' (sampling frequency) in its attributes."
788
        )
789

790
    # Value checks
791
    if fmin <= 0:
6✔
NEW
792
        raise ValueError("'fmin' must be a positive integer.")
×
793
    if fmax <= fmin:
6✔
NEW
794
        raise ValueError("'fmax' must be greater than 'fmin'.")
×
795

796
    # Decidecade bin frequencies
797
    bandwidth = 2 ** (1 / 10)
6✔
798
    half_bandwidth = 2 ** (1 / 20)
6✔
799

800
    mspl = _band_sound_pressure_level(spsd, bandwidth, half_bandwidth, fmin, fmax)
6✔
801
    mspl.attrs = {
6✔
802
        "units": "dB re 1 uPa",
803
        "long_name": "Decidecade Sound Pressure Level",
804
    }
805

806
    return mspl
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc