• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20934864165

12 Jan 2026 09:01PM UTC coverage: 70.168% (-4.5%) from 74.672%
20934864165

Pull #10

github

DanielChaseButterfield
Swap ROS1 while loop for publishing with rospy.Timer for real-time image publishing
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

524 of 780 new or added lines in 12 files covered. (67.18%)

3 existing lines in 2 files now uncovered.

1423 of 2028 relevant lines covered (70.17%)

1.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.97
/src/robotdataprocess/data_types/ImageData/ImageData.py
1
from __future__ import annotations
2✔
2

3
from ..Data import Data, ROSMsgLibType
2✔
4
import decimal
2✔
5
from decimal import Decimal
2✔
6
from enum import Enum
2✔
7
import numpy as np
2✔
8
from pathlib import Path
2✔
9
from typeguard import typechecked
2✔
10
from typing import Union, Any
2✔
11
from rosbags.typesys import Stores, get_typestore
2✔
12

13
@typechecked
2✔
14
class ImageData(Data):
2✔
15
    """ Generic ImageData class that should be overwritten by children """
16

17
    # Define image encodings enumeration
18
    class ImageEncoding(Enum):
2✔
19
        Mono8 = 0
2✔
20
        RGB8 = 1
2✔
21
        _32FC1 = 2
2✔
22

23
        # ================ Class Methods ================
24
        @classmethod
2✔
25
        def from_str(cls, encoding_str: str):
2✔
26
            if encoding_str == "ImageEncoding.Mono8":
2✔
NEW
27
                return cls.Mono8
×
28
            elif encoding_str == "ImageEncoding.RGB8":
2✔
29
                return cls.RGB8
2✔
30
            elif encoding_str == "ImageEncoding._32FC1":
2✔
31
                return cls._32FC1
2✔
32
            else:
NEW
33
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
34
        
35
        @classmethod
2✔
36
        def from_ros_str(cls, encoding_str: str):
2✔
37
            encoding_str = encoding_str.lower()
2✔
38
            if encoding_str == 'mono8':
2✔
NEW
39
                return cls.Mono8
×
40
            elif encoding_str == 'rgb8':
2✔
41
                return cls.RGB8
2✔
42
            elif encoding_str == "32fc1":
2✔
43
                return cls._32FC1
2✔
44
            else:
45
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
46
        
47
        @classmethod
2✔
48
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
2✔
49
            if dtype == np.uint8 and channels == 1:
2✔
NEW
50
                return cls.Mono8
×
51
            elif dtype == np.uint8 and channels == 3:
2✔
NEW
52
                return cls.RGB8
×
53
            elif dtype == np.float32 and channels == 1:
2✔
54
                return cls._32FC1
2✔
55
            else:
NEW
56
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
57
        
58
        @classmethod
2✔
59
        def from_pillow_str(cls, encoding_str: str):
2✔
60
            if encoding_str == "RGB":
2✔
61
                return cls.RGB8
2✔
62
            elif encoding_str == "L":
2✔
63
                return cls.Mono8
2✔
64
            else:
65
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
66
        
67
        # ================ Export Methods ================
68
        @staticmethod
2✔
69
        def to_ros_str(encoding: ImageData.ImageEncoding):
2✔
70
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
NEW
71
                return 'mono8'
×
72
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
73
                return 'rgb8'
2✔
74
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
75
                return '32FC1'
2✔
76
            else:
NEW
77
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
78
        
79
        @staticmethod
2✔
80
        def to_dtype_and_channels(encoding):
2✔
81
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
82
                return (np.uint8, 1)
2✔
83
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
84
                return (np.uint8, 3)
2✔
85
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
86
                return (np.float32, 1)
2✔
87
            else:
NEW
88
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
89
    
90
    height: int
2✔
91
    width: int
2✔
92
    encoding: ImageData.ImageEncoding
2✔
93
    images: Union[np.ndarray, Any] # With Any being a LazyImageArray
2✔
94

95
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], height: int,
2✔
96
                 width: int, encoding: ImageData.ImageEncoding, images: Union[np.ndarray, Any]):
97
        super().__init__(frame_id, timestamps)
2✔
98
        self.height = height
2✔
99
        self.width = width
2✔
100
        self.encoding = encoding
2✔
101
        self.images = images
2✔
102

103
    # =========================================================================
104
    # ============================ Class Methods ============================== 
105
    # =========================================================================  
106

107
    @classmethod
2✔
108
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str) -> ImageData:
2✔
109
        """ Creates a class structure from a folder with .png files. """
NEW
110
        NotImplementedError("This method needs to be overwritten by the child Data class!")
×
111
    
112
    # =========================================================================
113
    # ========================= Manipulation Methods ========================== 
114
    # =========================================================================  
115

116
    def crop_data(self, start: Decimal, end: Decimal):
2✔
117
        """ Will crop the data so only values within [start, end] inclusive are kept. """
118

119
        # Create boolean mask of data to keep
120
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
121

122
        # Apply mask
123
        self.timestamps = self.timestamps[mask]
2✔
124
        self.images = self.images[mask]
2✔
125

126
    # =========================================================================
127
    # =========================== Conversion to ROS =========================== 
128
    # ========================================================================= 
129

130
    @staticmethod
2✔
131
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
132
        """ Return the __msgtype__ for an Image msg. """
133

134
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
135
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
136
            return typestore.types['sensor_msgs/msg/Image'].__msgtype__
2✔
137
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
138
            from sensor_msgs.msg import Image
2✔
139
            return Image
2✔
140
        else:
NEW
141
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg_type!")
×
142

143
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
144
        """
145
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
146
        
147
        Args:
148
            lib_type (ROSMsgLibType): The type of ROS message to return (e.g., ROSBAGS, RCLPY).
149
            i (int): The index of the image message to convert.
150
        Raises:
151
            ValueError: If i is outside the data bounds.
152
        """
153

154
        # Check to make sure index is within data bounds
155
        if i < 0 or i >= self.len():
2✔
NEW
156
            raise ValueError(f"Index {i} is out of bounds!")
×
157

158
        # Calculate the step
159
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
160
            step = 3 * self.width
2✔
161
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
162
            step = 4 * self.width
2✔
163
        else:
NEW
164
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
165

166
        # Get the seconds and nanoseconds
167
        seconds = int(self.timestamps[i])
2✔
168
        nanoseconds = int((self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN))
2✔
169

170
        # Calculate the ROS2 Image data
171
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
172
            data = self.images[i].flatten()
2✔
173
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
174
            data = self.images[i].flatten().view(np.uint8)
2✔
175
            # TODO: Check endianness for _32FC1
176
        else:
NEW
177
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
178

179
        # Write the data into the new class
180
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
181
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
182
            Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
2✔
183

184
            return Image(Header(stamp=Time(sec=int(seconds), 
2✔
185
                                        nanosec=int(nanoseconds)), 
186
                                frame_id=self.frame_id),
187
                        height=self.height, 
188
                        width=self.width, 
189
                        encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
190
                        is_bigendian=0, 
191
                        step=step, 
192
                        data=data)
193
        
194
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
195
            from std_msgs.msg import Header
2✔
196
            from sensor_msgs.msg import Image
2✔
197

198
            # Create the messages
199
            img_msg = Image()
2✔
200
            img_msg.header = Header()
2✔
201
            if lib_type == ROSMsgLibType.RCLPY:
2✔
202
                from rclpy.time import Time
2✔
203
                img_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
204
            else:
NEW
205
                import rospy
×
NEW
206
                img_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
207

208
            # Populate the rest of the data
209
            img_msg.header.frame_id = self.frame_id 
2✔
210
            img_msg.height = self.height
2✔
211
            img_msg.width = self.width
2✔
212
            img_msg.encoding = ImageData.ImageEncoding.to_ros_str(self.encoding)
2✔
213
            img_msg.is_bigendian = 0
2✔
214
            img_msg.step = step
2✔
215
            img_msg.data = data.tolist()
2✔
216
            return img_msg
2✔
217

218
        else:
219
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg()!")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc