• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20820881882

08 Jan 2026 02:50PM UTC coverage: 62.482% (-12.2%) from 74.672%
20820881882

Pull #10

github

DanielChaseButterfield
Add publish script for OpenVINS
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

181 of 277 branches covered (65.34%)

Branch coverage included in aggregate %.

212 of 546 new or added lines in 11 files covered. (38.83%)

3 existing lines in 2 files now uncovered.

1118 of 1802 relevant lines covered (62.04%)

1.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/src/robotdataprocess/data_types/ImageData/ImageData.py
1
from __future__ import annotations
2✔
2

3
from ..Data import Data, ROSMsgLibType
2✔
4
import decimal
2✔
5
from decimal import Decimal
2✔
6
from enum import Enum
2✔
7
import numpy as np
2✔
8
from pathlib import Path
2✔
9
from typeguard import typechecked
2✔
10
from typing import Union, Any
2✔
11
from rosbags.typesys import Stores, get_typestore
2✔
12

13
@typechecked
2✔
14
class ImageData(Data):
2✔
15
    """ Generic ImageData class that should be overwritten by children """
16

17
    # Define image encodings enumeration
18
    class ImageEncoding(Enum):
2✔
19
        Mono8 = 0
2✔
20
        RGB8 = 1
2✔
21
        _32FC1 = 2
2✔
22

23
        # ================ Class Methods ================
24
        @classmethod
2✔
25
        def from_str(cls, encoding_str: str):
2✔
26
            if encoding_str == "ImageEncoding.Mono8":
2✔
NEW
27
                return cls.Mono8
×
28
            elif encoding_str == "ImageEncoding.RGB8":
2✔
29
                return cls.RGB8
2✔
30
            elif encoding_str == "ImageEncoding._32FC1":
2✔
31
                return cls._32FC1
2✔
32
            else:
NEW
33
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
×
34
        
35
        @classmethod
2✔
36
        def from_ros_str(cls, encoding_str: str):
2✔
37
            encoding_str = encoding_str.lower()
2✔
38
            if encoding_str == 'mono8':
2✔
NEW
39
                return cls.Mono8
×
40
            elif encoding_str == 'rgb8':
2✔
41
                return cls.RGB8
2✔
42
            elif encoding_str == "32fc1":
2✔
43
                return cls._32FC1
2✔
44
            else:
45
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
46
        
47
        @classmethod
2✔
48
        def from_dtype_and_channels(cls, dtype: np.dtype, channels: int):
2✔
49
            if dtype == np.uint8 and channels == 1:
2✔
NEW
50
                return cls.Mono8
×
51
            elif dtype == np.uint8 and channels == 3:
2✔
NEW
52
                return cls.RGB8
×
53
            elif dtype == np.float32 and channels == 1:
2✔
54
                return cls._32FC1
2✔
55
            else:
NEW
56
                raise NotImplementedError(f"dtype {dtype} w/ {channels} channel(s) has no corresponding encoding!")
×
57
        
58
        @classmethod
2✔
59
        def from_pillow_str(cls, encoding_str: str):
2✔
60
            if encoding_str == "RGB":
2✔
61
                return cls.RGB8
2✔
62
            elif encoding_str == "L":
2✔
63
                return cls.Mono8
2✔
64
            else:
65
                raise NotImplementedError(f"This encoding ({encoding_str}) is not yet implemented (or it doesn't exist)!")
2✔
66
        
67
        # ================ Export Methods ================
68
        @staticmethod
2✔
69
        def to_ros_str(encoding: ImageData.ImageEncoding):
2✔
70
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
NEW
71
                return 'mono8'
×
72
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
73
                return 'rgb8'
2✔
74
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
75
                return '32FC1'
2✔
76
            else:
NEW
77
                raise NotImplementedError(f"This ImageData.ImageEncoding.{encoding} is not yet implemented (or it doesn't exist)!")
×
78
        
79
        @staticmethod
2✔
80
        def to_dtype_and_channels(encoding):
2✔
81
            if encoding == ImageData.ImageEncoding.Mono8:
2✔
82
                return (np.uint8, 1)
2✔
83
            elif encoding == ImageData.ImageEncoding.RGB8:
2✔
84
                return (np.uint8, 3)
2✔
85
            elif encoding == ImageData.ImageEncoding._32FC1:
2✔
86
                return (np.float32, 1)
2✔
87
            else:
NEW
88
                raise NotImplementedError(f"This encoding ({encoding}) is missing a mapping to dtype/channels!")
×
89
    
90
    height: int
2✔
91
    width: int
2✔
92
    encoding: ImageData.ImageEncoding
2✔
93
    images: Union[np.ndarray, Any] # With Any being a LazyImageArray
2✔
94

95
    def __init__(self, frame_id: str, timestamps: Union[np.ndarray, list], height: int,
2✔
96
                 width: int, encoding: ImageData.ImageEncoding, images: Union[np.ndarray, Any]):
97
        super().__init__(frame_id, timestamps)
2✔
98
        self.height = height
2✔
99
        self.width = width
2✔
100
        self.encoding = encoding
2✔
101
        self.images = images
2✔
102

103
    # =========================================================================
104
    # ============================ Class Methods ============================== 
105
    # =========================================================================  
106

107
    @classmethod
2✔
108
    def from_image_files(cls, image_folder_path: Union[Path, str], frame_id: str) -> ImageData:
2✔
109
        """ Creates a class structure from a folder with .png files. """
NEW
110
        NotImplementedError("This method needs to be overwritten by the child Data class!")
×
111
        
112
    # =========================================================================
113
    # =========================== Conversion to ROS =========================== 
114
    # ========================================================================= 
115

116
    @staticmethod
2✔
117
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
118
        """ Return the __msgtype__ for an Image msg. """
119

120
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
121
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
122
            return typestore.types['sensor_msgs/msg/Image'].__msgtype__
2✔
NEW
123
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
124
            from sensor_msgs.msg import Image
×
NEW
125
            return Image
×
126
        else:
NEW
127
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg_type!")
×
128

129
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
130
        """
131
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
132
        
133
        Args:
134
            lib_type (ROSMsgLibType): The type of ROS message to return (e.g., ROSBAGS, RCLPY).
135
            i (int): The index of the image message to convert.
136
        Raises:
137
            ValueError: If i is outside the data bounds.
138
        """
139

140
        # Check to make sure index is within data bounds
141
        if i < 0 or i >= self.len():
2✔
NEW
142
            raise ValueError(f"Index {i} is out of bounds!")
×
143

144
        # Calculate the step
145
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
146
            step = 3 * self.width
2✔
147
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
148
            step = 4 * self.width
2✔
149
        else:
NEW
150
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
151

152
        # Get the seconds and nanoseconds
153
        seconds = int(self.timestamps[i])
2✔
154
        nanoseconds = int((self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN))
2✔
155

156
        # Calculate the ROS2 Image data
157
        if self.encoding == ImageData.ImageEncoding.RGB8:
2✔
158
            data = self.images[i].flatten()
2✔
159
        elif self.encoding == ImageData.ImageEncoding._32FC1:
2✔
160
            data = self.images[i].flatten().view(np.uint8)
2✔
161
            # TODO: Check endianness for _32FC1
162
        else:
NEW
163
            raise NotImplementedError(f"Unsupported encoding {self.encoding} for rosbag_get_ros_msg!")
×
164

165
        # Write the data into the new class
166
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
167
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
168
            Image, Header, Time = typestore.types['sensor_msgs/msg/Image'], typestore.types['std_msgs/msg/Header'], typestore.types['builtin_interfaces/msg/Time']
2✔
169

170
            return Image(Header(stamp=Time(sec=int(seconds), 
2✔
171
                                        nanosec=int(nanoseconds)), 
172
                                frame_id=self.frame_id),
173
                        height=self.height, 
174
                        width=self.width, 
175
                        encoding=ImageData.ImageEncoding.to_ros_str(self.encoding),
176
                        is_bigendian=0, 
177
                        step=step, 
178
                        data=data)
179
        
NEW
180
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
181
            from std_msgs.msg import Header
×
NEW
182
            from sensor_msgs.msg import Image
×
183

184
            # Create the messages
NEW
185
            img_msg = Image()
×
NEW
186
            img_msg.header = Header()
×
NEW
187
            if lib_type == ROSMsgLibType.RCLPY:
×
NEW
188
                from rclpy.time import Time
×
NEW
189
                img_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
190
            else:
NEW
191
                import rospy
×
NEW
192
                img_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
193

194
            # Populate the rest of the data
NEW
195
            img_msg.header.frame_id = self.frame_id 
×
NEW
196
            img_msg.height = self.height
×
NEW
197
            img_msg.width = self.width
×
NEW
198
            img_msg.encoding = ImageData.ImageEncoding.to_ros_str(self.encoding)
×
NEW
199
            img_msg.is_bigendian = 0
×
NEW
200
            img_msg.step = step
×
NEW
201
            img_msg.data = data.tolist()
×
NEW
202
            return img_msg
×
203

204
        else:
NEW
205
            raise NotImplementedError(f"Unsupported ROS_MSG_LIBRARY_TYPE {lib_type} for ImageData.get_ros_msg()!")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc