• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21483849472

29 Jan 2026 03:20PM UTC coverage: 74.142% (-0.5%) from 74.672%
21483849472

Pull #10

github

DanielChaseButterfield
Fix with dependency mismatch with typeguard in conversion_utils, ImageDataOnDisk for .npy files, linear interpolation for odometry data
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

881 of 1305 new or added lines in 14 files covered. (67.51%)

2 existing lines in 1 file now uncovered.

1749 of 2359 relevant lines covered (74.14%)

1.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.05
/src/robotdataprocess/data_types/OdometryData.py
1
from __future__ import annotations
2✔
2

3
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
2✔
4
import csv
2✔
5
from .Data import CoordinateFrame, Data, ROSMsgLibType
2✔
6
import decimal
2✔
7
from decimal import Decimal
2✔
8
from evo.core import geometry
2✔
9
import matplotlib.pyplot as plt
2✔
10
from ..ModuleImporter import ModuleImporter
2✔
11
import numpy as np
2✔
12
from numpy.typing import NDArray
2✔
13
import os
2✔
14
import pandas as pd
2✔
15
from .PathData import PathData
2✔
16
from pathlib import Path
2✔
17
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
18
from rosbags.rosbag2 import Reader as Reader2
2✔
19
from rosbags.typesys import Stores, get_typestore
2✔
20
from rosbags.typesys.store import Typestore
2✔
21
from scipy.spatial.transform import Rotation as R
2✔
22
from scipy.spatial.transform import Slerp
2✔
23
from typeguard import typechecked
2✔
24
from typing import Union, List, Tuple, Any
2✔
25
import tqdm
2✔
26

27
PATH_SLICE_STEP = 40
2✔
28

29
@typechecked
2✔
30
class OdometryData(PathData):
2✔
31

32
    # Define odometry-specific data attributes
33
    child_frame_id: str
2✔
34
    poses: list # Saved nav_msgs/msg/Pose for rosbags
2✔
35
    poses_rclpy: list # Saved nav_msgs/msg/Pose for rclpy
2✔
36

37
    def __init__(self, frame_id: str, child_frame_id: str, timestamps: Union[np.ndarray, list], 
2✔
38
                 positions: Union[np.ndarray, list], orientations: Union[np.ndarray, list], frame: CoordinateFrame):
39
        
40
        # Copy initial values into attributes
41
        super().__init__(frame_id, timestamps, positions, orientations, frame)
2✔
42
        self.child_frame_id: str = child_frame_id
2✔
43
        self.poses: list = []
2✔
44
        self.poses_rclpy: list = []
2✔
45

46
        # Check to ensure that all arrays have same length
47
        if len(self.timestamps) != len(self.positions) or len(self.positions) != len(self.orientations):
2✔
48
            raise ValueError("Lengths of timestamp, position, and orientation arrays are not equal!")
2✔
49

50
    # =========================================================================
51
    # ============================ Class Methods ============================== 
52
    # =========================================================================  
53

54
    @classmethod
2✔
55
    def from_ros2_bag(cls, bag_path: Union[Path, str], odom_topic: str, frame: CoordinateFrame):
2✔
56
        """
57
        Creates a class structure from a ROS2 bag file with an Odometry topic.
58

59
        Args:
60
            bag_path (Path | str): Path to the ROS2 bag file.
61
            odom_topic (str): Topic of the Odometry messages.
62
        Returns:
63
            OdometryData: Instance of this class.
64
        """
65

66
        # Get topic message count and typestore
67
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
68
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
69
        num_msgs: int = bag_wrapper.get_topic_count(odom_topic)
2✔
70
        
71
        # Pre-allocate arrays (memory-mapped or otherwise)
72
        timestamps_np = np.zeros(num_msgs, dtype=Decimal)
2✔
73
        positions_np = np.zeros((num_msgs, 3), dtype=Decimal)
2✔
74
        orientations_np = np.zeros((num_msgs, 4), dtype=Decimal)
2✔
75

76
        # Setup tqdm bar & counter
77
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Odometry...", unit=" msgs")
2✔
78
        i = 0
2✔
79

80
        # Extract the odometry information
81
        frame_id, child_frame_id = None, None
2✔
82
        with Reader2(str(bag_path)) as reader:
2✔
83

84
            # Extract frames from first message
85
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
86
            for conn, timestamp, rawdata in reader.messages(connections=connections):  
2✔
87
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
88
                frame_id = msg.header.frame_id
2✔
89
                child_frame_id = msg.child_frame_id
2✔
90
                break
2✔
91

92
            # Extract individual message data
93
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
94
            for conn, timestamp, rawdata in reader.messages(connections=connections):
2✔
95
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
96
                
97
                timestamps_np[i] = bag_wrapper.extract_timestamp(msg)
2✔
98
                pos = msg.pose.pose.position
2✔
99
                positions_np[i] = np.array([Decimal(pos.x), Decimal(pos.y), Decimal(pos.z)])
2✔
100
                ori = msg.pose.pose.orientation
2✔
101
                orientations_np[i] = np.array([Decimal(ori.x), Decimal(ori.y), Decimal(ori.z), Decimal(ori.w)])
2✔
102

103
                # NOTE: Doesn't support Twist information currently
104

105
                # Increment the count
106
                i += 1
2✔
107
                pbar.update(1)
2✔
108

109
        # Create an OdometryData class
110
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
111
    
112
    @classmethod
2✔
113
    def from_csv(cls, csv_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,          
2✔
114
                 header_included: bool, column_to_data: Union[List[int], None] = None, 
115
                 separator: Union[str, None] = None, filter: Union[Tuple[str, str], None] = None,
116
                 ts_in_ns: bool = False, reorder_data: bool = False):
117
        """
118
        Creates a class structure from a csv file.
119

120
        Args:
121
            csv_path (Path | str): Path to the CSV file.
122
            frame_id (str): The frame that this odometry is relative to.
123
            child_frame_id (str): The frame whose pose is represented by this odometry.
124
            frame (CoordinateFrame): The coordinate system convention of this data.
125
            header_included (bool): If this csv file has a header, so we can remove it.
126
            column_to_data (list[int]): Tells the algorithms which columns in the csv contain which
127
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus, 
128
                index 0 of column_to_data should be the column that timestamp data is found in the 
129
                csv file. Set to None to use [0,1,2,3,4,5,6,7].
130
            separator (str | None): The separator used in the csv file. If None, will use a comma by default.
131
            filter: A tuple of (column_name, value), where only rows with column_name == value will be kept. If
132
                csv file has no headers, then `column_name` should be the index of the column as a string.
133
            ts_in_ns (bool): If True, assumes timestamps are in nanoseconds and converts to seconds. Otherwise,
134
                assumes timestamps are already in seconds.
135
            reorder_data (bool): If True, reorders the data to be in order of timestamps. If False, 
136
                assumes data is already ordered by timestamp.
137

138
        Returns:
139
            OdometryData: Instance of this class.
140
        """
141

142
        # If column_to_data is None, assume default
143
        if column_to_data is None:
2✔
144
            column_to_data = [0,1,2,3,4,5,6,7]
2✔
145
        else:
146
            # Check column_to_data values are valid
147
            assert np.all(np.array(column_to_data) >= 0)
2✔
148
            assert len(column_to_data) == 8
2✔
149

150
        # Read the csv file
151
        header = 0 if header_included else None
2✔
152
        df1 = pd.read_csv(str(csv_path), header=header, index_col=False, sep=separator, engine='python')
2✔
153

154
        # Rename columns to standard names
155
        rename_dict = {}
2✔
156
        desired_data = ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']
2✔
157
        for j, ind in enumerate(column_to_data):
2✔
158
            rename_dict[df1.columns[ind]] = desired_data[j]
2✔
159
        df1 = df1.rename(columns=rename_dict)
2✔
160

161
        # Using the filter if provided, remove unwanted rows
162
        if filter is not None:
2✔
163
            df1 = df1[df1[filter[0]] == filter[1]]
2✔
164

165
        # Convert columns to NDArray[Decimal]
166
        timestamps_np = np.array([Decimal(str(ts)) for ts in df1['timestamp']], dtype=object)
2✔
167
        positions_np = np.array([[Decimal(str(x)), Decimal(str(y)), Decimal(str(z))] 
2✔
168
                                 for x, y, z in zip(df1['x'], df1['y'], df1['z'])], dtype=object)
169
        orientations_np = np.array([[Decimal(str(qx)), Decimal(str(qy)), Decimal(str(qz)), Decimal(str(qw))]
2✔
170
                                    for qx, qy, qz, qw in zip(df1['qx'], df1['qy'], df1['qz'], df1['qw'])], dtype=object)
171
        
172
        # If timestamps are in ns, convert to s
173
        if ts_in_ns:
2✔
174
            timestamps_np = timestamps_np / Decimal('1e9')
2✔
175

176
        # Reorder the data if needed
177
        if reorder_data:
2✔
NEW
178
            print("Warning: This code is not tested yet!")
×
NEW
179
            sort_indices = np.argsort(timestamps_np)
×
NEW
180
            timestamps_np = timestamps_np[sort_indices]
×
NEW
181
            positions_np = positions_np[sort_indices]
×
NEW
182
            orientations_np = orientations_np[sort_indices]
×
183

184
        # Create an OdometryData class
185
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
186
    
187
    @classmethod
2✔
188
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,
2✔
189
                      header_included: bool, column_to_data: Union[List[int], None] = None):
190
        """
191
        Creates and OdometryData class from a text file.
192

193
        Args:
194
            file_path (Path | str): Path to the file containing the odometry data.
195
            frame_id (str): The frame where this odometry is relative to.
196
            child_frame_id (str): The frame whose pose is represented by this odometry.
197
            frame (CoordinateFrame): The coordinate system convention of this data.
198
            header_included (bool): If this text file has a header, so we can remove it.
199
            column_to_data (list[int]): Tells the algorithms which columns in the text file contain which
200
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus, 
201
                index 0 of column_to_data should be the column that timestamp data is found in the 
202
                text file. Set to None to use [0,1,2,3,4,5,6,7].
203
        Returns:
204
            OdometryData: Instance of this class.
205
        """
206
        
207
        # If column_to_data is None, assume default
208
        if column_to_data is None:
2✔
209
            column_to_data = [0,1,2,3,4,5,6,7]
2✔
210
        else:
211
            # Check column_to_data values are valid
212
            assert np.all(np.array(column_to_data) >= 0)
2✔
213
            assert len(column_to_data) == 8
2✔
214

215
        # Count the number of lines in the file
216
        line_count = 0
2✔
217
        with open(str(file_path), 'r') as file:
2✔
218
            for _ in file: 
2✔
219
                line_count += 1
2✔
220

221
        # Setup arrays to hold data
222
        timestamps_np = np.zeros((line_count), dtype=object)
2✔
223
        positions_np = np.zeros((line_count, 3), dtype=object)
2✔
224
        orientations_np = np.zeros((line_count, 4), dtype=object)
2✔
225

226
        # Open the txt file and read in the data
227
        with open(str(file_path), 'r') as file:
2✔
228
            for i, line in enumerate(file):
2✔
229
                line_split = line.split(' ')
2✔
230
                timestamps_np[i] = line_split[column_to_data[0]]
2✔
231
                positions_np[i] = np.array([line_split[column_to_data[1]], line_split[column_to_data[2]], line_split[column_to_data[3]]])
2✔
232
                orientations_np[i] =  np.array([line_split[column_to_data[5]], line_split[column_to_data[6]], line_split[column_to_data[7]], line_split[column_to_data[4]]])
2✔
233

234
        # Remove the header
235
        if header_included:
2✔
236
            timestamps_np = timestamps_np[1:]
2✔
237
            positions_np = positions_np[1:]
2✔
238
            orientations_np = orientations_np[1:]
2✔
239
        
240
        # Create an OdometryData class
241
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
242
    
243
    # =========================================================================
244
    # ========================= Manipulation Methods ========================== 
245
    # =========================================================================  
246
    
247
    def add_folded_guassian_noise_to_position(self, xy_noise_std_per_frame: float,
2✔
248
            z_noise_std_per_frame: float):
249
        """
250
        This method simulates odometry drift by adding folded gaussian noise
251
        to the odometry positions on a per frame basis. It also accumulates
252
        it over time. NOTE: It completely ignores the timestamps, and the "folded
253
        guassian noise" distribution stds might not align with the stds of the 
254
        guassian used internally, so this is not a robust function at all.
255

256
        Args:
257
            xy_noise_std_per_frame (float): Standard deviation of the gaussian 
258
                distribution for xy, whose output is then run through abs().
259
            z_noise_std_per_frame (float): Same as above, but for z.
260
        """
261

262
        # Track cumulative noise for each field
263
        cumulative_noise_pos = {'x': 0.0, 'y': 0.0, 'z': 0.0}
×
264

265
        # For each position
266
        for i in range(len(self.timestamps)):
×
267

268
            # Sample noise and accumulate
269
            noise_pos = {'x': np.random.normal(0, xy_noise_std_per_frame),
×
270
                        'y': np.random.normal(0, xy_noise_std_per_frame),
271
                        'z': np.random.normal(0, z_noise_std_per_frame)}
272
            for key in cumulative_noise_pos:
×
273
                cumulative_noise_pos[key] += abs(noise_pos[key])
×
274

275
            # Update positions
276
            self.positions[i][0] += Decimal(cumulative_noise_pos['x'])
×
277
            self.positions[i][1] += Decimal(cumulative_noise_pos['y'])
×
278
            self.positions[i][2] += Decimal(cumulative_noise_pos['z'])
×
279

280
        # Empty poses as they might need to be recalculated
NEW
281
        self.poses = []
×
NEW
282
        self.poses_rclpy = []
×
283

284
    def shift_position(self, x_shift: float, y_shift: float, z_shift: float):
2✔
285
        """
286
        Shifts the positions of the odometry.
287

288
        Args:
289
            x_shift (float): Shift in x-axis.
290
            y_shift (float): Shift in y_axis.
291
            z_shift (float): Shift in z_axis.
292
        """
293
        self.positions[:,0] += Decimal(x_shift)
×
294
        self.positions[:,1] += Decimal(y_shift)
×
295
        self.positions[:,2] += Decimal(z_shift)
×
296

297
        # Empty poses as they might need to be recalculated
NEW
298
        self.poses = []
×
NEW
299
        self.poses_rclpy = []
×
300

301
    def shift_to_start_at_identity(self):
2✔
302
        """
303
        Alter the positions and orientations based so that the first pose 
304
        starts at Identity.
305
        """
306

307
        # Get pose of first robot position w.r.t world
308
        R_o = R.from_quat(self.orientations[0]).as_matrix()
2✔
309
        T_o = np.expand_dims(self.positions[0], axis=1)
2✔
310
        
311
        # Calculate the inverse (pose of world w.r.t first robot location)
312
        R_inv = R_o.T
2✔
313

314
        # Rotate positions and orientations
315
        self.positions = (R_inv @ (self.positions.T - T_o).astype(float)).T
2✔
316
        for i in range(self.len()):
2✔
317
            self.orientations[i] = R.from_matrix((R_inv @ R.from_quat(self.orientations[i]).as_matrix())).as_quat()
2✔
318

319
        # Convert back to decimal array
320
        self.positions = col_to_dec_arr(self.positions)
2✔
321
        self.orientations = col_to_dec_arr(self.orientations)
2✔
322

323
        # Empty poses as they might need to be recalculated
324
        self.poses = []
2✔
325
        self.poses_rclpy = []
2✔
326

327
    def crop_data(self, start: Decimal, end: Decimal):
2✔
328
        """ Will crop the data so only values within [start, end] inclusive are kept. """
329

330
        # Create boolean mask of data to keep
331
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
332

333
        # Apply mask
334
        self.timestamps = self.timestamps[mask]
2✔
335
        self.positions = self.positions[mask]
2✔
336
        self.orientations = self.orientations[mask]
2✔
337

338
        # Empty poses as they might need to be recalculated
339
        self.poses = []
2✔
340
        self.poses_rclpy = []
2✔
341

342
    def interpolate_to_hz(self, target_hz: float):
2✔
343
        """
344
        Linearly interpolates position and SLERPs orientation so that odometry
345
        is output at the desired frequency.
346

347
        Args:
348
            target_hz (float): Desired output frequency in Hertz (e.g. 6.0)
349
        """
350

351
        # Check that target_hz is valid
NEW
352
        if target_hz <= 0: raise ValueError("target_hz must be positive")
×
353

354
        # Convert timestamps to float seconds
NEW
355
        ts = dec_arr_to_float_arr(self.timestamps)
×
NEW
356
        pos = dec_arr_to_float_arr(self.positions)
×
NEW
357
        quat = dec_arr_to_float_arr(self.orientations)
×
358

359
        # Create new evenly spaced timestamps
NEW
360
        duration = ts[-1] - ts[0]
×
NEW
361
        num_samples = int(np.ceil(duration * target_hz)) + 1
×
NEW
362
        new_ts = np.linspace(ts[0], ts[-1], num_samples)
×
363

364
        # Linear interpolation for position
NEW
365
        new_pos = np.zeros((len(new_ts), 3))
×
NEW
366
        for i in range(3):
×
NEW
367
            new_pos[:, i] = np.interp(new_ts, ts, pos[:, i])
×
368

369
        # Spherical Linear Interpolation of Rotations (SLERP)
NEW
370
        key_rots = R.from_quat(quat)
×
NEW
371
        slerp = Slerp(ts, key_rots)
×
NEW
372
        new_rots = slerp(new_ts)
×
NEW
373
        new_quat = new_rots.as_quat()
×
374

375
        # Convert back to Decimal arrays
NEW
376
        self.timestamps = col_to_dec_arr(new_ts)
×
NEW
377
        self.positions = col_to_dec_arr(new_pos)
×
NEW
378
        self.orientations = col_to_dec_arr(new_quat)
×
379

380
        # Clear cached ROS messages
NEW
381
        self.poses = []
×
NEW
382
        self.poses_rclpy = []
×
383

384
    # =========================================================================
385
    # ============================ Export Methods ============================= 
386
    # =========================================================================  
387

388
    def to_csv(self, csv_path: Union[Path, str], write_header: bool = True):
2✔
389
        """
390
        Writes the odometry data to a .csv file. Note that data will be
391
        saved in the following order: timestamp, pos.x, pos.y, pos.z,
392
        ori.w, ori.x, ori.y, ori.z. Timestamp is in seconds.
393

394
        Args:
395
            csv_path (Path | str): Path to the output csv file.
396
            odom_topic (str): Topic of the Odometry messages.
397
            write_header (bool): If false, skip the header row.
398
        Returns:
399
            OdometryData: Instance of this class.
400
        """
401

402
        # setup tqdm 
403
        pbar = tqdm.tqdm(total=None, desc="Saving to csv... ", unit=" frames")
2✔
404

405
        # Check that file path doesn't already exist
406
        file_path = Path(csv_path)
2✔
407
        if os.path.exists(file_path):
2✔
408
            raise ValueError(f"Output file already exists: {file_path}")
×
409
        
410
        # Open csv file
411
        with open(file_path, 'w', newline='') as csvfile:
2✔
412
            writer = csv.writer(csvfile, delimiter=',')
2✔
413

414
            # Write the first row
415
            if write_header:
2✔
416
                writer.writerow(['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'])
2✔
417
                
418
            # Write message data to the csv file
419
            for i in range(len(self.timestamps)):
2✔
420
                writer.writerow([str(self.timestamps[i]), 
2✔
421
                    str(self.positions[i][0]), str(self.positions[i][1]), str(self.positions[i][2]),
422
                    str(self.orientations[i][3]), str(self.orientations[i][0]), str(self.orientations[i][1]), 
423
                    str(self.orientations[i][2])])
424
                pbar.update(1)
2✔
425

426
    # =========================================================================
427
    # =========================== Frame Conversions =========================== 
428
    # ========================================================================= 
429
    def to_FLU_frame(self):
2✔
430
        # If we are already in the FLU frame, return
431
        if self.frame == CoordinateFrame.FLU:
2✔
432
            print("Data already in FLU coordinate frame, returning...")
2✔
433
            return
2✔
434

435
        # If in NED, run the conversion
436
        elif self.frame == CoordinateFrame.NED:
2✔
437
            # Define the rotation matrix
438
            R_NED = np.array([[1,  0,  0],
2✔
439
                              [0, -1,  0],
440
                              [0,  0, -1]])
441

442
            # Do a change of basis to update the frame
443
            self._convert_frame(R_NED)
2✔
444

445
            # Update frame
446
            self.frame = CoordinateFrame.FLU
2✔
447

448
            # Empty poses as they might need to be recalculated
449
            self.poses = []
2✔
450
            self.poses_rclpy = []
2✔
451

452
        # Otherwise, throw an error
453
        else:
454
            raise RuntimeError(f"OdometryData class is in an unexpected frame: {self.frame}!")
2✔
455
    
456
    def apply_transformation_left_side(self, H: np.ndarray):
2✔
457
        """
458
        Applies a rigid-body transformation to the entire path.
459
        In terms of transfomration matricies, this multiplies this odometry
460
        on the left side.
461

462
        Args:
463
            H: The 4x4 transformation matrix
464
        """ 
465

466
        # Apply the transformation
467
        H_self = np.eye(4).reshape(1, 4, 4).repeat(self.len(), axis=0)  # shape (N,4,4)
2✔
468
        H_self[:, :3, :3] = R.from_quat(self.orientations).as_matrix()    
2✔
469
        H_self[:, :3, 3] = self.positions
2✔
470
        H_output = H @ H_self
2✔
471

472
        # Extract results and save
473
        self.positions = H_output [:, :3, 3]
2✔
474
        self.orientations = R.from_matrix(H_output[:, :3, :3]).as_quat()      
2✔
475

476
        # Empty poses as they might need to be recalculated
477
        self.poses = []
2✔
478
        self.poses_rclpy = []
2✔
479

480
    def apply_transformation_right_side(self, H: np.ndarray):
2✔
481
        """
482
        Applies a rigid-body transformation to the entire path.
483
        In terms of transformation matrices, this multiplies this odometry
484
        on the right side (row-vector convention).
485

486
        Args:
487
            H: The 4x4 transformation matrix
488
        """ 
489

490
        # Apply the transformation
491
        H_self = np.eye(4).reshape(1, 4, 4).repeat(self.len(), axis=0)  # shape (N,4,4)
2✔
492
        H_self[:, :3, :3] = R.from_quat(self.orientations).as_matrix()    
2✔
493
        H_self[:, :3, 3] = self.positions
2✔
494
        H_output = H_self @ H
2✔
495

496
        # Extract results and save
497
        self.positions = H_output [:, :3, 3]
2✔
498
        self.orientations = R.from_matrix(H_output[:, :3, :3]).as_quat()      
2✔
499

500
        # Empty poses as they might need to be recalculated
501
        self.poses = []
2✔
502
        self.poses_rclpy = []
2✔
503

504
    def _convert_frame(self, R_frame: np.ndarray):
2✔
505
        """ Uses a change of basis to update the positions and orientations. """
506
        R_frame_Q = R.from_matrix(R_frame)
2✔
507
        self.positions = (R_frame @ self.positions.T).T
2✔
508
        self._ori_change_of_basis(R_frame_Q)
2✔
509

510
        # Empty poses as they might need to be recalculated
511
        self.poses = []
2✔
512
        self.poses_rclpy = []
2✔
513

514
    def _ori_apply_rotation(self, R_i: R):
2✔
515
        """ Applies a rotation (not a change of basis) to orientations, thus stays in the same frame. """
516
        for i in range(self.len()):
2✔
517
            self.orientations[i] = (R_i * R.from_quat(self.orientations[i])).as_quat()
2✔
518

519
        # Empty poses as they might need to be recalculated
520
        self.poses = []
2✔
521
        self.poses_rclpy = []
2✔
522

523
    def _ori_change_of_basis(self, R_i: R):
2✔
524
        """ Applies a change of basis to orientations """
525
        for i in range(self.len()):
2✔
526
            self.orientations[i] = (R_i * R.from_quat(self.orientations[i]) * R_i.inv()).as_quat()
2✔
527
    
528
        # Empty poses as they might need to be recalculated
529
        self.poses = []
2✔
530
        self.poses_rclpy = []
2✔
531

532
    # =========================================================================
533
    # =========================== Conversion to ROS =========================== 
534
    # ========================================================================= 
535

536
    @staticmethod
2✔
537
    def get_ros_msg_type(lib_type: ROSMsgLibType, msg_type: str = "Odometry") -> Any:
2✔
538
        """ Return the __msgtype__ for an Odometry msg. """
539
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
540
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
541
            if msg_type == "Odometry":
2✔
542
                return typestore.types['nav_msgs/msg/Odometry'].__msgtype__
2✔
543
            elif msg_type == "Path":
2✔
544
                return typestore.types['nav_msgs/msg/Path'].__msgtype__
2✔
545
            else:
NEW
546
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
547
        elif lib_type == ROSMsgLibType.RCLPY:
2✔
548
            if msg_type == "Path":
2✔
549
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
2✔
550
            else:
NEW
551
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
NEW
552
        elif lib_type == ROSMsgLibType.ROSPY:
×
NEW
553
            if msg_type == "Odometry":
×
NEW
554
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Odometry')
×
NEW
555
            if msg_type == "maplab_msg/OdometryWithImuBiases":
×
NEW
556
                return ModuleImporter.get_module_attribute('maplab_msgs.msg', 'OdometryWithImuBiases')
×
NEW
557
            elif msg_type == "Path":
×
NEW
558
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
×
559
            else:
NEW
560
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
561
        else:
NEW
562
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg_type()!")
×
563
    
564
    def _extract_seconds_and_nanoseconds(self, i: int):
2✔
565
        seconds = int(self.timestamps[i])
2✔
566
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) \
2✔
567
                        * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
568
        return seconds, nanoseconds
2✔
569
    
570
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int, msg_type: str = "Odometry"):
2✔
571
        """
572
        Gets an Image ROS2 Humble message corresponding to the odometry in index i.
573
        
574
        Args:
575
            i (int): The index of the odometry data to convert.
576
        Raises:
577
            ValueError: If i is outside the data bounds.
578

579
        NOTE: Currently doesn't support Twist information.
580
        NOTE: Assumes zero covariances.
581
        """
582

583
        # Check to make sure index is within data bounds
584
        if i < 0 or i >= self.len():
2✔
585
            raise IndexError(f"Index {i} is out of bounds!")
×
586
        
587
        # Extract seconds and nanoseconds
588
        seconds, nanoseconds = self._extract_seconds_and_nanoseconds(i)
2✔
589

590
        # Write the data into the new msg
591
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
592
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
593

594
            Odometry = typestore.types['nav_msgs/msg/Odometry']
2✔
595
            Header = typestore.types['std_msgs/msg/Header']
2✔
596
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
597
            PoseWithCovariance = typestore.types['geometry_msgs/msg/PoseWithCovariance']
2✔
598
            TwistWithCovariance = typestore.types['geometry_msgs/msg/TwistWithCovariance']
2✔
599
            Twist = typestore.types['geometry_msgs/msg/Twist']
2✔
600
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
601
            Path = typestore.types['nav_msgs/msg/Path']
2✔
602
            Pose = typestore.types['geometry_msgs/msg/Pose']
2✔
603
            Point = typestore.types['geometry_msgs/msg/Point']
2✔
604
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
605

606
            if msg_type == "Odometry":
2✔
607
            
608
                return Odometry(Header(stamp=Time(sec=int(seconds), 
2✔
609
                                                nanosec=int(nanoseconds)), 
610
                                frame_id=self.frame_id),
611
                                child_frame_id=self.child_frame_id,
612
                                pose=PoseWithCovariance(pose=Pose(position=Point(x=self.positions[i][0],
613
                                                                    y=self.positions[i][1],
614
                                                                    z=self.positions[i][2]),
615
                                                        orientation=Quaternion(x=self.orientations[i][0],
616
                                                                                y=self.orientations[i][1],
617
                                                                                z=self.orientations[i][2],
618
                                                                                w=self.orientations[i][3])),
619
                                                        covariance=np.zeros(36)),
620
                                twist=TwistWithCovariance(twist=Twist(linear=Vector3(x=0, # Currently doesn't support Twist
621
                                                                                    y=0,
622
                                                                                    z=0,),
623
                                                                    angular=Vector3(x=0,
624
                                                                                    y=0,
625
                                                                                    z=0,)),
626
                                                        covariance=np.zeros(36)))
627
            elif msg_type == "Path":
2✔
628

629
                # Pre-calculate all the poses
630
                if len(self.poses) != self.len():
2✔
631
                    PoseStamped = typestore.types['geometry_msgs/msg/PoseStamped']
2✔
632

633
                    for j in range(self.len()):
2✔
634
                        sec, nanosec = self._extract_seconds_and_nanoseconds(j)
2✔
635
                        self.poses.append(PoseStamped(Header(stamp=Time(sec=int(sec), 
2✔
636
                                                                        nanosec=int(nanosec)),
637
                                                            frame_id=self.frame_id),
638
                                                    pose=Pose(position=Point(x=self.positions[j][0],
639
                                                                            y=self.positions[j][1],
640
                                                                            z=self.positions[j][2]),
641
                                    orientation=Quaternion(x=self.orientations[j][0],
642
                                                            y=self.orientations[j][1],
643
                                                            z=self.orientations[j][2],
644
                                                            w=self.orientations[j][3]))))
645

646
                return Path(Header(stamp=Time(sec=int(seconds), 
2✔
647
                                            nanosec=int(nanoseconds)),
648
                                frame_id=self.frame_id),
649
                                poses=self.poses[0:i+1:PATH_SLICE_STEP])
650
            else:
NEW
651
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSBAGS")
×
652
            
653
        elif lib_type == ROSMsgLibType.ROSPY or lib_type == ROSMsgLibType.RCLPY:
2✔
654
            if msg_type == "Odometry":
2✔
655

NEW
656
                Odometry = ModuleImporter.get_module_attribute('nav_msgs.msg', 'Odometry')
×
NEW
657
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
×
NEW
658
                PoseWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseWithCovariance')
×
NEW
659
                TwistWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'TwistWithCovariance')
×
NEW
660
                Pose = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Pose')
×
NEW
661
                Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
×
NEW
662
                Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
×
NEW
663
                Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
×
664

NEW
665
                msg = Odometry()
×
NEW
666
                msg.header = Header()
×
NEW
667
                msg.header.frame_id = self.frame_id
×
NEW
668
                if lib_type == ROSMsgLibType.RCLPY: 
×
NEW
669
                    Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
×
NEW
670
                    msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
671
                else:
NEW
672
                    rospy = ModuleImporter.get_module('rospy')
×
NEW
673
                    msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
NEW
674
                msg.child_frame_id = self.child_frame_id
×
675

NEW
676
                msg.pose = PoseWithCovariance()
×
NEW
677
                msg.pose.pose = Pose()
×
NEW
678
                msg.pose.pose.position = Point()
×
NEW
679
                msg.pose.pose.position.x = float(self.positions[i][0])
×
NEW
680
                msg.pose.pose.position.y = float(self.positions[i][1])
×
NEW
681
                msg.pose.pose.position.z = float(self.positions[i][2])
×
NEW
682
                msg.pose.pose.orientation = Quaternion()
×
NEW
683
                msg.pose.pose.orientation.x = float(self.orientations[i][0])
×
NEW
684
                msg.pose.pose.orientation.y = float(self.orientations[i][1])
×
NEW
685
                msg.pose.pose.orientation.z = float(self.orientations[i][2])
×
NEW
686
                msg.pose.pose.orientation.w = float(self.orientations[i][3])
×
NEW
687
                msg.pose.covariance = np.zeros(36) # NOTE: Assumes covariance of zero.
×
NEW
688
                msg.twist = TwistWithCovariance()
×
NEW
689
                msg.twist.twist.linear = Vector3()
×
NEW
690
                msg.twist.twist.linear.x = 0.0  # NOTE: Currently doesn't support Twist
×
NEW
691
                msg.twist.twist.linear.y = 0.0
×
NEW
692
                msg.twist.twist.linear.z = 0.0
×
NEW
693
                msg.twist.twist.angular = Vector3()
×
NEW
694
                msg.twist.twist.angular.x = 0.0
×
NEW
695
                msg.twist.twist.angular.y = 0.0
×
NEW
696
                msg.twist.twist.angular.z = 0.0
×
NEW
697
                msg.twist.covariance = np.zeros(36)
×
NEW
698
                return msg
×
699
            
700
            elif msg_type == "maplab_msg/OdometryWithImuBiases":
2✔
701
                
NEW
702
                if lib_type == ROSMsgLibType.RCLPY:
×
NEW
703
                    raise ValueError("maplab_msg/OdometryWithImuBiases is not supported for RCLPY!")
×
704

NEW
705
                rospy = ModuleImporter.get_module('rospy')
×
NEW
706
                OdometryWithImuBiases = ModuleImporter.get_module_attribute('maplab_msgs.msg', 'OdometryWithImuBiases')
×
NEW
707
                PoseWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseWithCovariance')
×
NEW
708
                TwistWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'TwistWithCovariance')
×
NEW
709
                Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
×
NEW
710
                Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
×
NEW
711
                Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
×
NEW
712
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
×
713

NEW
714
                msg = OdometryWithImuBiases()
×
NEW
715
                msg.header = Header()
×
NEW
716
                msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
NEW
717
                msg.header.frame_id = self.frame_id
×
NEW
718
                msg.child_frame_id = self.child_frame_id
×
NEW
719
                msg.pose = PoseWithCovariance()
×
NEW
720
                msg.pose.pose.position = Point()
×
NEW
721
                msg.pose.pose.position.x = float(self.positions[i][0])
×
NEW
722
                msg.pose.pose.position.y = float(self.positions[i][1])
×
NEW
723
                msg.pose.pose.position.z = float(self.positions[i][2])
×
NEW
724
                msg.pose.pose.orientation = Quaternion()
×
NEW
725
                msg.pose.pose.orientation.x = float(self.orientations[i][0])
×
NEW
726
                msg.pose.pose.orientation.y = float(self.orientations[i][1])
×
NEW
727
                msg.pose.pose.orientation.z = float(self.orientations[i][2])
×
NEW
728
                msg.pose.pose.orientation.w = float(self.orientations[i][3])
×
NEW
729
                msg.pose.covariance = np.zeros(36) # NOTE: Assumes covariance of zero.
×
NEW
730
                msg.twist = TwistWithCovariance()
×
NEW
731
                msg.twist.twist.linear = Vector3()
×
NEW
732
                msg.twist.twist.linear.x = 0.0  # NOTE: Currently doesn't support Twist
×
NEW
733
                msg.twist.twist.linear.y = 0.0
×
NEW
734
                msg.twist.twist.linear.z = 0.0
×
NEW
735
                msg.twist.twist.angular = Vector3()
×
NEW
736
                msg.twist.twist.angular.x = 0.0
×
NEW
737
                msg.twist.twist.angular.y = 0.0
×
NEW
738
                msg.twist.twist.angular.z = 0.0
×
NEW
739
                msg.twist.covariance = np.zeros(36)
×
NEW
740
                msg.accel_bias = Vector3() # NOTE: Assumes IMU biases are zero
×
NEW
741
                msg.accel_bias.x = 0.0
×
NEW
742
                msg.accel_bias.y = 0.0
×
NEW
743
                msg.accel_bias.z = 0.0
×
NEW
744
                msg.gyro_bias = Vector3()
×
NEW
745
                msg.gyro_bias.x = 0.0
×
NEW
746
                msg.gyro_bias.y = 0.0
×
NEW
747
                msg.gyro_bias.z = 0.0
×
NEW
748
                msg.odometry_state = 0 # NOTE: Assumes default state
×
NEW
749
                return msg
×
750

751
            elif msg_type == "Path":
2✔
752
                
753
                Path = ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
2✔
754
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
755

756
                # Pre-calculate all the poses
757
                if len(self.poses_rclpy) != self.len():
2✔
758

759
                    PoseStamped = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseStamped')
2✔
760
                    Pose = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Pose')
2✔
761
                    Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
2✔
762
                    Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
2✔
763

764
                    for j in range(self.len()):
2✔
765
                        sec, nanosec = self._extract_seconds_and_nanoseconds(j)
2✔
766
                        pose_msg = PoseStamped()
2✔
767
                        pose_msg.header = Header()
2✔
768
                        if lib_type == ROSMsgLibType.RCLPY:
2✔
769
                            Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
770
                            pose_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
771
                        else:
NEW
772
                            rospy = ModuleImporter.get_module('rospy')
×
NEW
773
                            pose_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
774
                        pose_msg.header.frame_id = self.frame_id
2✔
775
                        pose_msg.pose = Pose()
2✔
776
                        pose_msg.pose.position = Point()
2✔
777
                        pose_msg.pose.position.x = float(self.positions[j][0])
2✔
778
                        pose_msg.pose.position.y = float(self.positions[j][1])
2✔
779
                        pose_msg.pose.position.z = float(self.positions[j][2])
2✔
780
                        pose_msg.pose.orientation = Quaternion()
2✔
781
                        pose_msg.pose.orientation.x = float(self.orientations[j][0])
2✔
782
                        pose_msg.pose.orientation.y = float(self.orientations[j][1])
2✔
783
                        pose_msg.pose.orientation.z = float(self.orientations[j][2])
2✔
784
                        pose_msg.pose.orientation.w = float(self.orientations[j][3])
2✔
785
                        self.poses_rclpy.append(pose_msg)
2✔
786

787
                msg = Path()
2✔
788
                msg.header = Header()
2✔
789
                if lib_type == ROSMsgLibType.RCLPY:
2✔
790
                    Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
791
                    msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
792
                else:
NEW
793
                    rospy = ModuleImporter.get_module('rospy')
×
NEW
794
                    msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
795
                msg.header.frame_id = self.frame_id
2✔
796
                msg.poses = self.poses_rclpy[0:i+1:PATH_SLICE_STEP]
2✔
797
                return msg
2✔
798
            
799
            else:
NEW
800
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSPY or ROSMsgLibType.RCLPY.")
×
801
        else:
802
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg()!")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc