• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20820881882

08 Jan 2026 02:50PM UTC coverage: 62.482% (-12.2%) from 74.672%
20820881882

Pull #10

github

DanielChaseButterfield
Add publish script for OpenVINS
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

181 of 277 branches covered (65.34%)

Branch coverage included in aggregate %.

212 of 546 new or added lines in 11 files covered. (38.83%)

3 existing lines in 2 files now uncovered.

1118 of 1802 relevant lines covered (62.04%)

1.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.63
/src/robotdataprocess/data_types/ImuData.py
1
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
2✔
2
from .Data import Data, CoordinateFrame, ROSMsgLibType
2✔
3
import decimal
2✔
4
from decimal import Decimal
2✔
5
import numpy as np
2✔
6
from numpy.typing import NDArray
2✔
7
from pathlib import Path
2✔
8
from robotdataprocess.data_types.PathData import PathData
2✔
9
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
10
from rosbags.rosbag2 import Reader as Reader2
2✔
11
from rosbags.typesys import Stores, get_typestore
2✔
12
from rosbags.typesys.store import Typestore
2✔
13
from scipy.spatial.transform import Rotation as R
2✔
14
from typeguard import typechecked
2✔
15
from typing import Union, Any
2✔
16
import tqdm
2✔
17

18
@typechecked
2✔
19
class ImuData(Data):
2✔
20

21
    # Define IMU-specific data attributes
22
    lin_acc: NDArray[Decimal]
2✔
23
    ang_vel: NDArray[Decimal]
2✔
24
    orientations: NDArray[Decimal] # quaternions (x, y, z, w)
2✔
25
    frame: CoordinateFrame
2✔
26

27
    @typechecked
2✔
28
    def __init__(self, frame_id: str, frame: CoordinateFrame, timestamps: Union[np.ndarray, list], 
2✔
29
                 lin_acc: Union[np.ndarray, list], ang_vel: Union[np.ndarray, list],
30
                 orientations: Union[np.ndarray, list]):
31
        
32
        # Copy initial values into attributes
33
        super().__init__(frame_id, timestamps)
2✔
34
        self.frame = frame
2✔
35
        self.lin_acc = col_to_dec_arr(lin_acc)
2✔
36
        self.ang_vel = col_to_dec_arr(ang_vel)
2✔
37
        self.orientations = col_to_dec_arr(orientations)
2✔
38

39
        # Check to ensure that all arrays have same length
40
        if len(self.timestamps) != len(self.lin_acc) or len(self.lin_acc) != len(self.ang_vel) \
2✔
41
            or len(self.ang_vel) != len(self.orientations):
42
            raise ValueError("Lengths of timestamp, lin_acc, ang_vel, and orientation arrays are not equal!")
×
43

44
    # =========================================================================
45
    # ============================ Class Methods ============================== 
46
    # =========================================================================  
47

48
    @classmethod
2✔
49
    @typechecked
2✔
50
    def from_ros2_bag(cls, bag_path: Union[Path, str], imu_topic: str, frame_id: str):
2✔
51
        """
52
        Creates a class structure from a ROS2 bag file with an Imu topic.
53

54
        Args:
55
            bag_path (Path | str): Path to the ROS2 bag file.
56
            img_topic (str): Topic of the Imu messages.
57
            frame_id (str): The frame where this IMU data was collected.
58
        Returns:
59
            ImageData: Instance of this class.
60
        """
61

62
        # Get topic message count and typestore
63
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
64
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
65
        num_msgs: int = bag_wrapper.get_topic_count(imu_topic)
2✔
66

67
        # TODO: Load the frame id directly from the ROS2 bag.
68

69
        # Setup arrays to hold data
70
        timestamps = np.zeros((num_msgs), dtype=object)
2✔
71
        lin_acc = np.zeros((num_msgs, 3), dtype=np.double)
2✔
72
        ang_vel = np.zeros((num_msgs, 3), dtype=np.double)
2✔
73
        orientation = np.zeros((num_msgs, 4), dtype=np.double)
2✔
74

75
        # Extract the images/timestamps and save
76
        with Reader2(bag_path) as reader: 
2✔
77
            i = 0
2✔
78
            connections = [x for x in reader.connections if x.topic == imu_topic]
2✔
79
            for conn, _, rawdata in reader.messages(connections=connections):
2✔
80
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
81

82
                # Extract imu data 
83
                lin_acc[i] = np.array([msg.linear_acceleration.x, 
2✔
84
                                       msg.linear_acceleration.y, 
85
                                       msg.linear_acceleration.z], dtype=np.double)
86
                ang_vel[i] = np.array([msg.angular_velocity.x, 
2✔
87
                                       msg.angular_velocity.y, 
88
                                       msg.angular_velocity.z], dtype=np.double)
89
                orientation[i] = np.array([msg.orientation.x, 
2✔
90
                                       msg.orientation.y, 
91
                                       msg.orientation.z,
92
                                       msg.orientation.w], dtype=np.double)
93

94
                # Extract timestamps
95
                timestamps[i] = Ros2BagWrapper.extract_timestamp(msg)
2✔
96

97
                # Update the count
98
                i += 1
2✔
99

100
        # Create an ImageData class
101
        return cls(frame_id, CoordinateFrame.FLU, timestamps, lin_acc, ang_vel, orientation)
2✔
102
    
103
    @classmethod
2✔
104
    @typechecked
2✔
105
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, frame: CoordinateFrame):
2✔
106
        """
107
        Creates a class structure from the TartanAir dataset format, which includes
108
        various .txt files with IMU data. It expects the timestamp, the linear
109
        acceleration, and the angular velocity, seperated by spaces in that order.
110

111
        Args:
112
            file_path (Path | str): Path to the file containing the IMU data.
113
            frame_id (str): The frame where this IMU data was collected.
114
            frame (CoordinateFrame): The coordinate system convention of this data.
115
        Returns:
116
            ImuData: Instance of this class.
117

118
        NOTE: Sets orientation to identity! 
119
        """
120
        
121
        # Count the number of lines in the file
122
        line_count = 0
2✔
123
        with open(str(file_path), 'r') as file:
2✔
124
            for _ in file: 
2✔
125
                line_count += 1
2✔
126

127
        # Setup arrays to hold data
128
        timestamps = np.zeros((line_count), dtype=object)
2✔
129
        lin_acc = np.zeros((line_count, 3), dtype=object)
2✔
130
        ang_vel = np.zeros((line_count, 3), dtype=object)
2✔
131

132
        # Open the txt file and read in the data
133
        with open(str(file_path), 'r') as file:
2✔
134
            for i, line in enumerate(file):
2✔
135
                line_split = line.split(' ')
2✔
136
                timestamps[i] = line_split[0]
2✔
137
                lin_acc[i] = line_split[1:4]
2✔
138
                ang_vel[i] = line_split[4:7]
2✔
139
        
140
        # Set orientation to identity, as it is assumed this text file doesn't have it
141
        orientation = np.zeros((lin_acc.shape[0], 4), dtype=int)
2✔
142
        orientation[:,3] = np.ones((lin_acc.shape[0]), dtype=int)
2✔
143

144
        # Create the ImuData class
145
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientation)
2✔
146
    
147
    # =========================================================================
148
    # ========================= Manipulation Methods ========================== 
149
    # =========================================================================  
150

151
    def crop_data(self, start: Decimal, end: Decimal):
2✔
152
        """ Will crop the data so only values within [start, end] inclusive are kept. """
153

154
        # Create boolean mask of data to keep
155
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
156

157
        # Apply mask
158
        self.timestamps = self.timestamps[mask]
2✔
159
        self.lin_acc = self.lin_acc[mask]
2✔
160
        self.ang_vel = self.ang_vel[mask]
2✔
161
        self.orientations = self.orientations[mask]
2✔
162
        
163
    # =========================================================================
164
    # ============================ Export Methods ============================= 
165
    # =========================================================================  
166

167
    def to_PathData(self, initial_pos: NDArray[float], initial_vel: NDArray[float], 
2✔
168
                    initial_ori: NDArray[float], use_ang_vel: bool) -> PathData:
169
        """
170
        Converts this IMUData class into OdometeryData by integrating the IMU data using
171
        Euler's method.
172

173
        Parameters:
174
            initial_pos: The initial position as a numpy array.
175
            initial_vel: The initial velocity as a numpy array.
176
            initial_ori: The initial orientation as a numpy array (quaternion x, y, z, w).
177
            use_ang_vel: If True, will use angular velocity data to calculate orientation.
178
                If False, will use orientation data directly from the IMUData class.
179

180
        Returns:
181
            PathData: The resulting PathData class.
182
        """
183

184
        print("WARNING: This code has not been extensively tested yet!")
×
185

186
        # Setup arrays to hold integrated data 
187
        pos = np.zeros((self.len(), 3), dtype=float)
×
188
        pos[0] = initial_pos
×
189
        vel = np.zeros((self.len(), 3), dtype=float)
×
190
        vel[0] = initial_vel
×
191

192
        # Setup array to hold orientation data
193
        if use_ang_vel:
×
194
            ori = np.zeros((self.len(), 4), dtype=float)  
×
195
            ori[:, 3] = np.ones((self.len()), dtype=float)
×
196
        else:
197
            ori = dec_arr_to_float_arr(self.orientations)
×
198
        ori[0] = initial_ori
×
199

200
        # Setup a tqdm progress bar
201
        pbar = tqdm.tqdm(total=self.len()-1, desc="Integrating IMU Data", unit="steps")
×
202

203
        # Integrate the IMU data
204
        for i in range(1, self.len()):
×
205
            # Get time difference
NEW
206
            dt: float = dec_arr_to_float_arr(self.timestamps[i] - self.timestamps[i-1]).item()
×
207

208
            # Calculate orientation
209
            if use_ang_vel:
×
210
                cur_ori = R.from_quat(ori[i-1])
×
211
                delta_q = R.from_rotvec(dec_arr_to_float_arr(self.ang_vel[i-1]) * dt)
×
212
                new_ori = (cur_ori * delta_q).as_quat()
×
213
                ori[i] = new_ori / np.linalg.norm(new_ori)
×
214

215
            # Rotate linear acceleration into world frame
216
            r = R.from_quat(ori[i-1])
×
217
            lin_acc_world = r.apply(dec_arr_to_float_arr(self.lin_acc[i-1]))
×
218
            lin_acc_world = lin_acc_world
×
219

220
            # Subtract gravity
221
            GRAVITY_CONST = 9.80665
×
222
            if self.frame == CoordinateFrame.FLU:
×
223
                lin_acc_world[2] -= GRAVITY_CONST
×
224
            elif self.frame == CoordinateFrame.NED:
×
225
                lin_acc_world[2] += GRAVITY_CONST
×
226
            else:
227
                raise RuntimeError(f"to_PathData() doesn't currently support this frame: {self.frame}!")
×
228

229
            # Calculate velocity
230
            vel[i] = vel[i-1] + lin_acc_world * dt
×
231

232
            # Calculate position
233
            pos[i] = pos[i-1] + vel[i-1] * dt + 0.5 * lin_acc_world * dt * dt
×
234
            pbar.update(1)
×
235

236
        # Return the resulting PathData class
237
        return PathData(self.frame_id, self.timestamps, pos, ori, self.frame)
×
238

239
    # =========================================================================
240
    # =========================== Conversion to ROS =========================== 
241
    # ========================================================================= 
242

243
    @typechecked
2✔
244
    @staticmethod
2✔
245
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
246
        """ Return the __msgtype__ for an Imu msg. """
247

248
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
249
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
250
            return typestore.types['sensor_msgs/msg/Imu'].__msgtype__
2✔
NEW
251
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
252
            from sensor_msgs.msg import Imu
×
NEW
253
            return Imu
×
254
        else:
NEW
255
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg_type()!")
×
256
            
257
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
258
        """
259
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
260
        
261
        Args:
262
            i (int): The index of the image message to convert.
263
        Raises:
264
            ValueError: If i is outside the data bounds.
265

266
        # NOTE: Currently ignores orientation data, and doesn't set covariance values.
267
        """
268

269
        # Check to make sure index is within data bounds
270
        if i < 0 or i >= self.len():
2✔
271
            raise ValueError(f"Index {i} is out of bounds!")
×
272

273
        # Get the seconds and nanoseconds
274
        seconds = int(self.timestamps[i])
2✔
275
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
2✔
276

277
        # Write the data into the new msg
278
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
279
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
280
            Imu = typestore.types['sensor_msgs/msg/Imu']
2✔
281
            Header = typestore.types['std_msgs/msg/Header']
2✔
282
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
283
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
284
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
285

286
            return Imu(Header(stamp=Time(sec=int(seconds), 
2✔
287
                                        nanosec=int(nanoseconds)), 
288
                            frame_id=self.frame_id),
289
                        orientation=Quaternion(x=0,
290
                                            y=0,
291
                                            z=0,
292
                                            w=1), # Currently ignores data in orientation
293
                        orientation_covariance=np.zeros(9),
294
                        angular_velocity=Vector3(x=self.ang_vel[i][0],
295
                                                y=self.ang_vel[i][1],
296
                                                z=self.ang_vel[i][2]),
297
                        angular_velocity_covariance=np.zeros(9),
298
                        linear_acceleration=Vector3(x=self.lin_acc[i][0],
299
                                                    y=self.lin_acc[i][1], 
300
                                                    z=self.lin_acc[i][2]),
301
                        linear_acceleration_covariance=np.zeros(9))
302
        
NEW
303
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
×
NEW
304
            from std_msgs.msg import Header
×
NEW
305
            from sensor_msgs.msg import Imu
×
NEW
306
            from geometry_msgs.msg import Quaternion, Vector3
×
307

308
            # Create the message objects
NEW
309
            imu_msg = Imu()
×
NEW
310
            imu_msg.header = Header()
×
NEW
311
            if lib_type == ROSMsgLibType.RCLPY: 
×
NEW
312
                from rclpy.time import Time
×
NEW
313
                imu_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
314
            else:
NEW
315
                import rospy
×
NEW
316
                imu_msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
317

318
            # Populate the rest of the data
NEW
319
            imu_msg.header.frame_id = self.frame_id
×
NEW
320
            imu_msg.orientation = Quaternion()
×
NEW
321
            imu_msg.orientation.x = 0.0  # NOTE: Currently ignores data in orientation
×
NEW
322
            imu_msg.orientation.y = 0.0
×
NEW
323
            imu_msg.orientation.z = 0.0
×
NEW
324
            imu_msg.orientation.w = 1.0
×
NEW
325
            imu_msg.orientation_covariance = np.zeros(9)
×
NEW
326
            imu_msg.angular_velocity = Vector3()
×
NEW
327
            imu_msg.angular_velocity.x = float(self.ang_vel[i][0])
×
NEW
328
            imu_msg.angular_velocity.y = float(self.ang_vel[i][1])
×
NEW
329
            imu_msg.angular_velocity.z = float(self.ang_vel[i][2])
×
NEW
330
            imu_msg.angular_velocity_covariance = np.zeros(9)
×
NEW
331
            imu_msg.linear_acceleration = Vector3()
×
NEW
332
            imu_msg.linear_acceleration.x = float(self.lin_acc[i][0])
×
NEW
333
            imu_msg.linear_acceleration.y = float(self.lin_acc[i][1])
×
NEW
334
            imu_msg.linear_acceleration.z = float(self.lin_acc[i][2])
×
NEW
335
            imu_msg.linear_acceleration_covariance = np.zeros(9)
×
NEW
336
            return imu_msg
×
337

338
        else:
NEW
339
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg()!")
×
340
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc