• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EIT-ALIVE / eitprocessing / 19312054818

12 Nov 2025 09:12PM UTC coverage: 86.671% (+1.9%) from 84.795%
19312054818

push

github

psomhorst
Bump version: 1.8.4 → 1.8.5

781 of 976 branches covered (80.02%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

34 existing lines in 5 files now uncovered.

2828 of 3188 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.8
/eitprocessing/datahandling/loading/sentec.py
1
from __future__ import annotations
1✔
2

3
import mmap
1✔
4
import os
1✔
5
import warnings
1✔
6
from enum import IntEnum
1✔
7
from functools import partial
1✔
8
from typing import TYPE_CHECKING
1✔
9

10
import numpy as np
1✔
11

12
from eitprocessing.datahandling.continuousdata import ContinuousData
1✔
13
from eitprocessing.datahandling.datacollection import DataCollection
1✔
14
from eitprocessing.datahandling.eitdata import EITData, Vendor
1✔
15
from eitprocessing.datahandling.intervaldata import IntervalData
1✔
16
from eitprocessing.datahandling.loading import load_eit_data
1✔
17
from eitprocessing.datahandling.loading.binreader import BinReader
1✔
18
from eitprocessing.datahandling.sparsedata import SparseData
1✔
19

20
if TYPE_CHECKING:
21
    from pathlib import Path
22

23
    from numpy.typing import NDArray
24

25
SENTEC_SAMPLE_FREQUENCY = 50.2
1✔
26

27
load_sentec_data = partial(load_eit_data, vendor=Vendor.SENTEC)
1✔
28

29

30
def load_from_single_path(
1✔
31
    path: Path,
32
    sample_frequency: float | None = None,
33
    first_frame: int = 0,
34
    max_frames: int | None = None,
35
) -> dict[str, DataCollection]:
36
    """Load Sentec EIT data from path."""
37
    time: list[float] = []
1✔
38
    index = 0
1✔
39
    with path.open("br") as fo, mmap.mmap(fo.fileno(), length=0, access=mmap.ACCESS_READ) as fh:
1✔
40
        file_length = os.fstat(fo.fileno()).st_size
1✔
41
        reader = BinReader(fh, endian="little")
1✔
42
        version = reader.uint8()
1✔
43

44
        max_n_images = int(file_length / 32 / 32 / 4)
1✔
45
        if max_frames is not None:
1✔
46
            max_n_images = min(max_n_images, max_frames)
1✔
47

48
        image = np.full(shape=(max_n_images, 32, 32), fill_value=np.nan)
1✔
49

50
        # while there are still data to be read and the number of read data points is higher
51
        # than the maximum specified, keep reading
52
        while fh.tell() < file_length and (len(time) < max_n_images):
1✔
53
            _ = reader.uint64()  # skip timestamp reading
1✔
54
            domain_id = reader.uint8()
1✔
55
            number_data_fields = reader.uint8()
1✔
56

57
            for _ in range(number_data_fields):
1✔
58
                index, sample_frequency = _read_data_field(
1✔
59
                    reader, domain_id, index, first_frame, fh, time, version, image, sample_frequency
60
                )
61

62
    if first_frame >= index:
1✔
63
        msg = f"`first_frame` ({first_frame}) is larger than or equal to the number of frames in the file ({index})."
1✔
64
        raise ValueError(msg)
1✔
65

66
    image = image[first_frame:index, :, :]
1✔
67
    n_frames = len(image)
1✔
68

69
    if max_frames and n_frames != max_frames:
1✔
70
        msg = (
1✔
71
            f"The number of frames requested ({max_frames}) is larger "
72
            f"than the available number ({n_frames}) of frames after "
73
            f"the first frame selected ({first_frame}, total frames: "
74
            f"{index}).\n {n_frames} frames were loaded."
75
        )
76
        warnings.warn(msg, RuntimeWarning, stacklevel=2)
1✔
77

78
    if sample_frequency is None:
1!
79
        sample_frequency = SENTEC_SAMPLE_FREQUENCY
×
80

81
    time_array = np.unwrap(np.array(time), period=np.iinfo(np.uint32).max) / 1_000_000
1✔
82

83
    eitdata_collection = DataCollection(EITData)
1✔
84
    eitdata_collection.add(
1✔
85
        EITData(
86
            vendor=Vendor.SENTEC,
87
            path=path,
88
            sample_frequency=sample_frequency,
89
            nframes=n_frames,
90
            time=time_array,
91
            label="raw",
92
            pixel_impedance=image,
93
        ),
94
    )
95

96
    return {
1✔
97
        "eitdata_collection": eitdata_collection,
98
        "continuousdata_collection": DataCollection(ContinuousData),
99
        "sparsedata_collection": DataCollection(SparseData),
100
        "intervaldata_collection": DataCollection(IntervalData),
101
    }
102

103

104
def _read_data_field(
1✔
105
    reader: BinReader,
106
    domain_id: int,
107
    index: int,
108
    first_frame: int,
109
    fh: mmap.mmap,
110
    time: list[float],
111
    version: int,
112
    image: np.ndarray,
113
    sample_frequency: float | None,
114
) -> tuple[int, float | None]:
115
    """Reads the specified data field from file, and returns an updated index and sample frequency."""
116
    data_id = reader.uint8()
1✔
117
    payload_size = reader.uint16()
1✔
118

119
    if payload_size == 0:
1!
UNCOV
120
        return index, sample_frequency
×
121

122
    match domain_id, data_id:
1✔
123
        case Domain.MEASUREMENT, MeasurementDataID.TIMESTAMP:
1✔
124
            if index < first_frame:
1✔
125
                fh.seek(payload_size, os.SEEK_CUR)
1✔
126
            else:
127
                time.append(reader.uint32())
1✔
128

129
        case Domain.MEASUREMENT, MeasurementDataID.ZERO_REF_IMAGE:
1✔
130
            if index < first_frame:
1✔
131
                fh.seek(payload_size, os.SEEK_CUR)
1✔
132
            else:
133
                ref = _read_frame(version=version, payload_size=payload_size, reader=reader)
1✔
134
                image[index, :, :] = ref
1✔
135

136
            index += 1
1✔
137

138
        case Domain.CONFIGURATION, ConfigurationDataID.SAMPLE_FREQUENCY:
1✔
139
            loaded_sample_frequency = np.round(reader.float32(), 4)
1✔
140
            if sample_frequency and not np.isclose(loaded_sample_frequency, sample_frequency):
1!
UNCOV
141
                msg = (
×
142
                    f"Sample frequency provided ({sample_frequency:.2f} Hz) "
143
                    f"differs from value found in file "
144
                    f"({loaded_sample_frequency:.2f} Hz). "
145
                    f"The sample frequency value found in the file will be used."
146
                )
UNCOV
147
                warnings.warn(msg, RuntimeWarning, stacklevel=2)
×
148
            sample_frequency = loaded_sample_frequency
1✔
149

150
        case _, _:
1!
151
            fh.seek(payload_size, os.SEEK_CUR)
1✔
152

153
    return index, sample_frequency
1✔
154

155

156
def _read_frame(
1✔
157
    version: int,
158
    payload_size: int,
159
    reader: BinReader,
160
) -> NDArray | None:
161
    """Read a single frame in the file.
162

163
    The current position of the file has to be already set to the point where the image should be read (data_id 5).
164

165
    Args:
166
                fh: opened file object
167
                version: version of the Sentec file
168
                index: current number of read frames
169
                payload_size: size of the payload of the data to be read.
170
                reader: bites reader object
171
                first_frame: index of first time point of sequence.
172

173
    Returns: A 32 x 32 matrix, containing the pixels values.
174

175
    """
176
    if version > 1:
1!
177
        # read quality index. We don't use it, so we skip the bytes
178
        _ = reader.uint8()
×
UNCOV
179
        n_pixels = (payload_size - 3) // 4
×
180
    else:
181
        n_pixels = (payload_size - 2) // 4
1✔
182

183
    image_width = reader.uint8()
1✔
184
    image_height = reader.uint8()
1✔
185

186
    if image_width * image_height != n_pixels:
1!
187
        msg = (
×
188
            f"The length of image array is {n_pixels} which is not equal to "
189
            f"the product of the width ({image_width}) and height "
190
            f"({image_height}) of the frame."
191
        )
UNCOV
192
        raise OSError(msg)
×
193

194
    # the sign of the zero_ref values has to be inverted
195
    zero_ref = -reader.npfloat32(n_pixels)
1✔
196
    return np.reshape(zero_ref, (image_width, image_height), order="C")
1✔
197

198

199
class Domain(IntEnum):
1✔
200
    """Domain loaded data falls in."""
201

202
    MEASUREMENT = 16
1✔
203
    CONFIGURATION = 64
1✔
204

205

206
class MeasurementDataID(IntEnum):
1✔
207
    """ID of measured data."""
208

209
    TIMESTAMP = 0
1✔
210
    ZERO_REF_IMAGE = 5
1✔
211

212

213
class ConfigurationDataID(IntEnum):
1✔
214
    """ID of configuration data."""
215

216
    SAMPLE_FREQUENCY = 1
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc