• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 20820881882

08 Jan 2026 02:50PM UTC coverage: 62.482% (-12.2%) from 74.672%
20820881882

Pull #10

github

DanielChaseButterfield
Add publish script for OpenVINS
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

181 of 277 branches covered (65.34%)

Branch coverage included in aggregate %.

212 of 546 new or added lines in 11 files covered. (38.83%)

3 existing lines in 2 files now uncovered.

1118 of 1802 relevant lines covered (62.04%)

1.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.1
/src/robotdataprocess/CmdLineInterface.py
1
from collections import defaultdict
2✔
2
from .data_types.Data import CoordinateFrame
2✔
3
from .data_types.ImageData.ImageDataInMemory import ImageDataInMemory
2✔
4
from .data_types.OdometryData import OdometryData
2✔
5
from decimal import Decimal
2✔
6
import matplotlib.pyplot as plt
2✔
7
import numpy as np
2✔
8
from pathlib import Path
2✔
9
from .ros.Ros2BagWrapper import Ros2BagWrapper
2✔
10
from rosbags.rosbag2 import Reader as Reader2
2✔
11
from rosbags.rosbag2 import Writer as Writer2
2✔
12
from scipy.spatial.transform import Rotation as R
2✔
13
import tqdm
2✔
14
import yaml
2✔
15

16
class CmdLineInterface():
2✔
17

18
    def __init__(self, **kwargs):
2✔
19
        """
20
        Initialize the interface with the provided keyword arguments,
21
        which is assumed to be a dictionary.
22

23
        Args:
24
            **kwargs: Keyword arguments containing the configuration, including:
25
                - 'input_bag': Path to the input ROS2 bag file.
26
                - 'output_bag': Path to the output ROS2 bag file, if necessary.
27
                - 'operation_params': Dictionary containing operation parameters for specific manipulations.
28
                - 'external_msgs_path_ros2': Path to the directory containing external message definitions.
29
                - 'operation_to_run': The name of the operation to run, which should be a method of this class.
30
        """
31

32
        # Assign attributes from input arguments
33
        for key, value in kwargs.items():
2✔
34
            setattr(self, key, value)
2✔
35

36
        # Create a ROS2 bag wrapper
37
        self.bag_wrapper = Ros2BagWrapper(self.input_bag, self.external_msgs_path_ros2)
2✔
38

39
        # Run desired operation
40
        function = getattr(self, self.operation_to_run)
2✔
41
        function()
2✔
42

43
    @classmethod
2✔
44
    def from_yaml(cls, yaml_path: str):
2✔
45
        """
46
        Initialize the CmdLineInterface from a YAML file.
47

48
        Args:
49
            yaml_path (str): Path to the configuration.
50
        """
51
        with open(yaml_path, "r") as yaml_file:
×
52
            yaml_dict = yaml.safe_load(yaml_file)
×
53
            return cls(**yaml_dict)
×
54

55
    # ===============================================================
56
    # ======================== Operations ===========================
57
    # ===============================================================
58

59
    def hertz_analysis(self):
2✔
60
        """ Analyze the hertz of various topics in a ROS2 bag. """
61

62
        # Extract operation specific parameters
63
        topic: str = self.operation_params['hertz_analysis']['topic']
×
64
        output_folder: str = self.operation_params['hertz_analysis']['output_folder']
×
65
        expected_msgs: int = self.operation_params['hertz_analysis']['expected_msgs']
×
66
        max_msgs: int = self.operation_params['hertz_analysis']['max_msgs']
×
67
        try: robot_name: str = self.operation_params['hertz_analysis']['robot_name']
×
68
        except: robot_name = None
×
69

70
        self.bag_wrapper.hertz_analysis(topic, output_folder, expected_msgs, max_msgs, robot_name)
×
71

72
    def view_imu_data(self):
2✔
73
        """ Plot IMU data contained in a ROS2 bag. """
74

75
        # Extract operation specific parameters
76
        topic: str = self.operation_params['view_imu_data']['topic']
×
77
        output_folder: str = self.operation_params['view_imu_data']['output_folder']
×
78
        try: expected_msgs: int = self.operation_params['view_imu_data']['expected_msgs']
×
79
        except: expected_msgs = None
×
80
        try:
×
81
            data_range = self.operation_params['view_imu_data']['data_range']
×
82
            assert len(data_range) == 2
×
83
            data_range = tuple(data_range)
×
84
        except: data_range = None
×
85

86
        self.bag_wrapper.view_imu_data(topic, output_folder, expected_msgs, data_range)
×
87

88
    def downsample(self):
2✔
89
        """ Downsample a ROS2 bag file. """
90

91
        # Extract operation specific parameters
92
        topic_downsample_ratios: dict = self.operation_params['downsample']['topics'].copy()
2✔
93
        include_unmentioned_topics: bool = self.operation_params['downsample']['include_unmentioned_topics']
2✔
94
        self.bag_wrapper.downsample(self.output_bag, topic_downsample_ratios, include_unmentioned_topics)
2✔
95

96
    def crop(self):
2✔
97
        """ Crop a ROS2 bag file. """
98

99
        start_ts: float = self.operation_params['crop']['start_ts']
×
100
        end_ts: float = self.operation_params['crop']['end_ts']
×
101
        self.bag_wrapper.crop(self.output_bag, start_ts, end_ts)
×
102

103
    def convert_ros2_to_ros1(self):
2✔
104
        """ Convert a ROS2 bag from the bag wrapper into a ROS1 bag. """
105

106
        self.bag_wrapper.export_as_ros1(self.output_bag, self.external_msgs_path_ros1)
2✔
107

108
    def extract_odometry_to_csv(self):
2✔
109
        """ Extract odometry from a ROS2 bag and save in a csv file. """
110

111
        topic: str = self.operation_params['extract_odometry_to_csv']['topic']
2✔
112
        output_folder: str = self.operation_params['extract_odometry_to_csv']['output_file']
2✔
113
        add_noise: str = self.operation_params['extract_odometry_to_csv']['add_noise']
2✔
114
        xy_noise_std_per_frame: float = self.operation_params['extract_odometry_to_csv']['xy_noise_std_per_frame']
2✔
115
        z_noise_std_per_frame: str = self.operation_params['extract_odometry_to_csv']['z_noise_std_per_frame']
2✔
116
        shift_position_xy: float = self.operation_params['extract_odometry_to_csv']['shift_position_xy']
2✔
117
        shift_position_z: float = self.operation_params['extract_odometry_to_csv']['shift_position_z']
2✔
118

119
        odom_data = OdometryData.from_ros2_bag(self.input_bag, topic, CoordinateFrame.NONE)
2✔
120
        if add_noise:
2✔
121
            odom_data.add_folded_guassian_noise_to_position(xy_noise_std_per_frame, z_noise_std_per_frame)
×
122
            odom_data.shift_position(shift_position_xy, shift_position_xy, shift_position_z)
×
123
        odom_data.to_csv(output_folder)
2✔
124

125
    def extract_images_to_npy(self):
2✔
126
        """ Extract images from a ROS2 bag and saves them into .npy files. """
127

128
        topic: str = self.operation_params['extract_images_to_npy']['topic']
2✔
129
        output_folder: str = self.operation_params['extract_images_to_npy']['output_folder']
2✔
130
        ImageDataInMemory.from_ros2_bag(self.input_bag, topic, output_folder)
2✔
131

132
    def compare_timestamps_two_image_data(self):
2✔
133
        """ Compare timestamps between two ImageData instances. """
134

NEW
135
        data0 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_0'])
×
NEW
136
        data1 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_1'])
×
UNCOV
137
        data0.compare_timestamps(data1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc