• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 16598293332

29 Jul 2025 01:59PM UTC coverage: 73.457% (+2.9%) from 70.588%
16598293332

push

github

web-flow
(v0.1.1) ImageData Improvements & PathData

151 of 181 new or added lines in 6 files covered. (83.43%)

1 existing line in 1 file now uncovered.

952 of 1296 relevant lines covered (73.46%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.52
/src/robotdataprocess/data_types/ImageData.py
1
from __future__ import annotations
1✔
2

3
from ..conversion_utils import convert_collection_into_decimal_array
1✔
4
import cv2
1✔
5
from .Data import Data
1✔
6
import decimal
1✔
7
from decimal import Decimal
1✔
8
from enum import Enum
1✔
9
import numpy as np
1✔
10
from numpy.lib.format import open_memmap
1✔
11
import os
1✔
12
from pathlib import Path
1✔
13
from PIL import Image
1✔
14
from ..rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
15
from rosbags.rosbag2 import Reader as Reader2
1✔
16
from rosbags.typesys import Stores, get_typestore
1✔
17
from rosbags.typesys.store import Typestore
1✔
18
from typeguard import typechecked
1✔
19
from typing import Tuple
1✔
20
import tqdm
1✔
21

22
class ImageData(Data):
1✔
23

24
    # Define image encodings enum
25
    class ImageEncoding(Enum):
1✔
26
        Mono8 = 0
1✔
27
        RGB8 = 1
1✔
28
        _32FC1 = 2
1✔
29

30
        # ================ Class Methods ================
31
        @classmethod
1✔
32
        def from_str(cls, encoding_str: str):
1✔
33
            if encoding_str == "ImageEncoding.Mono8":
1✔
34
                return cls.Mono8
×
35
            elif encoding_str == "ImageEncoding.RGB8":
1✔
36
                return cls.RGB8
1✔
37
            elif encoding_str == "ImageEncoding._32FC1":
1✔
38
                return cls._32FC1
1✔
39
            else:
40
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
41
        
42
        @classmethod
1✔
43
        def from_ros_str(cls, encoding_str: str):
1✔
44
            encoding_str = encoding_str.lower()
1✔
45
            if encoding_str == 'mono8':
1✔
NEW
46
                return cls.Mono8
×
47
            elif encoding_str == 'rgb8':
1✔
48
                return cls.RGB8
1✔
49
            elif encoding_str == "32fc1":
1✔
50
                return cls._32FC1
1✔
51
            else:
52
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
53
        
54
        @classmethod
1✔
55
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
1✔
56
            if dtype == np.uint8 and channels == 1:
1✔
NEW
57
                return cls.Mono8
×
58
            elif dtype == np.uint8 and channels == 3:
1✔
NEW
59
                return cls.RGB8
×
60
            elif dtype == np.float32 and channels == 1:
1✔
61
                return cls._32FC1
1✔
62
            else:
NEW
63
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
64
        
65
        @classmethod
1✔
66
        def from_pillow_str(cls, encoding_str: str):
1✔
67
            if encoding_str == "RGB":
1✔
68
                return cls.RGB8
1✔
69
            elif encoding_str == "L":
1✔
70
                return cls.Mono8
1✔
71
            else:
72
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
73
        
74
        # ================ Export Methods ================
75
        @staticmethod
1✔
76
        def to_ros_str(encoding: ImageData.ImageEncoding):
1✔
77
            if encoding == ImageData.ImageEncoding.Mono8:
1✔
78
                return 'mono8'
×
79
            elif encoding == ImageData.ImageEncoding.RGB8:
1✔
80
                return 'rgb8'
1✔
81
            elif encoding == ImageData.ImageEncoding._32FC1:
×
82
                return '32FC1'
×
83
            else:
84
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
85
        
86
        @staticmethod
1✔
87
        def to_dtype_and_channels(encoding):
1✔
88
            match encoding:
1✔
89
                case ImageData.ImageEncoding.Mono8:
1✔
90
                    return (np.uint8, 1)
1✔
91
                case ImageData.ImageEncoding.RGB8:
1✔
92
                    return (np.uint8, 3)
1✔
93
                case ImageData.ImageEncoding._32FC1:
1✔
94
                    return (np.float32, 1)
1✔
NEW
95
                case _:
×
NEW
96
                    raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
97
            
98

99

100
    # Define image-specific data attributes
101
    height: int
1✔
102
    width: int
1✔
103
    encoding: ImageEncoding
1✔
104
    images: np.ndarray
1✔
105

106
    @typechecked
1✔
107
    def __init__(self, frame_id: str, timestamps: np.ndarray | list, 
1✔
108
                 height: int, width: int, encoding: ImageData.ImageEncoding, images: np.ndarray):
109
        
110
        # Copy initial values into attributes
111
        super().__init__(frame_id, timestamps)
1✔
112
        self.height = height
1✔
113
        self.width = width
1✔
114
        self.encoding = encoding
1✔
115
        self.images = images
1✔
116

117
    # =========================================================================
118
    # ============================ Class Methods ============================== 
119
    # =========================================================================  
120

121
    @classmethod
1✔
122
    @typechecked
1✔
123
    def from_ros2_bag(cls, bag_path: Path | str, img_topic: str, save_folder: Path | str):
1✔
124
        """
125
        Creates a class structure from a ROS2 bag file with an Image topic. Will
126
        Also save all the data into .npy and .txt files as this is required if image
127
        data doesn't fit into the RAM.
128

129
        Args:
130
            bag_path (Path | str): Path to the ROS2 bag file.
131
            img_topic (str): Topic of the Image messages.
132
            save_folder (Path | str): Path to save class data into.
133
        Returns:
134
            ImageData: Instance of this class.
135
        """
136

137
        # Get topic message count and typestore
138
        bag_wrapper = Ros2BagWrapper(bag_path, None)
1✔
139
        typestore: Typestore = bag_wrapper.get_typestore()
1✔
140
        num_msgs: int = bag_wrapper.get_topic_count(img_topic)
1✔
141

142
        # Extract relevant image parameters
143
        image_shape, frame_id, height, width, encoding = None, None, None, None, None
1✔
144
        with Reader2(bag_path) as reader:
1✔
145
            connections = [x for x in reader.connections if x.topic == img_topic]
1✔
146
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
147
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
148
                frame_id = msg.header.frame_id
1✔
149
                height = msg.height
1✔
150
                width = msg.width
1✔
151
                encoding = ImageData.ImageEncoding.from_ros_str(msg.encoding)
1✔
152
                img = ImageData._decode_image_msg(msg, encoding, height, width)
1✔
153
                image_shape = img.shape
1✔
154
                break
1✔
155
        
156
        # Pre-allocate arrays (memory-mapped or otherwise)
157
        imgs_path = str(Path(save_folder) / "imgs.npy")
1✔
158
        os.makedirs(save_folder, exist_ok=True)
1✔
159
        img_memmap = open_memmap(imgs_path, dtype=img.dtype, shape=(num_msgs, *image_shape), mode='w+')
1✔
160
        timestamps_np = np.zeros(num_msgs, dtype=np.float128)
1✔
161

162
        # Setup tqdm bar
163
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Images...", unit=" msgs")
1✔
164

165
        # Extract the images/timestamps and save
166
        with Reader2(bag_path) as reader: 
1✔
167
            i = 0
1✔
168
            connections = [x for x in reader.connections if x.topic == img_topic]
1✔
169
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
170
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
171

172
                # Extract images (skipping malformed ones)
173
                img = None
1✔
174
                try:
1✔
175
                    img = ImageData._decode_image_msg(msg, encoding, height, width)
1✔
176
                except Exception as e:
×
177
                    print("Failure decoding image msg: ", e)
×
178
                if img is not None and img.shape == image_shape: 
1✔
179
                    img_memmap[i] = img
1✔
180

181
                # Extract timestamps
182
                ts = Ros2BagWrapper.extract_timestamp(msg)
1✔
183
                timestamps_np[i] = ts
1✔
184

185
                # Update the count
186
                i += 1
1✔
187
                pbar.update(1)
1✔
188

189
        # Write all images to disk and save timestamps and other data
190
        img_memmap.flush()
1✔
191
        np.save(str(Path(save_folder) / "times.npy"), timestamps_np, allow_pickle=False)
1✔
192
        with open(str(Path(save_folder) / "attributes.txt"), "w") as f:
1✔
193
            f.write(f"image_shape: {image_shape}\n")
1✔
194
            f.write(f"frame_id: {frame_id}\n")
1✔
195
            f.write(f"height: {height}\n")
1✔
196
            f.write(f"width: {width}\n")
1✔
197
            f.write(f"encoding: {encoding}\n")
1✔
198

199
        # Create an ImageData class
200
        return cls(frame_id, timestamps_np, height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
1✔
201
    
202
    @classmethod
1✔
203
    @typechecked
1✔
204
    def from_npy(cls, folder_path: Path | str):
1✔
205
        """
206
        Creates a class structure from .npy and .txt files (the ones written by from_ros2_bag()).
207

208
        Args:
209
            folder_path (Path | str): Path to the folder with:
210
                - imgs.npy
211
                - times.npy
212
                - attributes.txt
213
        Returns:
214
            ImageData: Instance of this class.
215
        """
216

217
        # Calculate other paths from folder path
218
        imgs_path = str(Path(folder_path) / "imgs.npy")
1✔
219
        ts_path = str(Path(folder_path) / "times.npy")
1✔
220
        attr_path = str(Path(folder_path) / "attributes.txt")
1✔
221

222
        # Read in the attributes
223
        attr_data = {}
1✔
224
        with open(attr_path, "r") as f:
1✔
225
            for line in f:
1✔
226
                key, val = line.strip().split(":", 1)
1✔
227
                attr_data[key.strip()] = val.strip()
1✔
228

229
        # Parse and assign values to variables
230
        frame_id = attr_data["frame_id"]
1✔
231
        height = int(attr_data["height"])
1✔
232
        width = int(attr_data["width"])
1✔
233
        encoding = ImageData.ImageEncoding.from_str(attr_data["encoding"])
1✔
234

235
        # Create an ImageData class
236
        return cls(frame_id, np.load(ts_path), height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
1✔
237

238
    @classmethod
1✔
239
    @typechecked
1✔
240
    def from_npy_files(cls, npy_folder_path: Path | str, frame_id: str):
1✔
241
        """
242
        Creates a class structure from .npy files, where each individual image
243
        is stored in an .npy file with the timestamp as the name
244

245
        Args:
246
            npy_folder_path (Path | str): Path to the folder with the npy images.
247
            frame_id (str): The frame where this image data was collected.
248
        Returns:
249
            ImageData: Instance of this class.
250
        """
251

252
        # Get all png files in the designated folder (sorted)
253
        all_image_files = [str(p) for p in Path(npy_folder_path).glob("*.npy")]
1✔
254

255
        # Extract the timestamps and sort them
256
        timestamps = convert_collection_into_decimal_array([s.split('/')[-1][:-4] for s in all_image_files])
1✔
257
        sorted_indices = np.argsort(timestamps)
1✔
258
        timestamps_sorted = timestamps[sorted_indices]
1✔
259

260
        # Use sorted_indices to sort all_image_files in the same way
261
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
1✔
262

263
        # Extract width, height, and channels
264
        first_image = np.load(all_image_files_sorted[0], 'r')
1✔
265
        assert len(first_image.shape) >= 2
1✔
266
        assert len(first_image.shape) < 4
1✔
267
        height = first_image.shape[0]
1✔
268
        width = first_image.shape[1]
1✔
269
        channels = 1
1✔
270
        if len(first_image.shape) > 2: 
1✔
NEW
271
            channels = first_image.shape[2]
×
272

273
        # Extract mode and make sure it matches the supported type for this operation
274
        encoding = ImageData.ImageEncoding.from_dtype_and_channels(first_image.dtype, channels)
1✔
275
        if encoding != ImageData.ImageEncoding._32FC1:
1✔
NEW
276
            raise NotImplementedError(f"Only ImageData.ImageEncoding._32FC1 mode implemented for 'from_npy_files', not {encoding}")
×
277
        
278
        # Load the images as numpy arrays
279
        assert channels == 1
1✔
280
        images = np.zeros((len(all_image_files_sorted), height, width), dtype=np.float32)
1✔
281
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
1✔
282
        for i, path in enumerate(all_image_files_sorted):
1✔
283
            images[i] = np.load(path, 'r')
1✔
284
            pbar.update()
1✔
285

286
        # Return an ImageData class
287
        return cls(frame_id, timestamps_sorted, height, width, encoding, images)
1✔
288

289
    @classmethod
1✔
290
    @typechecked
1✔
291
    def from_image_files(cls, image_folder_path: Path | str, frame_id: str):
1✔
292
        """
293
        Creates a class structure from a folder with .png files, using the file names
294
        as the timestamps. This is the format that the HERCULES v1.4 dataset provides
295
        for image data.
296

297
        Args:
298
            image_folder_path (Path | str): Path to the folder with the images.
299
            frame_id (str): The frame where this image data was collected.
300
        Returns:
301
            ImageData: Instance of this class.
302
        """
303

304
        # Get all png files in the designated folder (sorted)
305
        all_image_files = [str(p) for p in Path(image_folder_path).glob("*.png")]
1✔
306

307
        # Extract the timestamps and sort them
308
        timestamps = convert_collection_into_decimal_array([s.split('/')[-1][:-4] for s in all_image_files])
1✔
309
        sorted_indices = np.argsort(timestamps)
1✔
310
        timestamps_sorted = timestamps[sorted_indices]
1✔
311

312
        # Use sorted_indices to sort all_image_files in the same way
313
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
1✔
314

315
        # Make sure the mode is what we expect
316
        with Image.open(all_image_files_sorted[0]) as first_image:
1✔
317
            encoding = ImageData.ImageEncoding.from_pillow_str(first_image.mode)
1✔
318
            if encoding != ImageData.ImageEncoding.RGB8 and encoding != ImageData.ImageEncoding.Mono8:
1✔
NEW
319
                raise NotImplementedError(f"Only RGB8 & Mono8 suppported for 'from_image_files', not \
×
320
                                        {encoding}")
321
        
322
        # Get dtype and channels based on the encoding
323
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
1✔
324

325
        # Define the image array shape
326
        if channels == 1:
1✔
327
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width)
1✔
328
        else: 
329
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width, channels)
1✔
330

331
        # Load the images as numpy arrays
332
        images = np.zeros(img_arr_shape, dtype=dtype)
1✔
333
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
1✔
334
        for i, path in enumerate(all_image_files_sorted):
1✔
335
            images[i] = np.array(Image.open(path), dtype=dtype)
1✔
336
            pbar.update()
1✔
337

338
        # Return an ImageData class
339
        return cls(frame_id, timestamps_sorted, first_image.height, first_image.width, encoding, images)
1✔
340
    
341
    # =========================================================================
342
    # ========================= Manipulation Methods ========================== 
343
    # =========================================================================  
344

345
    @typechecked
1✔
346
    def downscale_by_factor(self, scale: int):
1✔
347
        """
348
        Scales down all images by the provided factor.
349

350
        Args:
351
            scale (int): The downscaling factor. Must evenly divide both height and width.
352
        """
353

NEW
354
        if self.height % scale != 0 or self.width % scale != 0:
×
NEW
355
            raise ValueError(f"Scale factor {scale} must evenly divide both height ({self.height}) and width ({self.width})")
×
356
        
357
        # Calculate new height/width
NEW
358
        self.height = self.height // scale
×
NEW
359
        self.width = self.width // scale
×
360

361
        # Ensure we're working with Mono8 data
NEW
362
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
NEW
363
            raise NotImplementedError(f"This method is only currently implemented for Mono8 data, not {self.encoding}!")
×
364

365
        # Determine the number of channels in the image
NEW
366
        if len(self.images.shape) == 4: channels = self.images.shape[3]
×
NEW
367
        else: channels = 1
×
368

369
        # Create a new array to hold the resized images
NEW
370
        if channels == 1:
×
NEW
371
            rescaled_images = np.zeros((self.len(), self.height, self.width), dtype=self.images.dtype)
×
372
        else:
NEW
373
            rescaled_images = np.zeros((self.len(), self.height, self.width, channels), dtype=self.images.dtype)
×
374
        
375
        # Resize each image
NEW
376
        for i in range(self.len()):
×
NEW
377
            rescaled_images[i] = cv2.resize(self.images[i], (self.width, self.height), interpolation=cv2.INTER_LINEAR)
×
NEW
378
        self.images = rescaled_images
×
379

380
    # =========================================================================
381
    # ============================ Export Methods ============================= 
382
    # ========================================================================= 
383

384
    @typechecked
1✔
385
    def to_npy(self, output_folder_path: Path | str):
1✔
386
        """
387
        Saves each image in this ImageData into three files:
388
        - imgs.npy (with image data)
389
        - times.npy (with timestamps)
390
        - attributes.txt
391

392
        Args:
393
            output_folder_path (Path | str): The folder to save the .npy file at.
394
        """
395

396
        # Setup the output directory
397
        output_path = Path(output_folder_path)
1✔
398
        output_path.mkdir(parents=True, exist_ok=True)
1✔
399

400
        # Check that the encoding is supported
401
        if self.encoding != ImageData.ImageEncoding.RGB8 and self.encoding != ImageData.ImageEncoding._32FC1:
1✔
NEW
402
            raise NotImplementedError(f"Only RGB8 & 32FC1 images have been tested for export, not {self.encoding}")
×
403

404
        # Get dtype and channels
405
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(self.encoding)
1✔
406

407
        # Save images into memory-mapped array
408
        shape = (self.len(), self.height, self.width) if channels == 1 else (self.len(), self.height, self.width, channels)
1✔
409
        img_memmap = open_memmap(str(Path(output_folder_path) / "imgs.npy"), dtype=dtype, shape=shape, mode='w+')
1✔
410
        pbar = tqdm.tqdm(total=self.len(), desc="Saving Images...", unit=" images")
1✔
411
        for i in range(self.len()):
1✔
412
            img_memmap[i] = self.images[i]
1✔
413
            pbar.update()
1✔
414
        img_memmap.flush()
1✔
415

416
        # Save the timestamps
417
        np.save(str(Path(output_folder_path) / "times.npy"), self.timestamps.astype(np.float128), allow_pickle=False)
1✔
418

419
        # Save attributes
420
        with open(str(Path(output_folder_path) / "attributes.txt"), "w") as f:
1✔
421
            f.write(f"image_shape: ({self.height},{self.width})\n")
1✔
422
            f.write(f"frame_id: {self.frame_id}\n")
1✔
423
            f.write(f"height: {self.height}\n")
1✔
424
            f.write(f"width: {self.width}\n")
1✔
425
            f.write(f"encoding: {self.encoding}\n")
1✔
426

427

428
    @typechecked
1✔
429
    def to_image_files(self, output_folder_path: Path | str):
1✔
430
        """
431
        Saves each image in this ImageData instance to the specified folder,
432
        using the timestamps as filenames in .png format (lossless compression).
433

434
        Args:
435
            output_folder_path (Path | str): The folder to save images into.
436
        """
437

438
        # Setup the output directory
439
        output_path = Path(output_folder_path)
×
440
        output_path.mkdir(parents=True, exist_ok=True)
×
441

442
        # Check that the encoding is Mono8
443
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
444
            raise NotImplementedError(f"Only Mono8 encoding currently supported for export, not {self.encoding}")
×
445

446
        # Setup a progress bar
447
        pbar = tqdm.tqdm(total=self.images.shape[0], desc="Saving Images...", unit=" images")
×
448

449
        # Save each image
450
        for i, timestamp in enumerate(self.timestamps):
×
451
            # Format timestamp to match input expectations
452
            filename = f"{timestamp:.9f}" + ".png"
×
453
            file_path = output_path / filename
×
454

455
            # Save as lossless PNG with default compression
456
            img = Image.fromarray(self.images[i], mode="L")
×
457
            img.save(file_path, format="PNG", compress_level=1)
×
458
            pbar.update()
×
459

460
        pbar.close()
×
461

462
    # =========================================================================
463
    # ============================ Image Decoding ============================= 
464
    # ========================================================================= 
465

466
    @staticmethod
1✔
467
    @typechecked
1✔
468
    def _decode_image_msg(msg: object, encoding: ImageData.ImageEncoding, height: int, width: int):
1✔
469
        """
470
        Helper method that decodes image data from a ROS2 Image message.
471

472
        Args:
473
            msg (object): The ROS2 Image message.
474
            encoding (ImageEncoding): The encoding of the image data.
475
            height (int): Height of the image.
476
            width (int): Width of the image .
477
        """
478
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
1✔
479
        if channels > 1:
1✔
480
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width, channels)) 
1✔
481
        else:
482
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width))
1✔
483
        
484
    # =========================================================================
485
    # ======================= Multi ImageData Methods ========================= 
486
    # ========================================================================= 
487

488
    def compare_timestamps(self, other: ImageData):
1✔
489
        """
490
        This method compares two ImageData objects based on the timestamps of their
491
        images.
492
        """
493

494
        # Find the locations in other where self timestamps would fit
495
        idxs = np.searchsorted(other.timestamps, self.timestamps, side='right')
×
496

497
        # Get the left indices and right indices
498
        idxs_right = np.clip(idxs, 0, len(other.timestamps)-1)
×
499
        idxs_left = np.clip(idxs - 1, 0, len(other.timestamps)-1)
×
500

501
        # Get distances to nearest on either side
502
        dists = np.minimum(np.abs(self.timestamps - other.timestamps[idxs_left]), 
×
503
                           np.abs(self.timestamps - other.timestamps[idxs_right]))
504
        
505
        # Print the mean and std of the distances
506
        print(f"Mean distance (left): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_left]))}")
×
507
        print(f"Mean distance (right): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_right]))}")
×
508
        print(f"Mean distance: {np.mean(dists)}")
×
509
        print(f"Std distance: {np.std(dists)}")
×
510

511
    @typechecked
1✔
512
    def stereo_undistort_and_rectify(self: ImageData, other: ImageData,
1✔
513
            K1: np.ndarray, D1: np.ndarray, K2: np.ndarray, D2: np.ndarray, 
514
            R: np.ndarray, T: np.ndarray) -> Tuple[ImageData, ImageData, np.ndarray, np.ndarray]:
515
        """
516
        Undistort and rectify stereo images using stereo calibration parameters. 
517
        Note that self NEEDS to be the left stereo image sequence.
518

519
        Args:
520
            other (ImageData): The right stereo image sequence.
521
            K1, D1: Intrinsics and distortion for left camera.
522
            K2, D2: Intrinsics and distortion for right camera.
523
            R, T: Rotation and translation from left to right camera.
524

525
        Returns:
526
            Tuple[ImageData, ImageData, np.ndarray, np.ndarray]: Rectified left and right 
527
                ImageData, and new Instrinsics matrices for the left and right cameras.
528
        """
529

530
        # Make sure the ImageData sequences are compatible
531
        assert self.width == other.width and self.height == other.height and self.encoding == other.encoding, \
×
532
            "Left and right images must have the same resolution and encoding."
533

534
        # Find matching timestamps between self and other
535
        set_self = set(self.timestamps)
×
536
        set_other = set(other.timestamps)
×
537
        common_timestamps = sorted(set_self.intersection(set_other))
×
538
        if len(common_timestamps) == 0:
×
539
            raise ValueError("No matching timestamps between left and right images.")
×
540
        
541
        # Find indices of matching timestamps in each ImageData
542
        left_indices = [np.where(self.timestamps == ts)[0][0] for ts in common_timestamps]
×
543
        right_indices = [np.where(other.timestamps == ts)[0][0] for ts in common_timestamps]
×
544

545
        # Image size
546
        image_size = (self.width, self.height)
×
547

548
        # Compute rectification transforms
549
        R1, R2, P1, P2, Q, _, _ = cv2.stereoRectify(K1, D1, K2, D2, image_size, R, T, flags=cv2.CALIB_ZERO_DISPARITY, alpha=0)
×
550

551
        # Compute intrinsics of rectified imagery
552
        K1_new = P1[:, :3]
×
553
        K2_new = P2[:, :3]
×
554
        print("New left camera intrinsics after rectification:\n",  K1_new)
×
555
        print("New right camera intrinsics after rectification:\n", K2_new)
×
556
        print("Distortion coefficients after rectification are zero.")
×
557

558
        # Compute rectification maps
559
        map1_x, map1_y = cv2.initUndistortRectifyMap(K1, D1, R1, P1, image_size, cv2.CV_32FC1)
×
560
        map2_x, map2_y = cv2.initUndistortRectifyMap(K2, D2, R2, P2, image_size, cv2.CV_32FC1)
×
561

562
        # Allocate arrays for rectified images (only matching pairs)
563
        left_rectified = np.zeros((len(common_timestamps), self.height, self.width, *self.images.shape[3:]), dtype=self.images.dtype)
×
564
        right_rectified = np.zeros((len(common_timestamps), other.height, other.width, *other.images.shape[3:]), dtype=other.images.dtype)
×
565

566
        # Rectify/Undistort each image pair
567
        for i, (li, ri) in enumerate(tqdm.tqdm(zip(left_indices, right_indices), total=len(common_timestamps), desc="Rectifying stereo pairs")):
×
568
            left_rectified[i] = cv2.remap(self.images[li], map1_x, map1_y, interpolation=cv2.INTER_LINEAR)
×
569
            right_rectified[i] = cv2.remap(other.images[ri], map2_x, map2_y, interpolation=cv2.INTER_LINEAR)
×
570

571
        # Return new ImageData instances with rectified images and matched timestamps
572
        left = ImageData(self.frame_id, np.array(common_timestamps), self.height, self.width, self.encoding, left_rectified)
×
573
        right = ImageData(other.frame_id, np.array(common_timestamps), other.height, other.width, other.encoding, right_rectified)
×
574
        return left, right, K1_new, K2_new
×
575

576

577
    # =========================================================================
578
    # =========================== Conversion to ROS =========================== 
579
    # ========================================================================= 
580

581
    @typechecked
1✔
582
    @staticmethod
1✔
583
    def get_ros_msg_type() -> str:
1✔
584
        """ Return the __msgtype__ for an Image msg. """
585
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
586
        return typestore.types['sensor_msgs/msg/Image'].__msgtype__
1✔
587

588
    @typechecked
1✔
589
    def get_ros_msg(self, i: int):
1✔
590
        """
591
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
592
        
593
        Args:
594
            i (int): The index of the image message to convert.
595
        Raises:
596
            ValueError: If i is outside the data bounds.
597
        """
598

599
        # Check to make sure index is within data bounds
600
        if i < 0 or i >= self.len():
1✔
601
            raise ValueError(f"Index {i} is out of bounds!")
×
602

603
        # Get ROS2  message classes
604
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
605
        Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
1✔
606

607
        # Calculate the step
608
        if self.encoding == ImageData.ImageEncoding.RGB8:
1✔
609
            step = 3 * self.width
1✔
610
        elif self.encoding == ImageData.ImageEncoding._32FC1:
×
611
            step = 4 * self.width
×
612

613
        # Get the seconds and nanoseconds
614
        seconds = int(self.timestamps[i])
1✔
615
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
1✔
616

617
        # Write the data into the new class
618
        return Image(Header(stamp=Time(sec=int(seconds), 
1✔
619
                                       nanosec=int(nanoseconds)), 
620
                            frame_id=self.frame_id),
621
                    height=self.height, 
622
                    width=self.width, 
623
                    encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
624
                    is_bigendian=0, 
625
                    step=step, 
626
                    data=self.images[i].flatten())
627
        
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc