• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20823988045

08 Jan 2026 04:29PM UTC coverage: 62.089% (-12.6%) from 74.672%
20823988045

Pull #10

github

DanielChaseButterfield
Path message type for RCLPY and OdometryData
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

181 of 281 branches covered (64.41%)

Branch coverage included in aggregate %.

216 of 561 new or added lines in 11 files covered. (38.5%)

3 existing lines in 2 files now uncovered.

1121 of 1816 relevant lines covered (61.73%)

1.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.93
/src/robotdataprocess/data_types/OdometryData.py
1
from __future__ import annotations
2✔
2

3
from ..conversion_utils import col_to_dec_arr
2✔
4
import csv
2✔
5
from .Data import CoordinateFrame, Data, ROSMsgLibType
2✔
6
import decimal
2✔
7
from decimal import Decimal
2✔
8
from evo.core import geometry
2✔
9
import matplotlib.pyplot as plt
2✔
10
import numpy as np
2✔
11
from numpy.typing import NDArray
2✔
12
import os
2✔
13
import pandas as pd
2✔
14
from .PathData import PathData
2✔
15
from pathlib import Path
2✔
16
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
17
from rosbags.rosbag2 import Reader as Reader2
2✔
18
from rosbags.typesys import Stores, get_typestore
2✔
19
from rosbags.typesys.store import Typestore
2✔
20
from scipy.spatial.transform import Rotation as R
2✔
21
from typeguard import typechecked
2✔
22
from typing import Union, List, Tuple
2✔
23
import tqdm
2✔
24

25
@typechecked
2✔
26
class OdometryData(PathData):
2✔
27

28
    # Define odometry-specific data attributes
29
    child_frame_id: str
2✔
30
    poses: list # Saved nav_msgs/msg/Pose for rosbags
2✔
31
    poses_rclpy: list # Saved nav_msgs/msg/Pose for rclpy
2✔
32

33
    @typechecked
2✔
34
    def __init__(self, frame_id: str, child_frame_id: str, timestamps: Union[np.ndarray, list], 
2✔
35
                 positions: Union[np.ndarray, list], orientations: Union[np.ndarray, list], frame: CoordinateFrame):
36
        
37
        # Copy initial values into attributes
38
        super().__init__(frame_id, timestamps, positions, orientations, frame)
2✔
39
        self.child_frame_id: str = child_frame_id
2✔
40
        self.poses: list = []
2✔
41
        self.poses_rclpy: list = []
2✔
42

43
        # Check to ensure that all arrays have same length
44
        if len(self.timestamps) != len(self.positions) or len(self.positions) != len(self.orientations):
2✔
45
            raise ValueError("Lengths of timestamp, position, and orientation arrays are not equal!")
×
46

47
    # =========================================================================
48
    # ============================ Class Methods ============================== 
49
    # =========================================================================  
50

51
    @classmethod
2✔
52
    @typechecked
2✔
53
    def from_ros2_bag(cls, bag_path: Union[Path, str], odom_topic: str, frame: CoordinateFrame):
2✔
54
        """
55
        Creates a class structure from a ROS2 bag file with an Odometry topic.
56

57
        Args:
58
            bag_path (Path | str): Path to the ROS2 bag file.
59
            odom_topic (str): Topic of the Odometry messages.
60
        Returns:
61
            OdometryData: Instance of this class.
62
        """
63

64
        # Get topic message count and typestore
65
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
66
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
67
        num_msgs: int = bag_wrapper.get_topic_count(odom_topic)
2✔
68
        
69
        # Pre-allocate arrays (memory-mapped or otherwise)
70
        timestamps_np = np.zeros(num_msgs, dtype=Decimal)
2✔
71
        positions_np = np.zeros((num_msgs, 3), dtype=Decimal)
2✔
72
        orientations_np = np.zeros((num_msgs, 4), dtype=Decimal)
2✔
73

74
        # Setup tqdm bar & counter
75
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Odometry...", unit=" msgs")
2✔
76
        i = 0
2✔
77

78
        # Extract the odometry information
79
        frame_id, child_frame_id = None, None
2✔
80
        with Reader2(str(bag_path)) as reader:
2✔
81

82
            # Extract frames from first message
83
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
84
            for conn, timestamp, rawdata in reader.messages(connections=connections):  
2✔
85
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
86
                frame_id = msg.header.frame_id
2✔
87
                child_frame_id = msg.child_frame_id
2✔
88
                break
2✔
89

90
            # Extract individual message data
91
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
92
            for conn, timestamp, rawdata in reader.messages(connections=connections):
2✔
93
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
94
                
95
                timestamps_np[i] = bag_wrapper.extract_timestamp(msg)
2✔
96
                pos = msg.pose.pose.position
2✔
97
                positions_np[i] = np.array([Decimal(pos.x), Decimal(pos.y), Decimal(pos.z)])
2✔
98
                ori = msg.pose.pose.orientation
2✔
99
                orientations_np[i] = np.array([Decimal(ori.x), Decimal(ori.y), Decimal(ori.z), Decimal(ori.w)])
2✔
100

101
                # NOTE: Doesn't support Twist information currently
102

103
                # Increment the count
104
                i += 1
2✔
105
                pbar.update(1)
2✔
106

107
        # Create an OdometryData class
108
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
109
    
110
    @classmethod
2✔
111
    @typechecked
2✔
112
    def from_csv(cls, csv_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,          
2✔
113
                 header_included: bool, column_to_data: Union[List[int], None] = None, 
114
                 separator: Union[str, None] = None, filter: Union[Tuple[str, str], None] = None,
115
                 ts_in_ns: bool = False, reorder_data: bool = False):
116
        """
117
        Creates a class structure from a csv file.
118

119
        Args:
120
            csv_path (Path | str): Path to the CSV file.
121
            frame_id (str): The frame that this odometry is relative to.
122
            child_frame_id (str): The frame whose pose is represented by this odometry.
123
            frame (CoordinateFrame): The coordinate system convention of this data.
124
            header_included (bool): If this csv file has a header, so we can remove it.
125
            column_to_data (list[int]): Tells the algorithms which columns in the csv contain which
126
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus, 
127
                index 0 of column_to_data should be the column that timestamp data is found in the 
128
                csv file. Set to None to use [0,1,2,3,4,5,6,7].
129
            separator (str | None): The separator used in the csv file. If None, will use a comma by default.
130
            filter: A tuple of (column_name, value), where only rows with column_name == value will be kept. If
131
                csv file has no headers, then `column_name` should be the index of the column as a string.
132
            ts_in_ns (bool): If True, assumes timestamps are in nanoseconds and converts to seconds. Otherwise,
133
                assumes timestamps are already in seconds.
134
            reorder_data (bool): If True, reorders the data to be in order of timestamps. If False, 
135
                assumes data is already ordered by timestamp.
136

137
        Returns:
138
            OdometryData: Instance of this class.
139
        """
140

141
        # If column_to_data is None, assume default
142
        if column_to_data is None:
2✔
143
            column_to_data = [0,1,2,3,4,5,6,7]
2✔
144
        else:
145
            # Check column_to_data values are valid
146
            assert np.all(np.array(column_to_data) >= 0)
2✔
147
            assert len(column_to_data) == 8
2✔
148

149
        # Read the csv file
150
        header = 0 if header_included else None
2✔
151
        df1 = pd.read_csv(str(csv_path), header=header, index_col=False, sep=separator, engine='python')
2✔
152

153
        # Rename columns to standard names
154
        rename_dict = {}
2✔
155
        desired_data = ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']
2✔
156
        for j, ind in enumerate(column_to_data):
2✔
157
            rename_dict[df1.columns[ind]] = desired_data[j]
2✔
158
        df1 = df1.rename(columns=rename_dict)
2✔
159

160
        # Using the filter if provided, remove unwanted rows
161
        if filter is not None:
2✔
162
            df1 = df1[df1[filter[0]] == filter[1]]
2✔
163

164
        # Convert columns to NDArray[Decimal]
165
        timestamps_np = np.array([Decimal(str(ts)) for ts in df1['timestamp']], dtype=object)
2✔
166
        positions_np = np.array([[Decimal(str(x)), Decimal(str(y)), Decimal(str(z))] 
2✔
167
                                 for x, y, z in zip(df1['x'], df1['y'], df1['z'])], dtype=object)
168
        orientations_np = np.array([[Decimal(str(qx)), Decimal(str(qy)), Decimal(str(qz)), Decimal(str(qw))]
2✔
169
                                    for qx, qy, qz, qw in zip(df1['qx'], df1['qy'], df1['qz'], df1['qw'])], dtype=object)
170
        
171
        # If timestamps are in ns, convert to s
172
        if ts_in_ns:
2✔
173
            timestamps_np = timestamps_np / Decimal('1e9')
2✔
174

175
        # Reorder the data if needed
176
        if reorder_data:
2✔
NEW
177
            print("Warning: This code is not tested yet!")
×
NEW
178
            sort_indices = np.argsort(timestamps_np)
×
NEW
179
            timestamps_np = timestamps_np[sort_indices]
×
NEW
180
            positions_np = positions_np[sort_indices]
×
NEW
181
            orientations_np = orientations_np[sort_indices]
×
182

183
        # Create an OdometryData class
184
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
185
    
186
    @classmethod
2✔
187
    @typechecked
2✔
188
    def from_txt_file(cls, file_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,
2✔
189
                      header_included: bool, column_to_data: Union[List[int], None] = None):
190
        """
191
        Creates a class structure from a text file, where the order of values
192
        in the files follows ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'].
193

194
        Args:
195
            file_path (Path | str): Path to the file containing the odometry data.
196
            frame_id (str): The frame where this odometry is relative to.
197
            child_frame_id (str): The frame whose pose is represented by this odometry.
198
            frame (CoordinateFrame): The coordinate system convention of this data.
199
            header_included (bool): If this text file has a header, so we can remove it.
200
            column_to_data (list[int]): Tells the algorithms which columns in the text file contain which
201
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus, 
202
                index 0 of column_to_data should be the column that timestamp data is found in the 
203
                text file. Set to None to use [0,1,2,3,4,5,6,7].
204
        Returns:
205
            OdometryData: Instance of this class.
206
        """
207
        
208
        # If column_to_data is None, assume default
209
        if column_to_data is None:
2✔
210
            column_to_data = [0,1,2,3,4,5,6,7]
2✔
211
        else:
212
            # Check column_to_data values are valid
213
            assert np.all(np.array(column_to_data) >= 0)
2✔
214
            assert len(column_to_data) == 8
2✔
215

216
        # Count the number of lines in the file
217
        line_count = 0
2✔
218
        with open(str(file_path), 'r') as file:
2✔
219
            for _ in file: 
2✔
220
                line_count += 1
2✔
221

222
        # Setup arrays to hold data
223
        timestamps_np = np.zeros((line_count), dtype=object)
2✔
224
        positions_np = np.zeros((line_count, 3), dtype=object)
2✔
225
        orientations_np = np.zeros((line_count, 4), dtype=object)
2✔
226

227
        # Open the txt file and read in the data
228
        with open(str(file_path), 'r') as file:
2✔
229
            for i, line in enumerate(file):
2✔
230
                line_split = line.split(' ')
2✔
231
                timestamps_np[i] = line_split[column_to_data[0]]
2✔
232
                positions_np[i] = np.array([line_split[column_to_data[1]], line_split[column_to_data[2]], line_split[column_to_data[3]]])
2✔
233
                orientations_np[i] =  np.array([line_split[column_to_data[5]], line_split[column_to_data[6]], line_split[column_to_data[7]], line_split[column_to_data[4]]])
2✔
234

235
        # Remove the header
236
        if header_included:
2✔
237
            timestamps_np = timestamps_np[1:]
2✔
238
            positions_np = positions_np[1:]
2✔
239
            orientations_np = orientations_np[1:]
2✔
240
        
241
        # Create an OdometryData class
242
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
243
    
244
    # =========================================================================
245
    # ========================= Manipulation Methods ========================== 
246
    # =========================================================================  
247
    
248
    def add_folded_guassian_noise_to_position(self, xy_noise_std_per_frame: float,
2✔
249
            z_noise_std_per_frame: float):
250
        """
251
        This method simulates odometry drift by adding folded gaussian noise
252
        to the odometry positions on a per frame basis. It also accumulates
253
        it over time. NOTE: It completely ignores the timestamps, and the "folded
254
        guassian noise" distribution stds might not align with the stds of the 
255
        guassian used internally, so this is not a robust function at all.
256

257
        Args:
258
            xy_noise_std_per_frame (float): Standard deviation of the gaussian 
259
                distribution for xy, whose output is then run through abs().
260
            z_noise_std_per_frame (float): Same as above, but for z.
261
        """
262

263
        # Track cumulative noise for each field
264
        cumulative_noise_pos = {'x': 0.0, 'y': 0.0, 'z': 0.0}
×
265

266
        # For each position
267
        for i in range(len(self.timestamps)):
×
268

269
            # Sample noise and accumulate
270
            noise_pos = {'x': np.random.normal(0, xy_noise_std_per_frame),
×
271
                        'y': np.random.normal(0, xy_noise_std_per_frame),
272
                        'z': np.random.normal(0, z_noise_std_per_frame)}
273
            for key in cumulative_noise_pos:
×
274
                cumulative_noise_pos[key] += abs(noise_pos[key])
×
275

276
            # Update positions
277
            self.positions[i][0] += Decimal(cumulative_noise_pos['x'])
×
278
            self.positions[i][1] += Decimal(cumulative_noise_pos['y'])
×
279
            self.positions[i][2] += Decimal(cumulative_noise_pos['z'])
×
280

281
    @typechecked
2✔
282
    def shift_position(self, x_shift: float, y_shift: float, z_shift: float):
2✔
283
        """
284
        Shifts the positions of the odometry.
285

286
        Args:
287
            x_shift (float): Shift in x-axis.
288
            y_shift (float): Shift in y_axis.
289
            z_shift (float): Shift in z_axis.
290
        """
291
        self.positions[:,0] += Decimal(x_shift)
×
292
        self.positions[:,1] += Decimal(y_shift)
×
293
        self.positions[:,2] += Decimal(z_shift)
×
294

295
    def shift_to_start_at_identity(self):
2✔
296
        """
297
        Alter the positions and orientations based so that the first pose 
298
        starts at Identity.
299
        """
300

301
        # Get pose of first robot position w.r.t world
302
        R_o = R.from_quat(self.orientations[0]).as_matrix()
2✔
303
        T_o = np.expand_dims(self.positions[0], axis=1)
2✔
304
        
305
        # Calculate the inverse (pose of world w.r.t first robot location)
306
        R_inv = R_o.T
2✔
307

308
        # Rotate positions and orientations
309
        self.positions = (R_inv @ (self.positions.T - T_o).astype(float)).T
2✔
310
        for i in range(self.len()):
2✔
311
            self.orientations[i] = R.from_matrix((R_inv @ R.from_quat(self.orientations[i]).as_matrix())).as_quat()
2✔
312

313
        # Convert back to decimal array
314
        self.positions = col_to_dec_arr(self.positions)
2✔
315
        self.orientations = col_to_dec_arr(self.orientations)
2✔
316

317
    def crop_data(self, start: Decimal, end: Decimal):
2✔
318
        """ Will crop the data so only values within [start, end] inclusive are kept. """
319

320
        # Create boolean mask of data to keep
321
        mask = (self.timestamps >= start) & (self.timestamps <= end)
2✔
322

323
        # Apply mask
324
        self.timestamps = self.timestamps[mask]
2✔
325
        self.positions = self.positions[mask]
2✔
326
        self.orientations = self.orientations[mask]
2✔
327

328
        # Empty poses as they might need to be recalculated
329
        self.poses = []
2✔
330
        self.poses_rclpy = []
2✔
331

332
    # =========================================================================
333
    # ============================ Export Methods ============================= 
334
    # =========================================================================  
335

336
    @typechecked
2✔
337
    def to_csv(self, csv_path: Union[Path, str]):
2✔
338
        """
339
        Writes the odometry data to a .csv file. Note that data will be
340
        saved in the following order: timestamp, pos.x, pos.y, pos.z,
341
        ori.w, ori.x, ori.y, ori.z. Timestamp is in seconds.
342

343
        Args:
344
            csv_path (Path | str): Path to the output csv file.
345
            odom_topic (str): Topic of the Odometry messages.
346
        Returns:
347
            OdometryData: Instance of this class.
348
        """
349

350
        # setup tqdm 
351
        pbar = tqdm.tqdm(total=None, desc="Saving to csv... ", unit=" frames")
2✔
352

353
        # Check that file path doesn't already exist
354
        file_path = Path(csv_path)
2✔
355
        if os.path.exists(file_path):
2✔
356
            raise ValueError(f"Output file already exists: {file_path}")
×
357
        
358
        # Open csv file
359
        with open(file_path, 'w', newline='') as csvfile:
2✔
360
            writer = csv.writer(csvfile, delimiter=',')
2✔
361

362
            # Write the first row
363
            writer.writerow(['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz'])
2✔
364
                
365
            # Write message data to the csv file
366
            for i in range(len(self.timestamps)):
2✔
367
                writer.writerow([str(self.timestamps[i]), 
2✔
368
                    str(self.positions[i][0]), str(self.positions[i][1]), str(self.positions[i][2]),
369
                    str(self.orientations[i][3]), str(self.orientations[i][0]), str(self.orientations[i][1]), 
370
                    str(self.orientations[i][2])])
371
                pbar.update(1)
2✔
372

373
    # =========================================================================
374
    # =========================== Frame Conversions =========================== 
375
    # ========================================================================= 
376
    def to_FLU_frame(self):
2✔
377
        # If we are already in the FLU frame, return
378
        if self.frame == CoordinateFrame.FLU:
2✔
379
            print("Data already in FLU coordinate frame, returning...")
2✔
380
            return
2✔
381

382
        # If in NED, run the conversion
383
        elif self.frame == CoordinateFrame.NED:
2✔
384
            # Define the rotation matrix
385
            R_NED = np.array([[1,  0,  0],
2✔
386
                              [0, -1,  0],
387
                              [0,  0, -1]])
388

389
            # Do a change of basis to update the frame
390
            self._convert_frame(R_NED)
2✔
391

392
            # Update frame
393
            self.frame = CoordinateFrame.FLU
2✔
394

395
        # Otherwise, throw an error
396
        else:
397
            raise RuntimeError(f"OdometryData class is in an unexpected frame: {self.frame}!")
2✔
398
    
399
    @typechecked
2✔
400
    def _convert_frame(self, R_frame: np.ndarray):
2✔
401
        """ Uses a change of basis to update the positions and orientations. """
402
        R_frame_Q = R.from_matrix(R_frame)
2✔
403
        self.positions = (R_frame @ self.positions.T).T
2✔
404
        self._ori_change_of_basis(R_frame_Q)
2✔
405

406
    @typechecked
2✔
407
    def _ori_apply_rotation(self, R_i: R):
2✔
408
        """ Applies a rotation (not a change of basis) to orientations, thus stays in the same frame. """
409
        for i in range(self.len()):
2✔
410
            self.orientations[i] = (R_i * R.from_quat(self.orientations[i])).as_quat()
2✔
411

412
    @typechecked
2✔
413
    def _ori_change_of_basis(self, R_i: R):
2✔
414
        """ Applies a change of basis to orientations """
415
        for i in range(self.len()):
2✔
416
            self.orientations[i] = (R_i * R.from_quat(self.orientations[i]) * R_i.inv()).as_quat()
2✔
417

418
    # =========================================================================
419
    # =========================== Conversion to ROS =========================== 
420
    # ========================================================================= 
421

422
    @staticmethod
2✔
423
    def get_ros_msg_type(lib_type: ROSMsgLibType, msg_type: str = "Odometry") -> str:
2✔
424
        """ Return the __msgtype__ for an Imu msg. """
425
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
426
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
427
            if msg_type == "Odometry":
2✔
428
                return typestore.types['nav_msgs/msg/Odometry'].__msgtype__
2✔
429
            elif msg_type == "Path":
2✔
430
                return typestore.types['nav_msgs/msg/Path'].__msgtype__
2✔
431
            else:
NEW
432
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
433
        else:
NEW
434
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg_type()!")
×
435
    
436
    def _extract_seconds_and_nanoseconds(self, i: int):
2✔
437
        seconds = int(self.timestamps[i])
2✔
438
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) \
2✔
439
                        * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
440
        return seconds, nanoseconds
2✔
441
    
442
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int, msg_type: str = "Odometry"):
2✔
443
        """
444
        Gets an Image ROS2 Humble message corresponding to the odometry in index i.
445
        
446
        Args:
447
            i (int): The index of the odometry data to convert.
448
        Raises:
449
            ValueError: If i is outside the data bounds.
450
        """
451

452
        # Check to make sure index is within data bounds
453
        if i < 0 or i >= self.len():
2✔
454
            raise IndexError(f"Index {i} is out of bounds!")
×
455
        
456
        # Extract seconds and nanoseconds
457
        seconds, nanoseconds = self._extract_seconds_and_nanoseconds(i)
2✔
458

459
        # Write the data into the new msg
460
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
461
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
462

463
            Odometry = typestore.types['nav_msgs/msg/Odometry']
2✔
464
            Header = typestore.types['std_msgs/msg/Header']
2✔
465
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
466
            PoseWithCovariance = typestore.types['geometry_msgs/msg/PoseWithCovariance']
2✔
467
            TwistWithCovariance = typestore.types['geometry_msgs/msg/TwistWithCovariance']
2✔
468
            Twist = typestore.types['geometry_msgs/msg/Twist']
2✔
469
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
470
            Path = typestore.types['nav_msgs/msg/Path']
2✔
471
            Pose = typestore.types['geometry_msgs/msg/Pose']
2✔
472
            Point = typestore.types['geometry_msgs/msg/Point']
2✔
473
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
474

475
            if msg_type == "Odometry":
2✔
476
            
477
                return Odometry(Header(stamp=Time(sec=int(seconds), 
2✔
478
                                                nanosec=int(nanoseconds)), 
479
                                frame_id=self.frame_id),
480
                                child_frame_id=self.child_frame_id,
481
                                pose=PoseWithCovariance(pose=Pose(position=Point(x=self.positions[i][0],
482
                                                                    y=self.positions[i][1],
483
                                                                    z=self.positions[i][2]),
484
                                                        orientation=Quaternion(x=self.orientations[i][0],
485
                                                                                y=self.orientations[i][1],
486
                                                                                z=self.orientations[i][2],
487
                                                                                w=self.orientations[i][3])),
488
                                                        covariance=np.zeros(36)),
489
                                twist=TwistWithCovariance(twist=Twist(linear=Vector3(x=0, # Currently doesn't support Twist
490
                                                                                    y=0,
491
                                                                                    z=0,),
492
                                                                    angular=Vector3(x=0,
493
                                                                                    y=0,
494
                                                                                    z=0,)),
495
                                                        covariance=np.zeros(36)))
496
            elif msg_type == "Path":
2✔
497

498
                # Pre-calculate all the poses
499
                if len(self.poses) != self.len():
2✔
500
                    PoseStamped = typestore.types['geometry_msgs/msg/PoseStamped']
2✔
501

502
                    for j in range(self.len()):
2✔
503
                        seconds, nanoseconds = self._extract_seconds_and_nanoseconds(j)
2✔
504
                        self.poses.append(PoseStamped(Header(stamp=Time(sec=int(seconds), 
2✔
505
                                                                        nanosec=int(nanoseconds)),
506
                                                            frame_id=self.frame_id),
507
                                                    pose=Pose(position=Point(x=self.positions[j][0],
508
                                                                            y=self.positions[j][1],
509
                                                                            z=self.positions[j][2]),
510
                                    orientation=Quaternion(x=self.orientations[j][0],
511
                                                            y=self.orientations[j][1],
512
                                                            z=self.orientations[j][2],
513
                                                            w=self.orientations[j][3]))))
514

515
                return Path(Header(stamp=Time(sec=int(seconds), 
2✔
516
                                            nanosec=int(nanoseconds)),
517
                                frame_id=self.frame_id),
518
                                poses=self.poses[0:i+1:10])
519
            else:
NEW
520
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSBAGS")
×
521
            
NEW
522
        elif lib_type == ROSMsgLibType.ROSPY:
×
NEW
523
            if msg_type == "maplab_msg/OdometryWithImuBiases":
×
524

NEW
525
                import rospy
×
NEW
526
                from maplab_msg.msg import OdometryWithImuBiases
×
NEW
527
                from geometry_msgs.msg import PoseWithCovariance, TwistWithCovariance, Point, Quaternion, Vector3
×
NEW
528
                from std_msgs.msg import Header
×
529

NEW
530
                msg = OdometryWithImuBiases()
×
NEW
531
                msg.header = Header()
×
NEW
532
                msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
NEW
533
                msg.header.frame_id = self.frame_id
×
NEW
534
                msg.child_frame_id = self.child_frame_id
×
NEW
535
                msg.pose = PoseWithCovariance()
×
NEW
536
                msg.pose.pose.position = Point()
×
NEW
537
                msg.pose.pose.position.x = float(self.positions[i][0])
×
NEW
538
                msg.pose.pose.position.y = float(self.positions[i][1])
×
NEW
539
                msg.pose.pose.position.z = float(self.positions[i][2])
×
NEW
540
                msg.pose.pose.orientation = Quaternion()
×
NEW
541
                msg.pose.pose.orientation.x = float(self.orientations[i][0])
×
NEW
542
                msg.pose.pose.orientation.y = float(self.orientations[i][1])
×
NEW
543
                msg.pose.pose.orientation.z = float(self.orientations[i][2])
×
NEW
544
                msg.pose.pose.orientation.w = float(self.orientations[i][3])
×
NEW
545
                msg.pose.covariance = np.zeros(36)
×
NEW
546
                msg.twist = TwistWithCovariance()
×
NEW
547
                msg.twist.twist.linear = Vector3()
×
NEW
548
                msg.twist.twist.linear.x = 0.0  # NOTE: Currently doesn't support Twist
×
NEW
549
                msg.twist.twist.linear.y = 0.0
×
NEW
550
                msg.twist.twist.linear.z = 0.0
×
NEW
551
                msg.twist.twist.angular = Vector3()
×
NEW
552
                msg.twist.twist.angular.x = 0.0
×
NEW
553
                msg.twist.twist.angular.y = 0.0
×
NEW
554
                msg.twist.twist.angular.z = 0.0
×
NEW
555
                msg.twist.covariance = np.zeros(36)
×
NEW
556
                msg.accel_bias = Vector3() # NOTE: Assumes IMU biases are zero
×
NEW
557
                msg.accel_bias.x = 0.0
×
NEW
558
                msg.accel_bias.y = 0.0
×
NEW
559
                msg.accel_bias.z = 0.0
×
NEW
560
                msg.gyro_bias = Vector3()
×
NEW
561
                msg.gyro_bias.x = 0.0
×
NEW
562
                msg.gyro_bias.y = 0.0
×
NEW
563
                msg.gyro_bias.z = 0.0
×
NEW
564
                msg.odometry_state = 0 # Assumes default state
×
NEW
565
                return msg
×
566

567
            else:
NEW
568
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSPY")
×
569
        
NEW
570
        elif lib_type == ROSMsgLibType.RCLPY:
×
NEW
571
            if msg_type == "Path":
×
572
                
NEW
573
                from builtin_interfaces.msg import Time
×
NEW
574
                from std_msgs.msg import Header
×
575

576
                # Pre-calculate all the poses
NEW
577
                if len(self.poses_rclpy) != self.len():
×
578

NEW
579
                    from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
×
580

NEW
581
                    for j in range(self.len()):
×
NEW
582
                        seconds, nanoseconds = self._extract_seconds_and_nanoseconds(j)
×
NEW
583
                        self.poses_rclpy.append(PoseStamped(Header(stamp=Time(sec=int(seconds), 
×
584
                                                                        nanosec=int(nanoseconds)),
585
                                                            frame_id=self.frame_id),
586
                                                    pose=Pose(position=Point(x=self.positions[j][0],
587
                                                                            y=self.positions[j][1],
588
                                                                            z=self.positions[j][2]),
589
                                    orientation=Quaternion(x=self.orientations[j][0],
590
                                                            y=self.orientations[j][1],
591
                                                            z=self.orientations[j][2],
592
                                                            w=self.orientations[j][3]))))
593

NEW
594
                return Path(Header(stamp=Time(sec=int(seconds), 
×
595
                                            nanosec=int(nanoseconds)),
596
                                frame_id=self.frame_id),
597
                                poses=self.poses_rclpy[0:i+1:10])
598
            else:
NEW
599
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.RCLPY")
×
600
        else:
601
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg()!")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc