• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 24098054854

07 Apr 2026 06:35PM UTC coverage: 80.413%. First build
24098054854

Pull #18

github

web-flow
Merge cf8c0401c into 7b9573aeb
Pull Request #18: Develop

378 of 598 new or added lines in 10 files covered. (63.21%)

2882 of 3584 relevant lines covered (80.41%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.33
/src/robotdataprocess/data_types/OdometryData.py
1
from __future__ import annotations
2✔
2

3
from ..conversion_utils import col_to_dec_arr, dec_arr_to_float_arr
2✔
4
from .Data import CoordinateFrame, ROSMsgLibType
2✔
5
import decimal
2✔
6
from decimal import Decimal
2✔
7
from evo.core import geometry
2✔
8
import matplotlib.pyplot as plt
2✔
9
from ..ModuleImporter import ModuleImporter
2✔
10
import numpy as np
2✔
11
from numpy.typing import NDArray
2✔
12
from .PathData import PathData
2✔
13
from pathlib import Path
2✔
14
from ..ros.Ros2BagWrapper import Ros2BagWrapper
2✔
15
from rosbags.rosbag2 import Reader as Reader2
2✔
16
from rosbags.typesys import Stores, get_typestore
2✔
17
from rosbags.typesys.store import Typestore
2✔
18
from scipy.spatial.transform import Rotation as R
2✔
19
from typeguard import typechecked
2✔
20
from typing import Union, List, Tuple, Any
2✔
21
import tqdm
2✔
22

23
PATH_SLICE_STEP = 40
2✔
24

25
@typechecked
2✔
26
class OdometryData(PathData):
2✔
27
    """
28
    Odometry data extending PathData with a child frame ID and ROS message caching.
29

30
    Supports loading from ROS2 bags, CSV files, and TXT files, and exporting
31
    to CSV or ROS messages (Odometry, Path, and maplab OdometryWithImuBiases).
32

33
    Attributes:
34
        child_frame_id: The frame whose pose is described by this odometry (e.g. ``"base_link"``).
35
        poses: Cached rosbags PoseStamped messages (rebuilt after any mutation).
36
        poses_rclpy: Cached rclpy/rospy PoseStamped messages (rebuilt after any mutation).
37
    """
38

39
    # Define odometry-specific data attributes
40
    child_frame_id: str
2✔
41
    poses: list # Saved nav_msgs/msg/Pose for rosbags
2✔
42
    poses_rclpy: list # Saved nav_msgs/msg/Pose for rclpy
2✔
43

44
    def __init__(self, frame_id: str, child_frame_id: str, timestamps: Union[np.ndarray, list], 
2✔
45
                 positions: Union[np.ndarray, list], orientations: Union[np.ndarray, list], frame: CoordinateFrame):
46
        
47
        # Copy initial values into attributes
48
        super().__init__(frame_id, timestamps, positions, orientations, frame)
2✔
49
        self.child_frame_id: str = child_frame_id
2✔
50
        self.poses: list = []
2✔
51
        self.poses_rclpy: list = []
2✔
52

53
        # Check to ensure that all arrays have same length
54
        if len(self.timestamps) != len(self.positions) or len(self.positions) != len(self.orientations):
2✔
55
            raise ValueError("Lengths of timestamp, position, and orientation arrays are not equal!")
2✔
56

57
    def _invalidate_cache(self):
2✔
58
        """ Clears cached ROS message data after mutations. """
59
        self.poses = []
2✔
60
        self.poses_rclpy = []
2✔
61

62
    # =========================================================================
63
    # ============================ Class Methods ==============================
64
    # =========================================================================
65

66
    @classmethod
2✔
67
    def from_ros2_bag(cls, bag_path: Union[Path, str], odom_topic: str, frame: CoordinateFrame):
2✔
68
        """
69
        Creates a class structure from a ROS2 bag file with an Odometry topic.
70

71
        Args:
72
            bag_path (Path | str): Path to the ROS2 bag file.
73
            odom_topic (str): Topic of the Odometry messages.
74
        Returns:
75
            OdometryData: Instance of this class.
76
        """
77

78
        # Get topic message count and typestore
79
        bag_wrapper = Ros2BagWrapper(bag_path, None)
2✔
80
        typestore: Typestore = bag_wrapper.get_typestore()
2✔
81
        num_msgs: int = bag_wrapper.get_topic_count(odom_topic)
2✔
82
        
83
        # Pre-allocate arrays (memory-mapped or otherwise)
84
        timestamps_np = np.zeros(num_msgs, dtype=Decimal)
2✔
85
        positions_np = np.zeros((num_msgs, 3), dtype=Decimal)
2✔
86
        orientations_np = np.zeros((num_msgs, 4), dtype=Decimal)
2✔
87

88
        # Setup tqdm bar & counter
89
        pbar = tqdm.tqdm(total=num_msgs, desc="Extracting Odometry...", unit=" msgs")
2✔
90
        i = 0
2✔
91

92
        # Extract the odometry information
93
        frame_id, child_frame_id = None, None
2✔
94
        with Reader2(str(bag_path)) as reader:
2✔
95

96
            # Extract frames from first message
97
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
98
            for conn, timestamp, rawdata in reader.messages(connections=connections):  
2✔
99
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
100
                frame_id = msg.header.frame_id
2✔
101
                child_frame_id = msg.child_frame_id
2✔
102
                break
2✔
103

104
            # Extract individual message data
105
            connections = [x for x in reader.connections if x.topic == odom_topic]
2✔
106
            for conn, timestamp, rawdata in reader.messages(connections=connections):
2✔
107
                msg = typestore.deserialize_cdr(rawdata, conn.msgtype)
2✔
108
                
109
                timestamps_np[i] = bag_wrapper.extract_timestamp(msg)
2✔
110
                pos = msg.pose.pose.position
2✔
111
                positions_np[i] = np.array([Decimal(pos.x), Decimal(pos.y), Decimal(pos.z)])
2✔
112
                ori = msg.pose.pose.orientation
2✔
113
                orientations_np[i] = np.array([Decimal(ori.x), Decimal(ori.y), Decimal(ori.z), Decimal(ori.w)])
2✔
114

115
                # NOTE: Doesn't support Twist information currently
116

117
                # Increment the count
118
                i += 1
2✔
119
                pbar.update(1)
2✔
120

121
        # Create an OdometryData class
122
        return cls(frame_id, child_frame_id, timestamps_np, positions_np, orientations_np, frame)
2✔
123
    
124
    @classmethod
2✔
125
    def from_csv(cls, csv_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,
2✔
126
                 header_included: bool, column_to_data: Union[List[int], None] = None,
127
                 separator: Union[str, None] = None, filter: Union[Tuple[str, str], None] = None,
128
                 ts_in_ns: bool = False, reorder_data: bool = False):
129
        """
130
        Creates a class structure from a csv file.
131

132
        Args:
133
            csv_path (Path | str): Path to the CSV file.
134
            frame_id (str): The frame that this odometry is relative to.
135
            child_frame_id (str): The frame whose pose is represented by this odometry.
136
            frame (CoordinateFrame): The coordinate system convention of this data.
137
            header_included (bool): If this csv file has a header, so we can remove it.
138
            column_to_data (list[int]): Tells the algorithms which columns in the csv contain which
139
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus,
140
                index 0 of column_to_data should be the column that timestamp data is found in the
141
                csv file. Set to None to use [0,1,2,3,4,5,6,7].
142
            separator (str | None): The separator used in the csv file. If None, will use a comma by default.
143
            filter: A tuple of (column_name, value), where only rows with column_name == value will be kept. If
144
                csv file has no headers, then `column_name` should be the index of the column as a string.
145
            ts_in_ns (bool): If True, assumes timestamps are in nanoseconds and converts to seconds. Otherwise,
146
                assumes timestamps are already in seconds.
147
            reorder_data (bool): If True, reorders the data to be in order of timestamps. If False,
148
                assumes data is already ordered by timestamp.
149

150
        Returns:
151
            OdometryData: Instance of this class.
152
        """
153

154
        path_data = super().from_csv(csv_path, frame_id, frame, header_included, column_to_data,
2✔
155
                                     separator, filter, ts_in_ns, reorder_data)
156
        return cls(frame_id, child_frame_id, path_data.timestamps, path_data.positions, path_data.orientations, frame)
2✔
157
    
158
    @classmethod
2✔
159
    def from_txt(cls, file_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame,
2✔
160
                 header_included: bool, column_to_data: Union[List[int], None] = None):
161
        """
162
        Creates an OdometryData class from a text file.
163

164
        Args:
165
            file_path (Path | str): Path to the file containing the odometry data.
166
            frame_id (str): The frame where this odometry is relative to.
167
            child_frame_id (str): The frame whose pose is represented by this odometry.
168
            frame (CoordinateFrame): The coordinate system convention of this data.
169
            header_included (bool): If this text file has a header, so we can remove it.
170
            column_to_data (list[int]): Tells the algorithms which columns in the text file contain which
171
                of the following data: ['timestamp', 'x', 'y', 'z', 'qw', 'qx', 'qy', 'qz']. Thus,
172
                index 0 of column_to_data should be the column that timestamp data is found in the
173
                text file. Set to None to use [0,1,2,3,4,5,6,7].
174
        Returns:
175
            OdometryData: Instance of this class.
176
        """
177

178
        path_data = super().from_txt(file_path, frame_id, frame, header_included, column_to_data)
2✔
179
        return cls(frame_id, child_frame_id, path_data.timestamps, path_data.positions, path_data.orientations, frame)
2✔
180

181
    @classmethod
2✔
182
    def from_tum(cls, file_path: Union[Path, str], frame_id: str, child_frame_id: str, frame: CoordinateFrame):
2✔
183
        """
184
        Creates an OdometryData class from a TUM RGB-D dataset trajectory format text file.
185

186
        Each row must contain 8 space-separated values::
187

188
            timestamp x y z q_x q_y q_z q_w
189

190
        where ``timestamp`` is in seconds and the orientation quaternion is in
191
        ``(x, y, z, w)`` order.
192

193
        Args:
194
            file_path (Path | str): Path to the TUM trajectory file.
195
            frame_id (str): The frame where this odometry is relative to.
196
            child_frame_id (str): The frame whose pose is represented by this odometry.
197
            frame (CoordinateFrame): The coordinate system convention of this data.
198

199
        Returns:
200
            OdometryData: Instance of this class.
201
        """
202
        # TUM order: ts x y z qx qy qz qw
203
        # column_to_data: ts=0, x=1, y=2, z=3, qw=7, qx=4, qy=5, qz=6
204
        return cls.from_txt(file_path, frame_id, child_frame_id, frame,
2✔
205
                            header_included=False,
206
                            column_to_data=[0, 1, 2, 3, 7, 4, 5, 6])
207
    
208
    # =========================================================================
209
    # =========================== Conversion to ROS ===========================
210
    # =========================================================================
211

212
    @staticmethod
2✔
213
    def get_ros_msg_type(lib_type: ROSMsgLibType, msg_type: str = "Odometry") -> Any:
2✔
214
        """
215
        Return the ROS message type class for odometry-related messages.
216

217
        Args:
218
            lib_type: Which ROS message library to use.
219
            msg_type: The message type name. Supported values are ``"Odometry"``,
220
                ``"Path"``, ``"TFMessage"``, and ``"maplab_msg/OdometryWithImuBiases"``
221
                (ROSPY only).
222

223
        Returns:
224
            The ROS message type class.
225

226
        Raises:
227
            ValueError: If ``msg_type`` is not supported for the given ``lib_type``.
228
            NotImplementedError: If ``lib_type`` is not supported.
229
        """
230
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
231
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
232
            if msg_type == "Odometry":
2✔
233
                return typestore.types['nav_msgs/msg/Odometry'].__msgtype__
2✔
234
            elif msg_type == "Path":
2✔
235
                return typestore.types['nav_msgs/msg/Path'].__msgtype__
2✔
236
            elif msg_type == "TFMessage":
2✔
237
                return typestore.types['tf2_msgs/msg/TFMessage'].__msgtype__
2✔
238
            else:
239
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
240
        elif lib_type == ROSMsgLibType.RCLPY:
2✔
241
            if msg_type == "Path":
2✔
242
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
2✔
NEW
243
            elif msg_type == "TFMessage":
×
NEW
244
                return ModuleImporter.get_module_attribute('tf2_msgs.msg', 'TFMessage')
×
245
            else:
246
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
247
        elif lib_type == ROSMsgLibType.ROSPY:
×
248
            if msg_type == "Odometry":
×
249
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Odometry')
×
250
            if msg_type == "maplab_msg/OdometryWithImuBiases":
×
251
                return ModuleImporter.get_module_attribute('maplab_msgs.msg', 'OdometryWithImuBiases')
×
252
            elif msg_type == "Path":
×
253
                return ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
×
NEW
254
            elif msg_type == "TFMessage":
×
NEW
255
                return ModuleImporter.get_module_attribute('tf2_msgs.msg', 'TFMessage')
×
256
            else:
257
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type}")
×
258
        else:
259
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg_type()!")
×
260
    
261
    def _extract_seconds_and_nanoseconds(self, i: int):
2✔
262
        seconds = int(self.timestamps[i])
2✔
263
        nanoseconds = (self.timestamps[i] - self.timestamps[i].to_integral_value(rounding=decimal.ROUND_DOWN)) \
2✔
264
                        * Decimal("1e9").to_integral_value(decimal.ROUND_HALF_EVEN)
265
        return seconds, nanoseconds
2✔
266
    
267
    def get_ros_msg(self, lib_type: ROSMsgLibType, i: int, msg_type: str = "Odometry"):
2✔
268
        """
269
        Gets an Odometry ROS message corresponding to the data at index i.
270

271
        Args:
272
            lib_type: Which ROS message library to use.
273
            i: The index of the odometry data to convert.
274
            msg_type: The message type name (``"Odometry"``, ``"Path"``,
275
                ``"TFMessage"``, or ``"maplab_msg/OdometryWithImuBiases"``).
276

277
        Raises:
278
            IndexError: If ``i`` is outside the data bounds.
279
            ValueError: If ``msg_type`` is not supported for the given ``lib_type``.
280
        """
281

282
        # Check to make sure index is within data bounds
283
        if i < 0 or i >= self.len():
2✔
284
            raise IndexError(f"Index {i} is out of bounds!")
×
285
        
286
        # Extract seconds and nanoseconds
287
        seconds, nanoseconds = self._extract_seconds_and_nanoseconds(i)
2✔
288

289
        # Write the data into the new msg
290
        if lib_type == ROSMsgLibType.ROSBAGS:
2✔
291
            typestore = get_typestore(Stores.ROS2_HUMBLE)
2✔
292

293
            Odometry = typestore.types['nav_msgs/msg/Odometry']
2✔
294
            Header = typestore.types['std_msgs/msg/Header']
2✔
295
            Time = typestore.types['builtin_interfaces/msg/Time']
2✔
296
            PoseWithCovariance = typestore.types['geometry_msgs/msg/PoseWithCovariance']
2✔
297
            TwistWithCovariance = typestore.types['geometry_msgs/msg/TwistWithCovariance']
2✔
298
            Twist = typestore.types['geometry_msgs/msg/Twist']
2✔
299
            Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
300
            Path = typestore.types['nav_msgs/msg/Path']
2✔
301
            Pose = typestore.types['geometry_msgs/msg/Pose']
2✔
302
            Point = typestore.types['geometry_msgs/msg/Point']
2✔
303
            Quaternion = typestore.types['geometry_msgs/msg/Quaternion']
2✔
304

305
            if msg_type == "Odometry":
2✔
306
            
307
                return Odometry(Header(stamp=Time(sec=int(seconds), 
2✔
308
                                                nanosec=int(nanoseconds)), 
309
                                frame_id=self.frame_id),
310
                                child_frame_id=self.child_frame_id,
311
                                pose=PoseWithCovariance(pose=Pose(position=Point(x=self.positions[i][0],
312
                                                                    y=self.positions[i][1],
313
                                                                    z=self.positions[i][2]),
314
                                                        orientation=Quaternion(x=self.orientations[i][0],
315
                                                                                y=self.orientations[i][1],
316
                                                                                z=self.orientations[i][2],
317
                                                                                w=self.orientations[i][3])),
318
                                                        covariance=np.zeros(36)),
319
                                twist=TwistWithCovariance(twist=Twist(linear=Vector3(x=0, # Currently doesn't support Twist
320
                                                                                    y=0,
321
                                                                                    z=0,),
322
                                                                    angular=Vector3(x=0,
323
                                                                                    y=0,
324
                                                                                    z=0,)),
325
                                                        covariance=np.zeros(36)))
326
            elif msg_type == "Path":
2✔
327

328
                # Pre-calculate all the poses
329
                if len(self.poses) != self.len():
2✔
330
                    PoseStamped = typestore.types['geometry_msgs/msg/PoseStamped']
2✔
331

332
                    for j in range(self.len()):
2✔
333
                        sec, nanosec = self._extract_seconds_and_nanoseconds(j)
2✔
334
                        self.poses.append(PoseStamped(Header(stamp=Time(sec=int(sec), 
2✔
335
                                                                        nanosec=int(nanosec)),
336
                                                            frame_id=self.frame_id),
337
                                                    pose=Pose(position=Point(x=self.positions[j][0],
338
                                                                            y=self.positions[j][1],
339
                                                                            z=self.positions[j][2]),
340
                                    orientation=Quaternion(x=self.orientations[j][0],
341
                                                            y=self.orientations[j][1],
342
                                                            z=self.orientations[j][2],
343
                                                            w=self.orientations[j][3]))))
344

345
                return Path(Header(stamp=Time(sec=int(seconds),
2✔
346
                                            nanosec=int(nanoseconds)),
347
                                frame_id=self.frame_id),
348
                                poses=self.poses[0:i+1:PATH_SLICE_STEP])
349
            elif msg_type == "TFMessage":
2✔
350

351
                TFMessage = typestore.types['tf2_msgs/msg/TFMessage']
2✔
352
                TransformStamped = typestore.types['geometry_msgs/msg/TransformStamped']
2✔
353
                Transform = typestore.types['geometry_msgs/msg/Transform']
2✔
354
                Vector3 = typestore.types['geometry_msgs/msg/Vector3']
2✔
355

356
                return TFMessage(transforms=[
2✔
357
                    TransformStamped(
358
                        header=Header(stamp=Time(sec=int(seconds), nanosec=int(nanoseconds)),
359
                                      frame_id=self.frame_id),
360
                        child_frame_id=self.child_frame_id,
361
                        transform=Transform(
362
                            translation=Vector3(x=self.positions[i][0],
363
                                                y=self.positions[i][1],
364
                                                z=self.positions[i][2]),
365
                            rotation=Quaternion(x=self.orientations[i][0],
366
                                                y=self.orientations[i][1],
367
                                                z=self.orientations[i][2],
368
                                                w=self.orientations[i][3])))])
369
            else:
370
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSBAGS")
×
371
            
372
        elif lib_type == ROSMsgLibType.ROSPY or lib_type == ROSMsgLibType.RCLPY:
2✔
373
            if msg_type == "Odometry":
2✔
374

375
                Odometry = ModuleImporter.get_module_attribute('nav_msgs.msg', 'Odometry')
×
376
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
×
377
                PoseWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseWithCovariance')
×
378
                TwistWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'TwistWithCovariance')
×
379
                Pose = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Pose')
×
380
                Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
×
381
                Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
×
382
                Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
×
383

384
                msg = Odometry()
×
385
                msg.header = Header()
×
386
                msg.header.frame_id = self.frame_id
×
387
                if lib_type == ROSMsgLibType.RCLPY: 
×
388
                    Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
×
389
                    msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
390
                else:
391
                    rospy = ModuleImporter.get_module('rospy')
×
392
                    msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
393
                msg.child_frame_id = self.child_frame_id
×
394

395
                msg.pose = PoseWithCovariance()
×
396
                msg.pose.pose = Pose()
×
397
                msg.pose.pose.position = Point()
×
398
                msg.pose.pose.position.x = float(self.positions[i][0])
×
399
                msg.pose.pose.position.y = float(self.positions[i][1])
×
400
                msg.pose.pose.position.z = float(self.positions[i][2])
×
401
                msg.pose.pose.orientation = Quaternion()
×
402
                msg.pose.pose.orientation.x = float(self.orientations[i][0])
×
403
                msg.pose.pose.orientation.y = float(self.orientations[i][1])
×
404
                msg.pose.pose.orientation.z = float(self.orientations[i][2])
×
405
                msg.pose.pose.orientation.w = float(self.orientations[i][3])
×
406
                msg.pose.covariance = np.zeros(36) # NOTE: Assumes covariance of zero.
×
407
                msg.twist = TwistWithCovariance()
×
408
                msg.twist.twist.linear = Vector3()
×
409
                msg.twist.twist.linear.x = 0.0  # NOTE: Currently doesn't support Twist
×
410
                msg.twist.twist.linear.y = 0.0
×
411
                msg.twist.twist.linear.z = 0.0
×
412
                msg.twist.twist.angular = Vector3()
×
413
                msg.twist.twist.angular.x = 0.0
×
414
                msg.twist.twist.angular.y = 0.0
×
415
                msg.twist.twist.angular.z = 0.0
×
416
                msg.twist.covariance = np.zeros(36)
×
417
                return msg
×
418
            
419
            elif msg_type == "maplab_msg/OdometryWithImuBiases":
2✔
420
                
421
                if lib_type == ROSMsgLibType.RCLPY:
×
422
                    raise ValueError("maplab_msg/OdometryWithImuBiases is not supported for RCLPY!")
×
423

424
                rospy = ModuleImporter.get_module('rospy')
×
425
                OdometryWithImuBiases = ModuleImporter.get_module_attribute('maplab_msgs.msg', 'OdometryWithImuBiases')
×
426
                PoseWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseWithCovariance')
×
427
                TwistWithCovariance = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'TwistWithCovariance')
×
428
                Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
×
429
                Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
×
430
                Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
×
431
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
×
432

433
                msg = OdometryWithImuBiases()
×
434
                msg.header = Header()
×
435
                msg.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
436
                msg.header.frame_id = self.frame_id
×
437
                msg.child_frame_id = self.child_frame_id
×
438
                msg.pose = PoseWithCovariance()
×
439
                msg.pose.pose.position = Point()
×
440
                msg.pose.pose.position.x = float(self.positions[i][0])
×
441
                msg.pose.pose.position.y = float(self.positions[i][1])
×
442
                msg.pose.pose.position.z = float(self.positions[i][2])
×
443
                msg.pose.pose.orientation = Quaternion()
×
444
                msg.pose.pose.orientation.x = float(self.orientations[i][0])
×
445
                msg.pose.pose.orientation.y = float(self.orientations[i][1])
×
446
                msg.pose.pose.orientation.z = float(self.orientations[i][2])
×
447
                msg.pose.pose.orientation.w = float(self.orientations[i][3])
×
448
                msg.pose.covariance = np.zeros(36) # NOTE: Assumes covariance of zero.
×
449
                msg.twist = TwistWithCovariance()
×
450
                msg.twist.twist.linear = Vector3()
×
451
                msg.twist.twist.linear.x = 0.0  # NOTE: Currently doesn't support Twist
×
452
                msg.twist.twist.linear.y = 0.0
×
453
                msg.twist.twist.linear.z = 0.0
×
454
                msg.twist.twist.angular = Vector3()
×
455
                msg.twist.twist.angular.x = 0.0
×
456
                msg.twist.twist.angular.y = 0.0
×
457
                msg.twist.twist.angular.z = 0.0
×
458
                msg.twist.covariance = np.zeros(36)
×
459
                msg.accel_bias = Vector3() # NOTE: Assumes IMU biases are zero
×
460
                msg.accel_bias.x = 0.0
×
461
                msg.accel_bias.y = 0.0
×
462
                msg.accel_bias.z = 0.0
×
463
                msg.gyro_bias = Vector3()
×
464
                msg.gyro_bias.x = 0.0
×
465
                msg.gyro_bias.y = 0.0
×
466
                msg.gyro_bias.z = 0.0
×
467
                msg.odometry_state = 0 # NOTE: Assumes default state
×
468
                return msg
×
469

470
            elif msg_type == "Path":
2✔
471
                
472
                Path = ModuleImporter.get_module_attribute('nav_msgs.msg', 'Path')
2✔
473
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
2✔
474

475
                # Pre-calculate all the poses
476
                if len(self.poses_rclpy) != self.len():
2✔
477

478
                    PoseStamped = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'PoseStamped')
2✔
479
                    Pose = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Pose')
2✔
480
                    Point = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Point')
2✔
481
                    Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
2✔
482

483
                    for j in range(self.len()):
2✔
484
                        sec, nanosec = self._extract_seconds_and_nanoseconds(j)
2✔
485
                        pose_msg = PoseStamped()
2✔
486
                        pose_msg.header = Header()
2✔
487
                        if lib_type == ROSMsgLibType.RCLPY:
2✔
488
                            Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
489
                            pose_msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
490
                        else:
491
                            rospy = ModuleImporter.get_module('rospy')
×
492
                            pose_msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
493
                        pose_msg.header.frame_id = self.frame_id
2✔
494
                        pose_msg.pose = Pose()
2✔
495
                        pose_msg.pose.position = Point()
2✔
496
                        pose_msg.pose.position.x = float(self.positions[j][0])
2✔
497
                        pose_msg.pose.position.y = float(self.positions[j][1])
2✔
498
                        pose_msg.pose.position.z = float(self.positions[j][2])
2✔
499
                        pose_msg.pose.orientation = Quaternion()
2✔
500
                        pose_msg.pose.orientation.x = float(self.orientations[j][0])
2✔
501
                        pose_msg.pose.orientation.y = float(self.orientations[j][1])
2✔
502
                        pose_msg.pose.orientation.z = float(self.orientations[j][2])
2✔
503
                        pose_msg.pose.orientation.w = float(self.orientations[j][3])
2✔
504
                        self.poses_rclpy.append(pose_msg)
2✔
505

506
                msg = Path()
2✔
507
                msg.header = Header()
2✔
508
                if lib_type == ROSMsgLibType.RCLPY:
2✔
509
                    Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
2✔
510
                    msg.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
2✔
511
                else:
512
                    rospy = ModuleImporter.get_module('rospy')
×
513
                    msg.header.stamp = rospy.Time(secs=int(seconds), nsecs=int(nanoseconds))
×
514
                msg.header.frame_id = self.frame_id
2✔
515
                msg.poses = self.poses_rclpy[0:i+1:PATH_SLICE_STEP]
2✔
516
                return msg
2✔
517

NEW
518
            elif msg_type == "TFMessage":
×
519

NEW
520
                TFMessage = ModuleImporter.get_module_attribute('tf2_msgs.msg', 'TFMessage')
×
NEW
521
                TransformStamped = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'TransformStamped')
×
NEW
522
                Transform = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Transform')
×
NEW
523
                Vector3 = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Vector3')
×
NEW
524
                Quaternion = ModuleImporter.get_module_attribute('geometry_msgs.msg', 'Quaternion')
×
NEW
525
                Header = ModuleImporter.get_module_attribute('std_msgs.msg', 'Header')
×
526

NEW
527
                ts = TransformStamped()
×
NEW
528
                ts.header = Header()
×
NEW
529
                if lib_type == ROSMsgLibType.RCLPY:
×
NEW
530
                    Time = ModuleImporter.get_module_attribute('rclpy.time', 'Time')
×
NEW
531
                    ts.header.stamp = Time(seconds=seconds, nanoseconds=int(nanoseconds)).to_msg()
×
532
                else:
NEW
533
                    rospy = ModuleImporter.get_module('rospy')
×
NEW
534
                    ts.header.stamp = rospy.Time(secs=seconds, nsecs=int(nanoseconds))
×
NEW
535
                ts.header.frame_id = self.frame_id
×
NEW
536
                ts.child_frame_id = self.child_frame_id
×
NEW
537
                ts.transform = Transform()
×
NEW
538
                ts.transform.translation = Vector3()
×
NEW
539
                ts.transform.translation.x = float(self.positions[i][0])
×
NEW
540
                ts.transform.translation.y = float(self.positions[i][1])
×
NEW
541
                ts.transform.translation.z = float(self.positions[i][2])
×
NEW
542
                ts.transform.rotation = Quaternion()
×
NEW
543
                ts.transform.rotation.x = float(self.orientations[i][0])
×
NEW
544
                ts.transform.rotation.y = float(self.orientations[i][1])
×
NEW
545
                ts.transform.rotation.z = float(self.orientations[i][2])
×
NEW
546
                ts.transform.rotation.w = float(self.orientations[i][3])
×
547

NEW
548
                msg = TFMessage()
×
NEW
549
                msg.transforms = [ts]
×
NEW
550
                return msg
×
551

552
            else:
553
                raise ValueError(f"Unsupported msg_type for OdometryData: {msg_type} with ROSMsgLibType.ROSPY or ROSMsgLibType.RCLPY.")
×
554
        else:
555
            raise NotImplementedError(f"Unsupported ROSMsgLibType {lib_type} for OdometryData.get_ros_msg()!")
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc