• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21483849472

29 Jan 2026 03:20PM UTC coverage: 74.142% (-0.5%) from 74.672%
21483849472

Pull #10

github

DanielChaseButterfield
Fix with dependency mismatch with typeguard in conversion_utils, ImageDataOnDisk for .npy files, linear interpolation for odometry data
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

881 of 1305 new or added lines in 14 files covered. (67.51%)

2 existing lines in 1 file now uncovered.

1749 of 2359 relevant lines covered (74.14%)

1.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.75
/src/robotdataprocess/data_types/ImuData.py
1
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
2✔
2
from .Data import Data, CoordinateFrame, ROSMsgLibType
2✔
3
import decimal
2✔
4
from decimal import Decimal
2✔
5
from ..ModuleImporter import ModuleImporter
2✔
6
import matplotlib.pyplot as plt
2✔
7
import numpy as np
2✔
8
from numpy.typing import NDArray
2✔
9
from pathlib import Path
2✔
10
from robotdataprocess.data_types.PathData import PathData
2✔
11
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
12
from rosbags.rosbag2 import Reader as Reader2
2✔
13
from rosbags.typesys import Stores, get_typestore
2✔
14
from rosbags.typesys.store import Typestore
2✔
15
from scipy.spatial.transform import Rotation as R
2✔
16
from typeguard import typechecked
2✔
17
from typing import Union, Any, Optional, List
2✔
18
import tqdm
2✔
19

20
@typechecked
2✔
21
class ImuData(Data):
2✔
22

23
    # Define IMU-specific data attributes
24
    lin_acc: NDArray
2✔
25
    ang_vel: NDArray
2✔
26
    orientations: Optional[NDArray] # quaternions (x, y, z, w)
2✔
27
    frame: CoordinateFrame
2✔
28

29
    @typechecked
2✔
30
    def __init__(self, frame_id: str, frame: CoordinateFrame, timestamps: Union[np.ndarray, list], 
2✔
31
                 lin_acc: Union[np.ndarray, list], ang_vel: Union[np.ndarray, list],
32
                 orientations: Optional[NDArray]):
33
        
34
        # Copy initial values into attributes
35
        super().__init__(frame_id, timestamps)
2✔
36
        self.frame = frame
2✔
37
        self.lin_acc = col_to_dec_arr(lin_acc)
2✔
38
        self.ang_vel = col_to_dec_arr(ang_vel)
2✔
39
        if orientations is not None:
2✔
40
            self.orientations = col_to_dec_arr(orientations)
2✔
41
        else:
42
            self.orientations = None
2✔
43

44
        # Check to ensure that all arrays have same length
45
        if len(self.timestamps) != len(self.lin_acc) or len(self.lin_acc) != len(self.ang_vel) \
2✔
46
            or (self.orientations is not None and len(self.ang_vel) != len(self.orientations)):
47
            raise ValueError("Lengths of timestamp, lin_acc, ang_vel, and orientation arrays are not equal!")
2✔
48

49
    # =========================================================================
50
    # ============================ Class Methods ============================== 
51
    # =========================================================================  
52

53
    @classmethod
2✔
54
    @typechecked
2✔
55
    def from_ros2_bag(cls, bag_path: Union[Path, str], imu_topic: str, frame_id: str):
2✔
56
        """
57
        Creates a class structure from a ROS2 bag file with an Imu topic.
58

59
        Args:
60
            bag_path (Path | str): Path to the ROS2 bag file.
61
            img_topic (str): Topic of the Imu messages.
62
            frame_id (str): The frame where this IMU data was collected.
63
        Returns:
64
            ImageData: Instance of this class.
65
        """
66

67
        print("WARNING: This code does not check the orientation covariance to determine if the orientation is valid; may use invalid orientations!")
2✔
68

69
        # Get topic message count and typestore
70
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
71
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
72
        num_msgs: int = bag_wrapper.get_topic_count(imu_topic)
2✔
73

74
        # TODO: Load the frame id directly from the ROS2 bag.
75

76
        # Setup arrays to hold data
77
        timestamps = np.zeros((num_msgs), dtype=object)
2✔
78
        lin_acc = np.zeros((num_msgs, 3), dtype=np.double)
2✔
79
        ang_vel = np.zeros((num_msgs, 3), dtype=np.double)
2✔
80
        orientation = np.zeros((num_msgs, 4), dtype=np.double)
2✔
81

82
        # Extract the images/timestamps and save
83
        with Reader2(bag_path) as reader: 
2✔
84
            i = 0
2✔
85
            connections = [x for x in reader.connections if x.topic == imu_topic]
2✔
86
            for conn, _, rawdata in reader.messages(connections=connections):
2✔
87
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
88

89
                # Extract imu data 
90
                lin_acc[i] = np.array([msg.linear_acceleration.x, 
2✔
91
                                       msg.linear_acceleration.y, 
92
                                       msg.linear_acceleration.z], dtype=np.double)
93
                ang_vel[i] = np.array([msg.angular_velocity.x, 
2✔
94
                                       msg.angular_velocity.y, 
95
                                       msg.angular_velocity.z], dtype=np.double)
96
                orientation[i] = np.array([msg.orientation.x, 
2✔
97
                                       msg.orientation.y, 
98
                                       msg.orientation.z,
99
                                       msg.orientation.w], dtype=np.double)
100

101
                # Extract timestamps
102
                timestamps[i] = Ros2BagWrapper.extract_timestamp(msg)
2✔
103

104
                # Update the count
105
                i += 1
2✔
106

107
        # Create an ImageData class
108
        return cls(frame_id, CoordinateFrame.FLU, timestamps, lin_acc, ang_vel, orientation)
2✔
109
    
110
    @classmethod
2✔
111
    @typechecked
2✔
112
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, frame: CoordinateFrame, nine_axis: bool = False):
2✔
113
        """
114
        Creates a class structure from the TartanAir dataset format, which includes
115
        various .txt files with IMU data. It expects (in order) the timestamp, the linear
116
        acceleration, and the angular velocity seperated by spaces. Will also expect 
117
        orientation (xyzw) at the end if nine_axis is set to True.
118

119
        Args:
120
            file_path (Path | str): Path to the file containing the IMU data.
121
            frame_id (str): The frame where this IMU data was collected.
122
            frame (CoordinateFrame): The coordinate system convention of this data.
123
            nine_axis (bool): If true, will also load orientations from txt file.
124
        Returns:
125
            ImuData: Instance of this class.
126
        """
127
        
128
        # Count the number of lines in the file
129
        line_count = 0
2✔
130
        with open(str(file_path), 'r') as file:
2✔
131
            for _ in file: 
2✔
132
                line_count += 1
2✔
133

134
        # Setup arrays to hold data
135
        timestamps = np.zeros((line_count), dtype=object)
2✔
136
        lin_acc = np.zeros((line_count, 3), dtype=object)
2✔
137
        ang_vel = np.zeros((line_count, 3), dtype=object)
2✔
138
        if nine_axis:
2✔
139
            orientations = np.zeros((line_count, 4), dtype=object)
2✔
140
        else:
141
            orientations = None
2✔
142

143
        # Open the txt file and read in the data
144
        with open(str(file_path), 'r') as file:
2✔
145
            for i, line in enumerate(file):
2✔
146
                line_split = line.split(' ')
2✔
147
                timestamps[i] = line_split[0]
2✔
148
                lin_acc[i] = line_split[1:4]
2✔
149
                ang_vel[i] = line_split[4:7]
2✔
150
                if nine_axis:
2✔
151
                    orientations[i] = line_split[7:11]
2✔
152
        
153
        # Create the ImuData class
154
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientations)
2✔
155
    
156
    # =========================================================================
157
    # ========================= Manipulation Methods ========================== 
158
    # =========================================================================  
159

160
    def crop_data(self, start: Decimal, end: Decimal):
2✔
161
        """ Will crop the data so only values within [start, end] inclusive are kept. """
162

163
        # Create boolean mask of data to keep
164
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
165

166
        # Apply mask
167
        self.timestamps = self.timestamps[mask]
2✔
168
        self.lin_acc = self.lin_acc[mask]
2✔
169
        self.ang_vel = self.ang_vel[mask]
2✔
170
        if self.orientations is not None:
2✔
171
            self.orientations = self.orientations[mask]
2✔
172

173
    # =========================================================================
174
    # ============================ Visualization ============================== 
175
    # =========================================================================  
176

177
    def visualize(self, ts_start: float, ts_end: float):
2✔
178
        """ Plot the linear acceleration, angular velocity, and orientation data. """
179

NEW
180
        def multi_list_plotter(data: np.ndarray, timestamps: np.ndarray, title: str, keys: List[str], ylabel: str):
×
181
            """
182
            Helper function that plots each list in a dictionary in its own subplot,
183
            all in a single matplotlib figure.
184
            """
185

NEW
186
            num_plots = data.shape[1]
×
NEW
187
            fig, axes = plt.subplots(num_plots, 1, figsize=(10, 3 * num_plots), sharex=True)
×
188

189
            # Make it a list always
NEW
190
            if num_plots == 1:
×
NEW
191
                axes = [axes]
×
NEW
192
            timestamps_shifted = timestamps - timestamps[0]
×
193

NEW
194
            for i in range(0, num_plots):
×
NEW
195
                axes[i].plot(timestamps_shifted, data[:,i], label=keys[i])
×
NEW
196
                axes[i].set_ylabel(f"{keys[i]} - ({ylabel})")
×
NEW
197
                axes[i].grid(True)
×
NEW
198
                axes[i].legend(loc='upper right')
×
199

NEW
200
            axes[-1].set_xlabel("Time (s)")
×
NEW
201
            fig.suptitle(f"{title} vs Time", fontsize=14)
×
NEW
202
            fig.tight_layout(rect=[0, 0.03, 1, 0.95])
×
NEW
203
            plt.show()
×
NEW
204
            plt.close(fig)
×
205

206
        # Create all plots
NEW
207
        mask = (self.timestamps >= ts_start) & (self.timestamps <= ts_end)
×
NEW
208
        timestamps_float = dec_arr_to_float_arr(self.timestamps[mask])
×
NEW
209
        multi_list_plotter(dec_arr_to_float_arr(self.lin_acc[mask]), timestamps_float, "Linear Acceleration", ['x', 'y', 'z'], "m/s^2")
×
NEW
210
        multi_list_plotter(dec_arr_to_float_arr(self.ang_vel[mask]), timestamps_float, "Angular Velocity", ['roll', 'pitch', 'yaw'], "rad/s")
×
NEW
211
        multi_list_plotter(dec_arr_to_float_arr(self.orientations[mask]), timestamps_float, "Orientation", ['x', 'y', 'z', 'w'], "units")
×
212
        
213
    # =========================================================================
214
    # ============================ Export Methods ============================= 
215
    # =========================================================================  
216

217
    def to_PathData(self, initial_pos: NDArray, initial_vel: NDArray, 
2✔
218
                    initial_ori: NDArray, use_ang_vel: bool) -> PathData:
219
        """
220
        Converts this IMUData class into OdometeryData by integrating the IMU data using
221
        Euler's method.
222

223
        Parameters:
224
            initial_pos: The initial position as a numpy array.
225
            initial_vel: The initial velocity as a numpy array.
226
            initial_ori: The initial orientation as a numpy array (quaternion x, y, z, w),
227
                only used if use_ang_vel is True.
228
            use_ang_vel: If True, will use angular velocity data to calculate orientation.
229
                If False, will use orientation data directly from the IMUData class.
230

231
        Returns:
232
            PathData: The resulting PathData class.
233
        """
234

235
        print("WARNING: This code has not been extensively tested yet!")
×
236

237
        # Setup arrays to hold integrated data 
238
        pos = np.zeros((self.len(), 3), dtype=float)
×
239
        pos[0] = initial_pos
×
240
        vel = np.zeros((self.len(), 3), dtype=float)
×
241
        vel[0] = initial_vel
×
242

243
        # Setup array to hold orientation data
244
        if use_ang_vel:
×
245
            ori = np.zeros((self.len(), 4), dtype=float)  
×
246
            ori[:, 3] = np.ones((self.len()), dtype=float)
×
NEW
247
            ori[0] = initial_ori
×
248
        else:
NEW
249
            if self.orientations is not None:
×
NEW
250
                ori = dec_arr_to_float_arr(self.orientations)
×
251
            else:
NEW
252
                raise ValueError("use_ang_vel is False, but this ImuData hs no orientation data to use!")
×
253

254
        # Setup a tqdm progress bar
255
        pbar = tqdm.tqdm(total=self.len()-1, desc="Integrating IMU Data", unit="steps")
×
256

257
        # Integrate the IMU data
258
        for i in range(1, self.len()):
×
259
            # Get time difference
NEW
260
            dt: float = dec_arr_to_float_arr(np.array([self.timestamps[i] - self.timestamps[i-1]])).item()
×
261

262
            # Calculate orientation
263
            if use_ang_vel:
×
264
                cur_ori = R.from_quat(ori[i-1])
×
265
                delta_q = R.from_rotvec(dec_arr_to_float_arr(self.ang_vel[i-1]) * dt)
×
266
                new_ori = (cur_ori * delta_q).as_quat()
×
267
                ori[i] = new_ori / np.linalg.norm(new_ori)
×
268

269
            # Rotate linear acceleration into world frame
270
            r = R.from_quat(ori[i-1])
×
271
            lin_acc_world = r.apply(dec_arr_to_float_arr(self.lin_acc[i-1]))
×
272
            lin_acc_world = lin_acc_world
×
273

274
            # Subtract gravity
275
            GRAVITY_CONST = 9.80665
×
276
            if self.frame == CoordinateFrame.FLU:
×
277
                lin_acc_world[2] -= GRAVITY_CONST
×
278
            elif self.frame == CoordinateFrame.NED:
×
279
                lin_acc_world[2] += GRAVITY_CONST
×
280
            else:
281
                raise RuntimeError(f"to_PathData() doesn't currently support this frame: {self.frame}!")
×
282

283
            # Calculate velocity
284
            vel[i] = vel[i-1] + lin_acc_world * dt
×
285

286
            # Calculate position
287
            pos[i] = pos[i-1] + vel[i-1] * dt + 0.5 * lin_acc_world * dt * dt
×
288
            pbar.update(1)
×
289

290
        # Return the resulting PathData class
291
        return PathData(self.frame_id, self.timestamps, pos, ori, self.frame)
×
292

293
    # =========================================================================
294
    # =========================== Conversion to ROS =========================== 
295
    # ========================================================================= 
296

297
    @staticmethod
2✔
298
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
299
        """ Return the __msgtype__ for an Imu msg. """
300

301
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
302
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
303
            return typestore.types['sensor_msgs/msg/Imu'].__msgtype__
2✔
304
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
305
            return ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Imu')
2✔
306
        else:
NEW
307
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg_type()!")
×
308
            
309
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
310
        """
311
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
312
        
313
        Args:
314
            i (int): The index of the image message to convert.
315
        Raises:
316
            ValueError: If i is outside the data bounds.
317

318
        # NOTE: Assumes covariances of 0.
319
        """
320

321
        # Check to make sure index is within data bounds
322
        if i < 0 or i >= self.len():
2✔
323
            raise ValueError(f"Index {i} is out of bounds!")
×
324

325
        # Get the seconds and nanoseconds
326
        seconds = int(self.timestamps[i])
2✔
327
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
2✔
328

329
        # Write the data into the new msg
330
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
331
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
332
            Imu = typestore.types['sensor_msgs/msg/Imu']
2✔
333
            Header = typestore.types['std_msgs/msg/Header']
2✔
334
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
335
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
336
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
337

338
            if self.orientations is not None:
2✔
339
                ori = Quaternion(x=self.orientations[i][0], y=self.orientations[i][1],
2✔
340
                           z=self.orientations[i][2], w=self.orientations[i][3])
341
                ori_cov = np.zeros(9, dtype=np.float64)
2✔
342
            else:
343
                ori = Quaternion(x=0, y=0, z=0, w=1)
2✔
344
                ori_cov = np.zeros(9, dtype=np.float64)
2✔
345
                ori_cov[0] = -1
2✔
346

347
            return Imu(Header(stamp=Time(sec=int(seconds), 
2✔
348
                                        nanosec=int(nanoseconds)), 
349
                            frame_id=self.frame_id),
350
                        orientation=ori,
351
                        orientation_covariance=ori_cov,
352
                        angular_velocity=Vector3(x=self.ang_vel[i][0],
353
                                                y=self.ang_vel[i][1],
354
                                                z=self.ang_vel[i][2]),
355
                        angular_velocity_covariance=np.zeros(9),
356
                        linear_acceleration=Vector3(x=self.lin_acc[i][0],
357
                                                    y=self.lin_acc[i][1], 
358
                                                    z=self.lin_acc[i][2]),
359
                        linear_acceleration_covariance=np.zeros(9))
360
        
361
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
362
            Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
363
            Imu = ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Imu')
2✔
364
            Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
2✔
365
            Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
2✔
366

367
            # Create the message objects
368
            imu_msg = Imu()
2✔
369
            imu_msg.header = Header()
2✔
370
            if lib_type == ROSMsgLibType.RCLPY: 
2✔
371
                Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
372
                imu_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
373
            else:
NEW
374
                rospy = ModuleImporter.get_module('rospy')
×
NEW
375
                imu_msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
376

377
            # Populate the rest of the data
378
            imu_msg.header.frame_id = self.frame_id
2✔
379
            imu_msg.orientation = Quaternion()
2✔
380
            if self.orientations is not None:
2✔
381
                imu_msg.orientation.x = float(self.orientations[i][0])  
2✔
382
                imu_msg.orientation.y = float(self.orientations[i][1])
2✔
383
                imu_msg.orientation.z = float(self.orientations[i][2])
2✔
384
                imu_msg.orientation.w = float(self.orientations[i][3])
2✔
385
                imu_msg.orientation_covariance = np.zeros(9, dtype=np.float64)
2✔
386
            else:
NEW
387
                imu_msg.orientation.x = 0.0
×
NEW
388
                imu_msg.orientation.y = 0.0
×
NEW
389
                imu_msg.orientation.z = 0.0
×
NEW
390
                imu_msg.orientation.w = 1.0
×
NEW
391
                covariance = np.zeros(9, dtype=np.float64)
×
NEW
392
                covariance[0] = -1
×
NEW
393
                imu_msg.orientation_covariance = covariance
×
394

395
            imu_msg.angular_velocity = Vector3()
2✔
396
            imu_msg.angular_velocity.x = float(self.ang_vel[i][0])
2✔
397
            imu_msg.angular_velocity.y = float(self.ang_vel[i][1])
2✔
398
            imu_msg.angular_velocity.z = float(self.ang_vel[i][2])
2✔
399
            imu_msg.angular_velocity_covariance = np.zeros(9)
2✔
400
            imu_msg.linear_acceleration = Vector3()
2✔
401
            imu_msg.linear_acceleration.x = float(self.lin_acc[i][0])
2✔
402
            imu_msg.linear_acceleration.y = float(self.lin_acc[i][1])
2✔
403
            imu_msg.linear_acceleration.z = float(self.lin_acc[i][2])
2✔
404
            imu_msg.linear_acceleration_covariance = np.zeros(9)
2✔
405
            return imu_msg
2✔
406

407
        else:
408
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg()!")
2✔
409
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc