• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 17213080321

25 Aug 2025 03:19PM UTC coverage: 84.761% (+2.0%) from 82.774%
17213080321

push

github

psomhorst
Bump version: 1.7.3 → 1.8.0

745 of 958 branches covered (77.77%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

37 existing lines in 9 files now uncovered.

2737 of 3150 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.67
/eitprocessing/features/breath_detection.py
1
import itertools
1✔
2
import math
1✔
3
from collections.abc import Callable
1✔
4
from dataclasses import dataclass
1✔
5

6
import numpy as np
1✔
7
from numpy.typing import ArrayLike
1✔
8
from scipy import signal
1✔
9

10
from eitprocessing.datahandling.breath import Breath
1✔
11
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
12
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
13
from eitprocessing.datahandling.sequence import Sequence
1✔
14
from eitprocessing.features.moving_average import MovingAverage
1✔
15

16

17
@dataclass(kw_only=True)
1✔
18
class BreathDetection:
1✔
19
    """Algorithm for detecting breaths in data representing respiration.
20

21
    This algorithm detects the position of breaths in data by detecting valleys (local minimum values) and peaks (local
22
    maximum values) in data. BreathDetection has a default minimum duration of breaths to be detected. The minimum
23
    duration should be short enough to include the shortest expected breath in the data. The minimum duration is
24
    implemented as the minimum time between peaks and between valleys.
25

26
    Examples:
27
    ```
28
    >>> bd = BreathDetection(minimum_duration=0.5)
29
    >>> breaths = bd.find_breaths(
30
    ...     sequency=seq,
31
    ...     continuousdata_label="global_impedance_(raw)"
32
    ... )
33
    ```
34

35
    ```
36
    >>> global_impedance = seq.continuous_data["global_impedance_(raw)"]
37
    >>> breaths = bd.find_breaths(continuous_data=global_impedance)
38
    ```
39

40
    Args:
41
        minimum_duration: minimum expected duration of breaths, defaults to 2/3 of a second
42
        averaging_window_duration: duration of window used for averaging the data, defaults to 15 seconds
43
        averaging_window_function: function used to create a window for averaging the data, defaults to np.blackman
44
        amplitude_cutoff_fraction: fraction of the median amplitude below which breaths are removed, defaults to 0.25
45
        invalid_data_removal_window_length: window around invalid data in which breaths are removed, defaults to 0.5
46
        invalid_data_removal_percentile: the nth percentile of values used to remove outliers, defaults to 5
47
        invalid_data_removal_multiplier: the multiplier used to remove outliers, defaults to 4
48
    """
49

50
    minimum_duration: float = 2 / 3
1✔
51
    averaging_window_duration: float = 15
1✔
52
    averaging_window_function: Callable[[int], ArrayLike] | None = np.blackman
1✔
53
    amplitude_cutoff_fraction: float | None = 0.25
1✔
54
    invalid_data_removal_window_length: float = 0.5
1✔
55
    invalid_data_removal_percentile: int = 5
1✔
56
    invalid_data_removal_multiplier: int = 4
1✔
57

58
    def find_breaths(
1✔
59
        self,
60
        continuous_data: ContinuousData,
61
        result_label: str = "breaths",
62
        sequence: Sequence | None = None,
63
        store: bool | None = None,
64
    ) -> IntervalData:
65
        """Find breaths based on peaks and valleys, removing edge cases and breaths during invalid data.
66

67
        First, it naively finds any peaks that are a certain distance apart and higher than the moving average, and
68
        similarly valleys that are a certain distance apart and below the moving average.
69

70
        Next, valleys at the start and end of the signal are removed to ensure the first and last valleys are actual
71
        valleys, and not just the start or end of the signal. Peaks before the first or after the last valley are
72
        removed, to ensure peaks always fall between two valleys.
73

74
        At this point, it is possible multiple peaks exist between two valleys. Lower peaks are removed leaving only the
75
        highest peak between two valleys. Similarly, multiple valleys between two peaks are reduced to only the lowest
76
        valley.
77

78
        As a last step, breaths with a low amplitude (the average between the inspiratory and expiratory amplitudes) are
79
        removed.
80

81
        Breaths are constructed as a valley-peak-valley combination, representing the start of inspiration, the end of
82
        inspiration/start of expiration, and end of expiration.
83

84
        Args:
85
            continuous_data: optional, a ContinuousData object that contains the data
86
            result_label: label of the returned IntervalData object, defaults to `'breaths'`.
87
            sequence: optional, Sequence that contains the object to detect breaths in, and/or to store the result in
88
            store: whether to store the result in the sequence, defaults to `True` if a Sequence if provided.
89

90
        Returns:
91
            An IntervalData object containing Breath objects.
92
        """
93
        if not isinstance(continuous_data, ContinuousData):
1✔
94
            msg = f"`continuous_data` should be a ContinuousData object, not {type(continuous_data)}"
1✔
95
            raise TypeError(msg)
1✔
96

97
        if store is None and sequence:
1✔
98
            store = True
1✔
99

100
        if store and sequence is None:
1!
101
            msg = "Can't store the result if not Sequence is provided."
×
102
            raise RuntimeError(msg)
×
103

104
        data = continuous_data.values
1✔
105
        time = continuous_data.time
1✔
106
        sample_frequency = continuous_data.sample_frequency
1✔
107

108
        invalid_data_indices = self._detect_invalid_data(data)
1✔
109
        data = self._remove_invalid_data(data, invalid_data_indices)
1✔
110

111
        peak_indices, valley_indices = self._detect_peaks_and_valleys(data, sample_frequency)
1✔
112

113
        breaths = self._create_breaths_from_peak_valley_data(
1✔
114
            time,
115
            peak_indices,
116
            valley_indices,
117
        )
118
        breaths = self._remove_breaths_around_invalid_data(breaths, time, sample_frequency, invalid_data_indices)
1✔
119
        breaths_container = IntervalData(
1✔
120
            label=result_label,
121
            name="Breaths as determined by BreathDetection",
122
            unit=None,
123
            category="breath",
124
            intervals=[(breath.start_time, breath.end_time) for breath in breaths],
125
            values=breaths,
126
            parameters={type(self): dict(vars(self))},
127
            derived_from=[continuous_data],
128
        )
129

130
        if store:
1✔
131
            sequence.interval_data.add(breaths_container)
1✔
132

133
        return breaths_container
1✔
134

135
    def _detect_invalid_data(self, data: np.ndarray) -> np.ndarray:
1✔
136
        """Detects invalid data as outliers outside an upper and lower cutoff.
137

138
        This function defines a lower and upper cutoff. Data beyond those cutoffs is considered invalid for the purposes
139
        of breath detection.
140

141
        The lower cutoff is a distance away from the mean. The distance is m times the distance between the mean and the
142
        nth percentile of the data. The upper cutoff is m times the distance between the mean and the (100 - n)th
143
        percentile. m is given by `invalid_data_removal_multiplier` and n is given by `invalid_data_removal_percentile`.
144

145
        For example, with m = 4 and n = 5, the mean = 100, 5% of the data is below/equal to 90, and 5% of the data is
146
        above/equal to 120, all data below 100 - (4 * 10) = 60 and above 100 + (4 * 20) = 180 is considerd invalid.
147

148
        Args:
149
            data (np.ndarray): 1D array with impedance data
150

151
        Returns:
152
            np.ndarray: the indices of the data points with values outside the lower and upper cutoff values.
153
        """
154
        data_mean = np.mean(data)
1✔
155

156
        lower_percentile = np.percentile(data, self.invalid_data_removal_percentile)
1✔
157
        cutoff_low = data_mean - (data_mean - lower_percentile) * self.invalid_data_removal_multiplier
1✔
158

159
        upper_percentile = np.percentile(data, 100 - self.invalid_data_removal_percentile)
1✔
160
        cutoff_high = data_mean + (upper_percentile - data_mean) * self.invalid_data_removal_multiplier
1✔
161

162
        # detect indices of outliers
163
        return np.flatnonzero((data < cutoff_low) | (data > cutoff_high))
1✔
164

165
    def _remove_invalid_data(self, data: np.ndarray, invalid_data_indices: np.ndarray) -> np.ndarray:
1✔
166
        """Removes invalid data points and replace them with the nearest non-np.nan value."""
167
        data = np.copy(data)
1✔
168
        data[invalid_data_indices] = np.nan
1✔
169
        return self._fill_nan_with_nearest_neighbour(data)
1✔
170

171
    def _detect_peaks_and_valleys(self, data: np.ndarray, sample_frequency: float) -> tuple[np.ndarray, np.ndarray]:
1✔
172
        window_size = int(sample_frequency * self.averaging_window_duration)
1✔
173
        averager = MovingAverage(window_size=window_size, window_function=self.averaging_window_function)
1✔
174
        moving_average = averager.apply(data)
1✔
175

176
        peak_indices = self._find_extrema(data, moving_average, sample_frequency)
1✔
177
        valley_indices = self._find_extrema(data, moving_average, sample_frequency, invert=True)
1✔
178

179
        if len(valley_indices) < 2 or len(peak_indices) < 1:  # noqa: PLR2004
1✔
180
            return np.array([], dtype=int), np.array([], dtype=int)
1✔
181

182
        peak_indices, valley_indices = self._remove_edge_cases(data, peak_indices, valley_indices, moving_average)
1✔
183
        peak_indices, valley_indices = self._remove_doubles(data, peak_indices, valley_indices)
1✔
184
        peak_indices, valley_indices = self._remove_low_amplitudes(data, peak_indices, valley_indices)
1✔
185
        return peak_indices, valley_indices
1✔
186

187
    def _find_extrema(
1✔
188
        self,
189
        data: np.ndarray,
190
        moving_average: np.ndarray,
191
        sample_frequency: float,
192
        invert: bool = False,
193
    ) -> np.ndarray:
194
        """Find extrema (peaks or valleys) in the data.
195

196
        This method finds extrema (either peaks or valleys) in the data using the `scipy.signal.find_peaks()` function.
197
        The minimum distance (in time) between peaks is determined by the `minimum_duration` attribute.
198

199
        To find peaks, `invert` should be False. To find valleys, `invert` should be True, which inverts the data before
200
        finding peaks.
201

202
        Args:
203
            data (np.ndarray): a 1D array containing the data.
204
            moving_average (np.ndarray): a 1D array containing the moving average of the data.
205
            sample_frequency (float): sample frequency of the data and moving average
206
            invert (float, optional): whether to invert the data before
207
            detecting peaks. Defaults to False.
208

209
        Returns:
210
            np.ndarray: a 1D-array containing the indices of peaks or valleys.
211
        """
212
        data_ = -data if invert else data
1✔
213
        moving_average_ = -moving_average if invert else moving_average
1✔
214
        extrema_indices, _ = signal.find_peaks(
1✔
215
            data_,
216
            distance=max(self.minimum_duration * sample_frequency, 1),
217
            height=moving_average_,
218
        )
219

220
        return extrema_indices
1✔
221

222
    def _remove_edge_cases(
1✔
223
        self,
224
        data: np.ndarray,
225
        peak_indices: np.ndarray,
226
        valley_indices: np.ndarray,
227
        moving_average: np.ndarray,
228
    ) -> tuple[np.ndarray, np.ndarray]:
229
        """Remove overdetected peaks/valleys at the start and end of the data.
230

231
        A valley at the start of the data is deemed invalid if the data before the first valley stays below the moving
232
        average at the valley. The same is true for the last valley and the data after that valley. This ensures a
233
        valley is a true valley and not just a local minimum with the true valley cut off.
234

235
        Then, all peaks that occur before the first and after the last valley are removed. This ensures peaks only fall
236
        between valleys.
237

238
        Args:
239
            data (np.ndarray): the data in which the peaks/valleys were detected
240
            peak_indices (np.ndarray): indices of the peaks
241
            valley_indices (np.ndarray): indices of the valleys
242
            moving_average (np.ndarray): the moving average of data
243

244
        Returns:
245
            A tuple (peak_indices, peak_values) with edge cases removed.
246
        """
247
        if max(data[: valley_indices[0]]) < moving_average[valley_indices[0]]:
1✔
248
            # remove the first valley, if the data before that valley is not
249
            # high enough to be sure it's a valley
250
            valley_indices = np.delete(valley_indices, 0)
1✔
251

252
        if max(data[valley_indices[-1] :]) < moving_average[valley_indices[-1]]:
1✔
253
            # remove the last valley, if the data after that valley is not high
254
            # enough to be sure it's a valley
255
            valley_indices = np.delete(valley_indices, -1)
1✔
256

257
        # remove peaks that come before the first valley
258
        keep_peaks = peak_indices > valley_indices[0]
1✔
259
        peak_indices = peak_indices[keep_peaks]
1✔
260

261
        # remove peaks that come after the last valley
262
        keep_peaks = peak_indices < valley_indices[-1]
1✔
263
        peak_indices = peak_indices[keep_peaks]
1✔
264

265
        return peak_indices, valley_indices
1✔
266

267
    def _remove_doubles(
1✔
268
        self,
269
        data: np.ndarray,
270
        peak_indices: np.ndarray,
271
        valley_indices: np.ndarray,
272
    ) -> tuple[np.ndarray, np.ndarray]:
273
        """Remove double peaks/valleys.
274

275
        This method ensures there is only one peak between valleys, and only one valley between peaks. If there are
276
        multiple peaks between two valleys, the peak with the highest value is kept and the others are removed. If there
277
        are no peaks between several valleys (i.e. multiple valleys between peaks) the valley with the lowest value is
278
        kept, while the others are removed.
279

280
        This method does not remove peaks before the first or after the last valley.
281

282
        Args:
283
            data: data the peaks and valleys were found in
284
            peak_indices: indices of the peaks
285
            valley_indices: indices of the valleys
286

287
        Returns:
288
            tuple: a tuple of length 2 with the peak_indices and valley_indices with double peaks/valleys removed.
289
        """
290
        peak_values = data[peak_indices]
1✔
291
        valley_values = data[valley_indices]
1✔
292

293
        current_valley_index = 0
1✔
294
        while current_valley_index < len(valley_indices) - 1:
1✔
295
            start_index = valley_indices[current_valley_index]
1✔
296
            end_index = valley_indices[current_valley_index + 1]
1✔
297
            peaks_between_valleys = np.argwhere(
1✔
298
                (peak_indices > start_index) & (peak_indices < end_index),
299
            )
300
            if not len(peaks_between_valleys):
1✔
301
                # no peak between valleys, remove highest valley
302
                delete_valley_index = (
1✔
303
                    current_valley_index
304
                    if valley_values[current_valley_index] > valley_values[current_valley_index + 1]
305
                    else current_valley_index + 1
306
                )
307
                valley_indices = np.delete(valley_indices, delete_valley_index)
1✔
308
                valley_values = np.delete(valley_values, delete_valley_index)
1✔
309
                continue
1✔
310

311
            if len(peaks_between_valleys) > 1:
1✔
312
                # multiple peaks between valleys, remove lowest peak
313
                delete_peak_index = (
1✔
314
                    peaks_between_valleys[0]
315
                    if peak_values[peaks_between_valleys[0]] < peak_values[peaks_between_valleys[1]]
316
                    else peaks_between_valleys[1]
317
                )
318
                peak_indices = np.delete(peak_indices, delete_peak_index)
1✔
319
                peak_values = np.delete(peak_values, delete_peak_index)
1✔
320
                continue
1✔
321

322
            current_valley_index += 1
1✔
323

324
        return peak_indices, valley_indices
1✔
325

326
    def _remove_low_amplitudes(
1✔
327
        self,
328
        data: np.ndarray,
329
        peak_indices: np.ndarray,
330
        valley_indices: np.ndarray,
331
    ) -> tuple[np.ndarray, np.ndarray]:
332
        """Remove peaks if the amplitude is low compared to the median amplitude.
333

334
        The amplitude of a peak is determined as the average vertical distance between the peak value and the two valley
335
        values besides it. The cutoff value for the amplitude is calculated as the median amplitude times
336
        `amplitude_cutoff_fraction`. Peaks that have an amplitude below the cutoff are removed. Then,
337
        `_remove_doubles()` is called to remove either of the valleys next to the peak.
338

339
        If `amplitude_cutoff_fraction` is None, the input is returned unchanged.
340

341
        Args:
342
            data: the data the peaks and valleys were found in
343
            peak_indices (np.ndarray): the indices of the peaks
344
            valley_indices (np.ndarray): the indices of the valleys
345

346
        Returns:
347
            A tuple (peak_indices, valley_indices) with low-amplitude breaths removed.
348
        """
349
        if len(peak_indices) == 0 or len(valley_indices) == 0:
1!
UNCOV
350
            return peak_indices, valley_indices
×
351

352
        if not self.amplitude_cutoff_fraction:
1✔
353
            return peak_indices, valley_indices
1✔
354

355
        peak_values = data[peak_indices]
1✔
356
        valley_values = data[valley_indices]
1✔
357

358
        inspiratory_amplitude = peak_values - valley_values[:-1]
1✔
359
        expiratory_amplitude = peak_values - valley_values[1:]
1✔
360
        amplitude = (inspiratory_amplitude + expiratory_amplitude) / 2
1✔
361

362
        amplitude_cutoff = self.amplitude_cutoff_fraction * np.median(amplitude)
1✔
363
        delete_peaks = np.argwhere(amplitude < amplitude_cutoff)
1✔
364

365
        peak_indices = np.delete(peak_indices, delete_peaks)
1✔
366
        peak_values = np.delete(peak_values, delete_peaks)
1✔
367

368
        return self._remove_doubles(data, peak_indices, valley_indices)
1✔
369

370
    def _create_breaths_from_peak_valley_data(
1✔
371
        self,
372
        time: np.ndarray,
373
        peak_indices: np.ndarray,
374
        valley_indices: np.ndarray,
375
    ) -> list[Breath]:
376
        return [
1✔
377
            Breath(time[start], time[middle], time[end])
378
            for middle, (start, end) in zip(
379
                peak_indices,
380
                itertools.pairwise(valley_indices),
381
                strict=True,
382
            )
383
        ]
384

385
    def _remove_breaths_around_invalid_data(
1✔
386
        self,
387
        breaths: list[Breath],
388
        time: np.ndarray,
389
        sample_frequency: float,
390
        invalid_data_indices: np.ndarray,
391
    ) -> list[Breath]:
392
        """Remove breaths overlapping with invalid data.
393

394
        Breaths that start within a window length (given by invalid_data_removal_window_length) of invalid data are
395
        removed.
396

397
        Args:
398
            breaths: list of detected breath objects
399
            time: time axis belonging to the data
400
            sample_frequency: sample frequency of the data and time
401
            invalid_data_indices: indices of invalid data points
402
        """
403
        # TODO: write more general(ized) method of determining invalid data
404

405
        new_breaths = breaths[:]
1✔
406

407
        if not len(invalid_data_indices):
1✔
408
            return new_breaths
1✔
409

410
        invalid_data_values = np.zeros(time.shape)
1✔
411
        invalid_data_values[invalid_data_indices] = 1  # gives the value 1 to each invalid datapoint
1✔
412

413
        window_length = math.ceil(self.invalid_data_removal_window_length * sample_frequency)
1✔
414

415
        for breath in new_breaths[:]:
1✔
416
            breath_start_minus_window = max(0, np.argmax(time == breath.start_time) - window_length)
1✔
417
            breath_end_plus_window = min(len(invalid_data_values), np.argmax(time == breath.end_time) + window_length)
1✔
418

419
            # if no invalid datapoints are within the window, np.max() will return 0
420
            # if any invalid datapoints are within the window, np.max() will return 1
421
            if np.max(invalid_data_values[breath_start_minus_window:breath_end_plus_window]):
1✔
422
                new_breaths.remove(breath)
1✔
423

424
        return new_breaths
1✔
425

426
    @staticmethod
1✔
427
    def _fill_nan_with_nearest_neighbour(data: np.ndarray) -> np.ndarray:
1✔
428
        """Fill np.nan values in a 1D array with the nearest non-np.nan value.
429

430
        Each np.nan-value is replaced with the nearest (backwards and forwards) non-np.nan value. If the nearest earlier
431
        and a later value are the same distance away, the earlier value is preferred. np.nan-values at the start are
432
        filled with the first non-nan value.
433

434
        Example:
435
            foo = np.ndarray([np.nan, 1, np.nan, np.nan, np.nan, 3, np.nan, np.nan])
436
            bar = BreathDetection._fill_nan_with_nearest_neighbour(foo)
437
            assert bar == np.ndarray([1, 1, 1, 1, 3, 3, 3, 3])
438
        """
439
        data = np.copy(data)
1✔
440
        nan_indices = np.flatnonzero(np.isnan(data))
1✔
441

442
        if not len(nan_indices):
1✔
443
            return data
1✔
444

445
        if len(nan_indices) == len(data):
1!
UNCOV
446
            msg = "`data` only contains np.nan values. "
×
UNCOV
447
            raise ValueError(msg)
×
448

449
        grouped_nan_indices = np.split(nan_indices, np.where(np.diff(nan_indices) != 1)[0] + 1)
1✔
450

451
        for group in grouped_nan_indices:
1✔
452
            if group[0] == 0:
1!
UNCOV
453
                data[group] = data[group[-1] + 1]
×
454
                continue
×
455

456
            if group[-1] == len(data) - 1:
1!
UNCOV
457
                data[group] = data[group[0] - 1]
×
UNCOV
458
                continue
×
459

460
            middle = len(group) // 2
1✔
461
            data[group[:middle]] = data[group[0] - 1]
1✔
462
            data[group[middle:]] = data[group[-1] + 1]
1✔
463
        return data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc