• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 11745726274

08 Nov 2024 04:20PM UTC coverage: 82.129% (+2.1%) from 80.0%
11745726274

push

github

actions-user
Bump version: 1.4.4 → 1.4.5

336 of 452 branches covered (74.34%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

30 existing lines in 5 files now uncovered.

1346 of 1596 relevant lines covered (84.34%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.58
/eitprocessing/features/breath_detection.py
1
import itertools
1✔
2
import math
1✔
3
from collections.abc import Callable
1✔
4
from dataclasses import dataclass
1✔
5

6
import numpy as np
1✔
7
from numpy.typing import ArrayLike
1✔
8
from scipy import signal
1✔
9

10
from eitprocessing.datahandling.breath import Breath
1✔
11
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
12
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
13
from eitprocessing.datahandling.sequence import Sequence
1✔
14
from eitprocessing.features.moving_average import MovingAverage
1✔
15

16

17
@dataclass(kw_only=True)
1✔
18
class BreathDetection:
1✔
19
    """Algorithm for detecting breaths in data representing respiration.
20

21
    This algorithm detects the position of breaths in data by detecting valleys (local minimum values) and peaks (local
22
    maximum values) in data. BreathDetection has a default minimum duration of breaths to be detected. The minimum
23
    duration should be short enough to include the shortest expected breath in the data. The minimum duration is
24
    implemented as the minimum time between peaks and between valleys.
25

26
    Examples:
27
    ```
28
    >>> bd = BreathDetection(minimum_duration=0.5)
29
    >>> breaths = bd.find_breaths(
30
    ...     sequency=seq,
31
    ...     continuousdata_label="global_impedance_(raw)"
32
    ... )
33
    ```
34

35
    ```
36
    >>> global_impedance = seq.continuous_data["global_impedance_(raw)"]
37
    >>> breaths = bd.find_breaths(continuous_data=global_impedance)
38
    ```
39

40
    Args:
41
        minimum_duration: minimum expected duration of breaths, defaults to 2/3 of a second
42
        averaging_window_duration: duration of window used for averaging the data, defaults to 15 seconds
43
        averaging_window_function: function used to create a window for averaging the data, defaults to np.blackman
44
        amplitude_cutoff_fraction: fraction of the median amplitude below which breaths are removed, defaults to 0.25
45
        invalid_data_removal_window_length: window around invalid data in which breaths are removed, defaults to 0.5
46
        invalid_data_removal_percentile: the nth percentile of values used to remove outliers, defaults to 5
47
        invalid_data_removal_multiplier: the multiplier used to remove outliers, defaults to 4
48
    """
49

50
    minimum_duration: float = 2 / 3
1✔
51
    averaging_window_duration: float = 15
1✔
52
    averaging_window_function: Callable[[int], ArrayLike] | None = np.blackman
1✔
53
    amplitude_cutoff_fraction: float | None = 0.25
1✔
54
    invalid_data_removal_window_length: float = 0.5
1✔
55
    invalid_data_removal_percentile: int = 5
1✔
56
    invalid_data_removal_multiplier: int = 4
1✔
57

58
    def find_breaths(
1✔
59
        self,
60
        continuous_data: ContinuousData,
61
        result_label: str = "breaths",
62
        sequence: Sequence | None = None,
63
        store: bool | None = None,
64
    ) -> IntervalData:
65
        """Find breaths based on peaks and valleys, removing edge cases and breaths during invalid data.
66

67
        First, it naively finds any peaks that are a certain distance apart and higher than the moving average, and
68
        similarly valleys that are a certain distance apart and below the moving average.
69

70
        Next, valleys at the start and end of the signal are removed to ensure the first and last valleys are actual
71
        valleys, and not just the start or end of the signal. Peaks before the first or after the last valley are
72
        removed, to ensure peaks always fall between two valleys.
73

74
        At this point, it is possible multiple peaks exist between two valleys. Lower peaks are removed leaving only the
75
        highest peak between two valleys. Similarly, multiple valleys between two peaks are reduced to only the lowest
76
        valley.
77

78
        As a last step, breaths with a low amplitude (the average between the inspiratory and expiratory amplitudes) are
79
        removed.
80

81
        Breaths are constructed as a valley-peak-valley combination, representing the start of inspiration, the end of
82
        inspiration/start of expiration, and end of expiration.
83

84
        Args:
85
            continuous_data: optional, a ContinuousData object that contains the data
86
            result_label: label of the returned IntervalData object, defaults to `'breaths'`.
87
            sequence: optional, Sequence that contains the object to detect breaths in, and/or to store the result in
88
            store: whether to store the result in the sequence, defaults to `True` if a Sequence if provided.
89

90
        Returns:
91
            An IntervalData object containing Breath objects.
92
        """
93
        if not isinstance(continuous_data, ContinuousData):
1✔
94
            msg = f"`continuous_data` should be a ContinuousData object, not {type(continuous_data)}"
1✔
95
            raise TypeError(msg)
1✔
96

97
        if store is None and sequence:
1✔
98
            store = True
1✔
99

100
        if store and sequence is None:
1!
UNCOV
101
            msg = "Can't store the result if not Sequence is provided."
×
UNCOV
102
            raise RuntimeError(msg)
×
103

104
        data = continuous_data.values
1✔
105
        time = continuous_data.time
1✔
106
        sample_frequency = continuous_data.sample_frequency
1✔
107

108
        invalid_data_indices = self._detect_invalid_data(data)
1✔
109
        data = self._remove_invalid_data(data, invalid_data_indices)
1✔
110

111
        peak_indices, valley_indices = self._detect_peaks_and_valleys(data, sample_frequency)
1✔
112

113
        breaths = self._create_breaths_from_peak_valley_data(
1✔
114
            time,
115
            peak_indices,
116
            valley_indices,
117
        )
118
        breaths = self._remove_breaths_around_invalid_data(breaths, time, sample_frequency, invalid_data_indices)
1✔
119
        breaths_container = IntervalData(
1✔
120
            label=result_label,
121
            name="Breaths as determined by BreathDetection",
122
            unit=None,
123
            category="breath",
124
            intervals=[(breath.start_time, breath.end_time) for breath in breaths],
125
            values=breaths,
126
            parameters={type(self): dict(vars(self))},
127
            derived_from=[continuous_data],
128
        )
129

130
        if store:
1✔
131
            sequence.interval_data.add(breaths_container)
1✔
132

133
        return breaths_container
1✔
134

135
    def _detect_invalid_data(self, data: np.ndarray) -> np.ndarray:
1✔
136
        """Detects invalid data as outliers outside an upper and lower cutoff.
137

138
        This function defines a lower and upper cutoff. Data beyond those cutoffs is considered invalid for the purposes
139
        of breath detection.
140

141
        The lower cutoff is a distance away from the mean. The distance is m times the distance between the mean and the
142
        nth percentile of the data. The upper cutoff is m times the distance between the mean and the (100 - n)th
143
        percentile. m is given by `invalid_data_removal_multiplier` and n is given by `invalid_data_removal_percentile`.
144

145
        For example, with m = 4 and n = 5, the mean = 100, 5% of the data is below/equal to 90, and 5% of the data is
146
        above/equal to 120, all data below 100 - (4 * 10) = 60 and above 100 + (4 * 20) = 180 is considerd invalid.
147

148
        Args:
149
            data (np.ndarray): 1D array with impedance data
150

151
        Returns:
152
            np.ndarray: the indices of the data points with values outside the lower and upper cutoff values.
153
        """
154
        data_mean = np.mean(data)
1✔
155

156
        lower_percentile = np.percentile(data, self.invalid_data_removal_percentile)
1✔
157
        cutoff_low = data_mean - (data_mean - lower_percentile) * self.invalid_data_removal_multiplier
1✔
158

159
        upper_percentile = np.percentile(data, 100 - self.invalid_data_removal_percentile)
1✔
160
        cutoff_high = data_mean + (upper_percentile - data_mean) * self.invalid_data_removal_multiplier
1✔
161

162
        # detect indices of outliers
163
        return np.flatnonzero((data < cutoff_low) | (data > cutoff_high))
1✔
164

165
    def _remove_invalid_data(self, data: np.ndarray, invalid_data_indices: np.ndarray) -> np.ndarray:
1✔
166
        """Removes invalid data points and replace them with the nearest non-np.nan value."""
167
        data = np.copy(data)
1✔
168
        data[invalid_data_indices] = np.nan
1✔
169
        return self._fill_nan_with_nearest_neighbour(data)
1✔
170

171
    def _detect_peaks_and_valleys(self, data: np.ndarray, sample_frequency: float) -> tuple[np.ndarray, np.ndarray]:
1✔
172
        window_size = int(sample_frequency * self.averaging_window_duration)
1✔
173
        averager = MovingAverage(window_size=window_size, window_function=self.averaging_window_function)
1✔
174
        moving_average = averager.apply(data)
1✔
175

176
        peak_indices = self._find_extrema(data, moving_average, sample_frequency)
1✔
177
        valley_indices = self._find_extrema(data, moving_average, sample_frequency, invert=True)
1✔
178

179
        peak_indices, valley_indices = self._remove_edge_cases(data, peak_indices, valley_indices, moving_average)
1✔
180
        peak_indices, valley_indices = self._remove_doubles(data, peak_indices, valley_indices)
1✔
181
        peak_indices, valley_indices = self._remove_low_amplitudes(data, peak_indices, valley_indices)
1✔
182
        return peak_indices, valley_indices
1✔
183

184
    def _find_extrema(
1✔
185
        self,
186
        data: np.ndarray,
187
        moving_average: np.ndarray,
188
        sample_frequency: float,
189
        invert: bool = False,
190
    ) -> np.ndarray:
191
        """Find extrema (peaks or valleys) in the data.
192

193
        This method finds extrema (either peaks or valleys) in the data using the `scipy.signal.find_peaks()` function.
194
        The minimum distance (in time) between peaks is determined by the `minimum_duration` attribute.
195

196
        To find peaks, `invert` should be False. To find valleys, `invert` should be True, which inverts the data before
197
        finding peaks.
198

199
        Args:
200
            data (np.ndarray): a 1D array containing the data.
201
            moving_average (np.ndarray): a 1D array containing the moving average of the data.
202
            sample_frequency (float): sample frequency of the data and moving average
203
            invert (float, optional): whether to invert the data before
204
            detecting peaks. Defaults to False.
205

206
        Returns:
207
            np.ndarray: a 1D-array containing the indices of peaks or valleys.
208
        """
209
        data_ = -data if invert else data
1✔
210
        moving_average_ = -moving_average if invert else moving_average
1✔
211
        extrema_indices, _ = signal.find_peaks(
1✔
212
            data_,
213
            distance=max(self.minimum_duration * sample_frequency, 1),
214
            height=moving_average_,
215
        )
216

217
        return extrema_indices
1✔
218

219
    def _remove_edge_cases(
1✔
220
        self,
221
        data: np.ndarray,
222
        peak_indices: np.ndarray,
223
        valley_indices: np.ndarray,
224
        moving_average: np.ndarray,
225
    ) -> tuple[np.ndarray, np.ndarray]:
226
        """Remove overdetected peaks/valleys at the start and end of the data.
227

228
        A valley at the start of the data is deemed invalid if the data before the first valley stays below the moving
229
        average at the valley. The same is true for the last valley and the data after that valley. This ensures a
230
        valley is a true valley and not just a local minimum with the true valley cut off.
231

232
        Then, all peaks that occur before the first and after the last valley are removed. This ensures peaks only fall
233
        between valleys.
234

235
        Args:
236
            data (np.ndarray): the data in which the peaks/valleys were detected
237
            peak_indices (np.ndarray): indices of the peaks
238
            valley_indices (np.ndarray): indices of the valleys
239
            moving_average (np.ndarray): the moving average of data
240

241
        Returns:
242
            A tuple (peak_indices, peak_values) with edge cases removed.
243
        """
244
        if max(data[: valley_indices[0]]) < moving_average[valley_indices[0]]:
1✔
245
            # remove the first valley, if the data before that valley is not
246
            # high enough to be sure it's a valley
247
            valley_indices = np.delete(valley_indices, 0)
1✔
248

249
        if max(data[valley_indices[-1] :]) < moving_average[valley_indices[-1]]:
1✔
250
            # remove the last valley, if the data after that valley is not high
251
            # enough to be sure it's a valley
252
            valley_indices = np.delete(valley_indices, -1)
1✔
253

254
        # remove peaks that come before the first valley
255
        keep_peaks = peak_indices > valley_indices[0]
1✔
256
        peak_indices = peak_indices[keep_peaks]
1✔
257

258
        # remove peaks that come after the last valley
259
        keep_peaks = peak_indices < valley_indices[-1]
1✔
260
        peak_indices = peak_indices[keep_peaks]
1✔
261

262
        return peak_indices, valley_indices
1✔
263

264
    def _remove_doubles(
1✔
265
        self,
266
        data: np.ndarray,
267
        peak_indices: np.ndarray,
268
        valley_indices: np.ndarray,
269
    ) -> tuple[np.ndarray, np.ndarray]:
270
        """Remove double peaks/valleys.
271

272
        This method ensures there is only one peak between valleys, and only one valley between peaks. If there are
273
        multiple peaks between two valleys, the peak with the highest value is kept and the others are removed. If there
274
        are no peaks between several valleys (i.e. multiple valleys between peaks) the valley with the lowest value is
275
        kept, while the others are removed.
276

277
        This method does not remove peaks before the first or after the last valley.
278

279
        Args:
280
            data: data the peaks and valleys were found in
281
            peak_indices: indices of the peaks
282
            valley_indices: indices of the valleys
283

284
        Returns:
285
            tuple: a tuple of length 2 with the peak_indices and valley_indices with double peaks/valleys removed.
286
        """
287
        peak_values = data[peak_indices]
1✔
288
        valley_values = data[valley_indices]
1✔
289

290
        current_valley_index = 0
1✔
291
        while current_valley_index < len(valley_indices) - 1:
1✔
292
            start_index = valley_indices[current_valley_index]
1✔
293
            end_index = valley_indices[current_valley_index + 1]
1✔
294
            peaks_between_valleys = np.argwhere(
1✔
295
                (peak_indices > start_index) & (peak_indices < end_index),
296
            )
297
            if not len(peaks_between_valleys):
1✔
298
                # no peak between valleys, remove highest valley
299
                delete_valley_index = (
1✔
300
                    current_valley_index
301
                    if valley_values[current_valley_index] > valley_values[current_valley_index + 1]
302
                    else current_valley_index + 1
303
                )
304
                valley_indices = np.delete(valley_indices, delete_valley_index)
1✔
305
                valley_values = np.delete(valley_values, delete_valley_index)
1✔
306
                continue
1✔
307

308
            if len(peaks_between_valleys) > 1:
1✔
309
                # multiple peaks between valleys, remove lowest peak
310
                delete_peak_index = (
1✔
311
                    peaks_between_valleys[0]
312
                    if peak_values[peaks_between_valleys[0]] < peak_values[peaks_between_valleys[1]]
313
                    else peaks_between_valleys[1]
314
                )
315
                peak_indices = np.delete(peak_indices, delete_peak_index)
1✔
316
                peak_values = np.delete(peak_values, delete_peak_index)
1✔
317
                continue
1✔
318

319
            current_valley_index += 1
1✔
320

321
        return peak_indices, valley_indices
1✔
322

323
    def _remove_low_amplitudes(
1✔
324
        self,
325
        data: np.ndarray,
326
        peak_indices: np.ndarray,
327
        valley_indices: np.ndarray,
328
    ) -> tuple[np.ndarray, np.ndarray]:
329
        """Remove peaks if the amplitude is low compared to the median amplitude.
330

331
        The amplitude of a peak is determined as the average vertical distance between the peak value and the two valley
332
        values besides it. The cutoff value for the amplitude is calculated as the median amplitude times
333
        `amplitude_cutoff_fraction`. Peaks that have an amplitude below the cutoff are removed. Then,
334
        `_remove_doubles()` is called to remove either of the valleys next to the peak.
335

336
        If `amplitude_cutoff_fraction` is None, the input is returned unchanged.
337

338
        Args:
339
            data: the data the peaks and valleys were found in
340
            peak_indices (np.ndarray): the indices of the peaks
341
            valley_indices (np.ndarray): the indices of the valleys
342

343
        Returns:
344
            A tuple (peak_indices, valley_indices) with low-amplitude breaths removed.
345
        """
346
        if len(peak_indices) == 0 or len(valley_indices) == 0:
1✔
347
            return peak_indices, valley_indices
1✔
348

349
        if not self.amplitude_cutoff_fraction:
1✔
350
            return peak_indices, valley_indices
1✔
351

352
        peak_values = data[peak_indices]
1✔
353
        valley_values = data[valley_indices]
1✔
354

355
        inspiratory_amplitude = peak_values - valley_values[:-1]
1✔
356
        expiratory_amplitude = peak_values - valley_values[1:]
1✔
357
        amplitude = (inspiratory_amplitude + expiratory_amplitude) / 2
1✔
358

359
        amplitude_cutoff = self.amplitude_cutoff_fraction * np.median(amplitude)
1✔
360
        delete_peaks = np.argwhere(amplitude < amplitude_cutoff)
1✔
361

362
        peak_indices = np.delete(peak_indices, delete_peaks)
1✔
363
        peak_values = np.delete(peak_values, delete_peaks)
1✔
364

365
        return self._remove_doubles(data, peak_indices, valley_indices)
1✔
366

367
    def _create_breaths_from_peak_valley_data(
1✔
368
        self,
369
        time: np.ndarray,
370
        peak_indices: np.ndarray,
371
        valley_indices: np.ndarray,
372
    ) -> list[Breath]:
373
        return [
1✔
374
            Breath(time[start], time[middle], time[end])
375
            for middle, (start, end) in zip(
376
                peak_indices,
377
                itertools.pairwise(valley_indices),
378
                strict=True,
379
            )
380
        ]
381

382
    def _remove_breaths_around_invalid_data(
1✔
383
        self,
384
        breaths: list[Breath],
385
        time: np.ndarray,
386
        sample_frequency: float,
387
        invalid_data_indices: np.ndarray,
388
    ) -> list[Breath]:
389
        """Remove breaths overlapping with invalid data.
390

391
        Breaths that start within a window length (given by invalid_data_removal_window_length) of invalid data are
392
        removed.
393

394
        Args:
395
            breaths: list of detected breath objects
396
            time: time axis belonging to the data
397
            sample_frequency: sample frequency of the data and time
398
            invalid_data_indices: indices of invalid data points
399
        """
400
        # TODO: write more general(ized) method of determining invalid data
401

402
        new_breaths = breaths[:]
1✔
403

404
        if not len(invalid_data_indices):
1✔
405
            return new_breaths
1✔
406

407
        invalid_data_values = np.zeros(time.shape)
1✔
408
        invalid_data_values[invalid_data_indices] = 1  # gives the value 1 to each invalid datapoint
1✔
409

410
        window_length = math.ceil(self.invalid_data_removal_window_length * sample_frequency)
1✔
411

412
        for breath in new_breaths[:]:
1✔
413
            breath_start_minus_window = max(0, np.argmax(time == breath.start_time) - window_length)
1✔
414
            breath_end_plus_window = min(len(invalid_data_values), np.argmax(time == breath.end_time) + window_length)
1✔
415

416
            # if no invalid datapoints are within the window, np.max() will return 0
417
            # if any invalid datapoints are within the window, np.max() will return 1
418
            if np.max(invalid_data_values[breath_start_minus_window:breath_end_plus_window]):
1✔
419
                new_breaths.remove(breath)
1✔
420

421
        return new_breaths
1✔
422

423
    @staticmethod
1✔
424
    def _fill_nan_with_nearest_neighbour(data: np.ndarray) -> np.ndarray:
1✔
425
        """Fill np.nan values in a 1D array with the nearest non-np.nan value.
426

427
        Each np.nan-value is replaced with the nearest (backwards and forwards) non-np.nan value. If the nearest earlier
428
        and a later value are the same distance away, the earlier value is preferred. np.nan-values at the start are
429
        filled with the first non-nan value.
430

431
        Example:
432
            foo = np.ndarray([np.nan, 1, np.nan, np.nan, np.nan, 3, np.nan, np.nan])
433
            bar = BreathDetection._fill_nan_with_nearest_neighbour(foo)
434
            assert bar == np.ndarray([1, 1, 1, 1, 3, 3, 3, 3])
435
        """
436
        data = np.copy(data)
1✔
437
        nan_indices = np.flatnonzero(np.isnan(data))
1✔
438

439
        if not len(nan_indices):
1✔
440
            return data
1✔
441

442
        if len(nan_indices) == len(data):
1!
443
            msg = "`data` only contains np.nan values. "
×
444
            raise ValueError(msg)
×
445

446
        grouped_nan_indices = np.split(nan_indices, np.where(np.diff(nan_indices) != 1)[0] + 1)
1✔
447

448
        for group in grouped_nan_indices:
1✔
449
            if group[0] == 0:
1!
UNCOV
450
                data[group] = data[group[-1] + 1]
×
UNCOV
451
                continue
×
452

453
            if group[-1] == len(data) - 1:
1!
UNCOV
454
                data[group] = data[group[0] - 1]
×
UNCOV
455
                continue
×
456

457
            middle = len(group) // 2
1✔
458
            data[group[:middle]] = data[group[0] - 1]
1✔
459
            data[group[middle:]] = data[group[-1] + 1]
1✔
460
        return data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc