• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21218149827

21 Jan 2026 04:52PM UTC coverage: 75.402% (+0.7%) from 74.672%
21218149827

Pull #10

github

DanielChaseButterfield
Refactor hertz_analysis to make it part of Data
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

804 of 1098 new or added lines in 13 files covered. (73.22%)

3 existing lines in 2 files now uncovered.

1689 of 2240 relevant lines covered (75.4%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.12
/src/robotdataprocess/CmdLineInterface.py
1
from collections import defaultdict
2✔
2
from .data_types.Data import CoordinateFrame
2✔
3
from .data_types.ImageData.ImageDataInMemory import ImageDataInMemory
2✔
4
from .data_types.OdometryData import OdometryData
2✔
5
from decimal import Decimal
2✔
6
import matplotlib.pyplot as plt
2✔
7
import numpy as np
2✔
8
from pathlib import Path
2✔
9
from .ros.Ros2BagWrapper import Ros2BagWrapper
2✔
10
from rosbags.rosbag2 import Reader as Reader2
2✔
11
from rosbags.rosbag2 import Writer as Writer2
2✔
12
from scipy.spatial.transform import Rotation as R
2✔
13
import tqdm
2✔
14
import yaml
2✔
15

16
class CmdLineInterface():
2✔
17

18
    def __init__(self, **kwargs):
2✔
19
        """
20
        Initialize the interface with the provided keyword arguments,
21
        which is assumed to be a dictionary.
22

23
        Args:
24
            **kwargs: Keyword arguments containing the configuration, including:
25
                - 'input_bag': Path to the input ROS2 bag file.
26
                - 'output_bag': Path to the output ROS2 bag file, if necessary.
27
                - 'operation_params': Dictionary containing operation parameters for specific manipulations.
28
                - 'external_msgs_path_ros2': Path to the directory containing external message definitions.
29
                - 'operation_to_run': The name of the operation to run, which should be a method of this class.
30
        """
31

32
        # Assign attributes from input arguments
33
        for key, value in kwargs.items():
2✔
34
            setattr(self, key, value)
2✔
35

36
        # Create a ROS2 bag wrapper
37
        self.bag_wrapper = Ros2BagWrapper(self.input_bag, self.external_msgs_path_ros2)
2✔
38

39
        # Run desired operation
40
        function = getattr(self, self.operation_to_run)
2✔
41
        function()
2✔
42

43
    @classmethod
2✔
44
    def from_yaml(cls, yaml_path: str):
2✔
45
        """
46
        Initialize the CmdLineInterface from a YAML file.
47

48
        Args:
49
            yaml_path (str): Path to the configuration.
50
        """
51
        with open(yaml_path, "r") as yaml_file:
×
52
            yaml_dict = yaml.safe_load(yaml_file)
×
53
            return cls(**yaml_dict)
×
54

55
    # ===============================================================
56
    # ======================== Operations ===========================
57
    # ===============================================================
58

59
    def view_imu_data(self):
2✔
60
        """ Plot IMU data contained in a ROS2 bag. """
61

62
        # Extract operation specific parameters
63
        topic: str = self.operation_params['view_imu_data']['topic']
×
64
        output_folder: str = self.operation_params['view_imu_data']['output_folder']
×
65
        try: expected_msgs: int = self.operation_params['view_imu_data']['expected_msgs']
×
66
        except: expected_msgs = None
×
67
        try:
×
68
            data_range = self.operation_params['view_imu_data']['data_range']
×
69
            assert len(data_range) == 2
×
70
            data_range = tuple(data_range)
×
71
        except: data_range = None
×
72

73
        self.bag_wrapper.view_imu_data(topic, output_folder, expected_msgs, data_range)
×
74

75
    def downsample(self):
2✔
76
        """ Downsample a ROS2 bag file. """
77

78
        # Extract operation specific parameters
79
        topic_downsample_ratios: dict = self.operation_params['downsample']['topics'].copy()
2✔
80
        include_unmentioned_topics: bool = self.operation_params['downsample']['include_unmentioned_topics']
2✔
81
        self.bag_wrapper.downsample(self.output_bag, topic_downsample_ratios, include_unmentioned_topics)
2✔
82

83
    def crop(self):
2✔
84
        """ Crop a ROS2 bag file. """
85

86
        start_ts: float = self.operation_params['crop']['start_ts']
×
87
        end_ts: float = self.operation_params['crop']['end_ts']
×
88
        self.bag_wrapper.crop(self.output_bag, start_ts, end_ts)
×
89

90
    def convert_ros2_to_ros1(self):
2✔
91
        """ Convert a ROS2 bag from the bag wrapper into a ROS1 bag. """
92

93
        self.bag_wrapper.export_as_ros1(self.output_bag, self.external_msgs_path_ros1)
2✔
94

95
    def extract_odometry_to_csv(self):
2✔
96
        """ Extract odometry from a ROS2 bag and save in a csv file. """
97

98
        topic: str = self.operation_params['extract_odometry_to_csv']['topic']
2✔
99
        output_folder: str = self.operation_params['extract_odometry_to_csv']['output_file']
2✔
100
        add_noise: str = self.operation_params['extract_odometry_to_csv']['add_noise']
2✔
101
        xy_noise_std_per_frame: float = self.operation_params['extract_odometry_to_csv']['xy_noise_std_per_frame']
2✔
102
        z_noise_std_per_frame: str = self.operation_params['extract_odometry_to_csv']['z_noise_std_per_frame']
2✔
103
        shift_position_xy: float = self.operation_params['extract_odometry_to_csv']['shift_position_xy']
2✔
104
        shift_position_z: float = self.operation_params['extract_odometry_to_csv']['shift_position_z']
2✔
105

106
        odom_data = OdometryData.from_ros2_bag(self.input_bag, topic, CoordinateFrame.NONE)
2✔
107
        if add_noise:
2✔
108
            odom_data.add_folded_guassian_noise_to_position(xy_noise_std_per_frame, z_noise_std_per_frame)
×
109
            odom_data.shift_position(shift_position_xy, shift_position_xy, shift_position_z)
×
110
        odom_data.to_csv(output_folder)
2✔
111

112
    def extract_images_to_npy(self):
2✔
113
        """ Extract images from a ROS2 bag and saves them into .npy files. """
114

115
        topic: str = self.operation_params['extract_images_to_npy']['topic']
2✔
116
        output_folder: str = self.operation_params['extract_images_to_npy']['output_folder']
2✔
117
        ImageDataInMemory.from_ros2_bag(self.input_bag, topic, output_folder)
2✔
118

119
    def compare_timestamps_two_image_data(self):
2✔
120
        """ Compare timestamps between two ImageData instances. """
121

NEW
122
        data0 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_0'])
×
NEW
123
        data1 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_1'])
×
UNCOV
124
        data0.compare_timestamps(data1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc