• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20172884498

12 Dec 2025 04:18PM UTC coverage: 74.672% (+1.2%) from 73.457%
20172884498

push

github

web-flow
(v0.1.2) Path Metrics & Transformations, Python >=3.8 Support

269 of 364 new or added lines in 8 files covered. (73.9%)

6 existing lines in 4 files now uncovered.

1082 of 1449 relevant lines covered (74.67%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.81
/src/robotdataprocess/data_types/ImuData.py
1
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
1✔
2
from .Data import Data, CoordinateFrame
1✔
3
import decimal
1✔
4
from decimal import Decimal
1✔
5
import numpy as np
1✔
6
from numpy.typing import NDArray
1✔
7
from pathlib import Path
1✔
8
from robotdataprocess.data_types.PathData import PathData
1✔
9
from ..rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
10
from rosbags.rosbag2 import Reader as Reader2
1✔
11
from rosbags.typesys import Stores, get_typestore
1✔
12
from rosbags.typesys.store import Typestore
1✔
13
from scipy.spatial.transform import Rotation as R
1✔
14
from typeguard import typechecked
1✔
15
from typing import Union
1✔
16
import tqdm
1✔
17

18
class ImuData(Data):
1✔
19

20
    # Define IMU-specific data attributes
21
    lin_acc: NDArray[Decimal]
1✔
22
    ang_vel: NDArray[Decimal]
1✔
23
    orientations: NDArray[Decimal] # quaternions (x, y, z, w)
1✔
24
    frame: CoordinateFrame
1✔
25

26
    @typechecked
1✔
27
    def __init__(self, frame_id: str, frame: CoordinateFrame, timestamps: Union[np.ndarray, list], 
1✔
28
                 lin_acc: Union[np.ndarray, list], ang_vel: Union[np.ndarray, list],
29
                 orientations: Union[np.ndarray, list]):
30
        
31
        # Copy initial values into attributes
32
        super().__init__(frame_id, timestamps)
1✔
33
        self.frame = frame
1✔
34
        self.lin_acc = col_to_dec_arr(lin_acc)
1✔
35
        self.ang_vel = col_to_dec_arr(ang_vel)
1✔
36
        self.orientations = col_to_dec_arr(orientations)
1✔
37

38
        # Check to ensure that all arrays have same length
39
        if len(self.timestamps) != len(self.lin_acc) or len(self.lin_acc) != len(self.ang_vel) \
1✔
40
            or len(self.ang_vel) != len(self.orientations):
UNCOV
41
            raise ValueError("Lengths of timestamp, lin_acc, ang_vel, and orientation arrays are not equal!")
×
42

43
    # =========================================================================
44
    # ============================ Class Methods ============================== 
45
    # =========================================================================  
46

47
    @classmethod
1✔
48
    @typechecked
1✔
49
    def from_ros2_bag(cls, bag_path: Union[Path, str], imu_topic: str, frame_id: str):
1✔
50
        """
51
        Creates a class structure from a ROS2 bag file with an Imu topic.
52

53
        Args:
54
            bag_path (Path | str): Path to the ROS2 bag file.
55
            img_topic (str): Topic of the Imu messages.
56
            frame_id (str): The frame where this IMU data was collected.
57
        Returns:
58
            ImageData: Instance of this class.
59
        """
60

61
        # Get topic message count and typestore
62
        bag_wrapper = Ros2BagWrapper(bag_path, None)
1✔
63
        typestore: Typestore = bag_wrapper.get_typestore()
1✔
64
        num_msgs: int = bag_wrapper.get_topic_count(imu_topic)
1✔
65

66
        # TODO: Load the frame id directly from the ROS2 bag.
67

68
        # Setup arrays to hold data
69
        timestamps = np.zeros((num_msgs), dtype=object)
1✔
70
        lin_acc = np.zeros((num_msgs, 3), dtype=np.double)
1✔
71
        ang_vel = np.zeros((num_msgs, 3), dtype=np.double)
1✔
72
        orientation = np.zeros((num_msgs, 4), dtype=np.double)
1✔
73

74
        # Extract the images/timestamps and save
75
        with Reader2(bag_path) as reader: 
1✔
76
            i = 0
1✔
77
            connections = [x for x in reader.connections if x.topic == imu_topic]
1✔
78
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
79
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
80

81
                # Extract imu data 
82
                lin_acc[i] = np.array([msg.linear_acceleration.x, 
1✔
83
                                       msg.linear_acceleration.y, 
84
                                       msg.linear_acceleration.z], dtype=np.double)
85
                ang_vel[i] = np.array([msg.angular_velocity.x, 
1✔
86
                                       msg.angular_velocity.y, 
87
                                       msg.angular_velocity.z], dtype=np.double)
88
                orientation[i] = np.array([msg.orientation.x, 
1✔
89
                                       msg.orientation.y, 
90
                                       msg.orientation.z,
91
                                       msg.orientation.w], dtype=np.double)
92

93
                # Extract timestamps
94
                timestamps[i] = Ros2BagWrapper.extract_timestamp(msg)
1✔
95

96
                # Update the count
97
                i += 1
1✔
98

99
        # Create an ImageData class
100
        return cls(frame_id, CoordinateFrame.FLU, timestamps, lin_acc, ang_vel, orientation)
1✔
101
    
102
    @classmethod
1✔
103
    @typechecked
1✔
104
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, frame: CoordinateFrame):
1✔
105
        """
106
        Creates a class structure from the TartanAir dataset format, which includes
107
        various .txt files with IMU data. It expects the timestamp, the linear
108
        acceleration, and the angular velocity, seperated by spaced in that order.
109

110
        Args:
111
            file_path (Path | str): Path to the file containing the IMU data.
112
            frame_id (str): The frame where this IMU data was collected.
113
            frame (CoordinateFrame): The coordinate system convention of this data.
114
        Returns:
115
            ImuData: Instance of this class.
116

117
        NOTE: Sets orientation to identity! 
118
        """
119
        
120
        # Count the number of lines in the file
121
        line_count = 0
1✔
122
        with open(str(file_path), 'r') as file:
1✔
123
            for _ in file: 
1✔
124
                line_count += 1
1✔
125

126
        # Setup arrays to hold data
127
        timestamps = np.zeros((line_count), dtype=object)
1✔
128
        lin_acc = np.zeros((line_count, 3), dtype=object)
1✔
129
        ang_vel = np.zeros((line_count, 3), dtype=object)
1✔
130

131
        # Open the txt file and read in the data
132
        with open(str(file_path), 'r') as file:
1✔
133
            for i, line in enumerate(file):
1✔
134
                line_split = line.split(' ')
1✔
135
                timestamps[i] = line_split[0]
1✔
136
                lin_acc[i] = line_split[1:4]
1✔
137
                ang_vel[i] = line_split[4:7]
1✔
138
        
139
        # Set orientation to identity, as it is assumed this text file doesn't have it
140
        orientation = np.zeros((lin_acc.shape[0], 4), dtype=int)
1✔
141
        orientation[:,3] = np.ones((lin_acc.shape[0]), dtype=int)
1✔
142

143
        # Create the ImuData class
144
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientation)
1✔
145
    
146
    # =========================================================================
147
    # ========================= Manipulation Methods ========================== 
148
    # =========================================================================  
149

150
    def crop_data(self, start: Decimal, end: Decimal):
1✔
151
        """ Will crop the data so only values within [start, end] inclusive are kept. """
152

153
        # Create boolean mask of data to keep
154
        mask = (self.timestamps >= start) & (self.timestamps <= end)
1✔
155

156
        # Apply mask
157
        self.timestamps = self.timestamps[mask]
1✔
158
        self.lin_acc = self.lin_acc[mask]
1✔
159
        self.ang_vel = self.ang_vel[mask]
1✔
160
        self.orientations = self.orientations[mask]
1✔
161
        
162
    # =========================================================================
163
    # ============================ Export Methods ============================= 
164
    # =========================================================================  
165

166
    def to_PathData(self, initial_pos: NDArray[float], initial_vel: NDArray[float], 
1✔
167
                    initial_ori: NDArray[float], use_ang_vel: bool) -> PathData:
168
        """
169
        Converts this IMUData class into OdometeryData by integrating the IMU data using
170
        Euler's method.
171

172
        Parameters:
173
            initial_pos: The initial position as a numpy array.
174
            initial_vel: The initial velocity as a numpy array.
175
            initial_ori: The initial orientation as a numpy array (quaternion x, y, z, w).
176
            use_ang_vel: If True, will use angular velocity data to calculate orientation.
177
                If False, will use orientation data directly from the IMUData class.
178

179
        Returns:
180
            PathData: The resulting PathData class.
181
        """
182

NEW
183
        print("WARNING: This code has not been extensively tested yet!")
×
184

185
        # Setup arrays to hold integrated data 
NEW
186
        pos = np.zeros((self.len(), 3), dtype=float)
×
NEW
187
        pos[0] = initial_pos
×
NEW
188
        vel = np.zeros((self.len(), 3), dtype=float)
×
NEW
189
        vel[0] = initial_vel
×
190

191
        # Setup array to hold orientation data
NEW
192
        if use_ang_vel:
×
NEW
193
            ori = np.zeros((self.len(), 4), dtype=float)  
×
NEW
194
            ori[:, 3] = np.ones((self.len()), dtype=float)
×
195
        else:
NEW
196
            ori = dec_arr_to_float_arr(self.orientations)
×
NEW
197
        ori[0] = initial_ori
×
198

199
        # Setup a tqdm progress bar
NEW
200
        pbar = tqdm.tqdm(total=self.len()-1, desc="Integrating IMU Data", unit="steps")
×
201

202
        # Integrate the IMU data
NEW
203
        for i in range(1, self.len()):
×
204
            # Get time difference
NEW
205
            dt: float = dec_arr_to_float_arr(self.timestamps[i] - self.timestamps[i-1])
×
206

207
            # Calculate orientation
NEW
208
            if use_ang_vel:
×
NEW
209
                cur_ori = R.from_quat(ori[i-1])
×
NEW
210
                delta_q = R.from_rotvec(dec_arr_to_float_arr(self.ang_vel[i-1]) * dt)
×
NEW
211
                new_ori = (cur_ori * delta_q).as_quat()
×
NEW
212
                ori[i] = new_ori / np.linalg.norm(new_ori)
×
213

214
            # Rotate linear acceleration into world frame
NEW
215
            r = R.from_quat(ori[i-1])
×
NEW
216
            lin_acc_world = r.apply(dec_arr_to_float_arr(self.lin_acc[i-1]))
×
NEW
217
            lin_acc_world = lin_acc_world
×
218

219
            # Subtract gravity
NEW
220
            GRAVITY_CONST = 9.80665
×
NEW
221
            if self.frame == CoordinateFrame.FLU:
×
NEW
222
                lin_acc_world[2] -= GRAVITY_CONST
×
NEW
223
            elif self.frame == CoordinateFrame.NED:
×
NEW
224
                lin_acc_world[2] += GRAVITY_CONST
×
225
            else:
NEW
226
                raise RuntimeError(f"to_PathData() doesn't currently support this frame: {self.frame}!")
×
227

228
            # Calculate velocity
NEW
229
            vel[i] = vel[i-1] + lin_acc_world * dt
×
230

231
            # Calculate position
NEW
232
            pos[i] = pos[i-1] + vel[i-1] * dt + 0.5 * lin_acc_world * dt * dt
×
NEW
233
            pbar.update(1)
×
234

235
        # Return the resulting PathData class
NEW
236
        return PathData(self.frame_id, self.timestamps, pos, ori, self.frame)
×
237

238
    # =========================================================================
239
    # =========================== Conversion to ROS =========================== 
240
    # ========================================================================= 
241

242
    @typechecked
1✔
243
    @staticmethod
1✔
244
    def get_ros_msg_type() -> str:
1✔
245
        """ Return the __msgtype__ for an Imu msg. """
246
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
247
        return typestore.types['sensor_msgs/msg/Imu'].__msgtype__
1✔
248

249
    @typechecked
1✔
250
    def get_ros_msg(self, i: int):
1✔
251
        """
252
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
253
        
254
        Args:
255
            i (int): The index of the image message to convert.
256
        Raises:
257
            ValueError: If i is outside the data bounds.
258
        """
259

260
        # Check to make sure index is within data bounds
261
        if i < 0 or i >= self.len():
1✔
262
            raise ValueError(f"Index {i} is out of bounds!")
×
263

264
        # Get ROS2 message classes
265
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
266
        Imu = typestore.types['sensor_msgs/msg/Imu']
1✔
267
        Header = typestore.types['std_msgs/msg/Header']
1✔
268
        Time = typestore.types['builtin_interfaces/msg/Time']
1✔
269
        Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
1✔
270
        Vector3 = typestore.types['geometry_msgs/msg/Vector3']
1✔
271

272
        # Get the seconds and nanoseconds
273
        seconds = int(self.timestamps[i])
1✔
274
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
1✔
275

276
        # Write the data into the new msg
277
        return Imu(Header(stamp=Time(sec=int(seconds), 
1✔
278
                                     nanosec=int(nanoseconds)), 
279
                          frame_id=self.frame_id),
280
                    orientation=Quaternion(x=0,
281
                                           y=0,
282
                                           z=0,
283
                                           w=1), # Currently ignores data in orientation
284
                    orientation_covariance=np.zeros(9),
285
                    angular_velocity=Vector3(x=self.ang_vel[i][0],
286
                                             y=self.ang_vel[i][1],
287
                                             z=self.ang_vel[i][2]),
288
                    angular_velocity_covariance=np.zeros(9),
289
                    linear_acceleration=Vector3(x=self.lin_acc[i][0],
290
                                                y=self.lin_acc[i][1], 
291
                                                z=self.lin_acc[i][2]),
292
                    linear_acceleration_covariance=np.zeros(9))
293
                    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc