• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 17213080321

25 Aug 2025 03:19PM UTC coverage: 84.761% (+2.0%) from 82.774%
17213080321

push

github

psomhorst
Bump version: 1.7.3 → 1.8.0

745 of 958 branches covered (77.77%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

37 existing lines in 9 files now uncovered.

2737 of 3150 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/eitprocessing/features/pixel_breath.py
1
import itertools
1✔
2
import warnings
1✔
3
from collections.abc import Callable
1✔
4
from dataclasses import InitVar, dataclass, field
1✔
5
from typing import Final, Literal
1✔
6

7
import numpy as np
1✔
8
from scipy import signal
1✔
9

10
from eitprocessing.datahandling.breath import Breath
1✔
11
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
12
from eitprocessing.datahandling.eitdata import EITData
1✔
13
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
14
from eitprocessing.datahandling.sequence import Sequence
1✔
15
from eitprocessing.features.breath_detection import BreathDetection
1✔
16

17
_SENTINEL_BREATH_DETECTION: Final = BreathDetection()
1✔
18
MAX_XCORR_LAG = 1
1✔
19
ALLOW_FRACTION_BREATHS_SKIPPED = 0.25
1✔
20

21

22
def _return_sentinel_breath_detection() -> BreathDetection:
1✔
23
    # Returns a sential of a BreathDetection, which only exists to signal that the default value for breath_detection
24
    # was used.
25
    return _SENTINEL_BREATH_DETECTION
1✔
26

27

28
@dataclass(kw_only=True)
1✔
29
class PixelBreath:
1✔
30
    """Algorithm for detecting timing of pixel breaths in pixel impedance data.
31

32
    This algorithm detects the position of start of inspiration, end of inspiration and
33
    end of expiration in pixel impedance data. It uses BreathDetection to find the global start and end
34
    of inspiration and expiration. These points are then used to find the start/end of pixel
35
    inspiration/expiration in pixel impedance data.
36

37
    Since this algorithm uses the previous and next global breath to determine the start and end of a pixel breath, the
38
    first and last global breaths can not be used to determine pixel breaths. They are always set to `None` in the
39
    return list. Breaths that could not properly be detected are set to `None` as well.
40

41
    Some pixel breaths may be phase shifted (inflation starts and ends later compared to others, e.g., due to pendelluft
42
    or late airway opening). Other pixel breaths may have a negative amplitude (impedance decreases during inspiration,
43
    e.g., due to pleural effusion or reconstruction artifacts). It is not always possible to determine whether a pixel
44
    is out of phase or has a negative amplitude. PixelBreath has three different phase correction modes. In 'negative
45
    amplitude' mode (default), pixels that have a decrease in amplitude between the start and end of globally defined
46
    inspiration, will have a negative amplitude and smaller phase shift. In 'phase shift' mode, all pixel breaths will
47
    have positive amplitudes, but can have large phase shifts. In 'none'/`None` mode, all pixels are assumed to be
48
    within rouglhy -90 to 90 degrees of phase. Note that the 'none' mode can lead to unexpected results, such as
49
    ultra-short (down to 2 frames) or very long breaths.
50

51
    Example:
52
    ```
53
    >>> pi = PixelBreath()
54
    >>> eit_data = sequence.eit_data['raw']
55
    >>> continuous_data = sequence.continuous_data['global_impedance_(raw)']
56
    >>> pixel_breaths = pi.find_pixel_breaths(eit_data, continuous_data, sequence)
57
    ```
58

59
    Args:
60
        breath_detection (BreathDetection): BreathDetection object to use for detecting breaths.
61
        phase_correction_mode: How to resolve pixels that are out-of-phase. Defaults to "negative amplitude".
62
    """
63

64
    breath_detection: BreathDetection = field(default_factory=_return_sentinel_breath_detection)
1✔
65
    breath_detection_kwargs: InitVar[dict | None] = None
1✔
66
    phase_correction_mode: Literal["negative amplitude", "phase shift", "none"] | None = "negative amplitude"
1✔
67

68
    def __post_init__(self, breath_detection_kwargs: dict | None):
1✔
69
        if breath_detection_kwargs is not None:
1✔
70
            if self.breath_detection is not _SENTINEL_BREATH_DETECTION:
1✔
71
                msg = (
1✔
72
                    "`breath_detection_kwargs` is deprecated, and can't be used at the same time as `breath_detection`."
73
                )
74
                raise TypeError(msg)
1✔
75

76
            self.breath_detection = BreathDetection(**breath_detection_kwargs)
1✔
77
            warnings.warn(
1✔
78
                "`breath_detection_kwargs` is deprecated and will be removed soon. "
79
                "Replace with `breath_detection=BreathDetection(**breath_detection_kwargs)`.",
80
                DeprecationWarning,
81
                stacklevel=2,
82
            )
83

84
    def find_pixel_breaths(  # noqa: C901, PLR0912, PLR0915
1✔
85
        self,
86
        eit_data: EITData,
87
        continuous_data: ContinuousData,
88
        sequence: Sequence | None = None,
89
        store: bool | None = None,
90
        result_label: str = "pixel_breaths",
91
    ) -> IntervalData:
92
        """Find pixel breaths in the data.
93

94
        This method finds the pixel start/end of inspiration/expiration based on the start/end of inspiration/expiration
95
        as detected in the continuous data.
96

97
        For most pixels, the start of a breath (start inspiration) is the valley between the middles (start of
98
        expiration) of the globally defined breaths on either side. The end of a pixel breath is the start of the next
99
        pixel breath. The middle of the pixel breath is the peak between the start and end of the pixel breath.
100

101
        If the pixel is out of phase or has negative amplitude, the definition of the breath depends on the phase
102
        correction mode. In 'negative amplitude' mode, the start of a breath is the peak between the middles of the
103
        globally defined breaths on either side, while the middle of the pixel breath is the valley of the start and end
104
        of the pixel breath. In 'phase shift' mode, first the phase shift between the pixel impedance and global
105
        impedance is determined as the highest crosscorrelation between the signals near a phase shift of 0. The start
106
        of breath is the valley between the phase shifted middles of the globally defined breaths on either side.
107

108
        Pixel breaths are constructed as a valley-peak-valley combination, representing the start of inspiration, the
109
        end of inspiration/start of expiration, and end of expiration.
110

111
        Args:
112
            eit_data: EITData to apply the algorithm to.
113
            continuous_data: ContinuousData to use for global breath detection.
114
            result_label: label of the returned IntervalData object, defaults to `'pixel_breaths'`.
115
            sequence: optional, Sequence that contains the object to detect pixel breaths in, and/or to store the result
116
            in.
117
            store: whether to store the result in the sequence, defaults to `True` if a Sequence if provided.
118

119
        Returns:
120
            An IntervalData object containing Breath objects.
121

122
        Raises:
123
            RuntimeError: If store is set to true but no sequence is provided.
124
            ValueError: If the provided sequence is not an instance of the Sequence dataclass.
125
        """
126
        if store is None and isinstance(sequence, Sequence):
1✔
127
            store = True
1✔
128

129
        if store and sequence is None:
1✔
130
            msg = "Can't store the result if no Sequence is provided."
1✔
131
            raise RuntimeError(msg)
1✔
132

133
        if store and not isinstance(sequence, Sequence):
1✔
134
            msg = "To store the result a Sequence dataclass must be provided."
1✔
135
            raise ValueError(msg)
1✔
136

137
        continuous_breaths = self.breath_detection.find_breaths(continuous_data)
1✔
138

139
        indices_breath_middles = np.searchsorted(
1✔
140
            eit_data.time,
141
            [breath.middle_time for breath in continuous_breaths.values],
142
        )
143

144
        _, n_rows, n_cols = eit_data.pixel_impedance.shape
1✔
145

146
        from eitprocessing.parameters.tidal_impedance_variation import TIV  # noqa: PLC0415
1✔
147

148
        pixel_tivs = TIV().compute_pixel_parameter(
1✔
149
            eit_data,
150
            continuous_data,
151
            sequence,
152
            tiv_method="inspiratory",
153
            tiv_timing="continuous",
154
            store=False,
155
        )  # Set store to false as to not save these pixel tivs as SparseData.
156

157
        pixel_tiv_with_continuous_data_timing = (
1✔
158
            np.empty((0, n_rows, n_cols)) if not len(pixel_tivs.values) else np.stack(pixel_tivs.values)
159
        )
160

161
        # Create a mask to detect slices that are entirely NaN
162
        all_nan_mask = np.isnan(pixel_tiv_with_continuous_data_timing).all(axis=0)
1✔
163

164
        # Initialize the mean_tiv_pixel array with NaNs
165
        mean_tiv_pixel = np.full((n_rows, n_cols), np.nan)
1✔
166

167
        # Only compute nanmean for slices that are not entirely NaN
168
        if not all_nan_mask.all():  # Check if there are any valid (non-all-NaN) slices
1✔
169
            mean_tiv_pixel[~all_nan_mask] = np.nanmean(pixel_tiv_with_continuous_data_timing[:, ~all_nan_mask], axis=0)
1✔
170

171
        time = eit_data.time
1✔
172
        pixel_impedance = eit_data.pixel_impedance
1✔
173

174
        pixel_breaths = np.full((len(continuous_breaths), n_rows, n_cols), None, dtype=object)
1✔
175

176
        lags = signal.correlation_lags(len(continuous_data), len(continuous_data), mode="same")
1✔
177

178
        match self.phase_correction_mode:
1✔
179
            case "negative amplitude":
1✔
180
                allow_negative_amplitude = True
1✔
181
                correct_for_phase_shift = None
1✔
182
            case "phase shift":
1!
183
                allow_negative_amplitude = False
1✔
184
                correct_for_phase_shift = True
1✔
185
            case "none" | None:
×
186
                allow_negative_amplitude = False
×
187
                correct_for_phase_shift = False
×
188
            case _:
×
189
                msg = f"Unknown phase correction mode ({self.phase_correction_mode})."
×
UNCOV
190
                raise ValueError(msg)
×
191

192
        for row, col in itertools.product(range(n_rows), range(n_cols)):
1✔
193
            mean_tiv = mean_tiv_pixel[row, col]
1✔
194

195
            if np.std(pixel_impedance[:, row, col]) == 0:
1✔
196
                # pixel has no amplitude
197
                continue
1✔
198

199
            if allow_negative_amplitude and mean_tiv < 0:
1✔
200
                start_func, middle_func = np.argmax, np.argmin
1✔
201
                lagged_indices_breath_middles = indices_breath_middles
1✔
202
            else:
203
                start_func, middle_func = np.argmin, np.argmax
1✔
204

205
                cd = np.copy(continuous_data.values)
1✔
206
                cd = signal.detrend(cd, type="linear")
1✔
207
                pi = np.copy(pixel_impedance[:, row, col])
1✔
208
                if not np.all(np.isnan(pi)):
1!
209
                    pi = signal.detrend(pi, type="linear")
1✔
210

211
                if correct_for_phase_shift:
1✔
212
                    # search for closest or maximum cross correlation within MAX_XCORR_LAG times the average duration of
213
                    # a breath
214
                    xcorr = signal.correlate(cd, pi, mode="same")
1✔
215
                    max_lag = MAX_XCORR_LAG * np.mean(np.diff(indices_breath_middles))
1✔
216
                    lag_range = (lags >= -max_lag) & (lags <= max_lag)
1✔
217

218
                    # find the peaks in the cross correlation
219
                    peaks, _ = signal.find_peaks(xcorr[lag_range], height=0, prominence=xcorr.std())
1✔
220

221
                    # find the closest peak to zero lag
222
                    peak_lags = lags[lag_range][peaks]
1✔
223
                    peak_distances = np.abs(peak_lags)
1✔
224
                    min_peak_distance = np.min(peak_distances)
1✔
225
                    candidates = peak_lags[peak_distances == min_peak_distance]
1✔
226

227
                    # if there are multiple candidates, take the one with the highest cross correlation
228
                    if len(candidates) == 1:
1✔
229
                        lag = candidates[0]
1✔
230
                    elif len(candidates) == 2:  # noqa: PLR2004
1!
231
                        # take the lag with the highest cross correlation
232
                        lag = candidates[np.argmax(xcorr[np.searchsorted(lags, candidates)])]
1✔
233
                    else:
234
                        msg = "Too many peaks found in cross correlation."
×
UNCOV
235
                        raise RuntimeError(msg)
×
236

237
                    # shift search area
238
                    lagged_indices_breath_middles = indices_breath_middles - lag
1✔
239
                else:
240
                    lagged_indices_breath_middles = indices_breath_middles
1✔
241

242
            skip = np.concatenate(
1✔
243
                (
244
                    np.flatnonzero(lagged_indices_breath_middles < 0),
245
                    np.flatnonzero(lagged_indices_breath_middles >= len(continuous_data)),
246
                )
247
            )
248
            lagged_indices_breath_middles = lagged_indices_breath_middles.clip(min=0, max=len(continuous_data) - 1)
1✔
249
            outsides = self._find_extreme_indices(pixel_impedance, lagged_indices_breath_middles, row, col, start_func)
1✔
250
            starts = outsides[:-1]
1✔
251
            ends = outsides[1:]
1✔
252
            middles = self._find_extreme_indices(pixel_impedance, outsides, row, col, middle_func)
1✔
253

254
            skip = np.concatenate((skip, np.flatnonzero(starts >= middles), np.flatnonzero(middles >= ends)))
1✔
255

256
            if len(starts) > len(skip) > len(starts) * ALLOW_FRACTION_BREATHS_SKIPPED:
1!
UNCOV
257
                warnings.warn(
×
258
                    f"Skipping pixel ({row}, {col}) because more than half ({len(skip) / len(starts):.0%}) "
259
                    "of breaths skipped.",
260
                    RuntimeWarning,
261
                    stacklevel=2,
262
                )
UNCOV
263
                continue
×
264

265
            pixel_breaths[:, row, col] = self._construct_breaths(starts, middles, ends, time, skip=skip)
1✔
266

267
        intervals = [(breath.start_time, breath.end_time) for breath in continuous_breaths.values]
1✔
268

269
        pixel_breaths_container = IntervalData(
1✔
270
            label=result_label,
271
            name="Pixel in- and deflation timing as determined by Pixelbreath",
272
            unit=None,
273
            category="breath",
274
            intervals=intervals,
275
            values=list(
276
                pixel_breaths,
277
            ),  ## TODO: change back to pixel_breaths array when IntervalData works with 3D array
278
            derived_from=[eit_data],
279
        )
280
        if store:
1✔
281
            sequence.interval_data.add(pixel_breaths_container)
1✔
282

283
        return pixel_breaths_container
1✔
284

285
    def _construct_breaths(
1✔
286
        self, start: list[int], middle: list[int], end: list[int], time: np.ndarray, skip: np.ndarray | None = None
287
    ) -> list:
288
        skip_ = skip if skip is not None else np.array([], dtype=int)
1✔
289
        breaths = [
1✔
290
            Breath(time[s], time[m], time[e]) if i not in skip_ else None
291
            for i, (s, m, e) in enumerate(zip(start, middle, end, strict=True))
292
        ]
293
        # First and last breath are not detected by definition (need two breaths to find one breath)
294
        return [None, *breaths, None]
1✔
295

296
    def _find_extreme_indices(
1✔
297
        self,
298
        pixel_impedance: np.ndarray,
299
        times: np.ndarray,
300
        row: int,
301
        col: int,
302
        function: Callable,
303
    ) -> np.ndarray:
304
        """Finds extreme indices in pixel impedance.
305

306
        This method divides the pixel impedance for a single pixel (selected using row and col) into smaller segments
307
        based on the `times` array. The times array consists of indices to divide these segments. The function iterates
308
        over each index in the times array to select consecutive segments of pixel impedance.
309
        For each segment, the method applies the `function` (either np.argmax or np.argmin) to extract an extreme value
310
        (local maximum or minimum).
311

312
        Args:
313
            pixel_impedance (np.ndarray): The pixel impedance array from which the function will extract values.
314
                Assumed to be 3-dimensional (e.g., time, rows, and columns).
315
            times (np.ndarray): 1D array of time indices. These times define the start
316
                and end of each segment in the pixel impedance.
317
            row (int): The row index in the pixel impedance
318
            col (int): The column index in the pixel impedance
319
            function (Callable): A function that is applied to each segment of data to find
320
                an extreme value (np.argmax or np.argmin)
321

322
        Returns:
323
            np.ndarray: An array of indices where the extreme values (based on the function)
324
            are located for each time segment.
325
        """
326
        return np.array(
1✔
327
            [function(pixel_impedance[t1:t2, row, col]) + t1 for t1, t2 in itertools.pairwise(times)],
328
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc