• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 16376904896

18 Jul 2025 05:43PM UTC coverage: 70.588%. First build
16376904896

push

github

web-flow
(v0.1.0) Refactor to robotdataprocess, Major Data classes with Loading, Manipulation, and Saving. (#3)

* Coverage Functionality with coveralls

* Add Coverage Status Badge

* Remove clean action from github action

* Clarify assumed ROS2 and ROS1 versions

* New Message types and support for list attributes

* Optimizations to increase speed of ros2_to_ros1 conversion

* Fixed bug in downsample where requested downsample ratio wasn't matched perfectly and had undefined behaviour

* Add instructions for running unit tests, coverage, and profiling

* Increase to version 0.0.3

* Add support for rosgraph_msgs/msg/Clock

* Speedup downsample operation

* Initial code for IMU data viewer, not tested

* Major refactor to cleanly partition command line vs. rosbag vs. data functionality

* Remove hardcoded paths from tests

* Test for from_npy() method for ImageData

* Writing various message data types from non-ROS to rosbag

* Loading and Writing data from HERCULES v1.3

* Add missing test file

* Add missing dependency on OpenCV

* Load Odometry data from csv file and save to ROSbag

* Support to write Odometry Data to a nav_msgs/msg/Path message

* Support to convert IMU and Odometry data from NED to ROS frame

* Fix bugs in test cases

* Fixes to broken NED to ROS conversion for Odometry

* Refactor: Move rosbag manipulation to Ros2BagWrapper (#5)

* Refactor: Move rosbag manipulation to Ros2BagWrapper

- Moved hertz_analysis, view_imu_data, and downsample methods from rosbag_manip.py to Ros2BagWrapper.py.
- Renamed rosbag_manip.py to command_line.py.
- Renamed rosbag_manipulation class to CmdLineInterface.
- Updated command_line.py to use the new methods in Ros2BagWrapper.py.
- Updated __main__.py to reflect the file and class rename.

* Refactor name to robotdataprocess

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>

* Stereo rectifiation and save to raw image files, Upgrade to 0.1.0 to a... (continued)

619 of 895 new or added lines in 9 files covered. (69.16%)

816 of 1156 relevant lines covered (70.59%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.67
/src/robotdataprocess/CmdLineInterface.py
1
from collections import defaultdict
1✔
2
from .data_types.ImageData import ImageData
1✔
3
from .data_types.OdometryData import OdometryData
1✔
4
from decimal import Decimal
1✔
5
import matplotlib.pyplot as plt
1✔
6
import numpy as np
1✔
7
from pathlib import Path
1✔
8
from .rosbag.Ros2BagWrapper import Ros2BagWrapper
1✔
9
from rosbags.rosbag2 import Reader as Reader2
1✔
10
from rosbags.rosbag2 import Writer as Writer2
1✔
11
from scipy.spatial.transform import Rotation as R
1✔
12
import tqdm
1✔
13
import yaml
1✔
14

15
class CmdLineInterface():
1✔
16

17
    def __init__(self, **kwargs):
1✔
18
        """
19
        Initialize the interface with the provided keyword arguments,
20
        which is assumed to be a dictionary.
21

22
        Args:
23
            **kwargs: Keyword arguments containing the configuration, including:
24
                - 'input_bag': Path to the input ROS2 bag file.
25
                - 'output_bag': Path to the output ROS2 bag file, if necessary.
26
                - 'operation_params': Dictionary containing operation parameters for specific manipulations.
27
                - 'external_msgs_path_ros2': Path to the directory containing external message definitions.
28
                - 'operation_to_run': The name of the operation to run, which should be a method of this class.
29
        """
30

31
        # Assign attributes from input arguments
32
        for key, value in kwargs.items():
1✔
33
            setattr(self, key, value)
1✔
34

35
        # Create a ROS2 bag wrapper
36
        self.bag_wrapper = Ros2BagWrapper(self.input_bag, self.external_msgs_path_ros2)
1✔
37

38
        # Run desired operation
39
        function = getattr(self, self.operation_to_run)
1✔
40
        function()
1✔
41

42
    @classmethod
1✔
43
    def from_yaml(cls, yaml_path: str):
1✔
44
        """
45
        Initialize the CmdLineInterface from a YAML file.
46

47
        Args:
48
            yaml_path (str): Path to the configuration.
49
        """
NEW
50
        with open(yaml_path, "r") as yaml_file:
×
NEW
51
            yaml_dict = yaml.safe_load(yaml_file)
×
NEW
52
            return cls(**yaml_dict)
×
53

54
    # ===============================================================
55
    # ======================== Operations ===========================
56
    # ===============================================================
57

58
    def hertz_analysis(self):
1✔
59
        """ Analyze the hertz of various topics in a ROS2 bag. """
60

61
        # Extract operation specific parameters
NEW
62
        topic: str = self.operation_params['hertz_analysis']['topic']
×
NEW
63
        output_folder: str = self.operation_params['hertz_analysis']['output_folder']
×
NEW
64
        expected_msgs: int = self.operation_params['hertz_analysis']['expected_msgs']
×
NEW
65
        max_msgs: int = self.operation_params['hertz_analysis']['max_msgs']
×
NEW
66
        try: robot_name: str = self.operation_params['hertz_analysis']['robot_name']
×
NEW
67
        except: robot_name = None
×
68

NEW
69
        self.bag_wrapper.hertz_analysis(topic, output_folder, expected_msgs, max_msgs, robot_name)
×
70

71
    def view_imu_data(self):
1✔
72
        """ Plot IMU data contained in a ROS2 bag. """
73

74
        # Extract operation specific parameters
NEW
75
        topic: str = self.operation_params['view_imu_data']['topic']
×
NEW
76
        output_folder: str = self.operation_params['view_imu_data']['output_folder']
×
NEW
77
        try: expected_msgs: int = self.operation_params['view_imu_data']['expected_msgs']
×
NEW
78
        except: expected_msgs = None
×
NEW
79
        try:
×
NEW
80
            data_range = self.operation_params['view_imu_data']['data_range']
×
NEW
81
            assert len(data_range) == 2
×
NEW
82
            data_range = tuple(data_range)
×
NEW
83
        except: data_range = None
×
84

NEW
85
        self.bag_wrapper.view_imu_data(topic, output_folder, expected_msgs, data_range)
×
86

87
    def downsample(self):
1✔
88
        """ Downsample a ROS2 bag file. """
89

90
        # Extract operation specific parameters
91
        topic_downsample_ratios: dict = self.operation_params['downsample']['topics'].copy()
1✔
92
        include_unmentioned_topics: bool = self.operation_params['downsample']['include_unmentioned_topics']
1✔
93
        self.bag_wrapper.downsample(self.output_bag, topic_downsample_ratios, include_unmentioned_topics)
1✔
94

95
    def crop(self):
1✔
96
        """ Crop a ROS2 bag file. """
97

NEW
98
        start_ts: float = self.operation_params['crop']['start_ts']
×
NEW
99
        end_ts: float = self.operation_params['crop']['end_ts']
×
NEW
100
        self.bag_wrapper.crop(self.output_bag, start_ts, end_ts)
×
101

102
    def convert_ros2_to_ros1(self):
1✔
103
        """ Convert a ROS2 bag from the bag wrapper into a ROS1 bag. """
104

105
        self.bag_wrapper.export_as_ros1(self.output_bag, self.external_msgs_path_ros1)
1✔
106

107
    def extract_odometry_to_csv(self):
1✔
108
        """ Extract odometry from a ROS2 bag and save in a csv file. """
109

110
        topic: str = self.operation_params['extract_odometry_to_csv']['topic']
1✔
111
        output_folder: str = self.operation_params['extract_odometry_to_csv']['output_file']
1✔
112
        add_noise: str = self.operation_params['extract_odometry_to_csv']['add_noise']
1✔
113
        xy_noise_std_per_frame: float = self.operation_params['extract_odometry_to_csv']['xy_noise_std_per_frame']
1✔
114
        z_noise_std_per_frame: str = self.operation_params['extract_odometry_to_csv']['z_noise_std_per_frame']
1✔
115
        shift_position_xy: float = self.operation_params['extract_odometry_to_csv']['shift_position_xy']
1✔
116
        shift_position_z: float = self.operation_params['extract_odometry_to_csv']['shift_position_z']
1✔
117

118
        odom_data = OdometryData.from_ros2_bag(self.input_bag, topic)
1✔
119
        if add_noise:
1✔
NEW
120
            odom_data.add_folded_guassian_noise_to_position(xy_noise_std_per_frame, z_noise_std_per_frame)
×
NEW
121
            odom_data.shift_position(shift_position_xy, shift_position_xy, shift_position_z)
×
122
        odom_data.to_csv(output_folder)
1✔
123

124
    def extract_images_to_npy(self):
1✔
125
        """ Extract images from a ROS2 bag and saves them into .npy files. """
126

127
        topic: str = self.operation_params['extract_images_to_npy']['topic']
1✔
128
        output_folder: str = self.operation_params['extract_images_to_npy']['output_folder']
1✔
129
        ImageData.from_ros2_bag(self.input_bag, topic, output_folder)
1✔
130

131
    def compare_timestamps_two_image_data(self):
1✔
132
        """ Compare timestamps between two ImageData instances. """
133

NEW
134
        data0 = ImageData.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_0'])
×
NEW
135
        data1 = ImageData.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_1'])
×
NEW
136
        data0.compare_timestamps(data1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc