• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 16376904896

18 Jul 2025 05:43PM UTC coverage: 70.588%. First build
16376904896

push

github

web-flow
(v0.1.0) Refactor to robotdataprocess, Major Data classes with Loading, Manipulation, and Saving. (#3)

* Coverage Functionality with coveralls

* Add Coverage Status Badge

* Remove clean action from github action

* Clarify assumed ROS2 and ROS1 versions

* New Message types and support for list attributes

* Optimizations to increase speed of ros2_to_ros1 conversion

* Fixed bug in downsample where requested downsample ratio wasn't matched perfectly and had undefined behaviour

* Add instructions for running unit tests, coverage, and profiling

* Increase to version 0.0.3

* Add support for rosgraph_msgs/msg/Clock

* Speedup downsample operation

* Initial code for IMU data viewer, not tested

* Major refactor to cleanly partition command line vs. rosbag vs. data functionality

* Remove hardcoded paths from tests

* Test for from_npy() method for ImageData

* Writing various message data types from non-ROS to rosbag

* Loading and Writing data from HERCULES v1.3

* Add missing test file

* Add missing dependency on OpenCV

* Load Odometry data from csv file and save to ROSbag

* Support to write Odometry Data to a nav_msgs/msg/Path message

* Support to convert IMU and Odometry data from NED to ROS frame

* Fix bugs in test cases

* Fixes to broken NED to ROS conversion for Odometry

* Refactor: Move rosbag manipulation to Ros2BagWrapper (#5)

* Refactor: Move rosbag manipulation to Ros2BagWrapper

- Moved hertz_analysis, view_imu_data, and downsample methods from rosbag_manip.py to Ros2BagWrapper.py.
- Renamed rosbag_manip.py to command_line.py.
- Renamed rosbag_manipulation class to CmdLineInterface.
- Updated command_line.py to use the new methods in Ros2BagWrapper.py.
- Updated __main__.py to reflect the file and class rename.

* Refactor name to robotdataprocess

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>

* Stereo rectifiation and save to raw image files, Upgrade to 0.1.0 to a... (continued)

619 of 895 new or added lines in 9 files covered. (69.16%)

816 of 1156 relevant lines covered (70.59%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.67
/src/robotdataprocess/data_types/OdometryData.py
1
from __future__ import annotations
1✔
2

3
from ..conversion_utils import convert_collection_into_decimal_array
1✔
4
import csv
1✔
5
from .Data import CoordinateFrame, Data
1✔
6
import decimal
1✔
7
from decimal import Decimal
1✔
8
import matplotlib.pyplot as plt
1✔
9
import numpy as np
1✔
10
import os
1✔
11
import pandas as pd
1✔
12
from pathlib import Path
1✔
13
from ..rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
14
from rosbags.rosbag2 import Reader as Reader2
1✔
15
from rosbags.typesys import Stores, get_typestore
1✔
16
from rosbags.typesys.store import Typestore
1✔
17
from scipy.spatial.transform import Rotation as R
1✔
18
from typeguard import typechecked
1✔
19
import tqdm
1✔
20

21
class OdometryData(Data):
1✔
22

23
    # Define odometry-specific data attributes
24
    child_frame_id: str
1✔
25
    frame: CoordinateFrame
1✔
26
    positions: np.ndarray[Decimal] # meters (x, y, z)
1✔
27
    orientations: np.ndarray[Decimal] # quaternions (x, y, z, w)
1✔
28
    poses = [] # Saved nav_msgs/msg/Pose
1✔
29

30
    @typechecked
1✔
31
    def __init__(self, frame_id: str, child_frame_id: str, frame: CoordinateFrame, 
1✔
32
                 timestamps: np.ndarray | list,  positions: np.ndarray | list, 
33
                 orientations: np.ndarray | list):
34
        
35
        # Copy initial values into attributes
36
        super().__init__(frame_id, timestamps)
1✔
37
        self.child_frame_id = child_frame_id
1✔
38
        self.frame = frame
1✔
39
        self.positions = convert_collection_into_decimal_array(positions)
1✔
40
        self.orientations = convert_collection_into_decimal_array(orientations)
1✔
41

42
        # Check to ensure that all arrays have same length
43
        if len(self.timestamps) != len(self.positions) or len(self.positions) != len(self.orientations):
1✔
44
            raise ValueError("Lengths of timestamp, position, and orientation arrays are not equal!")
1✔
45

46
    # =========================================================================
47
    # ============================ Class Methods ============================== 
48
    # =========================================================================  
49

50
    @classmethod
1✔
51
    @typechecked
1✔
52
    def from_ros2_bag(cls, bag_path: Path | str, odom_topic: str):
1✔
53
        """
54
        Creates a class structure from a ROS2 bag file with an Odometry topic.
55

56
        Args:
57
            bag_path (Path | str): Path to the ROS2 bag file.
58
            odom_topic (str): Topic of the Odometry messages.
59
        Returns:
60
            OdometryData: Instance of this class.
61
        """
62

63
        # Get topic message count and typestore
64
        bag_wrapper = Ros2BagWrapper(bag_path, None)
1✔
65
        typestore: Typestore = bag_wrapper.get_typestore()
1✔
66
        num_msgs: int = bag_wrapper.get_topic_count(odom_topic)
1✔
67
        
68
        # Pre-allocate arrays (memory-mapped or otherwise)
69
        timestamps_np = np.zeros(num_msgs, dtype=Decimal)
1✔
70
        positions_np = np.zeros((num_msgs, 3), dtype=Decimal)
1✔
71
        orientations_np = np.zeros((num_msgs, 4), dtype=Decimal)
1✔
72

73
        # Setup tqdm bar & counter
74
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Odometry...", unit=" msgs")
1✔
75
        i = 0
1✔
76

77
        # Extract the odometry information
78
        frame_id, child_frame_id = None, None
1✔
79
        with Reader2(str(bag_path)) as reader:
1✔
80

81
            # Extract frames from first message
82
            connections = [x for x in reader.connections if x.topic == odom_topic]
1✔
83
            for conn, timestamp, rawdata in reader.messages(connections=connections):  
1✔
84
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
85
                frame_id = msg.header.frame_id
1✔
86
                child_frame_id = msg.child_frame_id
1✔
87
                break
1✔
88

89
            # Extract individual message data
90
            connections = [x for x in reader.connections if x.topic == odom_topic]
1✔
91
            for conn, timestamp, rawdata in reader.messages(connections=connections):
1✔
92
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
1✔
93
                
94
                timestamps_np[i] = bag_wrapper.extract_timestamp(msg)
1✔
95
                pos = msg.pose.pose.position
1✔
96
                positions_np[i] = np.array([Decimal(pos.x), Decimal(pos.y), Decimal(pos.z)])
1✔
97
                ori = msg.pose.pose.orientation
1✔
98
                orientations_np[i] = np.array([Decimal(ori.x), Decimal(ori.y), Decimal(ori.z), Decimal(ori.w)])
1✔
99

100
                # NOTE: Doesn't support Twist information currently
101

102
                # Increment the count
103
                i += 1
1✔
104
                pbar.update(1)
1✔
105

106
        # Create an OdometryData class
107
        return cls(frame_id, child_frame_id, CoordinateFrame.ROS, timestamps_np, positions_np, orientations_np)
1✔
108
    
109
    @classmethod
1✔
110
    @typechecked
1✔
111
    def from_csv(cls, csv_path: Path | str, frame_id: str, child_frame_id: str, frame: CoordinateFrame):
1✔
112
        """
113
        Creates a class structure from a csv file, where the order of values
114
        in the files follows ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'].
115

116
        Args:
117
            csv_path (Path | str): Path to the CSV file.
118
            frame_id (str): The frame that this odometry is relative to.
119
            child_frame_id (str): The frame whose pose is represented by this odometry.
120
            frame (CoordinateFrame): The coordinate system convention of this data.
121
        Returns:
122
            OdometryData: Instance of this class.
123
        """
124

125
        # Read the csv file
NEW
126
        df1 = pd.read_csv(str(csv_path), header=0, names=['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'])
×
127
        
128
        # Convert columns to np.ndarray[Decimal]
NEW
129
        timestamps_np = np.array([Decimal(str(ts)) for ts in df1['timestamp']], dtype=object)
×
NEW
130
        positions_np = np.array([[Decimal(str(x)), Decimal(str(y)), Decimal(str(z))] 
×
131
                                 for x, y, z in zip(df1['x'], df1['y'], df1['z'])], dtype=object)
NEW
132
        orientations_np = np.array([[Decimal(str(qx)), Decimal(str(qy)), Decimal(str(qz)), Decimal(str(qw))]
×
133
                                    for qx, qy, qz, qw in zip(df1['qx'], df1['qy'], df1['qz'], df1['qw'])], dtype=object)
134

135
        # Create an OdometryData class
NEW
136
        return cls(frame_id, child_frame_id, frame, timestamps_np, positions_np, orientations_np)
×
137
    
138
    @classmethod
1✔
139
    @typechecked
1✔
140
    def from_txt_file(cls, file_path: Path | str, frame_id: str, child_frame_id: str, frame: CoordinateFrame):
1✔
141
        """
142
        Creates a class structure from a text file, where the order of values
143
        in the files follows ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'].
144

145
        Args:
146
            file_path (Path | str): Path to the file containing the odometry data.
147
            frame_id (str): The frame where this odometry is relative to.
148
            child_frame_id (str): The frame whose pose is represented by this odometry.
149
            frame (CoordinateFrame): The coordinate system convention of this data.
150
        Returns:
151
            OdometryData: Instance of this class.
152
        """
153
        
154
        # Count the number of lines in the file
155
        line_count = 0
1✔
156
        with open(str(file_path), 'r') as file:
1✔
157
            for _ in file: 
1✔
158
                line_count += 1
1✔
159

160
        # Setup arrays to hold data
161
        timestamps_np = np.zeros((line_count), dtype=object)
1✔
162
        positions_np = np.zeros((line_count, 3), dtype=object)
1✔
163
        orientations_np = np.zeros((line_count, 4), dtype=object)
1✔
164

165
        # Open the txt file and read in the data
166
        with open(str(file_path), 'r') as file:
1✔
167
            for i, line in enumerate(file):
1✔
168
                line_split = line.split(' ')
1✔
169
                timestamps_np[i] = line_split[0]
1✔
170
                positions_np[i] = line_split[1:4]
1✔
171
                orientations_np[i] =  line_split[5:8] + [line_split[4]]
1✔
172
        
173
        # Create an OdometryData class
174
        return cls(frame_id, child_frame_id, frame, timestamps_np, positions_np, orientations_np)
1✔
175
    
176
    # =========================================================================
177
    # ========================= Manipulation Methods ========================== 
178
    # =========================================================================  
179
    
180
    def add_folded_guassian_noise_to_position(self, xy_noise_std_per_frame: float,
1✔
181
            z_noise_std_per_frame: float):
182
        """
183
        This method simulates odometry drift by adding folded gaussian noise
184
        to the odometry positions on a per frame basis. It also accumulates
185
        it over time. NOTE: It completely ignores the timestamps, and the "folded
186
        guassian noise" distribution stds might not align with the stds of the 
187
        guassian used internally, so this is not a robust function at all.
188

189
        Args:
190
            xy_noise_std_per_frame (float): Standard deviation of the gaussian 
191
                distribution for xy, whose output is then run through abs().
192
            z_noise_std_per_frame (float): Same as above, but for z.
193
        """
194

195
        # Track cumulative noise for each field
NEW
196
        cumulative_noise_pos = {'x': 0.0, 'y': 0.0, 'z': 0.0}
×
197

198
        # For each position
NEW
199
        for i in range(len(self.timestamps)):
×
200

201
            # Sample noise and accumulate
NEW
202
            noise_pos = {'x': np.random.normal(0, xy_noise_std_per_frame),
×
203
                        'y': np.random.normal(0, xy_noise_std_per_frame),
204
                        'z': np.random.normal(0, z_noise_std_per_frame)}
NEW
205
            for key in cumulative_noise_pos:
×
NEW
206
                cumulative_noise_pos[key] += abs(noise_pos[key])
×
207

208
            # Update positions
NEW
209
            self.positions[i][0] += Decimal(cumulative_noise_pos['x'])
×
NEW
210
            self.positions[i][1] += Decimal(cumulative_noise_pos['y'])
×
NEW
211
            self.positions[i][2] += Decimal(cumulative_noise_pos['z'])
×
212

213
    def shift_position(self, x_shift: float, y_shift: float, z_shift: float):
1✔
214
        """
215
        Shifts the positions of the odometry.
216

217
        Args:
218
            x_shift (float): Shift in x-axis.
219
            y_shift (float): Shift in y_axis.
220
            z_shift (float): Shift in z_axis.
221
        """
NEW
222
        self.positions[:,0] += Decimal(x_shift)
×
NEW
223
        self.positions[:,1] += Decimal(y_shift)
×
NEW
224
        self.positions[:,2] += Decimal(z_shift)
×
225

226
    def shift_to_start_at_identity(self):
1✔
227
        """
228
        Alter the positions and orientations based so that the first pose 
229
        starts at Identity.
230
        """
231

232
        # Get pose of first robot position w.r.t world
233
        R_o = R.from_quat(self.orientations[0]).as_matrix()
1✔
234
        T_o = np.expand_dims(self.positions[0], axis=1)
1✔
235
        
236
        # Calculate the inverse (pose of world w.r.t first robot location)
237
        R_inv = R_o.T
1✔
238

239
        # Rotate positions and orientations
240
        self.positions = (R_inv @ (self.positions.T - T_o).astype(float)).T
1✔
241
        for i in range(self.len()):
1✔
242
            self.orientations[i] = R.from_matrix((R_inv @ R.from_quat(self.orientations[i]).as_matrix())).as_quat()
1✔
243

244
    # =========================================================================
245
    # ============================ Export Methods ============================= 
246
    # =========================================================================  
247

248
    def to_csv(self, csv_path: Path | str):
1✔
249
        """
250
        Writes the odometry data to a .csv file. Note that data will be
251
        saved in the following order: timestamp, pos.x, pos.y, pos.z,
252
        ori.w, ori.x, ori.y, ori.z. Timestamp is in seconds.
253

254
        Args:
255
            csv_path (Path | str): Path to the output csv file.
256
            odom_topic (str): Topic of the Odometry messages.
257
        Returns:
258
            OdometryData: Instance of this class.
259
        """
260

261
        # setup tqdm 
262
        pbar = tqdm.tqdm(total=None, desc="Saving to csv... ", unit=" frames")
1✔
263

264
        # Check that file path doesn't already exist
265
        file_path = Path(csv_path)
1✔
266
        if os.path.exists(file_path):
1✔
NEW
267
            raise ValueError(f"Output file already exists: {file_path}")
×
268
        
269
        # Open csv file
270
        with open(file_path, 'w', newline='') as csvfile:
1✔
271
            writer = csv.writer(csvfile, delimiter=',')
1✔
272

273
            # Write the first row
274
            writer.writerow(['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'])
1✔
275
                
276
            # Write message data to the csv file
277
            for i in range(len(self.timestamps)):
1✔
278
                writer.writerow([str(self.timestamps[i]), 
1✔
279
                    str(self.positions[i][0]), str(self.positions[i][1]), str(self.positions[i][2]),
280
                    str(self.orientations[i][3]), str(self.orientations[i][0]), str(self.orientations[i][1]), 
281
                    str(self.orientations[i][2])])
282
                pbar.update(1)
1✔
283

284
    # =========================================================================
285
    # ============================ Visualization ============================== 
286
    # =========================================================================  
287

288
    @typechecked
1✔
289
    def visualize(self, otherList: list[OdometryData], titles: list[str]):
1✔
290
        """
291
        Visualizes this OdometryData (and all others included in otherList)
292
        on a single plot.
293

294
        Args:
295
            otherList (list[OdometryData]): All other OdometryData objects whose
296
                odometry should also be visualized on this plot.
297
            titles (list[str]): Titles for each OdometryData object, starting 
298
                with self.
299
        """
300

NEW
301
        def draw_axes(data: OdometryData, label_prefix=""):
×
302
            """Helper function that visualizes orientation along the trajectory path with axes."""
NEW
303
            axes_interval = 5000
×
NEW
304
            axes_length = 5
×
305

NEW
306
            for i in range(0, data.len(), axes_interval):
×
307
                # Extract data
NEW
308
                pos = data.positions[i].astype(np.float64)
×
NEW
309
                quat = data.orientations[i].astype(np.float64)
×
NEW
310
                rot = R.from_quat(quat, scalar_first=False)
×
311

312
                # Define unit vectors for X, Y, Z in local frame
NEW
313
                x_axis = rot.apply([1, 0, 0])
×
NEW
314
                y_axis = rot.apply([0, 1, 0])
×
NEW
315
                z_axis = rot.apply([0, 0, 1])
×
316

317
                # Plot axes
NEW
318
                ax.quiver(*pos, *x_axis, length=axes_length, color='r', normalize=True, linewidth=0.8)
×
NEW
319
                ax.quiver(*pos, *y_axis, length=axes_length, color='g', normalize=True, linewidth=0.8)
×
NEW
320
                ax.quiver(*pos, *z_axis, length=axes_length, color='b', normalize=True, linewidth=0.8)
×
321

322
        # Ensure that the lists are of the proper sizes
NEW
323
        if (len(otherList) + 1) != len(titles):
×
NEW
324
            raise ValueError("Length of titles should be one more than length of otherlist!")
×
325

326
        # Build a 3D plot
NEW
327
        fig = plt.figure()
×
NEW
328
        ax = fig.add_subplot(111, projection='3d')
×
329

NEW
330
        ax.plot(self.positions[:,0].astype(np.float64), 
×
331
                self.positions[:,1].astype(np.float64), 
332
                self.positions[:,2].astype(np.float64), label=titles[0])
NEW
333
        for i, other in enumerate(otherList):
×
NEW
334
            ax.plot(other.positions[:,0].astype(np.float64), 
×
335
                    other.positions[:,1].astype(np.float64), 
336
                    other.positions[:,2].astype(np.float64), 
337
                    label=titles[1+i])
338

339
        # Draw orientation axes (X = red, Y = green, Z = blue)
NEW
340
        draw_axes(self)
×
NEW
341
        for other in otherList:
×
NEW
342
            draw_axes(other)
×
343

344
        # Set labels
NEW
345
        ax.set_title("Trajectory Comparison with Full Orientation")
×
NEW
346
        ax.set_xlabel("X (m)")
×
NEW
347
        ax.set_ylabel("Y (m)")
×
NEW
348
        ax.set_zlabel("Z (m)")
×
NEW
349
        ax.legend()
×
350

351
        # Concatenate all x, y and z values together
NEW
352
        all_x = self.positions[:,0]
×
NEW
353
        all_y = self.positions[:,1]
×
NEW
354
        all_z = self.positions[:,2]
×
NEW
355
        for other in otherList:
×
NEW
356
            all_x = np.concatenate((all_x, other.positions[:,0]))
×
NEW
357
            all_y = np.concatenate((all_y, other.positions[:,1]))
×
NEW
358
            all_z = np.concatenate((all_z, other.positions[:,2]))
×
NEW
359
        all_x = all_x.astype(np.float64)
×
NEW
360
        all_y = all_y.astype(np.float64)
×
NEW
361
        all_z = all_z.astype(np.float64)
×
362

363
        # Set an equal scale for all axes
NEW
364
        x_center = (all_x.max() + all_x.min()) / 2
×
NEW
365
        y_center = (all_y.max() + all_y.min()) / 2
×
NEW
366
        z_center = (all_z.max() + all_z.min()) / 2
×
NEW
367
        max_range = max(all_x.max() - all_x.min(), all_y.max() - all_y.min(), all_z.max() - all_z.min()) / 2
×
NEW
368
        ax.set_xlim(x_center - max_range, x_center + max_range)
×
NEW
369
        ax.set_ylim(y_center - max_range, y_center + max_range)
×
NEW
370
        ax.set_zlim(z_center - max_range, z_center + max_range)
×
371

372
        # Show the plot
NEW
373
        plt.tight_layout()
×
NEW
374
        plt.show()
×
375

376
    # =========================================================================
377
    # =========================== Frame Conversions =========================== 
378
    # ========================================================================= 
379
    def to_ROS_frame(self):
1✔
380
        # If we are already in the ROS frame, return
381
        if self.frame == CoordinateFrame.ROS:
1✔
NEW
382
            print("Data already in ROS coordinate frame, returning...")
×
NEW
383
            return
×
384

385
        # If in NED, run the conversion
386
        elif self.frame == CoordinateFrame.NED:
1✔
387
            # Define the rotation matrix
388
            R_NED = np.array([[1,  0,  0],
1✔
389
                              [0, -1,  0],
390
                              [0,  0, -1]])
391
            R_NED_Q = R.from_matrix(R_NED)
1✔
392

393
            # Do a change of basis
394
            self.positions = (R_NED @ self.positions.T).T
1✔
395
            for i in range(self.len()):
1✔
396
                self.orientations[i] = (R_NED_Q * R.from_quat(self.orientations[i]) * R_NED_Q.inv()).as_quat()
1✔
397

398
            # Update frame
399
            self.frame = CoordinateFrame.ROS
1✔
400

401
        # Otherwise, throw an error
402
        else:
NEW
403
            raise RuntimeError(f"OdometryData class is in an unexpected frame: {self.frame}!")
×
404

405
    # =========================================================================
406
    # =========================== Conversion to ROS =========================== 
407
    # ========================================================================= 
408

409
    @typechecked
1✔
410
    @staticmethod
1✔
411
    def get_ros_msg_type(msg_type: str = "Odometry") -> str:
1✔
412
        """ Return the __msgtype__ for an Imu msg. """
413
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
414
        if msg_type == "Odometry":
1✔
415
            return typestore.types['nav_msgs/msg/Odometry'].__msgtype__
1✔
NEW
416
        elif msg_type == "Path":
×
NEW
417
            return typestore.types['nav_msgs/msg/Path'].__msgtype__
×
418
        else:
NEW
419
            raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
420
    
421
    @typechecked
1✔
422
    def extract_seconds_and_nanoseconds(self, i: int):
1✔
423
        seconds = int(self.timestamps[i])
1✔
424
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) \
1✔
425
                        * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
426
        return seconds, nanoseconds
1✔
427
    
428
    def calculate_stamped_poses(self):
1✔
429
        """ Pre-calculates all stamped poses if they aren't calculated yet."""
430

431
        if len(self.poses) != self.len():
1✔
432
            # Get ROS2 message types
433
            typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
434
            Header = typestore.types['std_msgs/msg/Header']
1✔
435
            Time = typestore.types['builtin_interfaces/msg/Time']
1✔
436
            PoseStamped = typestore.types['geometry_msgs/msg/PoseStamped']
1✔
437
            Pose = typestore.types['geometry_msgs/msg/Pose']
1✔
438
            Point = typestore.types['geometry_msgs/msg/Point']
1✔
439
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
1✔
440

441
            # Pre-calculate all the poses
442
            for i in range(self.len()):
1✔
443
                seconds, nanoseconds = self.extract_seconds_and_nanoseconds(i)
1✔
444
                self.poses.append(PoseStamped(Header(stamp=Time(sec=int(seconds), 
1✔
445
                                                                nanosec=int(nanoseconds)),
446
                                                    frame_id=self.frame_id),
447
                                            pose=Pose(position=Point(x=self.positions[i][0],
448
                                                                    y=self.positions[i][1],
449
                                                                    z=self.positions[i][2]),
450
                            orientation=Quaternion(x=self.orientations[i][0],
451
                                                    y=self.orientations[i][1],
452
                                                    z=self.orientations[i][2],
453
                                                    w=self.orientations[i][3]))))
454

455
    @typechecked
1✔
456
    def get_ros_msg(self, i: int, msg_type: str = "Odometry"):
1✔
457
        """
458
        Gets an Image ROS2 Humble message corresponding to the odometry in index i.
459
        
460
        Args:
461
            i (int): The index of the odometry data to convert.
462
        Raises:
463
            ValueError: If i is outside the data bounds.
464
        """
465

466
        # Calculate the Stamped Poses
467
        self.calculate_stamped_poses()
1✔
468

469
        # Make sure our data is in the ROS frame, otherwise throw an error
470
        if self.frame != CoordinateFrame.ROS:
1✔
NEW
471
            raise RuntimeError("Convert this Odometry Data to a ROS frame before writing to a ROS2 bag!")
×
472

473
        # Check to make sure index is within data bounds
474
        if i < 0 or i >= self.len():
1✔
NEW
475
            raise IndexError(f"Index {i} is out of bounds!")
×
476

477
        # Get ROS2 message classes
478
        typestore = get_typestore(Stores.ROS2_HUMBLE)
1✔
479
        Odometry = typestore.types['nav_msgs/msg/Odometry']
1✔
480
        Header = typestore.types['std_msgs/msg/Header']
1✔
481
        Time = typestore.types['builtin_interfaces/msg/Time']
1✔
482
        PoseWithCovariance = typestore.types['geometry_msgs/msg/PoseWithCovariance']
1✔
483
        TwistWithCovariance = typestore.types['geometry_msgs/msg/TwistWithCovariance']
1✔
484
        Twist = typestore.types['geometry_msgs/msg/Twist']
1✔
485
        Vector3 = typestore.types['geometry_msgs/msg/Vector3']
1✔
486
        Path = typestore.types['nav_msgs/msg/Path']
1✔
487
        Pose = typestore.types['geometry_msgs/msg/Pose']
1✔
488
        Point = typestore.types['geometry_msgs/msg/Point']
1✔
489
        Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
1✔
490

491
        # Write the data into the new msg
492
        if msg_type == "Odometry":
1✔
493
            seconds, nanoseconds = self.extract_seconds_and_nanoseconds(i)
1✔
494
            return Odometry(Header(stamp=Time(sec=int(seconds), 
1✔
495
                                              nanosec=int(nanoseconds)), 
496
                            frame_id=self.frame_id),
497
                            child_frame_id=self.child_frame_id,
498
                            pose=PoseWithCovariance(pose=Pose(position=Point(x=self.positions[i][0],
499
                                                                 y=self.positions[i][1],
500
                                                                 z=self.positions[i][2]),
501
                                                    orientation=Quaternion(x=self.orientations[i][0],
502
                                                                            y=self.orientations[i][1],
503
                                                                            z=self.orientations[i][2],
504
                                                                            w=self.orientations[i][3])),
505
                                                    covariance=np.zeros(36)),
506
                            twist=TwistWithCovariance(twist=Twist(linear=Vector3(x=0, # Currently doesn't support Twist
507
                                                                                y=0,
508
                                                                                z=0,),
509
                                                                angular=Vector3(x=0,
510
                                                                                y=0,
511
                                                                                z=0,)),
512
                                                    covariance=np.zeros(36)))
NEW
513
        elif msg_type == "Path":
×
NEW
514
            seconds, nanoseconds = self.extract_seconds_and_nanoseconds(i)
×
NEW
515
            return Path(Header(stamp=Time(sec=int(seconds), 
×
516
                                          nanosec=int(nanoseconds)),
517
                               frame_id=self.frame_id),
518
                               poses=self.poses[0:i+1:10])
519
        else:
520
            raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc