• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20798125136

07 Jan 2026 10:09PM UTC coverage: 62.111% (-12.6%) from 74.672%
20798125136

Pull #10

github

DanielChaseButterfield
Merge ROS2 and ROS1 publishers into single and rehaul unit tests to handle ROS2 tests
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

212 of 545 new or added lines in 11 files covered. (38.9%)

3 existing lines in 2 files now uncovered.

1118 of 1800 relevant lines covered (62.11%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.09
/src/robotdataprocess/data_types/ImageData/ImageData.py
1
from __future__ import annotations
1✔
2

3
from ..Data import Data, ROSMsgLibType
1✔
4
import decimal
1✔
5
from decimal import Decimal
1✔
6
from enum import Enum
1✔
7
import numpy as np
1✔
8
from pathlib import Path
1✔
9
from typeguard import typechecked
1✔
10
from typing import Union, Any
1✔
11
from rosbags.typesys import Stores, get_typestore
1✔
12

13
@typechecked
1✔
14
class ImageData(Data):
1✔
15
    """ Generic ImageData class that should be overwritten by children """
16

17
    # Define image encodings enumeration
18
    class ImageEncoding(Enum):
1✔
19
        Mono8 = 0
1✔
20
        RGB8 = 1
1✔
21
        _32FC1 = 2
1✔
22

23
        # ================ Class Methods ================
24
        @classmethod
1✔
25
        def from_str(cls, encoding_str: str):
1✔
26
            if encoding_str == "ImageEncoding.Mono8":
1✔
NEW
27
                return cls.Mono8
×
28
            elif encoding_str == "ImageEncoding.RGB8":
1✔
29
                return cls.RGB8
1✔
30
            elif encoding_str == "ImageEncoding._32FC1":
1✔
31
                return cls._32FC1
1✔
32
            else:
NEW
33
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
34
        
35
        @classmethod
1✔
36
        def from_ros_str(cls, encoding_str: str):
1✔
37
            encoding_str = encoding_str.lower()
1✔
38
            if encoding_str == 'mono8':
1✔
NEW
39
                return cls.Mono8
×
40
            elif encoding_str == 'rgb8':
1✔
41
                return cls.RGB8
1✔
42
            elif encoding_str == "32fc1":
1✔
43
                return cls._32FC1
1✔
44
            else:
45
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
46
        
47
        @classmethod
1✔
48
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
1✔
49
            if dtype == np.uint8 and channels == 1:
1✔
NEW
50
                return cls.Mono8
×
51
            elif dtype == np.uint8 and channels == 3:
1✔
NEW
52
                return cls.RGB8
×
53
            elif dtype == np.float32 and channels == 1:
1✔
54
                return cls._32FC1
1✔
55
            else:
NEW
56
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
57
        
58
        @classmethod
1✔
59
        def from_pillow_str(cls, encoding_str: str):
1✔
60
            if encoding_str == "RGB":
1✔
61
                return cls.RGB8
1✔
62
            elif encoding_str == "L":
1✔
63
                return cls.Mono8
1✔
64
            else:
65
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
1✔
66
        
67
        # ================ Export Methods ================
68
        @staticmethod
1✔
69
        def to_ros_str(encoding: ImageData.ImageEncoding):
1✔
70
            if encoding == ImageData.ImageEncoding.Mono8:
1✔
NEW
71
                return 'mono8'
×
72
            elif encoding == ImageData.ImageEncoding.RGB8:
1✔
73
                return 'rgb8'
1✔
74
            elif encoding == ImageData.ImageEncoding._32FC1:
1✔
75
                return '32FC1'
1✔
76
            else:
NEW
77
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
78
        
79
        @staticmethod
1✔
80
        def to_dtype_and_channels(encoding):
1✔
81
            if encoding == ImageData.ImageEncoding.Mono8:
1✔
82
                return (np.uint8, 1)
1✔
83
            elif encoding == ImageData.ImageEncoding.RGB8:
1✔
84
                return (np.uint8, 3)
1✔
85
            elif encoding == ImageData.ImageEncoding._32FC1:
1✔
86
                return (np.float32, 1)
1✔
87
            else:
NEW
88
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
89
    
90
    height: int
1✔
91
    width: int
1✔
92
    encoding: ImageData.ImageEncoding
1✔
93
    images: Union[np.ndarray, Any] # With Any being a LazyImageArray
1✔
94

95
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], height: int,
1✔
96
                 width: int, encoding: ImageData.ImageEncoding, images: Union[np.ndarray, Any]):
97
        super().__init__(frame_id, timestamps)
1✔
98
        self.height = height
1✔
99
        self.width = width
1✔
100
        self.encoding = encoding
1✔
101
        self.images = images
1✔
102

103
    # =========================================================================
104
    # ============================ Class Methods ============================== 
105
    # =========================================================================  
106

107
    @classmethod
1✔
108
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str) -> ImageData:
1✔
109
        """ Creates a class structure from a folder with .png files. """
NEW
110
        NotImplementedError("This method needs to be overwritten by the child Data class!")
×
111
        
112
    # =========================================================================
113
    # =========================== Conversion to ROS =========================== 
114
    # ========================================================================= 
115

116
    @staticmethod
1✔
117
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
1✔
118
        """ Return the __msgtype__ for an Image msg. """
119

120
        if lib_type == ROSMsgLibType.ROSBAGS:
1✔
121
            typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
122
            return typestore.types['sensor_msgs/msg/Image'].__msgtype__
1✔
NEW
123
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
124
            from sensor_msgs.msg import Image
×
NEW
125
            return Image
×
126
        else:
NEW
127
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg_type!")
×
128

129
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
1✔
130
        """
131
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
132
        
133
        Args:
134
            lib_type (ROSMsgLibType): The type of ROS message to return (e.g., ROSBAGS, RCLPY).
135
            i (int): The index of the image message to convert.
136
        Raises:
137
            ValueError: If i is outside the data bounds.
138
        """
139

140
        # Check to make sure index is within data bounds
141
        if i < 0 or i >= self.len():
1✔
NEW
142
            raise ValueError(f"Index {i} is out of bounds!")
×
143

144
        # Calculate the step
145
        if self.encoding == ImageData.ImageEncoding.RGB8:
1✔
146
            step = 3 * self.width
1✔
147
        elif self.encoding == ImageData.ImageEncoding._32FC1:
1✔
148
            step = 4 * self.width
1✔
149
        else:
NEW
150
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
151

152
        # Get the seconds and nanoseconds
153
        seconds = int(self.timestamps[i])
1✔
154
        nanoseconds = int((self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN))
1✔
155

156
        # Calculate the ROS2 Image data
157
        if self.encoding == ImageData.ImageEncoding.RGB8:
1✔
158
            data = self.images[i].flatten()
1✔
159
        elif self.encoding == ImageData.ImageEncoding._32FC1:
1✔
160
            data = self.images[i].flatten().view(np.uint8)
1✔
161
            # TODO: Check endianness for _32FC1
162
        else:
NEW
163
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
164

165
        # Write the data into the new class
166
        if lib_type == ROSMsgLibType.ROSBAGS:
1✔
167
            typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
168
            Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
1✔
169

170
            return Image(Header(stamp=Time(sec=int(seconds), 
1✔
171
                                        nanosec=int(nanoseconds)), 
172
                                frame_id=self.frame_id),
173
                        height=self.height, 
174
                        width=self.width, 
175
                        encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
176
                        is_bigendian=0, 
177
                        step=step, 
178
                        data=data)
179
        
NEW
180
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
181
            from std_msgs.msg import Header
×
NEW
182
            from sensor_msgs.msg import Image
×
183

184
            # Create the messages
NEW
185
            img_msg = Image()
×
NEW
186
            img_msg.header = Header()
×
NEW
187
            if lib_type == ROSMsgLibType.RCLPY:
×
NEW
188
                from rclpy.time import Time
×
NEW
189
                img_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
190
            else:
NEW
191
                import rospy
×
NEW
192
                img_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
193

194
            # Populate the rest of the data
NEW
195
            img_msg.header.frame_id = self.frame_id 
×
NEW
196
            img_msg.height = self.height
×
NEW
197
            img_msg.width = self.width
×
NEW
198
            img_msg.encoding = ImageData.ImageEncoding.to_ros_str(self.encoding)
×
NEW
199
            img_msg.is_bigendian = 0
×
NEW
200
            img_msg.step = step
×
NEW
201
            img_msg.data = data.tolist()
×
NEW
202
            return img_msg
×
203

204
        else:
NEW
205
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg()!")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc