• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 15254232358

26 May 2025 12:41PM UTC coverage: 82.774% (-0.5%) from 83.253%
15254232358

push

github

psomhorst
Bump version: 1.7.1 → 1.7.2

389 of 520 branches covered (74.81%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

21 existing lines in 2 files now uncovered.

1509 of 1773 relevant lines covered (85.11%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.32
/eitprocessing/features/pixel_breath.py
1
import itertools
1✔
2
import warnings
1✔
3
from collections.abc import Callable
1✔
4
from dataclasses import InitVar, dataclass, field
1✔
5
from typing import Final, Literal
1✔
6

7
import numpy as np
1✔
8
from scipy import signal
1✔
9

10
from eitprocessing.datahandling.breath import Breath
1✔
11
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
12
from eitprocessing.datahandling.eitdata import EITData
1✔
13
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
14
from eitprocessing.datahandling.sequence import Sequence
1✔
15
from eitprocessing.features.breath_detection import BreathDetection
1✔
16

17
_SENTINEL_BREATH_DETECTION: Final = BreathDetection()
1✔
18
MAX_XCORR_LAG = 1
1✔
19
ALLOW_FRACTION_BREATHS_SKIPPED = 0.25
1✔
20

21

22
def _return_sentinel_breath_detection() -> BreathDetection:
1✔
23
    # Returns a sential of a BreathDetection, which only exists to signal that the default value for breath_detection
24
    # was used.
25
    return _SENTINEL_BREATH_DETECTION
1✔
26

27

28
@dataclass(kw_only=True)
1✔
29
class PixelBreath:
1✔
30
    """Algorithm for detecting timing of pixel breaths in pixel impedance data.
31

32
    This algorithm detects the position of start of inspiration, end of inspiration and
33
    end of expiration in pixel impedance data. It uses BreathDetection to find the global start and end
34
    of inspiration and expiration. These points are then used to find the start/end of pixel
35
    inspiration/expiration in pixel impedance data.
36

37
    Since this algorithm uses the previous and next global breath to determine the start and end of a pixel breath, the
38
    first and last global breaths can not be used to determine pixel breaths. They are always set to `None` in the
39
    return list. Breaths that could not properly be detected are set to `None` as well.
40

41
    Some pixel breaths may be phase shifted (inflation starts and ends later compared to others, e.g., due to pendelluft
42
    or late airway opening). Other pixel breaths may have a negative amplitude (impedance decreases during inspiration,
43
    e.g., due to pleural effusion or reconstruction artifacts). It is not always possible to determine whether a pixel
44
    is out of phase or has a negative amplitude. PixelBreath has three different phase correction modes. In 'negative
45
    amplitude' mode (default), pixels that have a decrease in amplitude between the start and end of globally defined
46
    inspiration, will have a negative amplitude and smaller phase shift. In 'phase shift' mode, all pixel breaths will
47
    have positive amplitudes, but can have large phase shifts. In 'none'/`None` mode, all pixels are assumed to be
48
    within rouglhy -90 to 90 degrees of phase. Note that the 'none' mode can lead to unexpected results, such as
49
    ultra-short (down to 2 frames) or very long breaths.
50

51
    Example:
52
    ```
53
    >>> pi = PixelBreath()
54
    >>> eit_data = sequence.eit_data['raw']
55
    >>> continuous_data = sequence.continuous_data['global_impedance_(raw)']
56
    >>> pixel_breaths = pi.find_pixel_breaths(eit_data, continuous_data, sequence)
57
    ```
58

59
    Args:
60
        breath_detection (BreathDetection): BreathDetection object to use for detecting breaths.
61
        phase_correction_mode: How to resolve pixels that are out-of-phase. Defaults to "negative amplitude".
62
    """
63

64
    breath_detection: BreathDetection = field(default_factory=_return_sentinel_breath_detection)
1✔
65
    breath_detection_kwargs: InitVar[dict | None] = None
1✔
66
    phase_correction_mode: Literal["negative amplitude", "phase shift", "none"] | None = "negative amplitude"
1✔
67

68
    def __post_init__(self, breath_detection_kwargs: dict | None):
1✔
69
        if breath_detection_kwargs is not None:
1✔
70
            if self.breath_detection is not _SENTINEL_BREATH_DETECTION:
1✔
71
                msg = (
1✔
72
                    "`breath_detection_kwargs` is deprecated, and can't be used at the same time as `breath_detection`."
73
                )
74
                raise TypeError(msg)
1✔
75

76
            self.breath_detection = BreathDetection(**breath_detection_kwargs)
1✔
77
            warnings.warn(
1✔
78
                "`breath_detection_kwargs` is deprecated and will be removed soon. "
79
                "Replace with `breath_detection=BreathDetection(**breath_detection_kwargs)`.",
80
                DeprecationWarning,
81
            )
82

83
    def find_pixel_breaths(  # noqa: C901, PLR0912, PLR0915
1✔
84
        self,
85
        eit_data: EITData,
86
        continuous_data: ContinuousData,
87
        sequence: Sequence | None = None,
88
        store: bool | None = None,
89
        result_label: str = "pixel_breaths",
90
    ) -> IntervalData:
91
        """Find pixel breaths in the data.
92

93
        This method finds the pixel start/end of inspiration/expiration based on the start/end of inspiration/expiration
94
        as detected in the continuous data.
95

96
        For most pixels, the start of a breath (start inspiration) is the valley between the middles (start of
97
        expiration) of the globally defined breaths on either side. The end of a pixel breath is the start of the next
98
        pixel breath. The middle of the pixel breath is the peak between the start and end of the pixel breath.
99

100
        If the pixel is out of phase or has negative amplitude, the definition of the breath depends on the phase
101
        correction mode. In 'negative amplitude' mode, the start of a breath is the peak between the middles of the
102
        globally defined breaths on either side, while the middle of the pixel breath is the valley of the start and end
103
        of the pixel breath. In 'phase shift' mode, first the phase shift between the pixel impedance and global
104
        impedance is determined as the highest crosscorrelation between the signals near a phase shift of 0. The start
105
        of breath is the valley between the phase shifted middles of the globally defined breaths on either side.
106

107
        Pixel breaths are constructed as a valley-peak-valley combination, representing the start of inspiration, the
108
        end of inspiration/start of expiration, and end of expiration.
109

110
        Args:
111
            eit_data: EITData to apply the algorithm to.
112
            continuous_data: ContinuousData to use for global breath detection.
113
            result_label: label of the returned IntervalData object, defaults to `'pixel_breaths'`.
114
            sequence: optional, Sequence that contains the object to detect pixel breaths in, and/or to store the result
115
            in.
116
            store: whether to store the result in the sequence, defaults to `True` if a Sequence if provided.
117

118
        Returns:
119
            An IntervalData object containing Breath objects.
120

121
        Raises:
122
            RuntimeError: If store is set to true but no sequence is provided.
123
            ValueError: If the provided sequence is not an instance of the Sequence dataclass.
124
        """
125
        if store is None and isinstance(sequence, Sequence):
1✔
126
            store = True
1✔
127

128
        if store and sequence is None:
1✔
129
            msg = "Can't store the result if no Sequence is provided."
1✔
130
            raise RuntimeError(msg)
1✔
131

132
        if store and not isinstance(sequence, Sequence):
1✔
133
            msg = "To store the result a Sequence dataclass must be provided."
1✔
134
            raise ValueError(msg)
1✔
135

136
        continuous_breaths = self.breath_detection.find_breaths(continuous_data)
1✔
137

138
        indices_breath_middles = np.searchsorted(
1✔
139
            eit_data.time,
140
            [breath.middle_time for breath in continuous_breaths.values],
141
        )
142

143
        _, n_rows, n_cols = eit_data.pixel_impedance.shape
1✔
144

145
        from eitprocessing.parameters.tidal_impedance_variation import TIV
1✔
146

147
        pixel_tivs = TIV().compute_pixel_parameter(
1✔
148
            eit_data,
149
            continuous_data,
150
            sequence,
151
            tiv_method="inspiratory",
152
            tiv_timing="continuous",
153
            store=False,
154
        )  # Set store to false as to not save these pixel tivs as SparseData.
155

156
        pixel_tiv_with_continuous_data_timing = (
1✔
157
            np.empty((0, n_rows, n_cols)) if not len(pixel_tivs.values) else np.stack(pixel_tivs.values)
158
        )
159

160
        # Create a mask to detect slices that are entirely NaN
161
        all_nan_mask = np.isnan(pixel_tiv_with_continuous_data_timing).all(axis=0)
1✔
162

163
        # Initialize the mean_tiv_pixel array with NaNs
164
        mean_tiv_pixel = np.full((n_rows, n_cols), np.nan)
1✔
165

166
        # Only compute nanmean for slices that are not entirely NaN
167
        if not all_nan_mask.all():  # Check if there are any valid (non-all-NaN) slices
1✔
168
            mean_tiv_pixel[~all_nan_mask] = np.nanmean(pixel_tiv_with_continuous_data_timing[:, ~all_nan_mask], axis=0)
1✔
169

170
        time = eit_data.time
1✔
171
        pixel_impedance = eit_data.pixel_impedance
1✔
172

173
        pixel_breaths = np.full((len(continuous_breaths), n_rows, n_cols), None, dtype=object)
1✔
174

175
        lags = signal.correlation_lags(len(continuous_data), len(continuous_data), mode="same")
1✔
176

177
        match self.phase_correction_mode:
1✔
178
            case "negative amplitude":
1!
179
                allow_negative_amplitude = True
1✔
180
                correct_for_phase_shift = None
1✔
181
            case "phase shift":
×
182
                allow_negative_amplitude = False
×
183
                correct_for_phase_shift = True
×
184
            case "none" | None:
×
UNCOV
185
                allow_negative_amplitude = False
×
UNCOV
186
                correct_for_phase_shift = False
×
UNCOV
187
            case _:
×
UNCOV
188
                msg = f"Unknown phase correction mode ({self.phase_correction_mode})."
×
UNCOV
189
                raise ValueError(msg)
×
190

191
        for row, col in itertools.product(range(n_rows), range(n_cols)):
1✔
192
            mean_tiv = mean_tiv_pixel[row, col]
1✔
193

194
            if np.std(pixel_impedance[:, row, col]) == 0:
1✔
195
                # pixel has no amplitude
196
                continue
1✔
197

198
            if allow_negative_amplitude and mean_tiv < 0:
1✔
199
                start_func, middle_func = np.argmax, np.argmin
1✔
200
                lagged_indices_breath_middles = indices_breath_middles
1✔
201
            else:
202
                start_func, middle_func = np.argmin, np.argmax
1✔
203

204
                cd = np.copy(continuous_data.values)
1✔
205
                cd = signal.detrend(cd, type="linear")
1✔
206
                pi = np.copy(pixel_impedance[:, row, col])
1✔
207
                if not np.all(np.isnan(pi)):
1!
208
                    pi = signal.detrend(pi, type="linear")
1✔
209

210
                if correct_for_phase_shift:
1!
211
                    # search for closest or maximum cross correlation within MAX_XCORR_LAG times the average duration of
212
                    # a breath
UNCOV
213
                    xcorr = signal.correlate(cd, pi, mode="same")
×
214
                    max_lag = MAX_XCORR_LAG * np.mean(np.diff(indices_breath_middles))
×
UNCOV
215
                    lag_range = (lags >= -max_lag) & (lags <= max_lag)
×
216

217
                    # find the peaks in the cross correlation
218
                    peaks, _ = signal.find_peaks(xcorr[lag_range], height=0, prominence=xcorr.std())
×
219

220
                    # find the closest peak to zero lag
UNCOV
221
                    peak_lags = lags[lag_range][peaks]
×
UNCOV
222
                    peak_distances = np.abs(peak_lags)
×
UNCOV
223
                    min_peak_distance = np.min(peak_distances)
×
UNCOV
224
                    candidates = peak_lags[peak_distances == min_peak_distance]
×
225

226
                    # if there are multiple candidates, take the one with the highest cross correlation
UNCOV
227
                    if len(candidates) == 1:
×
UNCOV
228
                        lag = candidates[0]
×
UNCOV
229
                    elif len(candidates) == 2:  # noqa: PLR2004
×
230
                        # take the lag with the highest cross correlation
UNCOV
231
                        lag = candidates[np.argmax(xcorr[np.searchsorted(lags, candidates)])]
×
232
                    else:
233
                        msg = "Too many peaks found in cross correlation."
×
UNCOV
234
                        raise RuntimeError(msg)
×
235

236
                    # shift search area
UNCOV
237
                    lagged_indices_breath_middles = indices_breath_middles - lag
×
238
                else:
239
                    lagged_indices_breath_middles = indices_breath_middles
1✔
240

241
            skip = np.concatenate(
1✔
242
                (
243
                    np.flatnonzero(lagged_indices_breath_middles < 0),
244
                    np.flatnonzero(lagged_indices_breath_middles >= len(continuous_data)),
245
                )
246
            )
247
            lagged_indices_breath_middles = lagged_indices_breath_middles.clip(min=0, max=len(continuous_data) - 1)
1✔
248
            outsides = self._find_extreme_indices(pixel_impedance, lagged_indices_breath_middles, row, col, start_func)
1✔
249
            starts = outsides[:-1]
1✔
250
            ends = outsides[1:]
1✔
251
            middles = self._find_extreme_indices(pixel_impedance, outsides, row, col, middle_func)
1✔
252

253
            skip = np.concatenate((skip, np.flatnonzero(starts >= middles), np.flatnonzero(middles >= ends)))
1✔
254

255
            if len(skip) > len(outsides) * ALLOW_FRACTION_BREATHS_SKIPPED:
1!
UNCOV
256
                warnings.warn(
×
257
                    f"Skipping pixel ({row}, {col}) because more than half ({len(skip) / len(outsides)}) "
258
                    "of breaths skipped."
259
                )
UNCOV
260
                continue
×
261

262
            pixel_breaths[:, row, col] = self._construct_breaths(starts, middles, ends, time, skip=skip)
1✔
263

264
        intervals = [(breath.start_time, breath.end_time) for breath in continuous_breaths.values]
1✔
265

266
        pixel_breaths_container = IntervalData(
1✔
267
            label=result_label,
268
            name="Pixel in- and deflation timing as determined by Pixelbreath",
269
            unit=None,
270
            category="breath",
271
            intervals=intervals,
272
            values=list(
273
                pixel_breaths,
274
            ),  ## TODO: change back to pixel_breaths array when IntervalData works with 3D array
275
            derived_from=[eit_data],
276
        )
277
        if store:
1✔
278
            sequence.interval_data.add(pixel_breaths_container)
1✔
279

280
        return pixel_breaths_container
1✔
281

282
    def _construct_breaths(
1✔
283
        self, start: list[int], middle: list[int], end: list[int], time: np.ndarray, skip: np.ndarray | None = None
284
    ) -> list:
285
        skip_ = skip if skip is not None else np.array([], dtype=int)
1✔
286
        breaths = [
1✔
287
            Breath(time[s], time[m], time[e]) if i not in skip_ else None
288
            for i, (s, m, e) in enumerate(zip(start, middle, end, strict=True))
289
        ]
290
        # First and last breath are not detected by definition (need two breaths to find one breath)
291
        return [None, *breaths, None]
1✔
292

293
    def _find_extreme_indices(
1✔
294
        self,
295
        pixel_impedance: np.ndarray,
296
        times: np.ndarray,
297
        row: int,
298
        col: int,
299
        function: Callable,
300
    ) -> np.ndarray:
301
        """Finds extreme indices in pixel impedance.
302

303
        This method divides the pixel impedance for a single pixel (selected using row and col) into smaller segments
304
        based on the `times` array. The times array consists of indices to divide these segments. The function iterates
305
        over each index in the times array to select consecutive segments of pixel impedance.
306
        For each segment, the method applies the `function` (either np.argmax or np.argmin) to extract an extreme value
307
        (local maximum or minimum).
308

309
        Args:
310
            pixel_impedance (np.ndarray): The pixel impedance array from which the function will extract values.
311
                Assumed to be 3-dimensional (e.g., time, rows, and columns).
312
            times (np.ndarray): 1D array of time indices. These times define the start
313
                and end of each segment in the pixel impedance.
314
            row (int): The row index in the pixel impedance
315
            col (int): The column index in the pixel impedance
316
            function (Callable): A function that is applied to each segment of data to find
317
                an extreme value (np.argmax or np.argmin)
318

319
        Returns:
320
            np.ndarray: An array of indices where the extreme values (based on the function)
321
            are located for each time segment.
322
        """
323
        return np.array(
1✔
324
            [function(pixel_impedance[t1:t2, row, col]) + t1 for t1, t2 in itertools.pairwise(times)],
325
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc