• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MHKiT-Software / MHKiT-Python / 10852448225

13 Sep 2024 04:08PM UTC coverage: 86.281%. First build
10852448225

Pull #349

github

web-flow
Merge 452ddcefb into d364d565d
Pull Request #349: Acoustics Module

222 of 252 new or added lines in 5 files covered. (88.1%)

9220 of 10686 relevant lines covered (86.28%)

5.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.28
/mhkit/acoustics/io.py
1
"""
6✔
2
This submodule includes the main passive acoustics I/O functions.
3
`read_hydrophone` is the main function, with a number of wrapper
4
functions for specific manufacturers. The `export_audio` function
5
exists to improve audio if one is difficult to listen to.
6
"""
7

8
import struct
6✔
9
import wave
6✔
10
from pathlib import Path
6✔
11
import numpy as np
6✔
12
import pandas as pd
6✔
13
import xarray as xr
6✔
14
from scipy.io import wavfile
6✔
15

16

17
def read_hydrophone(
6✔
18
    filename,
19
    peak_voltage=None,
20
    sensitivity=None,
21
    gain=0,
22
    start_time="2024-06-06T00:00:00",
23
):
24
    """
25
    Read .wav file from a hydrophone. Returns voltage timeseries if sensitivity not
26
    provided, returns pressure timeseries if it is provided.
27

28
    Parameters
29
    ----------
30
    filename: str or pathlib.Path
31
        Input filename
32
    peak_voltage: numeric
33
        Peak voltage supplied to the analog to digital converter (ADC) in V.
34
        (Or 1/2 of the peak to peak voltage).
35
    sensitivity: numeric
36
        Hydrophone calibration sensitivity in dB re 1 V/uPa.
37
        Should be negative. Default: None.
38
    gain: numeric
39
        Amplifier gain in dB re 1 V/uPa. Default 0.
40
    start_time: str
41
        Start time in the format yyyy-mm-ddTHH:MM:SS
42

43
    Returns
44
    -------
45
    out: numpy.array
46
        Sound pressure [Pa] or Voltage [V] indexed by time[s]
47
    """
48

49
    if (not isinstance(filename, str)) and (not isinstance(filename.as_posix(), str)):
6✔
NEW
50
        raise TypeError("Filename should be a string.")
×
51
    if peak_voltage is None:
6✔
NEW
52
        raise ValueError(
×
53
            "Please provide the peak voltage of the hydrophone's ADC `peak_voltage`."
54
        )
55
    if (sensitivity is not None) and (sensitivity > 0):
6✔
NEW
56
        raise ValueError(
×
57
            "Hydrophone calibrated sensitivity should be entered as a negative number."
58
        )
59

60
    with open(filename, "rb") as f:
6✔
61
        f.read(4)  # riff_key
6✔
62
        f.read(4)  # file_size "<I"
6✔
63
        f.read(4)  # wave_key
6✔
64
        list_key = f.read(4)
6✔
65

66
        # Skip metadata if it exits (don't know how to parse)
67
        if "LIST" in list_key.decode():
6✔
68
            list_size = struct.unpack("<I", f.read(4))[0]
6✔
69
            f.seek(f.tell() + list_size)
6✔
70
        else:
71
            f.seek(f.tell() - 4)
6✔
72

73
        # Read this to get the bits/sample
74
        f.read(4)  # fmt_key
6✔
75
        fmt_size = struct.unpack("<I", f.read(4))[0]
6✔
76
        f.read(2)  # compression_code "<H"
6✔
77
        f.read(2)  # n_channels "<H"
6✔
78
        f.read(4)  # sample_rate "<I"
6✔
79
        f.read(4)  # bytes_per_sec "<I"
6✔
80
        f.read(2)  # block_align "<H"
6✔
81
        bits_per_sample = struct.unpack("<H", f.read(2))[0]
6✔
82
        f.seek(f.tell() + fmt_size - 16)
6✔
83

84
    # Read data using scipy cause that's easier (will auto drop as int16 or int32)
85
    fs, raw = wavfile.read(filename)
6✔
86
    length = raw.shape[0] // fs  # length of recording in seconds
6✔
87

88
    if bits_per_sample in [16, 32]:
6✔
89
        max_count = 2 ** (bits_per_sample - 1)
6✔
90
    elif bits_per_sample == 12:
6✔
NEW
91
        max_count = 2 ** (16 - 1) - 2**4  # 12 bit read in as 16 bit
×
92
    elif bits_per_sample == 24:
6✔
93
        max_count = 2 ** (32 - 1) - 2**8  # 24 bit read in as 32 bit
6✔
94
    else:
NEW
95
        raise ValueError(
×
96
            f"Unknown how to read {bits_per_sample} bit ADC."
97
            "Please notify MHKiT team."
98
        )
99

100
    # Normalize and then scale to peak voltage
101
    # Use 64 bit float for decimal accuracy
102
    raw_voltage = raw.astype(float) / max_count * peak_voltage
6✔
103

104
    # Get time
105
    end_time = np.datetime64(start_time) + np.timedelta64(length, "s")
6✔
106
    time = pd.date_range(start_time, end_time, raw.size + 1)
6✔
107

108
    if sensitivity is not None:
6✔
109
        # Subtract gain
110
        # hydrophone with sensitivity of -177 dB and gain of -3 dB = sensitivity of -174 dB
111
        if gain:
6✔
NEW
112
            sensitivity -= gain
×
113
        # Convert calibration from dB rel 1 V/uPa into ratio
114
        sensitivity = 10 ** (sensitivity / 20)  # V/uPa
6✔
115

116
        # Sound pressure
117
        pressure = raw_voltage / sensitivity  # uPa
6✔
118
        pressure = pressure / 1e6  # Pa
6✔
119

120
        # Min resolution
121
        min_res = peak_voltage / max_count / sensitivity  # uPa
6✔
122
        # Pressure at which sensor is saturated
123
        max_sat = peak_voltage / sensitivity  # uPa
6✔
124

125
        out = xr.DataArray(
6✔
126
            pressure,
127
            coords={"time": time[:-1]},
128
            attrs={
129
                "units": "Pa",
130
                "sensitivity": sensitivity,
131
                "resolution": np.round(min_res / 1e6, 9),
132
                "valid_min": np.round(
133
                    -max_sat / 1e6,
134
                    6,
135
                ),
136
                "valid_max": np.round(max_sat / 1e6, 6),
137
                "fs": fs,
138
                "filename": Path(filename).stem,
139
            },
140
        )
141

142
    else:
143
        # Voltage min resolution
144
        min_res = peak_voltage / max_count  # V
6✔
145
        # Voltage at which sensor is saturated
146
        max_sat = peak_voltage  # V
6✔
147

148
        out = xr.DataArray(
6✔
149
            raw_voltage,
150
            coords={"time": time[:-1]},
151
            attrs={
152
                "units": "V",
153
                "resolution": np.round(min_res, 6),
154
                "valid_min": -max_sat,
155
                "valid_max": max_sat,
156
                "fs": fs,
157
                "filename": Path(filename).stem,
158
            },
159
        )
160

161
    return out
6✔
162

163

164
def read_soundtrap(filename, sensitivity=None, gain=0):
6✔
165
    """
166
    Read .wav file from an Ocean Instruments SoundTrap hydrophone.
167
    Returns voltage timeseries if sensitivity not provided, returns pressure
168
    timeseries if it is provided.
169

170
    Parameters
171
    ----------
172
    filename: string
173
        Input filename
174
    sensitivity: numeric
175
        Hydrophone calibration sensitivity in dB re 1 V/uPa.
176
        Should be negative.
177
    gain: numeric
178
        Amplifier gain in dB re 1 V/uPa. Default 0.
179

180
    Returns
181
    -------
182
    out: numpy.array
183
        Sound pressure [Pa] or Voltage [V] indexed by time[s]
184
    """
185

186
    # Get time from filename
187
    st = filename.split(".")[-2]
6✔
188
    start_time = (
6✔
189
        "20"
190
        + st[:2]
191
        + "-"
192
        + st[2:4]
193
        + "-"
194
        + st[4:6]
195
        + "T"
196
        + st[6:8]
197
        + ":"
198
        + st[8:10]
199
        + ":"
200
        + st[10:12]
201
    )
202

203
    # Soundtrap uses a peak voltage of 1 V
204
    out = read_hydrophone(
6✔
205
        filename,
206
        peak_voltage=1,
207
        sensitivity=sensitivity,
208
        gain=gain,
209
        start_time=start_time,
210
    )
211
    out.attrs["make"] = "SoundTrap"
6✔
212

213
    return out
6✔
214

215

216
def read_iclisten(filename, sensitivity=None, use_metadata=True):
6✔
217
    """
218
    Read .wav file from an Ocean Sonics icListen "Smart" hydrophone.
219
    Returns voltage timeseries if sensitivity not provided, returns pressure
220
    timeseries if it is provided.
221

222
    Parameters
223
    ----------
224
    filename: string
225
        Input filename
226
    sensitivity: numeric
227
        Hydrophone calibration sensitivity in dB re 1 V/uPa.
228
        Should be negative. Default: None.
229
    use_metadata: bool
230
        If True and `sensitivity` = None, applies sensitivity value stored in .wav file LIST block.
231
        If False and `sensitivity` = None, a sensitivity value isn't applied
232

233
    Returns
234
    -------
235
    out: numpy.array
236
        Sound pressure [Pa] or [V] indexed by time[s]
237
    """
238

239
    # Read icListen metadata from file header
240
    with open(filename, "rb") as f:
6✔
241
        # Read header keys
242
        f.read(4)  # riff_key
6✔
243
        f.read(4)  # file_size "<I"
6✔
244
        f.read(4)  # wave_key
6✔
245
        f.read(4)  # list_key
6✔
246

247
        # Read metadata keys
248
        f.read(4)  # list_size "<I"
6✔
249
        f.read(4)  # info_key
6✔
250

251
        # Hydrophone make and SN
252
        f.read(4)  # iart_key
6✔
253
        iart_size = struct.unpack("<I", f.read(4))[0]
6✔
254
        iart = f.read(iart_size).decode().rstrip("\x00")
6✔
255

256
        # Hydrophone model
257
        f.read(4)  # iprd_key
6✔
258
        iprd_size = struct.unpack("<I", f.read(4))[0]
6✔
259
        iprd = f.read(iprd_size).decode().rstrip("\x00")
6✔
260

261
        # File creation date
262
        f.read(4)  # icrd_key
6✔
263
        icrd_size = struct.unpack("<I", f.read(4))[0]
6✔
264
        icrd = f.read(icrd_size).decode().rstrip("\x00")
6✔
265

266
        # Hydrophone make and software version
267
        f.read(4)  # isft_key
6✔
268
        isft_size = struct.unpack("<I", f.read(4))[0]
6✔
269
        isft = f.read(isft_size).decode().rstrip("\x00")
6✔
270

271
        # Original filename
272
        f.read(4)  # inam_key
6✔
273
        inam_size = struct.unpack("<I", f.read(4))[0]
6✔
274
        inam = f.read(inam_size).decode().rstrip("\x00")
6✔
275

276
        # Additional comments
277
        f.read(4)  # icmt_key
6✔
278
        icmt_size = struct.unpack("<I", f.read(4))[0]
6✔
279
        icmt = f.read(icmt_size).decode().rstrip("\x00")
6✔
280

281
        fields = icmt.split(",")
6✔
282
        peak_voltage = fields[0]
6✔
283
        stored_sensitivity = fields[1].lstrip()
6✔
284
        humidity = fields[2].lstrip()
6✔
285
        temp = fields[3].lstrip()
6✔
286
        if len(fields) > 6:
6✔
287
            accel = ",".join(fields[4:7]).lstrip()
6✔
288
            mag = ",".join(fields[7:10]).lstrip()
6✔
289
        else:
NEW
290
            accel = []
×
NEW
291
            mag = []
×
292
        count_at_peak_voltage = fields[-2].lstrip()
6✔
293
        n_sequence = fields[-1].lstrip()
6✔
294

295
    peak_voltage = float(peak_voltage.split(" ")[0])
6✔
296
    stored_sensitivity = int(stored_sensitivity.split(" ")[0])
6✔
297

298
    # Use stored sensitivity
299
    if use_metadata and (sensitivity is None):
6✔
300
        sensitivity = stored_sensitivity
6✔
301

302
    out = read_hydrophone(
6✔
303
        filename,
304
        peak_voltage=peak_voltage,
305
        sensitivity=sensitivity,
306
        gain=0,
307
        start_time=np.datetime64(icrd),
308
    )
309

310
    out.attrs.update(
6✔
311
        {
312
            "serial_num": iart,
313
            "model": iprd,
314
            "software_ver": isft,
315
            "filename": inam + ".wav",
316
            "peak_voltage": peak_voltage,
317
            "sensitivity": sensitivity,
318
            "humidity": humidity,
319
            "temperature": temp,
320
            "accelerometer": accel,
321
            "magnetometer": mag,
322
            "count_at_peak_voltage": count_at_peak_voltage,
323
            "sequence_num": n_sequence,
324
        }
325
    )
326

327
    return out
6✔
328

329

330
def export_audio(filename, pressure, gain=1):
6✔
331
    """
332
    Creates human-scaled audio file from underwater recording.
333

334
    Parameters
335
    ----------
336
    filename: string
337
        Output filename for the WAV file (without extension).
338
    pressure: object
339
        Sound pressure object with attributes 'values' (numpy array of pressure data),
340
        'sensitivity' (sensitivity of the hydrophone in dB), and 'fs' (sampling frequency in Hz).
341
    gain: numeric, optional
342
        Gain to multiply original time series by. Default is 1.
343
    """
344

345
    # Convert from Pascals to UPa
346
    upa = pressure.values.T * 1e6
6✔
347
    # Change to voltage waveform
348
    v = upa * 10 ** (pressure.sensitivity / 20)  # in V
6✔
349
    # Normalize
350
    v = v / max(abs(v)) * gain
6✔
351
    # Convert to (little-endian) 16 bit integers.
352
    audio = (v * (2**16 - 1)).astype("<h")
6✔
353

354
    # pylint incorrectly thinks this is opening in read mode
355
    with wave.open(f"{filename}.wav", mode="w") as f:
6✔
356
        f.setnchannels(1)  # pylint: disable=no-member
6✔
357
        f.setsampwidth(2)  # pylint: disable=no-member
6✔
358
        f.setframerate(pressure.fs)  # pylint: disable=no-member
6✔
359
        f.writeframes(audio.tobytes())  # pylint: disable=no-member
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc