• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9651726532

24 Jun 2024 08:07PM UTC coverage: 92.421% (+0.6%) from 91.857%
9651726532

Pull #329

github

zain-sohail
use index and dataset keys
Pull Request #329: Refactor flashloader

684 of 709 new or added lines in 15 files covered. (96.47%)

3 existing lines in 1 file now uncovered.

6853 of 7415 relevant lines covered (92.42%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/sed/loader/flash/dataframe.py
1
from __future__ import annotations
1✔
2

3
import h5py
1✔
4
import numpy as np
1✔
5
from pandas import concat
1✔
6
from pandas import DataFrame
1✔
7
from pandas import Index
1✔
8
from pandas import MultiIndex
1✔
9
from pandas import Series
1✔
10

11
from sed.loader.flash.utils import get_channels
1✔
12

13

14
class DataFrameCreator:
1✔
15
    """
16
    Utility class for creating pandas DataFrames from HDF5 files with multiple channels.
17
    """
18

19
    def __init__(self, config_dataframe: dict, h5_file: h5py.File) -> None:
1✔
20
        """
21
        Initializes the DataFrameCreator class.
22

23
        Args:
24
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
25
            h5_file (h5py.File): The open h5 file.
26
        """
27
        self.h5_file: h5py.File = h5_file
1✔
28
        self.failed_files_error: list[str] = []
1✔
29
        self.multi_index = get_channels(index=True)
1✔
30
        self._config = config_dataframe
1✔
31

32
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
33
        """
34
        Checks if 'group_name' and converts to 'index_key' and 'dataset_key' if so.
35

36
        Args:
37
            channel (str): The name of the channel.
38

39
        Returns:
40
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
41

42
        Raises:
43
            ValueError: If 'index_key' and 'dataset_key' are not provided.
44
        """
45
        channel_config = self._config["channels"][channel]
1✔
46

47
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
48
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
49
        else:
50
            print("'group_name' is no longer supported.")
1✔
51

52
        raise ValueError(
1✔
53
            "For channel:",
54
            channel,
55
            "Provide both 'index_key' and 'dataset_key'.",
56
        )
57

58
    def get_dataset_array(
1✔
59
        self,
60
        channel: str,
61
        slice_: bool = False,
62
    ) -> tuple[Index, h5py.Dataset]:
63
        """
64
        Returns a numpy array for a given channel name.
65

66
        Args:
67
            channel (str): The name of the channel.
68
            slice_ (bool): If True, applies slicing on the dataset.
69

70
        Returns:
71
            tuple[Index, h5py.Dataset]: A tuple containing the train ID Index and the numpy array
72
            for the channel's data.
73
        """
74
        # Get the data from the necessary h5 file and channel
75
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
76

77
        key = Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
78
        dataset = self.h5_file[dataset_key]
1✔
79

80
        if slice_:
1✔
81
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
82
            if slice_index is not None:
1✔
83
                dataset = np.take(dataset, slice_index, axis=1)
1✔
84
        # If np_array is size zero, fill with NaNs
85
        if dataset.shape[0] == 0:
1✔
86
            # Fill the np_array with NaN values of the same shape as train_id
NEW
87
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
88

89
        return key, dataset
1✔
90

91
    def pulse_index(self, offset: int) -> tuple[MultiIndex, slice | np.ndarray]:
1✔
92
        """
93
        Computes the index for the 'per_electron' data.
94

95
        Args:
96
            offset (int): The offset value.
97

98
        Returns:
99
            tuple[MultiIndex, np.ndarray]: A tuple containing the computed MultiIndex and
100
            the indexer.
101
        """
102
        # Get the pulseId and the index_train
103
        index_train, dataset_pulse = self.get_dataset_array("pulseId", slice_=True)
1✔
104
        # Repeat the index_train by the number of pulses
105
        index_train_repeat = np.repeat(index_train, dataset_pulse.shape[1])
1✔
106
        # Explode the pulse dataset and subtract by the ubid_offset
107
        pulse_ravel = dataset_pulse.ravel() - offset
1✔
108
        # Create a MultiIndex with the index_train and the pulse
109
        microbunches = MultiIndex.from_arrays((index_train_repeat, pulse_ravel)).dropna()
1✔
110

111
        # Only sort if necessary
112
        indexer = slice(None)
1✔
113
        if not microbunches.is_monotonic_increasing:
1✔
NEW
114
            microbunches, indexer = microbunches.sort_values(return_indexer=True)
×
115

116
        # Count the number of electrons per microbunch and create an array of electrons
117
        electron_counts = microbunches.value_counts(sort=False).values
1✔
118
        electrons = np.concatenate([np.arange(count) for count in electron_counts])
1✔
119

120
        # Final index constructed here
121
        index = MultiIndex.from_arrays(
1✔
122
            (
123
                microbunches.get_level_values(0),
124
                microbunches.get_level_values(1).astype(int),
125
                electrons,
126
            ),
127
            names=self.multi_index,
128
        )
129
        return index, indexer
1✔
130

131
    @property
1✔
132
    def df_electron(self) -> DataFrame:
1✔
133
        """
134
        Returns a pandas DataFrame for a given channel name of type [per electron].
135

136
        Returns:
137
            DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
138
        """
139
        offset = self._config["ubid_offset"]
1✔
140
        # Index
141
        index, indexer = self.pulse_index(offset)
1✔
142

143
        # Data logic
144
        channels = get_channels(self._config["channels"], "per_electron")
1✔
145
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
146

147
        # First checking if dataset keys are the same for all channels
148
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
149
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
150

151
        # If all dataset keys are the same, we can directly use the ndarray to create frame
152
        if all_keys_same:
1✔
153
            _, dataset = self.get_dataset_array(channels[0])
1✔
154
            data_dict = {
1✔
155
                channel: dataset[:, slice_, :].ravel()
156
                for channel, slice_ in zip(channels, slice_index)
157
            }
158
            dataframe = DataFrame(data_dict)
1✔
159
        # Otherwise, we need to create a Series for each channel and concatenate them
160
        else:
161
            series = {
1✔
162
                channel: Series(self.get_dataset_array(channel, slice_=True)[1].ravel())
163
                for channel in channels
164
            }
NEW
165
            dataframe = concat(series, axis=1)
×
166

167
        drop_vals = np.arange(-offset, 0)
1✔
168

169
        # Few things happen here:
170
        # Drop all NaN values like while creating the multiindex
171
        # if necessary, the data is sorted with [indexer]
172
        # MultiIndex is set
173
        # Finally, the offset values are dropped
174
        return (
1✔
175
            dataframe.dropna()
176
            .iloc[indexer]
177
            .set_index(index)
178
            .drop(index=drop_vals, level="pulseId", errors="ignore")
179
        )
180

181
    @property
1✔
182
    def df_pulse(self) -> DataFrame:
1✔
183
        """
184
        Returns a pandas DataFrame for a given channel name of type [per pulse].
185

186
        Returns:
187
            DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
188
        """
189
        series = []
1✔
190
        channels = get_channels(self._config["channels"], "per_pulse")
1✔
191
        for channel in channels:
1✔
192
            # get slice
193
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
194
            index = MultiIndex.from_product(
1✔
195
                (key, np.arange(0, dataset.shape[1]), [0]),
196
                names=self.multi_index,
197
            )
198
            series.append(Series(dataset[()].ravel(), index=index, name=channel))
1✔
199

200
        return concat(series, axis=1)  # much faster when concatenating similarly indexed data first
1✔
201

202
    @property
1✔
203
    def df_train(self) -> DataFrame:
1✔
204
        """
205
        Returns a pandas DataFrame for a given channel name of type [per train].
206

207
        Returns:
208
            DataFrame: The pandas DataFrame for the 'per_train' channel's data.
209
        """
210
        series = []
1✔
211

212
        channels = get_channels(self._config["channels"], "per_train")
1✔
213

214
        for channel in channels:
1✔
215
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
216
            index = MultiIndex.from_product(
1✔
217
                (key, [0], [0]),
218
                names=self.multi_index,
219
            )
220
            if channel == "dldAux":
1✔
221
                aux_channels = self._config["channels"]["dldAux"]["dldAuxChannels"].items()
1✔
222
                for name, slice_aux in aux_channels:
1✔
223
                    series.append(Series(dataset[: key.size, slice_aux], index, name=name))
1✔
224
            else:
225
                series.append(Series(dataset, index, name=channel))
1✔
226

227
        return concat(series, axis=1)
1✔
228

229
    def validate_channel_keys(self) -> None:
1✔
230
        """
231
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
232

233
        Raises:
234
            KeyError: If the index or dataset keys do not exist in the file.
235
        """
236
        for channel in self._config["channels"]:
1✔
237
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
238
            if index_key not in self.h5_file:
1✔
NEW
239
                raise KeyError(f"Index key '{index_key}' doesn't exist in the file.")
×
240
            if dataset_key not in self.h5_file:
1✔
NEW
241
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
242

243
    @property
1✔
244
    def df(self) -> DataFrame:
1✔
245
        """
246
        Joins the 'per_electron', 'per_pulse', and 'per_train' using join operation,
247
        returning a single dataframe.
248

249
        Returns:
250
            DataFrame: The combined pandas DataFrame.
251
        """
252

253
        self.validate_channel_keys()
1✔
254
        return (
1✔
255
            self.df_electron.join(self.df_pulse, on=self.multi_index, how="outer")
256
            .join(self.df_train, on=self.multi_index, how="outer")
257
            .sort_index()
258
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc