• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21218149827

21 Jan 2026 04:52PM UTC coverage: 75.402% (+0.7%) from 74.672%
21218149827

Pull #10

github

DanielChaseButterfield
Refactor hertz_analysis to make it part of Data
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

804 of 1098 new or added lines in 13 files covered. (73.22%)

3 existing lines in 2 files now uncovered.

1689 of 2240 relevant lines covered (75.4%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.14
/src/robotdataprocess/data_types/ImuData.py
1
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
2✔
2
from .Data import Data, CoordinateFrame, ROSMsgLibType
2✔
3
import decimal
2✔
4
from decimal import Decimal
2✔
5
from ..ModuleImporter import ModuleImporter
2✔
6
import numpy as np
2✔
7
from numpy.typing import NDArray
2✔
8
from pathlib import Path
2✔
9
from robotdataprocess.data_types.PathData import PathData
2✔
10
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
11
from rosbags.rosbag2 import Reader as Reader2
2✔
12
from rosbags.typesys import Stores, get_typestore
2✔
13
from rosbags.typesys.store import Typestore
2✔
14
from scipy.spatial.transform import Rotation as R
2✔
15
from typeguard import typechecked
2✔
16
from typing import Union, Any, Optional
2✔
17
import tqdm
2✔
18

19
@typechecked
2✔
20
class ImuData(Data):
2✔
21

22
    # Define IMU-specific data attributes
23
    lin_acc: NDArray
2✔
24
    ang_vel: NDArray
2✔
25
    orientations: Optional[NDArray] # quaternions (x, y, z, w)
2✔
26
    frame: CoordinateFrame
2✔
27

28
    @typechecked
2✔
29
    def __init__(self, frame_id: str, frame: CoordinateFrame, timestamps: Union[np.ndarray, list], 
2✔
30
                 lin_acc: Union[np.ndarray, list], ang_vel: Union[np.ndarray, list],
31
                 orientations: Optional[NDArray]):
32
        
33
        # Copy initial values into attributes
34
        super().__init__(frame_id, timestamps)
2✔
35
        self.frame = frame
2✔
36
        self.lin_acc = col_to_dec_arr(lin_acc)
2✔
37
        self.ang_vel = col_to_dec_arr(ang_vel)
2✔
38
        if orientations is not None:
2✔
39
            self.orientations = col_to_dec_arr(orientations)
2✔
40
        else:
41
            self.orientations = None
2✔
42

43
        # Check to ensure that all arrays have same length
44
        if len(self.timestamps) != len(self.lin_acc) or len(self.lin_acc) != len(self.ang_vel) \
2✔
45
            or (self.orientations is not None and len(self.ang_vel) != len(self.orientations)):
46
            raise ValueError("Lengths of timestamp, lin_acc, ang_vel, and orientation arrays are not equal!")
2✔
47

48
    # =========================================================================
49
    # ============================ Class Methods ============================== 
50
    # =========================================================================  
51

52
    @classmethod
2✔
53
    @typechecked
2✔
54
    def from_ros2_bag(cls, bag_path: Union[Path, str], imu_topic: str, frame_id: str):
2✔
55
        """
56
        Creates a class structure from a ROS2 bag file with an Imu topic.
57

58
        Args:
59
            bag_path (Path | str): Path to the ROS2 bag file.
60
            img_topic (str): Topic of the Imu messages.
61
            frame_id (str): The frame where this IMU data was collected.
62
        Returns:
63
            ImageData: Instance of this class.
64
        """
65

66
        print("WARNING: This code does not check the orientation covariance to determine if the orientation is valid; may use invalid orientations!")
2✔
67

68
        # Get topic message count and typestore
69
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
70
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
71
        num_msgs: int = bag_wrapper.get_topic_count(imu_topic)
2✔
72

73
        # TODO: Load the frame id directly from the ROS2 bag.
74

75
        # Setup arrays to hold data
76
        timestamps = np.zeros((num_msgs), dtype=object)
2✔
77
        lin_acc = np.zeros((num_msgs, 3), dtype=np.double)
2✔
78
        ang_vel = np.zeros((num_msgs, 3), dtype=np.double)
2✔
79
        orientation = np.zeros((num_msgs, 4), dtype=np.double)
2✔
80

81
        # Extract the images/timestamps and save
82
        with Reader2(bag_path) as reader: 
2✔
83
            i = 0
2✔
84
            connections = [x for x in reader.connections if x.topic == imu_topic]
2✔
85
            for conn, _, rawdata in reader.messages(connections=connections):
2✔
86
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
87

88
                # Extract imu data 
89
                lin_acc[i] = np.array([msg.linear_acceleration.x, 
2✔
90
                                       msg.linear_acceleration.y, 
91
                                       msg.linear_acceleration.z], dtype=np.double)
92
                ang_vel[i] = np.array([msg.angular_velocity.x, 
2✔
93
                                       msg.angular_velocity.y, 
94
                                       msg.angular_velocity.z], dtype=np.double)
95
                orientation[i] = np.array([msg.orientation.x, 
2✔
96
                                       msg.orientation.y, 
97
                                       msg.orientation.z,
98
                                       msg.orientation.w], dtype=np.double)
99

100
                # Extract timestamps
101
                timestamps[i] = Ros2BagWrapper.extract_timestamp(msg)
2✔
102

103
                # Update the count
104
                i += 1
2✔
105

106
        # Create an ImageData class
107
        return cls(frame_id, CoordinateFrame.FLU, timestamps, lin_acc, ang_vel, orientation)
2✔
108
    
109
    @classmethod
2✔
110
    @typechecked
2✔
111
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, frame: CoordinateFrame, nine_axis: bool = False):
2✔
112
        """
113
        Creates a class structure from the TartanAir dataset format, which includes
114
        various .txt files with IMU data. It expects (in order) the timestamp, the linear
115
        acceleration, and the angular velocity seperated by spaces. Will also expect 
116
        orientation (xyzw) at the end if nine_axis is set to True.
117

118
        Args:
119
            file_path (Path | str): Path to the file containing the IMU data.
120
            frame_id (str): The frame where this IMU data was collected.
121
            frame (CoordinateFrame): The coordinate system convention of this data.
122
            nine_axis (bool): If true, will also load orientations from txt file.
123
        Returns:
124
            ImuData: Instance of this class.
125
        """
126
        
127
        # Count the number of lines in the file
128
        line_count = 0
2✔
129
        with open(str(file_path), 'r') as file:
2✔
130
            for _ in file: 
2✔
131
                line_count += 1
2✔
132

133
        # Setup arrays to hold data
134
        timestamps = np.zeros((line_count), dtype=object)
2✔
135
        lin_acc = np.zeros((line_count, 3), dtype=object)
2✔
136
        ang_vel = np.zeros((line_count, 3), dtype=object)
2✔
137
        if nine_axis:
2✔
138
            orientations = np.zeros((line_count, 4), dtype=object)
2✔
139
        else:
140
            orientations = None
2✔
141

142
        # Open the txt file and read in the data
143
        with open(str(file_path), 'r') as file:
2✔
144
            for i, line in enumerate(file):
2✔
145
                line_split = line.split(' ')
2✔
146
                timestamps[i] = line_split[0]
2✔
147
                lin_acc[i] = line_split[1:4]
2✔
148
                ang_vel[i] = line_split[4:7]
2✔
149
                if nine_axis:
2✔
150
                    orientations[i] = line_split[7:11]
2✔
151
        
152
        # Create the ImuData class
153
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientations)
2✔
154
    
155
    # =========================================================================
156
    # ========================= Manipulation Methods ========================== 
157
    # =========================================================================  
158

159
    def crop_data(self, start: Decimal, end: Decimal):
2✔
160
        """ Will crop the data so only values within [start, end] inclusive are kept. """
161

162
        # Create boolean mask of data to keep
163
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
164

165
        # Apply mask
166
        self.timestamps = self.timestamps[mask]
2✔
167
        self.lin_acc = self.lin_acc[mask]
2✔
168
        self.ang_vel = self.ang_vel[mask]
2✔
169
        if self.orientations is not None:
2✔
170
            self.orientations = self.orientations[mask]
2✔
171
        
172
    # =========================================================================
173
    # ============================ Export Methods ============================= 
174
    # =========================================================================  
175

176
    def to_PathData(self, initial_pos: NDArray[float], initial_vel: NDArray[float], 
2✔
177
                    initial_ori: NDArray[float], use_ang_vel: bool) -> PathData:
178
        """
179
        Converts this IMUData class into OdometeryData by integrating the IMU data using
180
        Euler's method.
181

182
        Parameters:
183
            initial_pos: The initial position as a numpy array.
184
            initial_vel: The initial velocity as a numpy array.
185
            initial_ori: The initial orientation as a numpy array (quaternion x, y, z, w),
186
                only used if use_ang_vel is True.
187
            use_ang_vel: If True, will use angular velocity data to calculate orientation.
188
                If False, will use orientation data directly from the IMUData class.
189

190
        Returns:
191
            PathData: The resulting PathData class.
192
        """
193

194
        print("WARNING: This code has not been extensively tested yet!")
×
195

196
        # Setup arrays to hold integrated data 
197
        pos = np.zeros((self.len(), 3), dtype=float)
×
198
        pos[0] = initial_pos
×
199
        vel = np.zeros((self.len(), 3), dtype=float)
×
200
        vel[0] = initial_vel
×
201

202
        # Setup array to hold orientation data
203
        if use_ang_vel:
×
204
            ori = np.zeros((self.len(), 4), dtype=float)  
×
205
            ori[:, 3] = np.ones((self.len()), dtype=float)
×
NEW
206
            ori[0] = initial_ori
×
207
        else:
NEW
208
            if self.orientations is not None:
×
NEW
209
                ori = dec_arr_to_float_arr(self.orientations)
×
210
            else:
NEW
211
                raise ValueError("use_ang_vel is False, but this ImuData hs no orientation data to use!")
×
212

213
        # Setup a tqdm progress bar
214
        pbar = tqdm.tqdm(total=self.len()-1, desc="Integrating IMU Data", unit="steps")
×
215

216
        # Integrate the IMU data
217
        for i in range(1, self.len()):
×
218
            # Get time difference
NEW
219
            dt: float = dec_arr_to_float_arr(self.timestamps[i] - self.timestamps[i-1]).item()
×
220

221
            # Calculate orientation
222
            if use_ang_vel:
×
223
                cur_ori = R.from_quat(ori[i-1])
×
224
                delta_q = R.from_rotvec(dec_arr_to_float_arr(self.ang_vel[i-1]) * dt)
×
225
                new_ori = (cur_ori * delta_q).as_quat()
×
226
                ori[i] = new_ori / np.linalg.norm(new_ori)
×
227

228
            # Rotate linear acceleration into world frame
229
            r = R.from_quat(ori[i-1])
×
230
            lin_acc_world = r.apply(dec_arr_to_float_arr(self.lin_acc[i-1]))
×
231
            lin_acc_world = lin_acc_world
×
232

233
            # Subtract gravity
234
            GRAVITY_CONST = 9.80665
×
235
            if self.frame == CoordinateFrame.FLU:
×
236
                lin_acc_world[2] -= GRAVITY_CONST
×
237
            elif self.frame == CoordinateFrame.NED:
×
238
                lin_acc_world[2] += GRAVITY_CONST
×
239
            else:
240
                raise RuntimeError(f"to_PathData() doesn't currently support this frame: {self.frame}!")
×
241

242
            # Calculate velocity
243
            vel[i] = vel[i-1] + lin_acc_world * dt
×
244

245
            # Calculate position
246
            pos[i] = pos[i-1] + vel[i-1] * dt + 0.5 * lin_acc_world * dt * dt
×
247
            pbar.update(1)
×
248

249
        # Return the resulting PathData class
250
        return PathData(self.frame_id, self.timestamps, pos, ori, self.frame)
×
251

252
    # =========================================================================
253
    # =========================== Conversion to ROS =========================== 
254
    # ========================================================================= 
255

256
    @staticmethod
2✔
257
    def get_ros_msg_type(lib_type: ROSMsgLibType) -> Any:
2✔
258
        """ Return the __msgtype__ for an Imu msg. """
259

260
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
261
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
262
            return typestore.types['sensor_msgs/msg/Imu'].__msgtype__
2✔
263
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
264
            return ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Imu')
2✔
265
        else:
NEW
266
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg_type()!")
×
267
            
268
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int):
2✔
269
        """
270
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
271
        
272
        Args:
273
            i (int): The index of the image message to convert.
274
        Raises:
275
            ValueError: If i is outside the data bounds.
276

277
        # NOTE: Assumes covariances of 0.
278
        """
279

280
        # Check to make sure index is within data bounds
281
        if i < 0 or i >= self.len():
2✔
282
            raise ValueError(f"Index {i} is out of bounds!")
×
283

284
        # Get the seconds and nanoseconds
285
        seconds = int(self.timestamps[i])
2✔
286
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
2✔
287

288
        # Write the data into the new msg
289
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
290
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
291
            Imu = typestore.types['sensor_msgs/msg/Imu']
2✔
292
            Header = typestore.types['std_msgs/msg/Header']
2✔
293
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
294
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
295
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
296

297
            if self.orientations is not None:
2✔
NEW
298
                ori = Quaternion(x=self.orientations[i][0], y=self.orientations[i][1],
×
299
                           z=self.orientations[i][2], w=self.orientations[i][3])
NEW
300
                ori_cov = np.zeros(9, dtype=np.float64)
×
301
            else:
302
                ori = Quaternion(x=0, y=0, z=0, w=1)
2✔
303
                ori_cov = np.zeros(9, dtype=np.float64)
2✔
304
                ori_cov[0] = -1
2✔
305

306
            return Imu(Header(stamp=Time(sec=int(seconds), 
2✔
307
                                        nanosec=int(nanoseconds)), 
308
                            frame_id=self.frame_id),
309
                        orientation=ori,
310
                        orientation_covariance=ori_cov,
311
                        angular_velocity=Vector3(x=self.ang_vel[i][0],
312
                                                y=self.ang_vel[i][1],
313
                                                z=self.ang_vel[i][2]),
314
                        angular_velocity_covariance=np.zeros(9),
315
                        linear_acceleration=Vector3(x=self.lin_acc[i][0],
316
                                                    y=self.lin_acc[i][1], 
317
                                                    z=self.lin_acc[i][2]),
318
                        linear_acceleration_covariance=np.zeros(9))
319
        
320
        elif lib_type == ROSMsgLibType.RCLPY or lib_type == ROSMsgLibType.ROSPY:
2✔
321
            Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
322
            Imu = ModuleImporter.get_module_attribute('sensor_msgs.msg', 'Imu')
2✔
323
            Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
2✔
324
            Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
2✔
325

326
            # Create the message objects
327
            imu_msg = Imu()
2✔
328
            imu_msg.header = Header()
2✔
329
            if lib_type == ROSMsgLibType.RCLPY: 
2✔
330
                Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
331
                imu_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
332
            else:
NEW
333
                rospy = ModuleImporter.get_module('rospy')
×
NEW
334
                imu_msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
335

336
            # Populate the rest of the data
337
            imu_msg.header.frame_id = self.frame_id
2✔
338
            imu_msg.orientation = Quaternion()
2✔
339
            if self.orientations is not None:
2✔
340
                imu_msg.orientation.x = float(self.orientations[i][0])  
2✔
341
                imu_msg.orientation.y = float(self.orientations[i][1])
2✔
342
                imu_msg.orientation.z = float(self.orientations[i][2])
2✔
343
                imu_msg.orientation.w = float(self.orientations[i][3])
2✔
344
                imu_msg.orientation_covariance = np.zeros(9, dtype=np.float64)
2✔
345
            else:
NEW
346
                imu_msg.orientation.x = 0.0
×
NEW
347
                imu_msg.orientation.y = 0.0
×
NEW
348
                imu_msg.orientation.z = 0.0
×
NEW
349
                imu_msg.orientation.w = 1.0
×
NEW
350
                covariance = np.zeros(9, dtype=np.float64)
×
NEW
351
                covariance[0] = -1
×
NEW
352
                imu_msg.orientation_covariance = covariance
×
353

354
            imu_msg.angular_velocity = Vector3()
2✔
355
            imu_msg.angular_velocity.x = float(self.ang_vel[i][0])
2✔
356
            imu_msg.angular_velocity.y = float(self.ang_vel[i][1])
2✔
357
            imu_msg.angular_velocity.z = float(self.ang_vel[i][2])
2✔
358
            imu_msg.angular_velocity_covariance = np.zeros(9)
2✔
359
            imu_msg.linear_acceleration = Vector3()
2✔
360
            imu_msg.linear_acceleration.x = float(self.lin_acc[i][0])
2✔
361
            imu_msg.linear_acceleration.y = float(self.lin_acc[i][1])
2✔
362
            imu_msg.linear_acceleration.z = float(self.lin_acc[i][2])
2✔
363
            imu_msg.linear_acceleration_covariance = np.zeros(9)
2✔
364
            return imu_msg
2✔
365

366
        else:
367
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for ImuData.get_ros_msg()!")
2✔
368
    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc