• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21483849472

29 Jan 2026 03:20PM UTC coverage: 74.142% (-0.5%) from 74.672%
21483849472

Pull #10

github

DanielChaseButterfield
Fix with dependency mismatch with typeguard in conversion_utils, ImageDataOnDisk for .npy files, linear interpolation for odometry data
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

881 of 1305 new or added lines in 14 files covered. (67.51%)

2 existing lines in 1 file now uncovered.

1749 of 2359 relevant lines covered (74.14%)

1.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.97
/src/robotdataprocess/data_types/ImageData/ImageData.py
1
from __future__ import annotations
2✔
2

3
from ..Data import Data, ROSMsgLibType
2✔
4
import decimal
2✔
5
from decimal import Decimal
2✔
6
from enum import Enum
2✔
7
from ...ModuleImporter import ModuleImporter
2✔
8
import numpy as np
2✔
9
from pathlib import Path
2✔
10
from typeguard import typechecked
2✔
11
from typing import Union, Any
2✔
12
from rosbags.typesys import Stores, get_typestore
2✔
13

14
@typechecked
2✔
15
class ImageData(Data):
2✔
16
    """ Generic ImageData class that should be overwritten by children """
17

18
    # Define image encodings enumeration
19
    class ImageEncoding(Enum):
2✔
20
        Mono8 = 0
2✔
21
        RGB8 = 1
2✔
22
        _32FC1 = 2
2✔
23

24
        # ================ Class Methods ================
25
        @classmethod
2✔
26
        def from_str(cls, encoding_str: str):
2✔
27
            if encoding_str == "ImageEncoding.Mono8":
2✔
NEW
28
                return cls.Mono8
×
29
            elif encoding_str == "ImageEncoding.RGB8":
2✔
30
                return cls.RGB8
2✔
31
            elif encoding_str == "ImageEncoding._32FC1":
2✔
32
                return cls._32FC1
2✔
33
            else:
NEW
34
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
35
        
36
        @classmethod
2✔
37
        def from_ros_str(cls, encoding_str: str):
2✔
38
            encoding_str = encoding_str.lower()
2✔
39
            if encoding_str == 'mono8':
2✔
NEW
40
                return cls.Mono8
×
41
            elif encoding_str == 'rgb8':
2✔
42
                return cls.RGB8
2✔
43
            elif encoding_str == "32fc1":
2✔
44
                return cls._32FC1
2✔
45
            else:
46
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
47
        
48
        @classmethod
2✔
49
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
2✔
50
            if dtype == np.uint8 and channels == 1:
2✔
NEW
51
                return cls.Mono8
×
52
            elif dtype == np.uint8 and channels == 3:
2✔
NEW
53
                return cls.RGB8
×
54
            elif dtype == np.float32 and channels == 1:
2✔
55
                return cls._32FC1
2✔
56
            else:
NEW
57
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
58
        
59
        @classmethod
2✔
60
        def from_pillow_str(cls, encoding_str: str):
2✔
61
            if encoding_str == "RGB":
2✔
62
                return cls.RGB8
2✔
63
            elif encoding_str == "L":
2✔
64
                return cls.Mono8
2✔
65
            else:
66
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
67
        
68
        # ================ Export Methods ================
69
        @staticmethod
2✔
70
        def to_ros_str(encoding: ImageData.ImageEncoding):
2✔
71
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
NEW
72
                return 'mono8'
×
73
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
74
                return 'rgb8'
2✔
75
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
76
                return '32FC1'
2✔
77
            else:
NEW
78
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
79
        
80
        @staticmethod
2✔
81
        def to_dtype_and_channels(encoding):
2✔
82
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
83
                return (np.uint8, 1)
2✔
84
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
85
                return (np.uint8, 3)
2✔
86
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
87
                return (np.float32, 1)
2✔
88
            else:
NEW
89
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
90
    
91
    height: int
2✔
92
    width: int
2✔
93
    encoding: ImageData.ImageEncoding
2✔
94
    images: Union[np.ndarray, Any] # With Any being a LazyImageArray
2✔
95

96
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], height: int,
2✔
97
                 width: int, encoding: ImageData.ImageEncoding, images: Union[np.ndarray, Any]):
98
        super().__init__(frame_id, timestamps)
2✔
99
        self.height = height
2✔
100
        self.width = width
2✔
101
        self.encoding = encoding
2✔
102
        self.images = images
2✔
103

104
    # =========================================================================
105
    # ============================ Class Methods ============================== 
106
    # =========================================================================  
107

108
    @classmethod
2✔
109
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str) -> ImageData:
2✔
110
        """ Creates a class structure from a folder with .png files. """
NEW
111
        NotImplementedError("This method needs to be overwritten by the child Data class!")
×
112
    
113
    # =========================================================================
114
    # ========================= Manipulation Methods ========================== 
115
    # =========================================================================  
116

117
    def crop_data(self, start: Decimal, end: Decimal):
2✔
118
        """ Will crop the data so only values within [start, end] inclusive are kept. """
119

120
        # Create boolean mask of data to keep
121
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
122

123
        # Apply mask
124
        self.timestamps = self.timestamps[mask]
2✔
125
        self.images = self.images[mask]
2✔
126

127
    # =========================================================================
128
    # =========================== Conversion to ROS =========================== 
129
    # ========================================================================= 
130

131
    @staticmethod
2✔
132
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
133
        """ Return the __msgtype__ for an Image msg. """
134

135
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
136
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
137
            return typestore.types['sensor_msgs/msg/Image'].__msgtype__
2✔
138
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
139
            return ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Image')
2✔
140
        else:
NEW
141
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg_type!")
×
142

143
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
144
        """
145
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
146
        
147
        Args:
148
            lib_type (ROSMsgLibType): The type of ROS message to return (e.g., ROSBAGS, RCLPY).
149
            i (int): The index of the image message to convert.
150
        Raises:
151
            ValueError: If i is outside the data bounds.
152
        """
153

154
        # Check to make sure index is within data bounds
155
        if i < 0 or i >= self.len():
2✔
NEW
156
            raise ValueError(f"Index {i} is out of bounds!")
×
157

158
        # Calculate the step
159
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
160
            step = 3 * self.width
2✔
161
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
162
            step = 4 * self.width
2✔
163
        else:
NEW
164
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
165

166
        # Get the seconds and nanoseconds
167
        seconds = int(self.timestamps[i])
2✔
168
        nanoseconds = int((self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN))
2✔
169

170
        # Calculate the ROS2 Image data
171
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
172
            data = self.images[i].flatten()
2✔
173
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
174
            data = self.images[i].flatten().view(np.uint8)
2✔
175
            # TODO: Check endianness for _32FC1
176
        else:
NEW
177
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
178

179
        # Write the data into the new class
180
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
181
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
182
            Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
2✔
183

184
            return Image(Header(stamp=Time(sec=int(seconds), 
2✔
185
                                        nanosec=int(nanoseconds)), 
186
                                frame_id=self.frame_id),
187
                        height=self.height, 
188
                        width=self.width, 
189
                        encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
190
                        is_bigendian=0, 
191
                        step=step, 
192
                        data=data)
193
        
194
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
195
            Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
196
            Image = ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Image')
2✔
197

198
            # Create the messages
199
            img_msg = Image()
2✔
200
            img_msg.header = Header()
2✔
201
            if lib_type == ROSMsgLibType.RCLPY:
2✔
202
                Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
203
                img_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
204
            else:
NEW
205
                rospy = ModuleImporter.get_module('rospy')
×
NEW
206
                img_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
207

208
            # Populate the rest of the data
209
            img_msg.header.frame_id = self.frame_id 
2✔
210
            img_msg.height = self.height
2✔
211
            img_msg.width = self.width
2✔
212
            img_msg.encoding = ImageData.ImageEncoding.to_ros_str(self.encoding)
2✔
213
            img_msg.is_bigendian = 0
2✔
214
            img_msg.step = step
2✔
215
            img_msg.data = data.tolist()
2✔
216
            return img_msg
2✔
217

218
        else:
219
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg()!")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc