• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 17213080321

25 Aug 2025 03:19PM UTC coverage: 84.761% (+2.0%) from 82.774%
17213080321

push

github

psomhorst
Bump version: 1.7.3 → 1.8.0

745 of 958 branches covered (77.77%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

37 existing lines in 9 files now uncovered.

2737 of 3150 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.39
/eitprocessing/datahandling/loading/timpel.py
1
from __future__ import annotations
1✔
2

3
import warnings
1✔
4
from functools import partial
1✔
5
from typing import TYPE_CHECKING
1✔
6

7
import numpy as np
1✔
8

9
from eitprocessing.datahandling.breath import Breath
1✔
10
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
11
from eitprocessing.datahandling.datacollection import DataCollection
1✔
12
from eitprocessing.datahandling.eitdata import EITData, Vendor
1✔
13
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
14
from eitprocessing.datahandling.loading import load_eit_data
1✔
15
from eitprocessing.datahandling.sparsedata import SparseData
1✔
16

17
if TYPE_CHECKING:
18
    from pathlib import Path
19

20
    from numpy.typing import NDArray
21

22

23
_COLUMN_WIDTH = 1030
1✔
24
_NAN_VALUE = -1000
1✔
25

26
TIMPEL_SAMPLE_FREQUENCY = 50
1✔
27

28

29
load_timpel_data = partial(load_eit_data, vendor=Vendor.TIMPEL)
1✔
30

31

32
def load_from_single_path(
1✔
33
    path: Path,
34
    sample_frequency: float | None = TIMPEL_SAMPLE_FREQUENCY,
35
    first_frame: int = 0,
36
    max_frames: int | None = None,
37
) -> dict[str, DataCollection]:
38
    """Load Timpel EIT data from path."""
39
    if not sample_frequency:
1!
40
        sample_frequency = TIMPEL_SAMPLE_FREQUENCY
1✔
41

42
    try:
1✔
43
        data: NDArray = np.loadtxt(
1✔
44
            str(path),
45
            dtype=float,
46
            delimiter=",",
47
            skiprows=first_frame,
48
            max_rows=max_frames,
49
        )
50
    except UnicodeDecodeError as e:
1✔
51
        msg = (
1✔
52
            f"File {path} could not be read as Timpel data.\n"
53
            "Make sure this is a valid and uncorrupted Timpel data file.\n"
54
            f"Original error message: {e}"
55
        )
56
        raise OSError(msg) from e
1✔
57

58
    if data.shape[1] != _COLUMN_WIDTH:
1!
59
        msg = (
×
60
            f"Input does not have a width of {_COLUMN_WIDTH} columns.\n"
61
            "Make sure this is a valid and uncorrupted Timpel data file."
62
        )
63
        raise OSError(msg)
×
64
    if data.shape[0] == 0:
1!
65
        msg = f"Invalid input: `first_frame` {first_frame} is larger than the total number of frames in the file."
×
66
        raise ValueError(msg)
×
67

68
    if max_frames and data.shape[0] == max_frames:
1✔
69
        nframes = max_frames
1✔
70
    else:
71
        if max_frames:
1!
72
            warnings.warn(
×
73
                f"The number of frames requested ({max_frames}) is larger "
74
                f"than the available number ({data.shape[0]}) of frames after "
75
                f"the first frame selected ({first_frame}).\n"
76
                f"{data.shape[0]} frames have been loaded.",
77
                RuntimeWarning,
78
                stacklevel=2,
79
            )
80
        nframes = data.shape[0]
1✔
81

82
    # TODO (#80): QUESTION: check whether below issue was only a Drager problem or also
83
    # applicable to Timpel.
84
    # The implemented method seems convoluted: it's easier to create an array
85
    # with nframes and add a time_offset. However, this results in floating
86
    # point errors, creating issues with comparing times later on.
87
    time = np.arange(nframes + first_frame) / sample_frequency
1✔
88
    time = time[first_frame:]
1✔
89

90
    pixel_impedance = data[:, :1024]
1✔
91
    pixel_impedance = np.reshape(pixel_impedance, (-1, 32, 32), order="C")
1✔
92

93
    pixel_impedance = np.where(pixel_impedance == _NAN_VALUE, np.nan, pixel_impedance)
1✔
94

95
    eit_data = EITData(
1✔
96
        vendor=Vendor.TIMPEL,
97
        label="raw",
98
        path=path,
99
        nframes=nframes,
100
        time=time,
101
        sample_frequency=sample_frequency,
102
        pixel_impedance=pixel_impedance,
103
    )
104
    eitdata_collection = DataCollection(EITData, raw=eit_data)
1✔
105

106
    # extract waveform data
107
    # TODO: properly export waveform data
108

109
    continuousdata_collection = DataCollection(ContinuousData)
1✔
110
    continuousdata_collection.add(
1✔
111
        ContinuousData(
112
            "global_impedance_(raw)",
113
            "Global impedance",
114
            "a.u.",
115
            "global_impedance",
116
            "Global impedance calculated from raw EIT data",
117
            time=time,
118
            values=eit_data.calculate_global_impedance(),
119
            sample_frequency=sample_frequency,
120
        ),
121
    )
122
    continuousdata_collection.add(
1✔
123
        ContinuousData(
124
            label="airway_pressure_(timpel)",
125
            name="Airway pressure",
126
            unit="cmH2O",
127
            category="pressure",
128
            description="Airway pressure measured by Timpel device",
129
            time=time,
130
            values=data[:, 1024],
131
            sample_frequency=sample_frequency,
132
        ),
133
    )
134

135
    continuousdata_collection.add(
1✔
136
        ContinuousData(
137
            label="flow_(timpel)",
138
            name="Flow",
139
            unit="L/s",
140
            category="flow",
141
            description="Flow measures by Timpel device",
142
            time=time,
143
            values=data[:, 1025],
144
            sample_frequency=sample_frequency,
145
        ),
146
    )
147

148
    continuousdata_collection.add(
1✔
149
        ContinuousData(
150
            label="volume_(timpel)",
151
            name="Volume",
152
            unit="L",
153
            category="volume",
154
            description="Volume measured by Timpel device",
155
            time=time,
156
            values=data[:, 1026],
157
            sample_frequency=sample_frequency,
158
        ),
159
    )
160

161
    # extract sparse data
162
    sparsedata_collection = DataCollection(SparseData)
1✔
163

164
    min_indices = np.nonzero(data[:, 1027] == 1)[0]
1✔
165
    sparsedata_collection.add(
1✔
166
        SparseData(
167
            label="minvalues_(timpel)",
168
            name="Minimum values detected by Timpel device.",
169
            unit=None,
170
            category="minvalue",
171
            derived_from=[eit_data],
172
            time=time[min_indices],
173
        ),
174
    )
175

176
    max_indices = np.nonzero(data[:, 1028] == 1)[0]
1✔
177
    sparsedata_collection.add(
1✔
178
        SparseData(
179
            label="maxvalues_(timpel)",
180
            name="Maximum values detected by Timpel device.",
181
            unit=None,
182
            category="maxvalue",
183
            derived_from=[eit_data],
184
            time=time[max_indices],
185
        ),
186
    )
187

188
    gi = continuousdata_collection["global_impedance_(raw)"].values
1✔
189

190
    time_ranges, breaths = _make_breaths(time, min_indices, max_indices, gi)
1✔
191
    intervaldata_collection = DataCollection(IntervalData)
1✔
192
    intervaldata_collection.add(
1✔
193
        IntervalData(
194
            label="breaths_(timpel)",
195
            name="Breaths (Timpel)",
196
            unit=None,
197
            category="breaths",
198
            intervals=time_ranges,
199
            values=breaths,
200
            default_partial_inclusion=False,
201
        ),
202
    )
203

204
    qrs_indices = np.nonzero(data[:, 1029] == 1)[0]
1✔
205
    sparsedata_collection.add(
1✔
206
        SparseData(
207
            label="qrscomplexes_(timpel)",
208
            name="QRS complexes detected by Timpel device",
209
            unit=None,
210
            category="qrs_complex",
211
            derived_from=[eit_data],
212
            time=time[qrs_indices],
213
        ),
214
    )
215

216
    return {
1✔
217
        "eitdata_collection": eitdata_collection,
218
        "continuousdata_collection": continuousdata_collection,
219
        "sparsedata_collection": sparsedata_collection,
220
        "intervaldata_collection": intervaldata_collection,
221
    }
222

223

224
def _make_breaths(
1✔
225
    time: np.ndarray,
226
    min_indices: np.ndarray,
227
    max_indices: np.ndarray,
228
    gi: np.ndarray,
229
) -> tuple[list[tuple[float, float]], list[Breath]]:
230
    # TODO: replace section with BreathDetection._remove_doubles() and BreathDetection._remove_edge_cases() from
231
    # 41_breath_detection_psomhorst; this code was directly copied from b59ac54
232

233
    if len(min_indices) < 2 or len(max_indices) < 1:  # noqa: PLR2004
1✔
234
        return [], []
1✔
235

236
    valley_indices = min_indices.copy()
1✔
237
    peak_indices = max_indices.copy()
1✔
238

239
    keep_peaks = peak_indices > valley_indices[0]
1✔
240
    peak_indices = peak_indices[keep_peaks]
1✔
241

242
    keep_peaks = peak_indices < valley_indices[-1]
1✔
243
    peak_indices = peak_indices[keep_peaks]
1✔
244

245
    valley_values = gi[min_indices]
1✔
246
    peak_values = gi[max_indices]
1✔
247

248
    current_valley_index = 0
1✔
249
    while current_valley_index < len(valley_indices) - 1:
1✔
250
        start_index = valley_indices[current_valley_index]
1✔
251
        end_index = valley_indices[current_valley_index + 1]
1✔
252
        peaks_between_valleys = np.argwhere(
1✔
253
            (peak_indices > start_index) & (peak_indices < end_index),
254
        )
255
        if not len(peaks_between_valleys):
1✔
256
            # no peak between valleys, remove highest valley
257
            delete_valley_index = (
1✔
258
                current_valley_index
259
                if valley_values[current_valley_index] > valley_values[current_valley_index + 1]
260
                else current_valley_index + 1
261
            )
262
            valley_indices = np.delete(valley_indices, delete_valley_index)
1✔
263
            valley_values = np.delete(valley_values, delete_valley_index)
1✔
264
            continue
1✔
265

266
        if len(peaks_between_valleys) > 1:
1!
267
            # multiple peaks between valleys, remove lowest peak
UNCOV
268
            delete_peak_index = (
×
269
                peaks_between_valleys[0]
270
                if peak_values[peaks_between_valleys[0]] < peak_values[peaks_between_valleys[1]]
271
                else peaks_between_valleys[1]
272
            )
273
            peak_indices = np.delete(peak_indices, delete_peak_index)
×
UNCOV
274
            peak_values = np.delete(peak_values, delete_peak_index)
×
UNCOV
275
            continue
×
276

277
        current_valley_index += 1
1✔
278

279
    breaths = []
1✔
280
    for start, end, middle in zip(valley_indices[:-1], valley_indices[1:], peak_indices, strict=True):
1✔
281
        breaths.append(((time[start], time[end]), Breath(time[start], time[middle], time[end])))
1✔
282

283
    time_ranges, values = zip(*breaths, strict=True)
1✔
284
    return list(time_ranges), list(values)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc