• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20934864165

12 Jan 2026 09:01PM UTC coverage: 70.168% (-4.5%) from 74.672%
20934864165

Pull #10

github

DanielChaseButterfield
Swap ROS1 while loop for publishing with rospy.Timer for real-time image publishing
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

524 of 780 new or added lines in 12 files covered. (67.18%)

3 existing lines in 2 files now uncovered.

1423 of 2028 relevant lines covered (70.17%)

1.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.18
/src/robotdataprocess/data_types/ImageData/ImageDataInMemory.py
1
from __future__ import annotations
2✔
2

3
from ...conversion_utils import col_to_dec_arr
2✔
4
import cv2
2✔
5

6
from decimal import Decimal
2✔
7
from enum import Enum
2✔
8
from .ImageData import ImageData
2✔
9
import numpy as np
2✔
10
from numpy.lib.format import open_memmap
2✔
11
import os
2✔
12
from pathlib import Path
2✔
13
from PIL import Image
2✔
14
from ...ros.Ros2BagWrapper import Ros2BagWrapper
2✔
15
from rosbags.rosbag2 import Reader as Reader2
2✔
16
from rosbags.typesys.store import Typestore
2✔
17
from typeguard import typechecked
2✔
18
from typing import Tuple, Union
2✔
19
import tqdm
2✔
20

21
@typechecked
2✔
22
class ImageDataInMemory(ImageData):
2✔
23

24
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], 
2✔
25
                 height: int, width: int, encoding: ImageData.ImageEncoding, images: np.ndarray):
26
        super().__init__(frame_id, timestamps, height, width, encoding, images)
2✔
27

28
    # =========================================================================
29
    # ============================ Class Methods ============================== 
30
    # =========================================================================  
31

32
    @classmethod
2✔
33
    def from_ros2_bag(cls, bag_path: Union[Path, str], img_topic: str, save_folder: Union[Path, str]):
2✔
34
        """
35
        Creates a class structure from a ROS2 bag file with an Image topic. Will
36
        Also save all the data into .npy and .txt files as this is required if image
37
        data doesn't fit into the RAM.
38

39
        Args:
40
            bag_path (Path | str): Path to the ROS2 bag file.
41
            img_topic (str): Topic of the Image messages.
42
            save_folder (Path | str): Path to save class data into.
43
        Returns:
44
            ImageData: Instance of this class.
45
        """
46

47
        # Get topic message count and typestore
48
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
49
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
50
        num_msgs: int = bag_wrapper.get_topic_count(img_topic)
2✔
51

52
        # Extract relevant image parameters
53
        image_shape, frame_id, height, width, encoding = None, None, None, None, None
2✔
54
        with Reader2(bag_path) as reader:
2✔
55
            connections = [x for x in reader.connections if x.topic == img_topic]
2✔
56
            for conn, _, rawdata in reader.messages(connections=connections):
2✔
57
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
58
                frame_id = msg.header.frame_id
2✔
59
                height = msg.height
2✔
60
                width = msg.width
2✔
61
                encoding = ImageData.ImageEncoding.from_ros_str(msg.encoding)
2✔
62
                img = ImageDataInMemory._decode_image_msg(msg, encoding, height, width)
2✔
63
                image_shape = img.shape
2✔
64
                break
2✔
65
        
66
        # Pre-allocate arrays (memory-mapped or otherwise)
67
        imgs_path = str(Path(save_folder) / "imgs.npy")
2✔
68
        os.makedirs(save_folder, exist_ok=True)
2✔
69
        img_memmap = open_memmap(imgs_path, dtype=img.dtype, shape=(num_msgs, *image_shape), mode='w+')
2✔
70
        timestamps_np = np.zeros(num_msgs, dtype=np.float128)
2✔
71

72
        # Setup tqdm bar
73
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Images...", unit=" msgs")
2✔
74

75
        # Extract the images/timestamps and save
76
        with Reader2(bag_path) as reader: 
2✔
77
            i = 0
2✔
78
            connections = [x for x in reader.connections if x.topic == img_topic]
2✔
79
            for conn, _, rawdata in reader.messages(connections=connections):
2✔
80
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
81

82
                # Extract images (skipping malformed ones)
83
                img = None
2✔
84
                try:
2✔
85
                    img = ImageDataInMemory._decode_image_msg(msg, encoding, height, width)
2✔
86
                except Exception as e:
×
87
                    print("Failure decoding image msg: ", e)
×
88
                if img is not None and img.shape == image_shape: 
2✔
89
                    img_memmap[i] = img
2✔
90

91
                # Extract timestamps
92
                ts = Ros2BagWrapper.extract_timestamp(msg)
2✔
93
                timestamps_np[i] = ts
2✔
94

95
                # Update the count
96
                i += 1
2✔
97
                pbar.update(1)
2✔
98

99
        # Write all images to disk and save timestamps and other data
100
        img_memmap.flush()
2✔
101
        np.save(str(Path(save_folder) / "times.npy"), timestamps_np, allow_pickle=False)
2✔
102
        with open(str(Path(save_folder) / "attributes.txt"), "w") as f:
2✔
103
            f.write(f"image_shape: {image_shape}\n")
2✔
104
            f.write(f"frame_id: {frame_id}\n")
2✔
105
            f.write(f"height: {height}\n")
2✔
106
            f.write(f"width: {width}\n")
2✔
107
            f.write(f"encoding: {encoding}\n")
2✔
108

109
        # Create an ImageData class
110
        return cls(frame_id, timestamps_np, height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
2✔
111
    
112
    @classmethod
2✔
113
    def from_npy(cls, folder_path: Union[Path, str]):
2✔
114
        """
115
        Creates a class structure from .npy and .txt files (the ones written by from_ros2_bag()).
116

117
        Args:
118
            folder_path (Path | str): Path to the folder with:
119
                - imgs.npy
120
                - times.npy
121
                - attributes.txt
122
        Returns:
123
            ImageData: Instance of this class.
124
        """
125

126
        # Calculate other paths from folder path
127
        imgs_path = str(Path(folder_path) / "imgs.npy")
2✔
128
        ts_path = str(Path(folder_path) / "times.npy")
2✔
129
        attr_path = str(Path(folder_path) / "attributes.txt")
2✔
130

131
        # Read in the attributes
132
        attr_data = {}
2✔
133
        with open(attr_path, "r") as f:
2✔
134
            for line in f:
2✔
135
                key, val = line.strip().split(":", 1)
2✔
136
                attr_data[key.strip()] = val.strip()
2✔
137

138
        # Parse and assign values to variables
139
        frame_id = attr_data["frame_id"]
2✔
140
        height = int(attr_data["height"])
2✔
141
        width = int(attr_data["width"])
2✔
142
        encoding = ImageData.ImageEncoding.from_str(attr_data["encoding"])
2✔
143

144
        # Create an ImageData class
145
        return cls(frame_id, np.load(ts_path), height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
2✔
146

147
    @classmethod
2✔
148
    def from_npy_files(cls, npy_folder_path: Union[Path, str], frame_id: str):
2✔
149
        """
150
        Creates a class structure from .npy files, where each individual image
151
        is stored in an .npy file with the timestamp as the name
152

153
        Args:
154
            npy_folder_path (Path | str): Path to the folder with the npy images.
155
            frame_id (str): The frame where this image data was collected.
156
        Returns:
157
            ImageData: Instance of this class.
158
        """
159

160
        # Get all npy files in the designated folder (sorted)
161
        all_image_files = [str(p) for p in Path(npy_folder_path).glob("*.npy")]
2✔
162

163
        # Extract the timestamps and sort them
164
        timestamps = col_to_dec_arr([s.split('/')[-1][:-4] for s in all_image_files])
2✔
165
        sorted_indices = np.argsort(timestamps)
2✔
166
        timestamps_sorted = timestamps[sorted_indices]
2✔
167

168
        # Use sorted_indices to sort all_image_files in the same way
169
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
2✔
170

171
        # Extract width, height, and channels
172
        first_image = np.load(all_image_files_sorted[0], 'r')
2✔
173
        assert len(first_image.shape) >= 2
2✔
174
        assert len(first_image.shape) < 4
2✔
175
        height = first_image.shape[0]
2✔
176
        width = first_image.shape[1]
2✔
177
        channels = 1
2✔
178
        if len(first_image.shape) > 2: 
2✔
179
            channels = first_image.shape[2]
×
180

181
        # Extract mode and make sure it matches the supported type for this operation
182
        encoding = ImageData.ImageEncoding.from_dtype_and_channels(first_image.dtype, channels)
2✔
183
        if encoding != ImageData.ImageEncoding._32FC1:
2✔
184
            raise NotImplementedError(f"Only ImageData.ImageEncoding._32FC1 mode implemented for 'from_npy_files', not {encoding}")
×
185
        
186
        # Load the images as numpy arrays
187
        assert channels == 1
2✔
188
        images = np.zeros((len(all_image_files_sorted), height, width), dtype=np.float32)
2✔
189
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
2✔
190
        for i, path in enumerate(all_image_files_sorted):
2✔
191
            images[i] = np.load(path, 'r')
2✔
192
            pbar.update()
2✔
193

194
        # Return an ImageData class
195
        return cls(frame_id, timestamps_sorted, height, width, encoding, images)
2✔
196

197
    @classmethod
2✔
198
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str):
2✔
199
        """
200
        Creates a class structure from a folder with .png files, using the file names
201
        as the timestamps. This is the format that the HERCULES v1.4 dataset provides
202
        for image data.
203

204
        Args:
205
            image_folder_path (Path | str): Path to the folder with the images.
206
            frame_id (str): The frame where this image data was collected.
207
        Returns:
208
            ImageData: Instance of this class.
209
        """
210

211
        # Get all png files in the designated folder (sorted)
212
        all_image_files = [str(p) for p in Path(image_folder_path).glob("*.png")]
2✔
213

214
        # Extract the timestamps and sort them
215
        timestamps = col_to_dec_arr([s.split('/')[-1][:-4] for s in all_image_files])
2✔
216
        sorted_indices = np.argsort(timestamps)
2✔
217
        timestamps_sorted = timestamps[sorted_indices]
2✔
218

219
        # Use sorted_indices to sort all_image_files in the same way
220
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
2✔
221

222
        # Make sure the mode is what we expect
223
        with Image.open(all_image_files_sorted[0]) as first_image:
2✔
224
            encoding = ImageData.ImageEncoding.from_pillow_str(first_image.mode)
2✔
225
            if encoding != ImageData.ImageEncoding.RGB8 and encoding != ImageData.ImageEncoding.Mono8:
2✔
226
                raise NotImplementedError(f"Only RGB8 & Mono8 suppported for 'from_image_files', not \
×
227
                                        {encoding}")
228
        
229
        # Get dtype and channels based on the encoding
230
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
2✔
231

232
        # Define the image array shape
233
        if channels == 1:
2✔
234
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width)
2✔
235
        else: 
236
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width, channels)
2✔
237

238
        # Load the images as numpy arrays
239
        images = np.zeros(img_arr_shape, dtype=dtype)
2✔
240
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
2✔
241
        for i, path in enumerate(all_image_files_sorted):
2✔
242
            images[i] = np.array(Image.open(path), dtype=dtype)
2✔
243
            pbar.update()
2✔
244

245
        # Return an ImageData class
246
        return cls(frame_id, timestamps_sorted, first_image.height, first_image.width, encoding, images)
2✔
247
    
248
    # =========================================================================
249
    # ========================= Manipulation Methods ========================== 
250
    # =========================================================================  
251

252
    def downscale_by_factor(self, scale: int):
2✔
253
        """
254
        Scales down all images by the provided factor.
255

256
        Args:
257
            scale (int): The downscaling factor. Must evenly divide both height and width.
258
        """
259

260
        if self.height % scale != 0 or self.width % scale != 0:
×
261
            raise ValueError(f"Scale factor {scale} must evenly divide both height ({self.height}) and width ({self.width})")
×
262
        
263
        # Calculate new height/width
264
        self.height = self.height // scale
×
265
        self.width = self.width // scale
×
266

267
        # Ensure we're working with Mono8 data
268
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
269
            raise NotImplementedError(f"This method is only currently implemented for Mono8 data, not {self.encoding}!")
×
270

271
        # Determine the number of channels in the image
272
        if len(self.images.shape) == 4: channels = self.images.shape[3]
×
273
        else: channels = 1
×
274

275
        # Create a new array to hold the resized images
276
        if channels == 1:
×
277
            rescaled_images = np.zeros((self.len(), self.height, self.width), dtype=self.images.dtype)
×
278
        else:
279
            rescaled_images = np.zeros((self.len(), self.height, self.width, channels), dtype=self.images.dtype)
×
280
        
281
        # Resize each image
282
        for i in range(self.len()):
×
283
            rescaled_images[i] = cv2.resize(self.images[i], (self.width, self.height), interpolation=cv2.INTER_LINEAR)
×
284
        self.images = rescaled_images
×
285

286
    # =========================================================================
287
    # ============================ Export Methods ============================= 
288
    # ========================================================================= 
289

290
    def to_npy(self, output_folder_path: Union[Path, str]):
2✔
291
        """
292
        Saves each image in this ImageData into three files:
293
        
294
        - imgs.npy (with image data)
295
        - times.npy (with timestamps)
296
        - attributes.txt
297

298
        Args:
299
            output_folder_path (Path | str): The folder to save the .npy file at.
300
        """
301

302
        # Setup the output directory
303
        output_path = Path(output_folder_path)
2✔
304
        output_path.mkdir(parents=True, exist_ok=True)
2✔
305

306
        # Check that the encoding is supported
307
        if self.encoding != ImageData.ImageEncoding.RGB8 and self.encoding != ImageData.ImageEncoding._32FC1:
2✔
308
            raise NotImplementedError(f"Only RGB8 & 32FC1 images have been tested for export, not {self.encoding}")
×
309

310
        # Get dtype and channels
311
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(self.encoding)
2✔
312

313
        # Save images into memory-mapped array
314
        shape = (self.len(), self.height, self.width) if channels == 1 else (self.len(), self.height, self.width, channels)
2✔
315
        img_memmap = open_memmap(str(Path(output_folder_path) / "imgs.npy"), dtype=dtype, shape=shape, mode='w+')
2✔
316
        pbar = tqdm.tqdm(total=self.len(), desc="Saving Images...", unit=" images")
2✔
317
        for i in range(self.len()):
2✔
318
            img_memmap[i] = self.images[i]
2✔
319
            pbar.update()
2✔
320
        img_memmap.flush()
2✔
321

322
        # Save the timestamps
323
        np.save(str(Path(output_folder_path) / "times.npy"), self.timestamps.astype(np.float128), allow_pickle=False)
2✔
324

325
        # Save attributes
326
        with open(str(Path(output_folder_path) / "attributes.txt"), "w") as f:
2✔
327
            f.write(f"image_shape: ({self.height},{self.width})\n")
2✔
328
            f.write(f"frame_id: {self.frame_id}\n")
2✔
329
            f.write(f"height: {self.height}\n")
2✔
330
            f.write(f"width: {self.width}\n")
2✔
331
            f.write(f"encoding: {self.encoding}\n")
2✔
332

333
    def to_image_files(self, output_folder_path: Union[Path, str]):
2✔
334
        """
335
        Saves each image in this ImageData instance to the specified folder,
336
        using the timestamps as filenames in .png format (lossless compression).
337

338
        Args:
339
            output_folder_path (Path | str): The folder to save images into.
340
        """
341

342
        # Setup the output directory
343
        output_path = Path(output_folder_path)
×
344
        output_path.mkdir(parents=True, exist_ok=True)
×
345

346
        # Check that the encoding is Mono8
347
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
348
            raise NotImplementedError(f"Only Mono8 encoding currently supported for export, not {self.encoding}")
×
349

350
        # Setup a progress bar
351
        pbar = tqdm.tqdm(total=self.images.shape[0], desc="Saving Images...", unit=" images")
×
352

353
        # Save each image
354
        for i, timestamp in enumerate(self.timestamps):
×
355
            # Format timestamp to match input expectations
356
            filename = f"{timestamp:.9f}" + ".png"
×
357
            file_path = output_path / filename
×
358

359
            # Save as lossless PNG with default compression
360
            img = Image.fromarray(self.images[i], mode="L")
×
361
            img.save(file_path, format="PNG", compress_level=1)
×
362
            pbar.update()
×
363

364
        pbar.close()
×
365

366
    # =========================================================================
367
    # ============================ Image Decoding ============================= 
368
    # ========================================================================= 
369

370
    @staticmethod
2✔
371
    def _decode_image_msg(msg: object, encoding: ImageData.ImageEncoding, height: int, width: int):
2✔
372
        """
373
        Helper method that decodes image data from a ROS2 Image message.
374

375
        Args:
376
            msg (object): The ROS2 Image message.
377
            encoding (ImageEncoding): The encoding of the image data.
378
            height (int): Height of the image.
379
            width (int): Width of the image .
380
        """
381
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
2✔
382
        if channels > 1:
2✔
383
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width, channels)) 
2✔
384
        else:
385
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width))
2✔
386
        
387
    # =========================================================================
388
    # ======================= Multi ImageData Methods ========================= 
389
    # ========================================================================= 
390

391
    def compare_timestamps(self, other: ImageDataInMemory):
2✔
392
        """
393
        This method compares two ImageData objects based on the timestamps of their
394
        images.
395
        """
396

397
        # Find the locations in other where self timestamps would fit
398
        idxs = np.searchsorted(other.timestamps, self.timestamps, side='right')
×
399

400
        # Get the left indices and right indices
401
        idxs_right = np.clip(idxs, 0, len(other.timestamps)-1)
×
402
        idxs_left = np.clip(idxs - 1, 0, len(other.timestamps)-1)
×
403

404
        # Get distances to nearest on either side
405
        dists = np.minimum(np.abs(self.timestamps - other.timestamps[idxs_left]), 
×
406
                           np.abs(self.timestamps - other.timestamps[idxs_right]))
407
        
408
        # Print the mean and std of the distances
409
        print(f"Mean distance (left): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_left]))}")
×
410
        print(f"Mean distance (right): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_right]))}")
×
411
        print(f"Mean distance: {np.mean(dists)}")
×
412
        print(f"Std distance: {np.std(dists)}")
×
413

414
    
415
    def stereo_undistort_and_rectify(self: ImageDataInMemory, other: ImageDataInMemory,
2✔
416
            K1: np.ndarray, D1: np.ndarray, K2: np.ndarray, D2: np.ndarray, 
417
            R: np.ndarray, T: np.ndarray) -> Tuple[ImageDataInMemory, ImageDataInMemory, np.ndarray, np.ndarray]:
418
        """
419
        Undistort and rectify stereo images using stereo calibration parameters. 
420
        Note that self NEEDS to be the left stereo image sequence.
421

422
        Args:
423
            other (ImageData): The right stereo image sequence.
424
            K1, D1: Intrinsics and distortion for left camera.
425
            K2, D2: Intrinsics and distortion for right camera.
426
            R, T: Rotation and translation from left to right camera.
427

428
        Returns:
429
            Tuple[ImageData, ImageData, np.ndarray, np.ndarray]: 
430
                Rectified left and right ImageData, and new Instrinsics matrices for the left and right cameras.
431
        """
432

433
        # Make sure the ImageData sequences are compatible
434
        assert self.width == other.width and self.height == other.height and self.encoding == other.encoding, \
×
435
            "Left and right images must have the same resolution and encoding."
436

437
        # Find matching timestamps between self and other
438
        set_self = set(self.timestamps)
×
439
        set_other = set(other.timestamps)
×
440
        common_timestamps = sorted(set_self.intersection(set_other))
×
441
        if len(common_timestamps) == 0:
×
442
            raise ValueError("No matching timestamps between left and right images.")
×
443
        
444
        # Find indices of matching timestamps in each ImageData
445
        left_indices = [np.where(self.timestamps == ts)[0][0] for ts in common_timestamps]
×
446
        right_indices = [np.where(other.timestamps == ts)[0][0] for ts in common_timestamps]
×
447

448
        # Image size
449
        image_size = (self.width, self.height)
×
450

451
        # Compute rectification transforms
452
        R1, R2, P1, P2, Q, _, _ = cv2.stereoRectify(K1, D1, K2, D2, image_size, R, T, flags=cv2.CALIB_ZERO_DISPARITY, alpha=0)
×
453

454
        # Compute intrinsics of rectified imagery
455
        K1_new = P1[:, :3]
×
456
        K2_new = P2[:, :3]
×
457
        print("New left camera intrinsics after rectification:\n",  K1_new)
×
458
        print("New right camera intrinsics after rectification:\n", K2_new)
×
459
        print("Distortion coefficients after rectification are zero.")
×
460

461
        # Compute rectification maps
462
        map1_x, map1_y = cv2.initUndistortRectifyMap(K1, D1, R1, P1, image_size, cv2.CV_32FC1)
×
463
        map2_x, map2_y = cv2.initUndistortRectifyMap(K2, D2, R2, P2, image_size, cv2.CV_32FC1)
×
464

465
        # Allocate arrays for rectified images (only matching pairs)
466
        left_rectified = np.zeros((len(common_timestamps), self.height, self.width, *self.images.shape[3:]), dtype=self.images.dtype)
×
467
        right_rectified = np.zeros((len(common_timestamps), other.height, other.width, *other.images.shape[3:]), dtype=other.images.dtype)
×
468

469
        # Rectify/Undistort each image pair
470
        for i, (li, ri) in enumerate(tqdm.tqdm(zip(left_indices, right_indices), total=len(common_timestamps), desc="Rectifying stereo pairs")):
×
471
            left_rectified[i] = cv2.remap(self.images[li], map1_x, map1_y, interpolation=cv2.INTER_LINEAR)
×
472
            right_rectified[i] = cv2.remap(other.images[ri], map2_x, map2_y, interpolation=cv2.INTER_LINEAR)
×
473

474
        # Return new ImageData instances with rectified images and matched timestamps
475
        left = ImageDataInMemory(self.frame_id, np.array(common_timestamps), self.height, self.width, self.encoding, left_rectified)
×
NEW
476
        right = ImageDataInMemory(other.frame_id, np.array(common_timestamps), other.height, other.width, other.encoding, right_rectified)
×
NEW
477
        return left, right, K1_new, K2_new
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc