• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MHKiT-Software / MHKiT-Python / 11367210922

16 Oct 2024 01:57PM UTC coverage: 85.658%. First build
11367210922

Pull #349

github

web-flow
Merge 4f0c18b1d into 6445365d0
Pull Request #349: Acoustics Module

344 of 472 new or added lines in 5 files covered. (72.88%)

9341 of 10905 relevant lines covered (85.66%)

5.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/mhkit/acoustics/io.py
1
"""
3✔
2
This submodule includes the main passive acoustics I/O functions.
3
`read_hydrophone` is the main function, with a number of wrapper
4
functions for specific manufacturers. The `export_audio` function
5
exists to improve audio if one is difficult to listen to.
6
"""
7

8
from typing import BinaryIO, Tuple, Dict, Union, Optional, Any
6✔
9
import io
6✔
10
import struct
6✔
11
import wave
6✔
12
from pathlib import Path
6✔
13
import numpy as np
6✔
14
import pandas as pd
6✔
15
import xarray as xr
6✔
16
from scipy.io import wavfile
6✔
17

18

19
def _read_wav_metadata(f: BinaryIO) -> dict:
6✔
20
    """
21
    Extracts the bit depth from a WAV file and skips over any metadata blocks
22
    that might be present (e.g., 'LIST' chunks).
23

24
    Parameters
25
    ----------
26
    f : BinaryIO
27
        An open WAV file in binary mode.
28

29
    Returns
30
    -------
31
    header : dict
32
        Dictionary containing .wav file's header data
33
    """
34

35
    header = {}
6✔
36
    f.read(4)  # riff_key
6✔
37
    header["filesize"] = struct.unpack("<I", f.read(4))[0]
6✔
38
    f.read(4)  # wave_key
6✔
39
    list_key = f.read(4)
6✔
40

41
    # Skip metadata if it exists
42
    if "LIST" in list_key.decode():
6✔
43
        list_size = struct.unpack("<I", f.read(4))[0]
6✔
44
        f.seek(f.tell() + list_size)
6✔
45
    else:
46
        f.seek(f.tell() - 4)
6✔
47

48
    f.read(4)  # fmt_key
6✔
49
    fmt_size = struct.unpack("<I", f.read(4))[0]
6✔
50
    header["compression_code"] = struct.unpack("<H", f.read(2))[0]
6✔
51
    header["n_channels"] = struct.unpack("<H", f.read(2))[0]
6✔
52
    header["sample_rate"] = struct.unpack("<I", f.read(4))[0]
6✔
53
    header["bytes_per_sec"] = struct.unpack("<I", f.read(4))[0]
6✔
54
    header["block_align"] = struct.unpack("<H", f.read(2))[0]
6✔
55
    header["bits_per_sample"] = struct.unpack("<H", f.read(2))[0]
6✔
56
    f.seek(f.tell() + fmt_size - 16)
6✔
57

58
    return header
6✔
59

60

61
def _calculate_voltage_and_time(
6✔
62
    fs: int,
63
    raw: np.ndarray,
64
    bits_per_sample: int,
65
    peak_voltage: Union[int, float],
66
    start_time: str,
67
) -> Tuple[np.ndarray, pd.DatetimeIndex, int]:
68
    """
69
    Normalizes the raw data from the WAV file to the appropriate voltage and
70
    calculates the time array based on the sampling frequency.
71

72
    Parameters
73
    ----------
74
    fs : int
75
        Sampling frequency of the audio data in Hertz.
76
    raw : numpy.ndarray
77
        Raw audio data extracted from the WAV file.
78
    bits_per_sample : int
79
        Number of bits per sample in the WAV file.
80
    peak_voltage : int or float
81
        Peak voltage supplied to the analog-to-digital converter (ADC) in volts.
82
    start_time : str, np.datetime64
83
        Start time of the recording in ISO 8601 format (e.g., '2024-06-06T00:00:00').
84

85
    Returns
86
    -------
87
    raw_voltage : numpy.ndarray
88
        Normalized voltage values corresponding to the raw audio data.
89
    time : pandas.DatetimeIndex
90
        Time index for the audio data based on the sample rate and start time.
91
    max_count : int
92
        Maximum possible count value for the given bit depth, used for normalization.
93
    """
94

95
    if not isinstance(fs, int):
6✔
NEW
96
        raise TypeError("Sampling frequency 'fs' must be an integer.")
×
97
    if not isinstance(raw, np.ndarray):
6✔
NEW
98
        raise TypeError("Raw audio data 'raw' must be a numpy.ndarray.")
×
99
    if not isinstance(bits_per_sample, int):
6✔
NEW
100
        raise TypeError("'bits_per_sample' must be an integer.")
×
101
    if not isinstance(peak_voltage, (int, float)):
6✔
NEW
102
        raise TypeError("'peak_voltage' must be numeric (int or float).")
×
103
    if not isinstance(start_time, (str, np.datetime64)):
6✔
NEW
104
        raise TypeError("'start_time' must be a string or np.datetime64.")
×
105

106
    length = raw.shape[0] // fs  # length of recording in seconds
6✔
107

108
    if bits_per_sample in [16, 32]:
6✔
109
        max_count = 2 ** (bits_per_sample - 1)
6✔
110
    elif bits_per_sample == 12:
6✔
NEW
111
        max_count = 2 ** (16 - 1) - 2**4  # 12 bit read in as 16 bit
×
112
    elif bits_per_sample == 24:
6✔
113
        max_count = 2 ** (32 - 1) - 2**8  # 24 bit read in as 32 bit
6✔
114
    else:
NEW
115
        raise IOError(
×
116
            f"Unknown how to read {bits_per_sample} bit ADC."
117
            "Please notify MHKiT team."
118
        )
119

120
    # Normalize and then scale to peak voltage
121
    # Use 64 bit float for decimal accuracy
122
    raw_voltage = raw.astype(float) / max_count * peak_voltage
6✔
123

124
    # Get time
125
    end_time = np.datetime64(start_time) + np.timedelta64(length * 1000, "ms")
6✔
126
    time = pd.date_range(start_time, end_time, raw.size + 1)
6✔
127

128
    return raw_voltage, time, max_count
6✔
129

130

131
def read_hydrophone(
6✔
132
    filename: Union[str, Path],
133
    peak_voltage: Union[int, float],
134
    sensitivity: Optional[Union[int, float]] = None,
135
    gain: Union[int, float] = 0,
136
    start_time: str = "2024-01-01T00:00:00",
137
) -> xr.DataArray:
138
    """
139
    Read .wav file from a hydrophone. Returns voltage timeseries if sensitivity not
140
    provided, returns pressure timeseries if it is provided.
141

142
    Parameters
143
    ----------
144
    filename: str or pathlib.Path
145
        Input filename
146
    peak_voltage: int or float
147
        Peak voltage supplied to the analog to digital converter (ADC) in V.
148
        (Or 1/2 of the peak to peak voltage).
149
    sensitivity: int or float
150
        Hydrophone calibration sensitivity in dB re 1 V/uPa.
151
        Should be negative. Default: None.
152
    gain: int or float
153
        Amplifier gain in dB re 1 V/uPa. Default 0.
154
    start_time: str
155
        Start time in the format yyyy-mm-ddTHH:MM:SS
156

157
    Returns
158
    -------
159
    out: numpy.array
160
        Sound pressure [Pa] or Voltage [V] indexed by time[s]
161
    """
162

163
    if not isinstance(filename, (str, Path)):
6✔
NEW
164
        raise TypeError("Filename must be a string or a pathlib.Path object.")
×
165
    if not isinstance(peak_voltage, (int, float)):
6✔
NEW
166
        raise TypeError("'peak_voltage' must be numeric (int or float).")
×
167
    if sensitivity is not None and not isinstance(sensitivity, (int, float)):
6✔
NEW
168
        raise TypeError("'sensitivity' must be numeric (int, float) or None.")
×
169
    if not isinstance(gain, (int, float)):
6✔
NEW
170
        raise TypeError("'gain' must be numeric (int or float).")
×
171
    if not isinstance(start_time, (str, np.datetime64)):
6✔
NEW
172
        raise TypeError("'start_time' must be a string or np.datetime64")
×
173

174
    if (sensitivity is not None) and (sensitivity > 0):
6✔
NEW
175
        raise ValueError(
×
176
            "Hydrophone calibrated sensitivity should be entered as a negative number."
177
        )
178

179
    # Read metadata from WAV file
180
    with open(filename, "rb") as f:
6✔
181
        header = _read_wav_metadata(f)
6✔
182

183
    # Read data using scipy (will auto drop as int16 or int32)
184
    fs, raw = wavfile.read(filename)
6✔
185

186
    # Calculate raw voltage and time array
187
    raw_voltage, time, max_count = _calculate_voltage_and_time(
6✔
188
        fs, raw, header["bits_per_sample"], peak_voltage, start_time
189
    )
190

191
    # If sensitivity is provided, convert to sound pressure
192
    if sensitivity is not None:
6✔
193
        # Subtract gain
194
        # Hydrophone with sensitivity of -177 dB and gain of -3 dB = sensitivity of -174 dB
195
        if gain:
6✔
NEW
196
            sensitivity -= gain
×
197
        # Convert calibration from dB rel 1 V/uPa into ratio
198
        sensitivity = 10 ** (sensitivity / 20)  # V/uPa
6✔
199

200
        # Sound pressure
201
        pressure = raw_voltage / sensitivity  # uPa
6✔
202
        pressure = pressure / 1e6  # Pa
6✔
203

204
        out = xr.DataArray(
6✔
205
            pressure,
206
            coords={"time": time[:-1]},
207
            attrs={
208
                "units": "Pa",
209
                "sensitivity": np.round(sensitivity, 12),
210
                # Pressure min resolution
211
                "resolution": np.round(peak_voltage / max_count / sensitivity / 1e6, 9),
212
                # Minimum pressure sensor can read
213
                "valid_min": np.round(-peak_voltage / sensitivity / 1e6, 6),
214
                # Pressure at which sensor is saturated
215
                "valid_max": np.round(peak_voltage / sensitivity / 1e6, 6),
216
                "fs": fs,
217
                "filename": Path(filename).stem,
218
            },
219
        )
220
    else:
221
        out = xr.DataArray(
6✔
222
            raw_voltage,
223
            coords={"time": time[:-1]},
224
            attrs={
225
                "units": "V",
226
                # Voltage min resolution
227
                "resolution": np.round(peak_voltage / max_count, 6),
228
                # Minimum voltage sensor can read
229
                "valid_min": -peak_voltage,
230
                # Voltage at which sensor is saturated
231
                "valid_max": peak_voltage,
232
                "fs": fs,
233
                "filename": Path(filename).stem,
234
            },
235
        )
236

237
    return out
6✔
238

239

240
def read_soundtrap(
6✔
241
    filename: str,
242
    sensitivity: Optional[Union[int, float]] = None,
243
    gain: Union[int, float] = 0,
244
) -> xr.DataArray:
245
    """
246
    Read .wav file from an Ocean Instruments SoundTrap hydrophone.
247
    Returns voltage timeseries if sensitivity not provided, returns pressure
248
    timeseries if it is provided.
249

250
    Parameters
251
    ----------
252
    filename : str
253
        Input filename.
254
    sensitivity : int or float, optional
255
        Hydrophone calibration sensitivity in dB re 1 V/μPa.
256
        Should be negative. Default is None.
257
    gain : int or float
258
        Amplifier gain in dB re 1 V/μPa. Default is 0.
259

260
    Returns
261
    -------
262
    out : xarray.DataArray
263
        Sound pressure [Pa] or Voltage [V] indexed by time[s].
264
    """
265

266
    # Get time from filename
267
    st = filename.split(".")[-2]
6✔
268
    start_time = (
6✔
269
        "20"
270
        + st[:2]
271
        + "-"
272
        + st[2:4]
273
        + "-"
274
        + st[4:6]
275
        + "T"
276
        + st[6:8]
277
        + ":"
278
        + st[8:10]
279
        + ":"
280
        + st[10:12]
281
    )
282

283
    # Soundtrap uses a peak voltage of 1 V
284
    out = read_hydrophone(
6✔
285
        filename,
286
        peak_voltage=1,
287
        sensitivity=sensitivity,
288
        gain=gain,
289
        start_time=start_time,
290
    )
291
    out.attrs["make"] = "SoundTrap"
6✔
292

293
    return out
6✔
294

295

296
def _read_iclisten_metadata(f: io.BufferedIOBase) -> Dict[str, Any]:
6✔
297
    """
298
    Reads the metadata from the icListen .wav file and
299
    returns the metadata in a dictionary.
300

301
    Parameters
302
    ----------
303
    f: io.BufferedIOBase
304
        Opened .wav file for reading metadata.
305

306
    Returns
307
    -------
308
    metadata: dict
309
        A dictionary containing metadata such as peak_voltage,
310
        stored_sensitivity, humidity, temperature, etc.
311
    """
312

313
    def read_string(f: io.BufferedIOBase) -> dict:
6✔
314
        """Reads a string from the file based on its size."""
315
        key = f.read(4).decode().lower()  # skip 4 bytes to bypass key name
6✔
316
        item = struct.unpack("<I", f.read(4))[0]
6✔
317
        return {key: f.read(item).decode().rstrip("\x00")}
6✔
318

319
    metadata: Dict[str, Any] = {}
6✔
320

321
    # Read header keys
322
    riff_key = f.read(4)
6✔
323
    metadata["filesize"] = struct.unpack("<I", f.read(4))[0]
6✔
324
    wave_key = f.read(4)
6✔
325
    # Check if headers are as expected
326
    if riff_key != b"RIFF" or wave_key != b"WAVE":
6✔
NEW
327
        raise IOError("Invalid file format or file corrupted.")
×
328

329
    # Read metadata keys
330
    list_key = f.read(4)
6✔
331
    if list_key != b"LIST":
6✔
NEW
332
        raise KeyError("Missing LIST chunk in WAV file.")
×
333
    list_size_bytes = f.read(4)
6✔
334
    if len(list_size_bytes) < 4:
6✔
NEW
335
        raise EOFError("Unexpected end of file when reading list size.")
×
336
    info_key = f.read(4)
6✔
337
    if info_key != b"INFO":
6✔
NEW
338
        raise KeyError("Expected INFO key in metadata but got different key.")
×
339

340
    # Read metadata and store in the dictionary
341
    metadata.update(read_string(f))  # Hydrophone make and SN
6✔
342
    metadata.update(read_string(f))  # Hydrophone model
6✔
343
    metadata.update(read_string(f))  # File creation date
6✔
344
    metadata.update(read_string(f))  # Hydrophone software version
6✔
345
    metadata.update(read_string(f))  # Original filename
6✔
346

347
    # Additional comments
348
    icmt_key = f.read(4)
6✔
349
    if icmt_key != b"ICMT":
6✔
NEW
350
        raise KeyError("Expected ICMT key in metadata but got different key.")
×
351
    icmt_size_bytes = f.read(4)
6✔
352
    if len(icmt_size_bytes) < 4:
6✔
NEW
353
        raise EOFError("Unexpected end of file when reading ICMT size.")
×
354
    icmt_size = struct.unpack("<I", icmt_size_bytes)[0]
6✔
355
    icmt_bytes = f.read(icmt_size)
6✔
356
    if len(icmt_bytes) < icmt_size:
6✔
NEW
357
        raise EOFError("Unexpected end of file when reading ICMT data.")
×
358
    icmt = icmt_bytes.decode().rstrip("\x00")
6✔
359

360
    # Parse the fields from comments and update the metadata dictionary
361
    fields = icmt.split(",")
6✔
362
    try:
6✔
363
        metadata["peak_voltage"] = float(fields[0].split(" ")[0])
6✔
364
        metadata["stored_sensitivity"] = int(fields[1].strip().split(" ")[0])
6✔
365
        metadata["humidity"] = fields[2].strip()
6✔
366
        metadata["temperature"] = fields[3].strip()
6✔
367
        metadata["accelerometer"] = (
6✔
368
            ",".join(fields[4:7]).strip() if len(fields) > 6 else None
369
        )
370
        metadata["magnetometer"] = (
6✔
371
            ",".join(fields[7:10]).strip() if len(fields) > 9 else None
372
        )
373
        metadata["count_at_peak_voltage"] = fields[-2].strip()
6✔
374
        metadata["sequence_num"] = fields[-1].strip()
6✔
NEW
375
    except (IndexError, ValueError) as e:
×
NEW
376
        raise ValueError(f"Error parsing metadata comments: {e}") from e
×
377

378
    # Return a dictionary with metadata
379
    return metadata
6✔
380

381

382
def read_iclisten(
6✔
383
    filename: str,
384
    sensitivity: Optional[Union[int, float]] = None,
385
    use_metadata: bool = True,
386
) -> xr.DataArray:
387
    """
388
    Read .wav file from an Ocean Sonics icListen "Smart" hydrophone.
389
    Returns voltage timeseries if sensitivity not provided, returns pressure
390
    timeseries if it is provided.
391

392
    Parameters
393
    ----------
394
    filename : str
395
        Input filename.
396
    sensitivity : int or float, optional
397
        Hydrophone calibration sensitivity in dB re 1 V/μPa.
398
        Should be negative. Default is None.
399
    use_metadata : bool
400
        If True and `sensitivity` is None, applies sensitivity value stored
401
        in the .wav file's LIST block. If False and `sensitivity` is None,
402
        a sensitivity value isn't applied.
403

404
    Returns
405
    -------
406
    out : xarray.DataArray
407
        Sound pressure [Pa] or Voltage [V] indexed by time[s].
408
    """
409

410
    if not isinstance(use_metadata, bool):
6✔
NEW
411
        raise TypeError("'use_metadata' must be a boolean value.")
×
412

413
    # Read icListen metadata from file header
414
    with open(filename, "rb") as f:
6✔
415
        metadata = _read_iclisten_metadata(f)
6✔
416

417
    # Use stored sensitivity
418
    if use_metadata and sensitivity is None:
6✔
419
        sensitivity = metadata["stored_sensitivity"]
6✔
420
        if sensitivity is None:
6✔
NEW
421
            raise ValueError("Stored sensitivity not found in metadata.")
×
422

423
    # Convert metadata creation date to datetime64
424
    try:
6✔
425
        start_time = np.datetime64(metadata["icrd"])
6✔
NEW
426
    except ValueError as e:
×
NEW
427
        raise ValueError(f"Invalid creation date format in metadata: {e}") from e
×
428

429
    out = read_hydrophone(
6✔
430
        filename,
431
        peak_voltage=metadata["peak_voltage"],
432
        sensitivity=sensitivity,
433
        gain=0,
434
        start_time=start_time,
435
    )
436

437
    # Update attributes with metadata
438
    out.attrs.update(
6✔
439
        {
440
            "serial_num": metadata["iart"],
441
            "model": metadata["iprd"],
442
            "software_ver": metadata["isft"],
443
            "filename": metadata["inam"] + ".wav",
444
            "peak_voltage": metadata["peak_voltage"],
445
            "sensitivity": sensitivity,
446
            "humidity": metadata["humidity"],
447
            "temperature": metadata["temperature"],
448
            "accelerometer": metadata["accelerometer"],
449
            "magnetometer": metadata["magnetometer"],
450
            "count_at_peak_voltage": metadata["count_at_peak_voltage"],
451
            "sequence_num": metadata["sequence_num"],
452
        }
453
    )
454

455
    return out
6✔
456

457

458
def export_audio(
6✔
459
    filename: str, pressure: xr.DataArray, gain: Union[int, float] = 1
460
) -> None:
461
    """
462
    Creates human-scaled audio file from underwater recording.
463

464
    Parameters
465
    ----------
466
    filename : str
467
        Output filename for the WAV file (without extension).
468
    pressure : xarray.DataArray
469
        Sound pressure data with attributes:
470
            - 'values' (numpy.ndarray): Pressure data array.
471
            - 'sensitivity' (int or float): Sensitivity of the hydrophone in dB.
472
            - 'fs' (int or float): Sampling frequency in Hz.
473
    gain : int or float, optional
474
        Gain to multiply the original time series by. Default is 1.
475

476
    Returns
477
    -------
478
    None
479
    """
480
    if not isinstance(filename, str):
6✔
NEW
481
        raise TypeError("'filename' must be a string.")
×
482

483
    if not isinstance(pressure, xr.DataArray):
6✔
NEW
484
        raise TypeError("'pressure' must be an xarray.DataArray.")
×
485

486
    if not hasattr(pressure, "values") or not isinstance(pressure.values, np.ndarray):
6✔
NEW
487
        raise TypeError("'pressure.values' must be a numpy.ndarray.")
×
488

489
    if not hasattr(pressure, "sensitivity") or not isinstance(
6✔
490
        pressure.sensitivity, (int, float)
491
    ):
NEW
492
        raise TypeError("'pressure.sensitivity' must be a numeric type (int or float).")
×
493

494
    if not hasattr(pressure, "fs") or not isinstance(pressure.fs, (int, float)):
6✔
NEW
495
        raise TypeError("'pressure.fs' must be a numeric type (int or float).")
×
496

497
    if not isinstance(gain, (int, float)):
6✔
NEW
498
        raise TypeError("'gain' must be a numeric type (int or float).")
×
499

500
    # Convert from Pascals to UPa
501
    upa = pressure.values.T * 1e6
6✔
502
    # Change to voltage waveform
503
    v = upa * 10 ** (pressure.sensitivity / 20)  # in V
6✔
504
    # Normalize
505
    v = v / max(abs(v)) * gain
6✔
506
    # Convert to (little-endian) 16 bit integers.
507
    audio = (v * (2**16 - 1)).astype("<h")
6✔
508

509
    # pylint incorrectly thinks this is opening in read mode
510
    with wave.open(f"{filename}.wav", mode="w") as f:
6✔
511
        f.setnchannels(1)  # pylint: disable=no-member
6✔
512
        f.setsampwidth(2)  # pylint: disable=no-member
6✔
513
        f.setframerate(pressure.fs)  # pylint: disable=no-member
6✔
514
        f.writeframes(audio.tobytes())  # pylint: disable=no-member
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc