• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarlab-gatech / robotdataprocess / 21222003393

21 Jan 2026 06:58PM UTC coverage: 78.695% (+4.0%) from 74.672%
21222003393

Pull #10

github

DanielChaseButterfield
test_RosPublisher.py test case for OdometryData
Pull Request #10: (v0.2) Prototype code for ROS2 Publishing, and new ImageDataOnDisk class

843 of 1123 new or added lines in 13 files covered. (75.07%)

3 existing lines in 2 files now uncovered.

1725 of 2192 relevant lines covered (78.7%)

1.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.7
/src/robotdataprocess/CmdLineInterface.py
1
from collections import defaultdict
2✔
2
from .data_types.Data import CoordinateFrame
2✔
3
from .data_types.ImageData.ImageDataInMemory import ImageDataInMemory
2✔
4
from .data_types.OdometryData import OdometryData
2✔
5
from decimal import Decimal
2✔
6
import matplotlib.pyplot as plt
2✔
7
import numpy as np
2✔
8
from pathlib import Path
2✔
9
from .ros.Ros2BagWrapper import Ros2BagWrapper
2✔
10
from rosbags.rosbag2 import Reader as Reader2
2✔
11
from rosbags.rosbag2 import Writer as Writer2
2✔
12
from scipy.spatial.transform import Rotation as R
2✔
13
import tqdm
2✔
14
import yaml
2✔
15

16
class CmdLineInterface():
2✔
17

18
    def __init__(self, **kwargs):
2✔
19
        """
20
        Initialize the interface with the provided keyword arguments,
21
        which is assumed to be a dictionary.
22

23
        Args:
24
            **kwargs: Keyword arguments containing the configuration, including:
25
                - 'input_bag': Path to the input ROS2 bag file.
26
                - 'output_bag': Path to the output ROS2 bag file, if necessary.
27
                - 'operation_params': Dictionary containing operation parameters for specific manipulations.
28
                - 'external_msgs_path_ros2': Path to the directory containing external message definitions.
29
                - 'operation_to_run': The name of the operation to run, which should be a method of this class.
30
        """
31

32
        # Assign attributes from input arguments
33
        for key, value in kwargs.items():
2✔
34
            setattr(self, key, value)
2✔
35

36
        # Create a ROS2 bag wrapper
37
        self.bag_wrapper = Ros2BagWrapper(self.input_bag, self.external_msgs_path_ros2)
2✔
38

39
        # Run desired operation
40
        function = getattr(self, self.operation_to_run)
2✔
41
        function()
2✔
42

43
    @classmethod
2✔
44
    def from_yaml(cls, yaml_path: str):
2✔
45
        """
46
        Initialize the CmdLineInterface from a YAML file.
47

48
        Args:
49
            yaml_path (str): Path to the configuration.
50
        """
51
        with open(yaml_path, "r") as yaml_file:
×
52
            yaml_dict = yaml.safe_load(yaml_file)
×
53
            return cls(**yaml_dict)
×
54

55
    # ===============================================================
56
    # ======================== Operations ===========================
57
    # ===============================================================
58

59
    def downsample(self):
2✔
60
        """ Downsample a ROS2 bag file. """
61

62
        # Extract operation specific parameters
63
        topic_downsample_ratios: dict = self.operation_params['downsample']['topics'].copy()
2✔
64
        include_unmentioned_topics: bool = self.operation_params['downsample']['include_unmentioned_topics']
2✔
65
        self.bag_wrapper.downsample(self.output_bag, topic_downsample_ratios, include_unmentioned_topics)
2✔
66

67
    def crop(self):
2✔
68
        """ Crop a ROS2 bag file. """
69

70
        start_ts: float = self.operation_params['crop']['start_ts']
×
71
        end_ts: float = self.operation_params['crop']['end_ts']
×
72
        self.bag_wrapper.crop(self.output_bag, start_ts, end_ts)
×
73

74
    def convert_ros2_to_ros1(self):
2✔
75
        """ Convert a ROS2 bag from the bag wrapper into a ROS1 bag. """
76

77
        self.bag_wrapper.export_as_ros1(self.output_bag, self.external_msgs_path_ros1)
2✔
78

79
    def extract_odometry_to_csv(self):
2✔
80
        """ Extract odometry from a ROS2 bag and save in a csv file. """
81

82
        topic: str = self.operation_params['extract_odometry_to_csv']['topic']
2✔
83
        output_folder: str = self.operation_params['extract_odometry_to_csv']['output_file']
2✔
84
        add_noise: str = self.operation_params['extract_odometry_to_csv']['add_noise']
2✔
85
        xy_noise_std_per_frame: float = self.operation_params['extract_odometry_to_csv']['xy_noise_std_per_frame']
2✔
86
        z_noise_std_per_frame: str = self.operation_params['extract_odometry_to_csv']['z_noise_std_per_frame']
2✔
87
        shift_position_xy: float = self.operation_params['extract_odometry_to_csv']['shift_position_xy']
2✔
88
        shift_position_z: float = self.operation_params['extract_odometry_to_csv']['shift_position_z']
2✔
89

90
        odom_data = OdometryData.from_ros2_bag(self.input_bag, topic, CoordinateFrame.NONE)
2✔
91
        if add_noise:
2✔
92
            odom_data.add_folded_guassian_noise_to_position(xy_noise_std_per_frame, z_noise_std_per_frame)
×
93
            odom_data.shift_position(shift_position_xy, shift_position_xy, shift_position_z)
×
94
        odom_data.to_csv(output_folder)
2✔
95

96
    def extract_images_to_npy(self):
2✔
97
        """ Extract images from a ROS2 bag and saves them into .npy files. """
98

99
        topic: str = self.operation_params['extract_images_to_npy']['topic']
2✔
100
        output_folder: str = self.operation_params['extract_images_to_npy']['output_folder']
2✔
101
        ImageDataInMemory.from_ros2_bag(self.input_bag, topic, output_folder)
2✔
102

103
    def compare_timestamps_two_image_data(self):
2✔
104
        """ Compare timestamps between two ImageData instances. """
105

NEW
106
        data0 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_0'])
×
NEW
107
        data1 = ImageDataInMemory.from_npy(self.operation_params['compare_timestamps_two_image_data']['folder_1'])
×
UNCOV
108
        data0.compare_timestamps(data1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc