• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9481831759

12 Jun 2024 11:33AM UTC coverage: 92.363% (+0.7%) from 91.71%
9481831759

Pull #329

github

zain-sohail
add available runs property
Pull Request #329: Refactor flashloader

699 of 724 new or added lines in 15 files covered. (96.55%)

10 existing lines in 2 files now uncovered.

6555 of 7097 relevant lines covered (92.36%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.9
/sed/loader/flash/dataframe.py
1
from __future__ import annotations
1✔
2

3
import h5py
1✔
4
import numpy as np
1✔
5
from pandas import concat
1✔
6
from pandas import DataFrame
1✔
7
from pandas import Index
1✔
8
from pandas import MultiIndex
1✔
9
from pandas import Series
1✔
10

11
from sed.loader.flash.utils import get_channels
1✔
12

13

14
class DataFrameCreator:
1✔
15
    """
16
    Utility class for creating pandas DataFrames from HDF5 files with multiple channels.
17
    """
18

19
    def __init__(self, config_dataframe: dict, h5_file: h5py.File) -> None:
1✔
20
        """
21
        Initializes the DataFrameCreator class.
22

23
        Args:
24
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
25
            h5_file (h5py.File): The open h5 file.
26
        """
27
        self.h5_file: h5py.File = h5_file
1✔
28
        self.failed_files_error: list[str] = []
1✔
29
        self.multi_index = get_channels(index=True)
1✔
30
        self._config = config_dataframe
1✔
31

32
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
33
        """
34
        Checks if 'group_name' and converts to 'index_key' and 'dataset_key' if so.
35

36
        Args:
37
            channel (str): The name of the channel.
38

39
        Returns:
40
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
41

42
        Raises:
43
            ValueError: If neither 'group_name' nor both 'index_key' and 'dataset_key' are provided.
44
        """
45
        channel_config = self._config["channels"][channel]
1✔
46

47
        if "group_name" in channel_config:
1✔
48
            index_key = channel_config["group_name"] + "index"
1✔
49
            if channel == "timeStamp":
1✔
50
                dataset_key = channel_config["group_name"] + "time"
1✔
51
            else:
52
                dataset_key = channel_config["group_name"] + "value"
1✔
53
            return index_key, dataset_key
1✔
54
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
55
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
56

57
        raise ValueError(
1✔
58
            "For channel:",
59
            channel,
60
            "Provide either both 'index_key' and 'dataset_key'.",
61
            "or 'group_name' (parses only 'index' and 'value' or 'time' keys.)",
62
        )
63

64
    def get_dataset_array(
1✔
65
        self,
66
        channel: str,
67
        slice_: bool = False,
68
    ) -> tuple[Index, h5py.Dataset]:
69
        """
70
        Returns a numpy array for a given channel name.
71

72
        Args:
73
            channel (str): The name of the channel.
74
            slice_ (bool): If True, applies slicing on the dataset.
75

76
        Returns:
77
            tuple[Index, h5py.Dataset]: A tuple containing the train ID Index and the numpy array
78
            for the channel's data.
79
        """
80
        # Get the data from the necessary h5 file and channel
81
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
82

83
        key = Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
84
        dataset = self.h5_file[dataset_key]
1✔
85

86
        if slice_:
1✔
87
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
88
            if slice_index is not None:
1✔
89
                dataset = np.take(dataset, slice_index, axis=1)
1✔
90
        # If np_array is size zero, fill with NaNs
91
        if dataset.shape[0] == 0:
1✔
92
            # Fill the np_array with NaN values of the same shape as train_id
NEW
93
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
94

95
        return key, dataset
1✔
96

97
    def pulse_index(self, offset: int) -> tuple[MultiIndex, slice | np.ndarray]:
1✔
98
        """
99
        Computes the index for the 'per_electron' data.
100

101
        Args:
102
            offset (int): The offset value.
103

104
        Returns:
105
            tuple[MultiIndex, np.ndarray]: A tuple containing the computed MultiIndex and
106
            the indexer.
107
        """
108
        # Get the pulseId and the index_train
109
        index_train, dataset_pulse = self.get_dataset_array("pulseId", slice_=True)
1✔
110
        # Repeat the index_train by the number of pulses
111
        index_train_repeat = np.repeat(index_train, dataset_pulse.shape[1])
1✔
112
        # Explode the pulse dataset and subtract by the ubid_offset
113
        pulse_ravel = dataset_pulse.ravel() - offset
1✔
114
        # Create a MultiIndex with the index_train and the pulse
115
        microbunches = MultiIndex.from_arrays((index_train_repeat, pulse_ravel)).dropna()
1✔
116

117
        # Only sort if necessary
118
        indexer = slice(None)
1✔
119
        if not microbunches.is_monotonic_increasing:
1✔
NEW
120
            microbunches, indexer = microbunches.sort_values(return_indexer=True)
×
121

122
        # Count the number of electrons per microbunch and create an array of electrons
123
        electron_counts = microbunches.value_counts(sort=False).values
1✔
124
        electrons = np.concatenate([np.arange(count) for count in electron_counts])
1✔
125

126
        # Final index constructed here
127
        index = MultiIndex.from_arrays(
1✔
128
            (
129
                microbunches.get_level_values(0),
130
                microbunches.get_level_values(1).astype(int),
131
                electrons,
132
            ),
133
            names=self.multi_index,
134
        )
135
        return index, indexer
1✔
136

137
    @property
1✔
138
    def df_electron(self) -> DataFrame:
1✔
139
        """
140
        Returns a pandas DataFrame for a given channel name of type [per electron].
141

142
        Returns:
143
            DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
144
        """
145
        offset = self._config["ubid_offset"]
1✔
146
        # Index
147
        index, indexer = self.pulse_index(offset)
1✔
148

149
        # Data logic
150
        channels = get_channels(self._config["channels"], "per_electron")
1✔
151
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
152

153
        # First checking if dataset keys are the same for all channels
154
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
155
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
156

157
        # If all dataset keys are the same, we can directly use the ndarray to create frame
158
        if all_keys_same:
1✔
159
            _, dataset = self.get_dataset_array(channels[0])
1✔
160
            data_dict = {
1✔
161
                channel: dataset[:, slice_, :].ravel()
162
                for channel, slice_ in zip(channels, slice_index)
163
            }
164
            dataframe = DataFrame(data_dict)
1✔
165
        # Otherwise, we need to create a Series for each channel and concatenate them
166
        else:
167
            series = {
1✔
168
                channel: Series(self.get_dataset_array(channel, slice_=True)[1].ravel())
169
                for channel in channels
170
            }
NEW
171
            dataframe = concat(series, axis=1)
×
172

173
        drop_vals = np.arange(-offset, 0)
1✔
174

175
        # Few things happen here:
176
        # Drop all NaN values like while creating the multiindex
177
        # if necessary, the data is sorted with [indexer]
178
        # MultiIndex is set
179
        # Finally, the offset values are dropped
180
        return (
1✔
181
            dataframe.dropna()
182
            .iloc[indexer]
183
            .set_index(index)
184
            .drop(index=drop_vals, level="pulseId", errors="ignore")
185
        )
186

187
    @property
1✔
188
    def df_pulse(self) -> DataFrame:
1✔
189
        """
190
        Returns a pandas DataFrame for a given channel name of type [per pulse].
191

192
        Returns:
193
            DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
194
        """
195
        series = []
1✔
196
        channels = get_channels(self._config["channels"], "per_pulse")
1✔
197
        for channel in channels:
1✔
198
            # get slice
199
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
200
            index = MultiIndex.from_product(
1✔
201
                (key, np.arange(0, dataset.shape[1]), [0]),
202
                names=self.multi_index,
203
            )
204
            series.append(Series(dataset[()].ravel(), index=index, name=channel))
1✔
205

206
        return concat(series, axis=1)  # much faster when concatenating similarly indexed data first
1✔
207

208
    @property
1✔
209
    def df_train(self) -> DataFrame:
1✔
210
        """
211
        Returns a pandas DataFrame for a given channel name of type [per train].
212

213
        Returns:
214
            DataFrame: The pandas DataFrame for the 'per_train' channel's data.
215
        """
216
        series = []
1✔
217

218
        channels = get_channels(self._config["channels"], "per_train")
1✔
219

220
        for channel in channels:
1✔
221
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
222
            index = MultiIndex.from_product(
1✔
223
                (key, [0], [0]),
224
                names=self.multi_index,
225
            )
226
            if channel == "dldAux":
1✔
227
                aux_channels = self._config["channels"]["dldAux"]["dldAuxChannels"].items()
1✔
228
                for name, slice_aux in aux_channels:
1✔
229
                    series.append(Series(dataset[: key.size, slice_aux], index, name=name))
1✔
230
            else:
231
                series.append(Series(dataset, index, name=channel))
1✔
232

233
        return concat(series, axis=1)
1✔
234

235
    def validate_channel_keys(self) -> None:
1✔
236
        """
237
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
238

239
        Raises:
240
            KeyError: If the index or dataset keys do not exist in the file.
241
        """
242
        for channel in self._config["channels"]:
1✔
243
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
244
            if index_key not in self.h5_file:
1✔
NEW
245
                raise KeyError(f"Index key '{index_key}' doesn't exist in the file.")
×
246
            if dataset_key not in self.h5_file:
1✔
NEW
247
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
248

249
    @property
1✔
250
    def df(self) -> DataFrame:
1✔
251
        """
252
        Joins the 'per_electron', 'per_pulse', and 'per_train' using join operation,
253
        returning a single dataframe.
254

255
        Returns:
256
            DataFrame: The combined pandas DataFrame.
257
        """
258

259
        self.validate_channel_keys()
1✔
260
        return (
1✔
261
            self.df_electron.join(self.df_pulse, on=self.multi_index, how="outer")
262
            .join(self.df_train, on=self.multi_index, how="outer")
263
            .sort_index()
264
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc