• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 16376904896

18 Jul 2025 05:43PM UTC coverage: 70.588%. First build
16376904896

push

github

web-flow
(v0.1.0) Refactor to robotdataprocess, Major Data classes with Loading, Manipulation, and Saving. (#3)

* Coverage Functionality with coveralls

* Add Coverage Status Badge

* Remove clean action from github action

* Clarify assumed ROS2 and ROS1 versions

* New Message types and support for list attributes

* Optimizations to increase speed of ros2_to_ros1 conversion

* Fixed bug in downsample where requested downsample ratio wasn't matched perfectly and had undefined behaviour

* Add instructions for running unit tests, coverage, and profiling

* Increase to version 0.0.3

* Add support for rosgraph_msgs/msg/Clock

* Speedup downsample operation

* Initial code for IMU data viewer, not tested

* Major refactor to cleanly partition command line vs. rosbag vs. data functionality

* Remove hardcoded paths from tests

* Test for from_npy() method for ImageData

* Writing various message data types from non-ROS to rosbag

* Loading and Writing data from HERCULES v1.3

* Add missing test file

* Add missing dependency on OpenCV

* Load Odometry data from csv file and save to ROSbag

* Support to write Odometry Data to a nav_msgs/msg/Path message

* Support to convert IMU and Odometry data from NED to ROS frame

* Fix bugs in test cases

* Fixes to broken NED to ROS conversion for Odometry

* Refactor: Move rosbag manipulation to Ros2BagWrapper (#5)

* Refactor: Move rosbag manipulation to Ros2BagWrapper

- Moved hertz_analysis, view_imu_data, and downsample methods from rosbag_manip.py to Ros2BagWrapper.py.
- Renamed rosbag_manip.py to command_line.py.
- Renamed rosbag_manipulation class to CmdLineInterface.
- Updated command_line.py to use the new methods in Ros2BagWrapper.py.
- Updated __main__.py to reflect the file and class rename.

* Refactor name to robotdataprocess

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>

* Stereo rectifiation and save to raw image files, Upgrade to 0.1.0 to a... (continued)

619 of 895 new or added lines in 9 files covered. (69.16%)

816 of 1156 relevant lines covered (70.59%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.24
/src/robotdataprocess/data_types/ImuData.py
1
from ..conversion_utils import convert_collection_into_decimal_array
1✔
2
from .Data import Data, CoordinateFrame
1✔
3
import decimal
1✔
4
from decimal import Decimal
1✔
5
import numpy as np
1✔
6
from pathlib import Path
1✔
7
from ..rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
8
from rosbags.rosbag2 import Reader as Reader2
1✔
9
from rosbags.typesys import Stores, get_typestore
1✔
10
from rosbags.typesys.store import Typestore
1✔
11
from scipy.spatial.transform import Rotation as R
1✔
12
from typeguard import typechecked
1✔
13

14
class ImuData(Data):
1✔
15

16
    # Define IMU-specific data attributes
17
    lin_acc: np.ndarray[Decimal]
1✔
18
    ang_vel: np.ndarray[Decimal]
1✔
19
    orientation: np.ndarray[Decimal] # quaternions (x, y, z, w)
1✔
20
    frame: CoordinateFrame
1✔
21

22
    @typechecked
1✔
23
    def __init__(self, frame_id: str, frame: CoordinateFrame, timestamps: np.ndarray | list, 
1✔
24
                 lin_acc: np.ndarray | list, ang_vel: np.ndarray | list,
25
                 orientation: np.ndarray | list):
26
        
27
        # Copy initial values into attributes
28
        super().__init__(frame_id, timestamps)
1✔
29
        self.frame = frame
1✔
30
        self.lin_acc = convert_collection_into_decimal_array(lin_acc)
1✔
31
        self.ang_vel = convert_collection_into_decimal_array(ang_vel)
1✔
32
        self.orientation = convert_collection_into_decimal_array(orientation)
1✔
33

34
        # Check to ensure that all arrays have same length
35
        if len(self.timestamps) != len(self.lin_acc) or len(self.lin_acc) != len(self.ang_vel) \
1✔
36
            or len(self.ang_vel) != len(self.orientation):
37
            raise ValueError("Lengths of timestamp, lin_acc, ang_vel, and orientation arrays are not equal!")
1✔
38

39
    # =========================================================================
40
    # ============================ Class Methods ============================== 
41
    # =========================================================================  
42

43
    @classmethod
1✔
44
    @typechecked
1✔
45
    def from_ros2_bag(cls, bag_path: Path | str, imu_topic: str, frame_id: str):
1✔
46
        """
47
        Creates a class structure from a ROS2 bag file with an Imu topic.
48

49
        Args:
50
            bag_path (Path | str): Path to the ROS2 bag file.
51
            img_topic (str): Topic of the Imu messages.
52
            frame_id (str): The frame where this IMU data was collected.
53
        Returns:
54
            ImageData: Instance of this class.
55
        """
56

57
        # Get topic message count and typestore
58
        bag_wrapper = Ros2BagWrapper(bag_path, None)
1✔
59
        typestore: Typestore = bag_wrapper.get_typestore()
1✔
60
        num_msgs: int = bag_wrapper.get_topic_count(imu_topic)
1✔
61

62
        # TODO: Load the frame id directly from the ROS2 bag.
63

64
        # Setup arrays to hold data
65
        timestamps = np.zeros((num_msgs), dtype=object)
1✔
66
        lin_acc = np.zeros((num_msgs, 3), dtype=np.double)
1✔
67
        ang_vel = np.zeros((num_msgs, 3), dtype=np.double)
1✔
68
        orientation = np.zeros((num_msgs, 4), dtype=np.double)
1✔
69

70
        # Extract the images/timestamps and save
71
        with Reader2(bag_path) as reader: 
1✔
72
            i = 0
1✔
73
            connections = [x for x in reader.connections if x.topic == imu_topic]
1✔
74
            for conn, _, rawdata in reader.messages(connections=connections):
1✔
75
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
76

77
                # Extract imu data 
78
                lin_acc[i] = np.array([msg.linear_acceleration.x, 
1✔
79
                                       msg.linear_acceleration.y, 
80
                                       msg.linear_acceleration.z], dtype=np.double)
81
                ang_vel[i] = np.array([msg.angular_velocity.x, 
1✔
82
                                       msg.angular_velocity.y, 
83
                                       msg.angular_velocity.z], dtype=np.double)
84
                orientation[i] = np.array([msg.orientation.x, 
1✔
85
                                       msg.orientation.y, 
86
                                       msg.orientation.z,
87
                                       msg.orientation.w], dtype=np.double)
88

89
                # Extract timestamps
90
                timestamps[i] = Ros2BagWrapper.extract_timestamp(msg)
1✔
91

92
                # Update the count
93
                i += 1
1✔
94

95
        # Create an ImageData class
96
        return cls(frame_id, CoordinateFrame.ROS, timestamps, lin_acc, ang_vel, orientation)
1✔
97

98
    @classmethod
1✔
99
    @typechecked
1✔
100
    def from_TartanAir(cls, folder_path: Path | str, frame_id: str):
1✔
101
        """
102
        Creates a class structure from the TartanAir dataset format, which includes
103
        various .txt files with IMU data.
104

105
        Args:
106
            folder_path (Path | str): Path to the folder containing the IMU data.
107
            frame_id (str): The frame where this IMU data was collected.
108
        Returns:
109
            ImuData: Instance of this class.
110
        """
111

112
        # Get paths to all necessary files
NEW
113
        ts_folder_path = Path(folder_path) / 'imu_time.npy'
×
NEW
114
        lin_acc_folder_path = Path(folder_path) / 'acc_nograv_body.npy'
×
NEW
115
        ang_vel_folder_path =  Path(folder_path) / 'gyro.npy'
×
NEW
116
        orientation_folder_path = Path(folder_path) / 'ori_global.npy'
×
117

118
        # Load the data
NEW
119
        timestamps = convert_collection_into_decimal_array(np.load(ts_folder_path))
×
NEW
120
        lin_acc = np.load(lin_acc_folder_path)
×
NEW
121
        ang_vel = np.load(ang_vel_folder_path)
×
122

123
        # Currently unsure of format of TartanAir Orientation data
124
        # (whether it's extrinsic or intrinsic euler rotations, etc.)
125
        # Thus, for now fill with zeros.
NEW
126
        orientation = np.zeros_like(ang_vel)
×
127

128
        # Create the ImuData class
NEW
129
        raise NotImplementedError("Need to know coordiante frame of TartanAir.")
×
130
        frame = None
131
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientation)
132
    
133
    @classmethod
1✔
134
    @typechecked
1✔
135
    def from_txt_file(cls, file_path: Path | str, frame_id: str, frame: CoordinateFrame):
1✔
136
        """
137
        Creates a class structure from the TartanAir dataset format, which includes
138
        various .txt files with IMU data. It expects the timestamp, the linear
139
        acceleration, and the angular velocity, seperated by spaced in that order.
140

141
        Args:
142
            file_path (Path | str): Path to the file containing the IMU data.
143
            frame_id (str): The frame where this IMU data was collected.
144
            frame (CoordinateFrame): The coordinate system convention of this data.
145
        Returns:
146
            ImuData: Instance of this class.
147
        """
148
        
149
        # Count the number of lines in the file
150
        line_count = 0
1✔
151
        with open(str(file_path), 'r') as file:
1✔
152
            for _ in file: 
1✔
153
                line_count += 1
1✔
154

155
        # Setup arrays to hold data
156
        timestamps = np.zeros((line_count), dtype=object)
1✔
157
        lin_acc = np.zeros((line_count, 3), dtype=object)
1✔
158
        ang_vel = np.zeros((line_count, 3), dtype=object)
1✔
159

160
        # Open the txt file and read in the data
161
        with open(str(file_path), 'r') as file:
1✔
162
            for i, line in enumerate(file):
1✔
163
                line_split = line.split(' ')
1✔
164
                timestamps[i] = line_split[0]
1✔
165
                lin_acc[i] = line_split[1:4]
1✔
166
                ang_vel[i] = line_split[4:7]
1✔
167
        
168
        # Set orientation to identity, as we don't have orientation from HERCULES IMU
169
        orientation = np.zeros((lin_acc.shape[0], 4), dtype=int)
1✔
170
        orientation[:,3] = np.ones((lin_acc.shape[0]), dtype=int)
1✔
171

172
        # Create the ImuData class
173
        return cls(frame_id, frame, timestamps, lin_acc, ang_vel, orientation)
1✔
174

175
    # =========================================================================
176
    # =========================== Frame Conversions =========================== 
177
    # ========================================================================= 
178
    def to_ROS_frame(self):
1✔
179
        # If we are already in the ROS frame, return
NEW
180
        if self.frame == CoordinateFrame.ROS:
×
NEW
181
            print("Data already in ROS coordinate frame, returning...")
×
NEW
182
            return
×
183

184
        # If in NED, run the conversion
NEW
185
        elif self.frame == CoordinateFrame.NED:
×
186
            # Define the rotation matrix
NEW
187
            R_NED = np.array([[1,  0,  0],
×
188
                              [0, -1,  0],
189
                              [0,  0, -1]])
NEW
190
            R_NED_Q = R.from_matrix(R_NED)
×
191

192
            # Do a change of basis 
NEW
193
            raise NotImplementedError("Not sure if this should be a pose transformation or change of basis")
×
194
            self.lin_acc = (R_NED @ self.lin_acc.T).T
195
            self.ang_vel = (R_NED @ self.ang_vel.T).T
196
            for i in range(self.len()):
197
                self.orientation[i] = (R_NED_Q * R.from_quat(self.orientation[i]) * R_NED_Q.inv()).as_quat()
198

199
            # Update frame
200
            self.frame = CoordinateFrame.ROS
201

202
        # Otherwise, throw an error
203
        else:
NEW
204
            raise RuntimeError(f"ImuData class is in an unexpected frame: {self.frame}!")
×
205

206
    # =========================================================================
207
    # =========================== Conversion to ROS =========================== 
208
    # ========================================================================= 
209

210
    @typechecked
1✔
211
    @staticmethod
1✔
212
    def get_ros_msg_type() -> str:
1✔
213
        """ Return the __msgtype__ for an Imu msg. """
214
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
215
        return typestore.types['sensor_msgs/msg/Imu'].__msgtype__
1✔
216

217
    @typechecked
1✔
218
    def get_ros_msg(self, i: int):
1✔
219
        """
220
        Gets an Image ROS2 Humble message corresponding to the image represented by index i.
221
        
222
        Args:
223
            i (int): The index of the image message to convert.
224
        Raises:
225
            ValueError: If i is outside the data bounds.
226
        """
227

228
        # Check to make sure index is within data bounds
229
        if i < 0 or i >= self.len():
1✔
NEW
230
            raise ValueError(f"Index {i} is out of bounds!")
×
231

232
        # Make sure our data is in the ROS frame, otherwise throw an error
233
        if self.frame != CoordinateFrame.ROS:
1✔
NEW
234
            raise RuntimeError("Convert this IMU Data to a ROS coordinate frame before writing to a ROS2 bag!")
×
235

236
        # Get ROS2 message classes
237
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
238
        Imu = typestore.types['sensor_msgs/msg/Imu']
1✔
239
        Header = typestore.types['std_msgs/msg/Header']
1✔
240
        Time = typestore.types['builtin_interfaces/msg/Time']
1✔
241
        Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
1✔
242
        Vector3 = typestore.types['geometry_msgs/msg/Vector3']
1✔
243

244
        # Get the seconds and nanoseconds
245
        seconds = int(self.timestamps[i])
1✔
246
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
1✔
247

248
        # Write the data into the new msg
249
        return Imu(Header(stamp=Time(sec=int(seconds), 
1✔
250
                                     nanosec=int(nanoseconds)), 
251
                          frame_id=self.frame_id),
252
                    orientation=Quaternion(x=0,
253
                                           y=0,
254
                                           z=0,
255
                                           w=1), # Currently ignores data in orientation
256
                    orientation_covariance=np.zeros(9),
257
                    angular_velocity=Vector3(x=self.ang_vel[i][0],
258
                                             y=self.ang_vel[i][1],
259
                                             z=self.ang_vel[i][2]),
260
                    angular_velocity_covariance=np.zeros(9),
261
                    linear_acceleration=Vector3(x=self.lin_acc[i][0],
262
                                                y=self.lin_acc[i][1], 
263
                                                z=self.lin_acc[i][2]),
264
                    linear_acceleration_covariance=np.zeros(9))
265
                    
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc