• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20172884498

12 Dec 2025 04:18PM UTC coverage: 74.672% (+1.2%) from 73.457%
20172884498

push

github

web-flow
(v0.1.2) Path Metrics & Transformations, Python >=3.8 Support

269 of 364 new or added lines in 8 files covered. (73.9%)

6 existing lines in 4 files now uncovered.

1082 of 1449 relevant lines covered (74.67%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.32
/src/robotdataprocess/data_types/ImageData.py
1
from __future__ import annotations
1✔
2

3
from ..conversion_utils import col_to_dec_arr
1✔
4
import cv2
1✔
5
from .Data import Data
1✔
6
import decimal
1✔
7
from decimal import Decimal
1✔
8
from enum import Enum
1✔
9
import numpy as np
1✔
10
from numpy.lib.format import open_memmap
1✔
11
import os
1✔
12
from pathlib import Path
1✔
13
from PIL import Image
1✔
14
from ..rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
15
from rosbags.rosbag2 import Reader as Reader2
1✔
16
from rosbags.typesys import Stores, get_typestore
1✔
17
from rosbags.typesys.store import Typestore
1✔
18
from typeguard import typechecked
1✔
19
from typing import Tuple, Union
1✔
20
import tqdm
1✔
21

22
class ImageData(Data):
1✔
23

24
    # Define image encodings enum
25
    class ImageEncoding(Enum):
1✔
26
        Mono8 = 0
1✔
27
        RGB8 = 1
1✔
28
        _32FC1 = 2
1✔
29

30
        # ================ Class Methods ================
31
        @classmethod
1✔
32
        def from_str(cls, encoding_str: str):
1✔
33
            if encoding_str == "ImageEncoding.Mono8":
1✔
34
                return cls.Mono8
×
35
            elif encoding_str == "ImageEncoding.RGB8":
1✔
36
                return cls.RGB8
1✔
37
            elif encoding_str == "ImageEncoding._32FC1":
1✔
38
                return cls._32FC1
1✔
39
            else:
40
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
41
        
42
        @classmethod
1✔
43
        def from_ros_str(cls, encoding_str: str):
1✔
44
            encoding_str = encoding_str.lower()
1✔
45
            if encoding_str == 'mono8':
1✔
46
                return cls.Mono8
×
47
            elif encoding_str == 'rgb8':
1✔
48
                return cls.RGB8
1✔
49
            elif encoding_str == "32fc1":
1✔
50
                return cls._32FC1
1✔
51
            else:
52
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
53
        
54
        @classmethod
1✔
55
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
1✔
56
            if dtype == np.uint8 and channels == 1:
1✔
57
                return cls.Mono8
×
58
            elif dtype == np.uint8 and channels == 3:
1✔
59
                return cls.RGB8
×
60
            elif dtype == np.float32 and channels == 1:
1✔
61
                return cls._32FC1
1✔
62
            else:
63
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
64
        
65
        @classmethod
1✔
66
        def from_pillow_str(cls, encoding_str: str):
1✔
67
            if encoding_str == "RGB":
1✔
68
                return cls.RGB8
1✔
69
            elif encoding_str == "L":
1✔
70
                return cls.Mono8
1✔
71
            else:
72
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
73
        
74
        # ================ Export Methods ================
75
        @staticmethod
1✔
76
        def to_ros_str(encoding: ImageData.ImageEncoding):
1✔
77
            if encoding == ImageData.ImageEncoding.Mono8:
1✔
78
                return 'mono8'
×
79
            elif encoding == ImageData.ImageEncoding.RGB8:
1✔
80
                return 'rgb8'
1✔
81
            elif encoding == ImageData.ImageEncoding._32FC1:
1✔
82
                return '32FC1'
1✔
83
            else:
84
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
85
        
86
        @staticmethod
1✔
87
        def to_dtype_and_channels(encoding):
1✔
88
            if encoding == ImageData.ImageEncoding.Mono8:
1✔
89
                return (np.uint8, 1)
1✔
90
            elif encoding == ImageData.ImageEncoding.RGB8:
1✔
91
                return (np.uint8, 3)
1✔
92
            elif encoding == ImageData.ImageEncoding._32FC1:
1✔
93
                return (np.float32, 1)
1✔
94
            else:
NEW
95
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
96
            
97

98

99
    # Define image-specific data attributes
100
    height: int
1✔
101
    width: int
1✔
102
    encoding: ImageEncoding
1✔
103
    images: np.ndarray
1✔
104

105
    @typechecked
1✔
106
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], 
1✔
107
                 height: int, width: int, encoding: ImageData.ImageEncoding, images: np.ndarray):
108
        
109
        # Copy initial values into attributes
110
        super().__init__(frame_id, timestamps)
1✔
111
        self.height = height
1✔
112
        self.width = width
1✔
113
        self.encoding = encoding
1✔
114
        self.images = images
1✔
115

116
    # =========================================================================
117
    # ============================ Class Methods ============================== 
118
    # =========================================================================  
119

120
    @classmethod
1✔
121
    @typechecked
1✔
122
    def from_ros2_bag(cls, bag_path: Union[Path, str], img_topic: str, save_folder: Union[Path, str]):
1✔
123
        """
124
        Creates a class structure from a ROS2 bag file with an Image topic. Will
125
        Also save all the data into .npy and .txt files as this is required if image
126
        data doesn't fit into the RAM.
127

128
        Args:
129
            bag_path (Path | str): Path to the ROS2 bag file.
130
            img_topic (str): Topic of the Image messages.
131
            save_folder (Path | str): Path to save class data into.
132
        Returns:
133
            ImageData: Instance of this class.
134
        """
135

136
        # Get topic message count and typestore
137
        bag_wrapper = Ros2BagWrapper(bag_path, None)
1✔
138
        typestore: Typestore = bag_wrapper.get_typestore()
1✔
139
        num_msgs: int = bag_wrapper.get_topic_count(img_topic)
1✔
140

141
        # Extract relevant image parameters
142
        image_shape, frame_id, height, width, encoding = None, None, None, None, None
1✔
143
        with Reader2(bag_path) as reader:
1✔
144
            connections = [x for x in reader.connections if x.topic == img_topic]
1✔
145
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
146
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
147
                frame_id = msg.header.frame_id
1✔
148
                height = msg.height
1✔
149
                width = msg.width
1✔
150
                encoding = ImageData.ImageEncoding.from_ros_str(msg.encoding)
1✔
151
                img = ImageData._decode_image_msg(msg, encoding, height, width)
1✔
152
                image_shape = img.shape
1✔
153
                break
1✔
154
        
155
        # Pre-allocate arrays (memory-mapped or otherwise)
156
        imgs_path = str(Path(save_folder) / "imgs.npy")
1✔
157
        os.makedirs(save_folder, exist_ok=True)
1✔
158
        img_memmap = open_memmap(imgs_path, dtype=img.dtype, shape=(num_msgs, *image_shape), mode='w+')
1✔
159
        timestamps_np = np.zeros(num_msgs, dtype=np.float128)
1✔
160

161
        # Setup tqdm bar
162
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Images...", unit=" msgs")
1✔
163

164
        # Extract the images/timestamps and save
165
        with Reader2(bag_path) as reader: 
1✔
166
            i = 0
1✔
167
            connections = [x for x in reader.connections if x.topic == img_topic]
1✔
168
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
169
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
170

171
                # Extract images (skipping malformed ones)
172
                img = None
1✔
173
                try:
1✔
174
                    img = ImageData._decode_image_msg(msg, encoding, height, width)
1✔
175
                except Exception as e:
×
176
                    print("Failure decoding image msg: ", e)
×
177
                if img is not None and img.shape == image_shape: 
1✔
178
                    img_memmap[i] = img
1✔
179

180
                # Extract timestamps
181
                ts = Ros2BagWrapper.extract_timestamp(msg)
1✔
182
                timestamps_np[i] = ts
1✔
183

184
                # Update the count
185
                i += 1
1✔
186
                pbar.update(1)
1✔
187

188
        # Write all images to disk and save timestamps and other data
189
        img_memmap.flush()
1✔
190
        np.save(str(Path(save_folder) / "times.npy"), timestamps_np, allow_pickle=False)
1✔
191
        with open(str(Path(save_folder) / "attributes.txt"), "w") as f:
1✔
192
            f.write(f"image_shape: {image_shape}\n")
1✔
193
            f.write(f"frame_id: {frame_id}\n")
1✔
194
            f.write(f"height: {height}\n")
1✔
195
            f.write(f"width: {width}\n")
1✔
196
            f.write(f"encoding: {encoding}\n")
1✔
197

198
        # Create an ImageData class
199
        return cls(frame_id, timestamps_np, height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
1✔
200
    
201
    @classmethod
1✔
202
    @typechecked
1✔
203
    def from_npy(cls, folder_path: Union[Path, str]):
1✔
204
        """
205
        Creates a class structure from .npy and .txt files (the ones written by from_ros2_bag()).
206

207
        Args:
208
            folder_path (Path | str): Path to the folder with:
209
                - imgs.npy
210
                - times.npy
211
                - attributes.txt
212
        Returns:
213
            ImageData: Instance of this class.
214
        """
215

216
        # Calculate other paths from folder path
217
        imgs_path = str(Path(folder_path) / "imgs.npy")
1✔
218
        ts_path = str(Path(folder_path) / "times.npy")
1✔
219
        attr_path = str(Path(folder_path) / "attributes.txt")
1✔
220

221
        # Read in the attributes
222
        attr_data = {}
1✔
223
        with open(attr_path, "r") as f:
1✔
224
            for line in f:
1✔
225
                key, val = line.strip().split(":", 1)
1✔
226
                attr_data[key.strip()] = val.strip()
1✔
227

228
        # Parse and assign values to variables
229
        frame_id = attr_data["frame_id"]
1✔
230
        height = int(attr_data["height"])
1✔
231
        width = int(attr_data["width"])
1✔
232
        encoding = ImageData.ImageEncoding.from_str(attr_data["encoding"])
1✔
233

234
        # Create an ImageData class
235
        return cls(frame_id, np.load(ts_path), height, width, encoding, np.load(imgs_path, mmap_mode='r+'))
1✔
236

237
    @classmethod
1✔
238
    @typechecked
1✔
239
    def from_npy_files(cls, npy_folder_path: Union[Path, str], frame_id: str):
1✔
240
        """
241
        Creates a class structure from .npy files, where each individual image
242
        is stored in an .npy file with the timestamp as the name
243

244
        Args:
245
            npy_folder_path (Path | str): Path to the folder with the npy images.
246
            frame_id (str): The frame where this image data was collected.
247
        Returns:
248
            ImageData: Instance of this class.
249
        """
250

251
        # Get all png files in the designated folder (sorted)
252
        all_image_files = [str(p) for p in Path(npy_folder_path).glob("*.npy")]
1✔
253

254
        # Extract the timestamps and sort them
255
        timestamps = col_to_dec_arr([s.split('/')[-1][:-4] for s in all_image_files])
1✔
256
        sorted_indices = np.argsort(timestamps)
1✔
257
        timestamps_sorted = timestamps[sorted_indices]
1✔
258

259
        # Use sorted_indices to sort all_image_files in the same way
260
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
1✔
261

262
        # Extract width, height, and channels
263
        first_image = np.load(all_image_files_sorted[0], 'r')
1✔
264
        assert len(first_image.shape) >= 2
1✔
265
        assert len(first_image.shape) < 4
1✔
266
        height = first_image.shape[0]
1✔
267
        width = first_image.shape[1]
1✔
268
        channels = 1
1✔
269
        if len(first_image.shape) > 2: 
1✔
270
            channels = first_image.shape[2]
×
271

272
        # Extract mode and make sure it matches the supported type for this operation
273
        encoding = ImageData.ImageEncoding.from_dtype_and_channels(first_image.dtype, channels)
1✔
274
        if encoding != ImageData.ImageEncoding._32FC1:
1✔
275
            raise NotImplementedError(f"Only ImageData.ImageEncoding._32FC1 mode implemented for 'from_npy_files', not {encoding}")
×
276
        
277
        # Load the images as numpy arrays
278
        assert channels == 1
1✔
279
        images = np.zeros((len(all_image_files_sorted), height, width), dtype=np.float32)
1✔
280
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
1✔
281
        for i, path in enumerate(all_image_files_sorted):
1✔
282
            images[i] = np.load(path, 'r')
1✔
283
            pbar.update()
1✔
284

285
        # Return an ImageData class
286
        return cls(frame_id, timestamps_sorted, height, width, encoding, images)
1✔
287

288
    @classmethod
1✔
289
    @typechecked
1✔
290
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str):
1✔
291
        """
292
        Creates a class structure from a folder with .png files, using the file names
293
        as the timestamps. This is the format that the HERCULES v1.4 dataset provides
294
        for image data.
295

296
        Args:
297
            image_folder_path (Path | str): Path to the folder with the images.
298
            frame_id (str): The frame where this image data was collected.
299
        Returns:
300
            ImageData: Instance of this class.
301
        """
302

303
        # Get all png files in the designated folder (sorted)
304
        all_image_files = [str(p) for p in Path(image_folder_path).glob("*.png")]
1✔
305

306
        # Extract the timestamps and sort them
307
        timestamps = col_to_dec_arr([s.split('/')[-1][:-4] for s in all_image_files])
1✔
308
        sorted_indices = np.argsort(timestamps)
1✔
309
        timestamps_sorted = timestamps[sorted_indices]
1✔
310

311
        # Use sorted_indices to sort all_image_files in the same way
312
        all_image_files_sorted = [all_image_files[i] for i in sorted_indices]
1✔
313

314
        # Make sure the mode is what we expect
315
        with Image.open(all_image_files_sorted[0]) as first_image:
1✔
316
            encoding = ImageData.ImageEncoding.from_pillow_str(first_image.mode)
1✔
317
            if encoding != ImageData.ImageEncoding.RGB8 and encoding != ImageData.ImageEncoding.Mono8:
1✔
318
                raise NotImplementedError(f"Only RGB8 & Mono8 suppported for 'from_image_files', not \
×
319
                                        {encoding}")
320
        
321
        # Get dtype and channels based on the encoding
322
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
1✔
323

324
        # Define the image array shape
325
        if channels == 1:
1✔
326
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width)
1✔
327
        else: 
328
            img_arr_shape = (len(all_image_files_sorted), first_image.height, first_image.width, channels)
1✔
329

330
        # Load the images as numpy arrays
331
        images = np.zeros(img_arr_shape, dtype=dtype)
1✔
332
        pbar = tqdm.tqdm(total=len(all_image_files_sorted), desc="Extracting Images...", unit=" images")
1✔
333
        for i, path in enumerate(all_image_files_sorted):
1✔
334
            images[i] = np.array(Image.open(path), dtype=dtype)
1✔
335
            pbar.update()
1✔
336

337
        # Return an ImageData class
338
        return cls(frame_id, timestamps_sorted, first_image.height, first_image.width, encoding, images)
1✔
339
    
340
    # =========================================================================
341
    # ========================= Manipulation Methods ========================== 
342
    # =========================================================================  
343

344
    @typechecked
1✔
345
    def downscale_by_factor(self, scale: int):
1✔
346
        """
347
        Scales down all images by the provided factor.
348

349
        Args:
350
            scale (int): The downscaling factor. Must evenly divide both height and width.
351
        """
352

353
        if self.height % scale != 0 or self.width % scale != 0:
×
354
            raise ValueError(f"Scale factor {scale} must evenly divide both height ({self.height}) and width ({self.width})")
×
355
        
356
        # Calculate new height/width
357
        self.height = self.height // scale
×
358
        self.width = self.width // scale
×
359

360
        # Ensure we're working with Mono8 data
361
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
362
            raise NotImplementedError(f"This method is only currently implemented for Mono8 data, not {self.encoding}!")
×
363

364
        # Determine the number of channels in the image
365
        if len(self.images.shape) == 4: channels = self.images.shape[3]
×
366
        else: channels = 1
×
367

368
        # Create a new array to hold the resized images
369
        if channels == 1:
×
370
            rescaled_images = np.zeros((self.len(), self.height, self.width), dtype=self.images.dtype)
×
371
        else:
372
            rescaled_images = np.zeros((self.len(), self.height, self.width, channels), dtype=self.images.dtype)
×
373
        
374
        # Resize each image
375
        for i in range(self.len()):
×
376
            rescaled_images[i] = cv2.resize(self.images[i], (self.width, self.height), interpolation=cv2.INTER_LINEAR)
×
377
        self.images = rescaled_images
×
378

379
    def crop_data(self, start: Decimal, end: Decimal):
1✔
380
        """ Will crop the data so only values within [start, end] inclusive are kept. """
381

382
        # Create boolean mask of data to keep
383
        mask = (self.timestamps >= start) & (self.timestamps <= end)
1✔
384

385
        # Apply mask
386
        self.timestamps = self.timestamps[mask]
1✔
387
        self.images = self.images[mask]
1✔
388

389
    # =========================================================================
390
    # ============================ Export Methods ============================= 
391
    # ========================================================================= 
392

393
    @typechecked
1✔
394
    def to_npy(self, output_folder_path: Union[Path, str]):
1✔
395
        """
396
        Saves each image in this ImageData into three files:
397
        
398
        - imgs.npy (with image data)
399
        - times.npy (with timestamps)
400
        - attributes.txt
401

402
        Args:
403
            output_folder_path (Path | str): The folder to save the .npy file at.
404
        """
405

406
        # Setup the output directory
407
        output_path = Path(output_folder_path)
1✔
408
        output_path.mkdir(parents=True, exist_ok=True)
1✔
409

410
        # Check that the encoding is supported
411
        if self.encoding != ImageData.ImageEncoding.RGB8 and self.encoding != ImageData.ImageEncoding._32FC1:
1✔
412
            raise NotImplementedError(f"Only RGB8 & 32FC1 images have been tested for export, not {self.encoding}")
×
413

414
        # Get dtype and channels
415
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(self.encoding)
1✔
416

417
        # Save images into memory-mapped array
418
        shape = (self.len(), self.height, self.width) if channels == 1 else (self.len(), self.height, self.width, channels)
1✔
419
        img_memmap = open_memmap(str(Path(output_folder_path) / "imgs.npy"), dtype=dtype, shape=shape, mode='w+')
1✔
420
        pbar = tqdm.tqdm(total=self.len(), desc="Saving Images...", unit=" images")
1✔
421
        for i in range(self.len()):
1✔
422
            img_memmap[i] = self.images[i]
1✔
423
            pbar.update()
1✔
424
        img_memmap.flush()
1✔
425

426
        # Save the timestamps
427
        np.save(str(Path(output_folder_path) / "times.npy"), self.timestamps.astype(np.float128), allow_pickle=False)
1✔
428

429
        # Save attributes
430
        with open(str(Path(output_folder_path) / "attributes.txt"), "w") as f:
1✔
431
            f.write(f"image_shape: ({self.height},{self.width})\n")
1✔
432
            f.write(f"frame_id: {self.frame_id}\n")
1✔
433
            f.write(f"height: {self.height}\n")
1✔
434
            f.write(f"width: {self.width}\n")
1✔
435
            f.write(f"encoding: {self.encoding}\n")
1✔
436

437

438
    @typechecked
1✔
439
    def to_image_files(self, output_folder_path: Union[Path, str]):
1✔
440
        """
441
        Saves each image in this ImageData instance to the specified folder,
442
        using the timestamps as filenames in .png format (lossless compression).
443

444
        Args:
445
            output_folder_path (Path | str): The folder to save images into.
446
        """
447

448
        # Setup the output directory
449
        output_path = Path(output_folder_path)
×
450
        output_path.mkdir(parents=True, exist_ok=True)
×
451

452
        # Check that the encoding is Mono8
453
        if self.encoding != ImageData.ImageEncoding.Mono8:
×
454
            raise NotImplementedError(f"Only Mono8 encoding currently supported for export, not {self.encoding}")
×
455

456
        # Setup a progress bar
457
        pbar = tqdm.tqdm(total=self.images.shape[0], desc="Saving Images...", unit=" images")
×
458

459
        # Save each image
460
        for i, timestamp in enumerate(self.timestamps):
×
461
            # Format timestamp to match input expectations
462
            filename = f"{timestamp:.9f}" + ".png"
×
463
            file_path = output_path / filename
×
464

465
            # Save as lossless PNG with default compression
466
            img = Image.fromarray(self.images[i], mode="L")
×
467
            img.save(file_path, format="PNG", compress_level=1)
×
468
            pbar.update()
×
469

470
        pbar.close()
×
471

472
    # =========================================================================
473
    # ============================ Image Decoding ============================= 
474
    # ========================================================================= 
475

476
    @staticmethod
1✔
477
    @typechecked
1✔
478
    def _decode_image_msg(msg: object, encoding: ImageData.ImageEncoding, height: int, width: int):
1✔
479
        """
480
        Helper method that decodes image data from a ROS2 Image message.
481

482
        Args:
483
            msg (object): The ROS2 Image message.
484
            encoding (ImageEncoding): The encoding of the image data.
485
            height (int): Height of the image.
486
            width (int): Width of the image .
487
        """
488
        dtype, channels = ImageData.ImageEncoding.to_dtype_and_channels(encoding)
1✔
489
        if channels > 1:
1✔
490
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width, channels)) 
1✔
491
        else:
492
            return np.frombuffer(msg.data, dtype=dtype).reshape((height, width))
1✔
493
        
494
    # =========================================================================
495
    # ======================= Multi ImageData Methods ========================= 
496
    # ========================================================================= 
497

498
    def compare_timestamps(self, other: ImageData):
1✔
499
        """
500
        This method compares two ImageData objects based on the timestamps of their
501
        images.
502
        """
503

504
        # Find the locations in other where self timestamps would fit
505
        idxs = np.searchsorted(other.timestamps, self.timestamps, side='right')
×
506

507
        # Get the left indices and right indices
508
        idxs_right = np.clip(idxs, 0, len(other.timestamps)-1)
×
509
        idxs_left = np.clip(idxs - 1, 0, len(other.timestamps)-1)
×
510

511
        # Get distances to nearest on either side
512
        dists = np.minimum(np.abs(self.timestamps - other.timestamps[idxs_left]), 
×
513
                           np.abs(self.timestamps - other.timestamps[idxs_right]))
514
        
515
        # Print the mean and std of the distances
516
        print(f"Mean distance (left): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_left]))}")
×
517
        print(f"Mean distance (right): {np.mean(np.abs(self.timestamps - other.timestamps[idxs_right]))}")
×
518
        print(f"Mean distance: {np.mean(dists)}")
×
519
        print(f"Std distance: {np.std(dists)}")
×
520

521
    @typechecked
1✔
522
    def stereo_undistort_and_rectify(self: ImageData, other: ImageData,
1✔
523
            K1: np.ndarray, D1: np.ndarray, K2: np.ndarray, D2: np.ndarray, 
524
            R: np.ndarray, T: np.ndarray) -> Tuple[ImageData, ImageData, np.ndarray, np.ndarray]:
525
        """
526
        Undistort and rectify stereo images using stereo calibration parameters. 
527
        Note that self NEEDS to be the left stereo image sequence.
528

529
        Args:
530
            other (ImageData): The right stereo image sequence.
531
            K1, D1: Intrinsics and distortion for left camera.
532
            K2, D2: Intrinsics and distortion for right camera.
533
            R, T: Rotation and translation from left to right camera.
534

535
        Returns:
536
            Tuple[ImageData, ImageData, np.ndarray, np.ndarray]: 
537
                Rectified left and right ImageData, and new Instrinsics matrices for the left and right cameras.
538
        """
539

540
        # Make sure the ImageData sequences are compatible
541
        assert self.width == other.width and self.height == other.height and self.encoding == other.encoding, \
×
542
            "Left and right images must have the same resolution and encoding."
543

544
        # Find matching timestamps between self and other
545
        set_self = set(self.timestamps)
×
546
        set_other = set(other.timestamps)
×
547
        common_timestamps = sorted(set_self.intersection(set_other))
×
548
        if len(common_timestamps) == 0:
×
549
            raise ValueError("No matching timestamps between left and right images.")
×
550
        
551
        # Find indices of matching timestamps in each ImageData
552
        left_indices = [np.where(self.timestamps == ts)[0][0] for ts in common_timestamps]
×
553
        right_indices = [np.where(other.timestamps == ts)[0][0] for ts in common_timestamps]
×
554

555
        # Image size
556
        image_size = (self.width, self.height)
×
557

558
        # Compute rectification transforms
559
        R1, R2, P1, P2, Q, _, _ = cv2.stereoRectify(K1, D1, K2, D2, image_size, R, T, flags=cv2.CALIB_ZERO_DISPARITY, alpha=0)
×
560

561
        # Compute intrinsics of rectified imagery
562
        K1_new = P1[:, :3]
×
563
        K2_new = P2[:, :3]
×
564
        print("New left camera intrinsics after rectification:\n",  K1_new)
×
565
        print("New right camera intrinsics after rectification:\n", K2_new)
×
566
        print("Distortion coefficients after rectification are zero.")
×
567

568
        # Compute rectification maps
569
        map1_x, map1_y = cv2.initUndistortRectifyMap(K1, D1, R1, P1, image_size, cv2.CV_32FC1)
×
570
        map2_x, map2_y = cv2.initUndistortRectifyMap(K2, D2, R2, P2, image_size, cv2.CV_32FC1)
×
571

572
        # Allocate arrays for rectified images (only matching pairs)
573
        left_rectified = np.zeros((len(common_timestamps), self.height, self.width, *self.images.shape[3:]), dtype=self.images.dtype)
×
574
        right_rectified = np.zeros((len(common_timestamps), other.height, other.width, *other.images.shape[3:]), dtype=other.images.dtype)
×
575

576
        # Rectify/Undistort each image pair
577
        for i, (li, ri) in enumerate(tqdm.tqdm(zip(left_indices, right_indices), total=len(common_timestamps), desc="Rectifying stereo pairs")):
×
578
            left_rectified[i] = cv2.remap(self.images[li], map1_x, map1_y, interpolation=cv2.INTER_LINEAR)
×
579
            right_rectified[i] = cv2.remap(other.images[ri], map2_x, map2_y, interpolation=cv2.INTER_LINEAR)
×
580

581
        # Return new ImageData instances with rectified images and matched timestamps
582
        left = ImageData(self.frame_id, np.array(common_timestamps), self.height, self.width, self.encoding, left_rectified)
×
583
        right = ImageData(other.frame_id, np.array(common_timestamps), other.height, other.width, other.encoding, right_rectified)
×
584
        return left, right, K1_new, K2_new
×
585

586

587
    # =========================================================================
588
    # =========================== Conversion to ROS =========================== 
589
    # ========================================================================= 
590

591
    @typechecked
1✔
592
    @staticmethod
1✔
593
    def get_ros_msg_type() -> str:
1✔
594
        """ Return the __msgtype__ for an Image msg. """
595
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
596
        return typestore.types['sensor_msgs/msg/Image'].__msgtype__
1✔
597

598
    @typechecked
1✔
599
    def get_ros_msg(self, i: int):
1✔
600
        """
601
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
602
        
603
        Args:
604
            i (int): The index of the image message to convert.
605
        Raises:
606
            ValueError: If i is outside the data bounds.
607
        """
608

609
        # Check to make sure index is within data bounds
610
        if i < 0 or i >= self.len():
1✔
611
            raise ValueError(f"Index {i} is out of bounds!")
×
612

613
        # Get ROS2  message classes
614
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
615
        Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
1✔
616

617
        # Calculate the step
618
        if self.encoding == ImageData.ImageEncoding.RGB8:
1✔
619
            step = 3 * self.width
1✔
620
        elif self.encoding == ImageData.ImageEncoding._32FC1:
1✔
621
            step = 4 * self.width
1✔
622
        else:
NEW
623
            raise NotImplementedError(f"Only RGB8 and 32FC1 encodings are currently supported for export, not {self.encoding}!")
×
624

625
        # Get the seconds and nanoseconds
626
        seconds = int(self.timestamps[i])
1✔
627
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
1✔
628

629
        # Calculate the ROS2 Image data
630
        if self.encoding == ImageData.ImageEncoding.RGB8:
1✔
631
            data = self.images[i].flatten()
1✔
632
        elif self.encoding == ImageData.ImageEncoding._32FC1:
1✔
633
            data = self.images[i].flatten().view(np.uint8)
1✔
634

635
        # Write the data into the new class
636
        return Image(Header(stamp=Time(sec=int(seconds), 
1✔
637
                                       nanosec=int(nanoseconds)), 
638
                            frame_id=self.frame_id),
639
                    height=self.height, 
640
                    width=self.width, 
641
                    encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
642
                    is_bigendian=0, 
643
                    step=step, 
644
                    data=data)
645
        
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc