• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 22233780237

20 Feb 2026 05:16PM UTC coverage: 84.188% (+0.03%) from 84.163%
22233780237

Pull #16

github

web-flow
Merge d4e371436 into 76ccbd7a0
Pull Request #16: Updated Documentation & ReadTheDocs

15 of 15 new or added lines in 3 files covered. (100.0%)

1 existing line in 1 file now uncovered.

2593 of 3080 relevant lines covered (84.19%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.7
/src/robotdataprocess/data_types/ImageData/ImageData.py
1
from __future__ import annotations
2✔
2

3
from ..Data import ROSMsgLibType
2✔
4
from ..SequentialData import SequentialData
2✔
5
import decimal
2✔
6
from decimal import Decimal
2✔
7
from enum import Enum
2✔
8
from ...ModuleImporter import ModuleImporter
2✔
9
import numpy as np
2✔
10
from numpy.lib.format import open_memmap
2✔
11
from pathlib import Path
2✔
12
from PIL import Image
2✔
13
from typeguard import typechecked
2✔
14
from typing import Union, Any
2✔
15
import tqdm
2✔
16
from rosbags.typesys import Stores, get_typestore
2✔
17

18
@typechecked
2✔
19
class ImageData(SequentialData):
2✔
20
    """ Generic ImageData class that should be overwritten by children """
21

22
    # Define image encodings enumeration
23
    class ImageEncoding(Enum):
2✔
24
        Mono8 = 0
2✔
25
        RGB8 = 1
2✔
26
        _32FC1 = 2
2✔
27
        BGR8 = 3
2✔
28

29
        # ================ Class Methods ================
30
        @classmethod
2✔
31
        def from_str(cls, encoding_str: str):
2✔
32
            if encoding_str == "ImageEncoding.Mono8":
2✔
33
                return cls.Mono8
2✔
34
            elif encoding_str == "ImageEncoding.RGB8":
2✔
35
                return cls.RGB8
2✔
36
            elif encoding_str == "ImageEncoding._32FC1":
2✔
37
                return cls._32FC1
2✔
38
            elif encoding_str == "ImageEncoding.BGR8":
2✔
39
                return cls.BGR8
×
40
            else:
41
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
42
        
43
        @classmethod
2✔
44
        def from_ros_str(cls, encoding_str: str):
2✔
45
            encoding_str = encoding_str.lower()
2✔
46
            if encoding_str == 'mono8':
2✔
47
                return cls.Mono8
2✔
48
            elif encoding_str == 'rgb8':
2✔
49
                return cls.RGB8
2✔
50
            elif encoding_str == "32fc1":
2✔
51
                return cls._32FC1
2✔
52
            elif encoding_str == 'bgr8':
2✔
53
                return cls.BGR8
×
54
            else:
55
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
56
        
57
        @classmethod
2✔
58
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
2✔
59
            if dtype == np.uint8 and channels == 1:
2✔
60
                return cls.Mono8
2✔
61
            elif dtype == np.uint8 and channels == 3:
2✔
62
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) can't determine which encoding the data is in!")
2✔
63
            elif dtype == np.float32 and channels == 1:
2✔
64
                return cls._32FC1
2✔
65
            else:
66
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
2✔
67
        
68
        @classmethod
2✔
69
        def from_pillow_str(cls, encoding_str: str):
2✔
70
            if encoding_str == "RGB":
2✔
71
                return cls.RGB8
2✔
72
            elif encoding_str == "L":
2✔
73
                return cls.Mono8
2✔
74
            else:
75
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
76
        
77
        # ================ Export Methods ================
78
        @staticmethod
2✔
79
        def to_ros_str(encoding: ImageData.ImageEncoding):
2✔
80
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
81
                return 'mono8'
2✔
82
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
83
                return 'rgb8'
2✔
84
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
85
                return '32FC1'
2✔
86
            elif encoding == ImageData.ImageEncoding.BGR8:
2✔
87
                return 'bgr8'
×
88
            else:
89
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
2✔
90
        
91
        @staticmethod
2✔
92
        def to_dtype_and_channels(encoding):
2✔
93
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
94
                return (np.uint8, 1)
2✔
95
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
96
                return (np.uint8, 3)
2✔
97
            elif encoding == ImageData.ImageEncoding.BGR8:
2✔
98
                return (np.uint8, 3)
2✔
99
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
100
                return (np.float32, 1)
2✔
101
            else:
102
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
2✔
103
    
104
    height: int
2✔
105
    width: int
2✔
106
    encoding: ImageData.ImageEncoding
2✔
107
    images: Union[np.ndarray, Any] # With Any being a LazyImageArray
2✔
108

109
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], height: int,
2✔
110
                 width: int, encoding: ImageData.ImageEncoding, images: Union[np.ndarray, Any]):
111
        super().__init__(frame_id, timestamps)
2✔
112
        self.height = height
2✔
113
        self.width = width
2✔
114
        self.encoding = encoding
2✔
115
        self.images = images
2✔
116

117
    def _invalidate_cache(self):
2✔
118
        """ Hook for subclasses to clear cached data after mutations. No-op in ImageData. """
119
        pass
×
120

121
    # =========================================================================
122
    # ============================ Class Methods ==============================
123
    # =========================================================================
124

125
    @classmethod
2✔
126
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str) -> ImageData:
2✔
127
        """
128
        Creates a class structure from a folder with .png files.
129

130
        Args:
131
            image_folder_path: Path to the folder containing image files.
132
            frame_id: The frame ID to assign.
133

134
        Returns:
135
            ImageData: Instance of this class.
136
        """
UNCOV
137
        NotImplementedError("This method needs to be overwritten by the child Data class!")
×
138
    
139
    # =========================================================================
140
    # ========================= Manipulation Methods ========================== 
141
    # =========================================================================  
142

143
    def crop_data(self, start: Decimal, end: Union[Decimal, None] = None):
2✔
144
        """
145
        Will crop the data so only values within [start, end] inclusive are kept.
146

147
        Args:
148
            start: The earliest timestamp to keep.
149
            end: The latest timestamp to keep. If None, keeps all data from ``start`` onward.
150
        """
151

152
        # Create boolean mask of data to keep
153
        mask = ((self.timestamps >= start) & (self.timestamps <= end)) if end is not None else (self.timestamps >= start)
2✔
154
        
155
        # Apply mask
156
        self.timestamps = self.timestamps[mask]
2✔
157
        self.images = self.images[mask]
2✔
158

159
    # =========================================================================
160
    # ============================ Export Methods ============================= 
161
    # ========================================================================= 
162

163
    def to_npy(self, output_folder_path: Union[Path, str]):
2✔
164
        """
165
        Saves each image in this ImageData into three files:
166
        
167
        - imgs.npy (with image data)
168
        - times.npy (with timestamps)
169
        - attributes.txt
170

171
        Args:
172
            output_folder_path (Path | str): The folder to save the .npy file at.
173
        """
174

175
        # Setup the output directory
176
        output_path = Path(output_folder_path)
2✔
177
        output_path.mkdir(parents=True, exist_ok=True)
2✔
178

179
        # Check that the encoding is supported
180
        if self.encoding != ImageData.ImageEncoding.RGB8 and self.encoding != ImageData.ImageEncoding._32FC1:
2✔
181
            raise NotImplementedError(f"Only RGB8 & 32FC1 images have been tested for export, not {self.encoding}")
2✔
182

183
        # Get dtype and channels
184
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(self.encoding)
2✔
185

186
        # Save images into memory-mapped array
187
        shape = (self.len(), self.height, self.width) if channels == 1 else (self.len(), self.height, self.width, channels)
2✔
188
        img_memmap = open_memmap(str(Path(output_folder_path) / "imgs.npy"), dtype=dtype, shape=shape, mode='w+')
2✔
189
        pbar = tqdm.tqdm(total=self.len(), desc="Saving Images...", unit=" images")
2✔
190
        for i in range(self.len()):
2✔
191
            img_memmap[i] = self.images[i]
2✔
192
            pbar.update()
2✔
193
        img_memmap.flush()
2✔
194

195
        # Save the timestamps
196
        np.save(str(Path(output_folder_path) / "times.npy"), self.timestamps.astype(np.float128), allow_pickle=False)
2✔
197

198
        # Save attributes
199
        with open(str(Path(output_folder_path) / "attributes.txt"), "w") as f:
2✔
200
            f.write(f"image_shape: ({self.height},{self.width})\n")
2✔
201
            f.write(f"frame_id: {self.frame_id}\n")
2✔
202
            f.write(f"height: {self.height}\n")
2✔
203
            f.write(f"width: {self.width}\n")
2✔
204
            f.write(f"encoding: {self.encoding}\n")
2✔
205

206
    def to_image_files(self, output_folder_path: Union[Path, str]):
2✔
207
        """
208
        Saves each image in this ImageData instance to the specified folder,
209
        using the timestamps as filenames in .png format (lossless compression).
210

211
        Args:
212
            output_folder_path (Path | str): The folder to save images into.
213
        """
214

215
        # Setup the output directory
216
        output_path = Path(output_folder_path)
2✔
217
        output_path.mkdir(parents=True, exist_ok=True)
2✔
218

219
        # Check that the encoding is Mono8
220
        if self.encoding != ImageData.ImageEncoding.Mono8 and self.encoding != ImageData.ImageEncoding.RGB8:
2✔
221
            raise NotImplementedError(f"Only Mono8 and RGB8 encoding currently supported for export, not {self.encoding}")
×
222

223
        # Setup a progress bar
224
        pbar = tqdm.tqdm(total=self.len(), desc="Saving Images...", unit=" images")
2✔
225

226
        # Save each image
227
        for i, timestamp in enumerate(self.timestamps):
2✔
228
            # Format timestamp to match input expectations
229
            filename = f"{timestamp:.9f}" + ".png"
2✔
230
            file_path = output_path / filename
2✔
231

232
            # Determine mode
233
            if self.encoding == ImageData.ImageEncoding.Mono8:
2✔
234
                mode = "L"
2✔
235
            elif self.encoding == ImageData.ImageEncoding.RGB8:
2✔
236
                mode = "RGB"
2✔
237
            else:
238
                raise Exception("Should have been caught already!")
×
239

240
            # Save as lossless PNG with default compression
241
            img = Image.fromarray(self.images[i], mode=mode)
2✔
242
            img.save(file_path, format="PNG", compress_level=1)
2✔
243
            pbar.update()
2✔
244

245
        pbar.close()
2✔
246

247
    # =========================================================================
248
    # =========================== Conversion to ROS =========================== 
249
    # ========================================================================= 
250

251
    @staticmethod
2✔
252
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
253
        """
254
        Return the __msgtype__ for an Image msg.
255

256
        Args:
257
            lib_type: The ROS message library to use.
258

259
        Returns:
260
            The Image message type for the specified library.
261

262
        Raises:
263
            NotImplementedError: If ``lib_type`` is not supported.
264
        """
265

266
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
267
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
268
            return typestore.types['sensor_msgs/msg/Image'].__msgtype__
2✔
269
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
270
            return ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Image')
2✔
271
        else:
272
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg_type!")
2✔
273

274
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
275
        """
276
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
277
        
278
        Args:
279
            lib_type (ROSMsgLibType): The type of ROS message to return (e.g., ROSBAGS, RCLPY).
280
            i (int): The index of the image message to convert.
281
        Raises:
282
            ValueError: If i is outside the data bounds.
283
        """
284

285
        # Check to make sure index is within data bounds
286
        if i < 0 or i >= self.len():
2✔
287
            raise ValueError(f"Index {i} is out of bounds!")
2✔
288

289
        # Calculate the step
290
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
291
            step = 3 * self.width
2✔
292
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
293
            step = 4 * self.width
2✔
294
        else:
295
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
2✔
296

297
        # Get the seconds and nanoseconds
298
        seconds = int(self.timestamps[i])
2✔
299
        nanoseconds = int((self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN))
2✔
300

301
        # Calculate the ROS2 Image data
302
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
303
            data = self.images[i].flatten()
2✔
304
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
305
            data = self.images[i].flatten().view(np.uint8)
2✔
306
            # TODO: Check endianness for _32FC1
307
        else:
308
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
309

310
        # Write the data into the new class
311
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
312
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
313
            Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
2✔
314

315
            return Image(Header(stamp=Time(sec=int(seconds), 
2✔
316
                                        nanosec=int(nanoseconds)), 
317
                                frame_id=self.frame_id),
318
                        height=self.height, 
319
                        width=self.width, 
320
                        encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
321
                        is_bigendian=0, 
322
                        step=step, 
323
                        data=data)
324
        
325
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
326
            Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
327
            Image = ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Image')
2✔
328

329
            # Create the messages
330
            img_msg = Image()
2✔
331
            img_msg.header = Header()
2✔
332
            if lib_type == ROSMsgLibType.RCLPY:
2✔
333
                Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
334
                img_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
335
            else:
336
                rospy = ModuleImporter.get_module('rospy')
2✔
337
                img_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
2✔
338

339
            # Populate the rest of the data
340
            img_msg.header.frame_id = self.frame_id 
2✔
341
            img_msg.height = self.height
2✔
342
            img_msg.width = self.width
2✔
343
            img_msg.encoding = ImageData.ImageEncoding.to_ros_str(self.encoding)
2✔
344
            img_msg.is_bigendian = 0
2✔
345
            img_msg.step = step
2✔
346
            img_msg.data = data.tolist()
2✔
347
            return img_msg
2✔
348

349
        else:
350
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg()!")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc