• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9820936107

06 Jul 2024 05:22PM UTC coverage: 92.469% (+0.6%) from 91.888%
9820936107

Pull #437

github

web-flow
Merge pull request #465 from OpenCOMPES/flash_test_fixes

Flash test fixes
Pull Request #437: Upgrade to V1

899 of 926 new or added lines in 49 files covered. (97.08%)

2 existing lines in 2 files now uncovered.

6876 of 7436 relevant lines covered (92.47%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16

17

18
class DataFrameCreator:
1✔
19
    """
20
    A class for creating pandas DataFrames from an HDF5 file.
21

22
    Attributes:
23
        h5_file (h5py.File): The HDF5 file object.
24
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
25
        _config (dict): The configuration dictionary for the DataFrame.
26
    """
27

28
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
29
        """
30
        Initializes the DataFrameCreator class.
31

32
        Args:
33
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
34
            h5_path (Path): Path to the h5 file.
35
        """
36
        self.h5_file = h5py.File(h5_path, "r")
1✔
37
        self.multi_index = get_channels(index=True)
1✔
38
        self._config = config_dataframe
1✔
39

40
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
41
        """
42
        Checks if 'index_key' and 'dataset_key' exists and returns that.
43

44
        Args:
45
            channel (str): The name of the channel.
46

47
        Returns:
48
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
49

50
        Raises:
51
            ValueError: If 'index_key' and 'dataset_key' are not provided.
52
        """
53
        channel_config = self._config["channels"][channel]
1✔
54

55
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
56
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
57
        elif "group_name" in channel_config:
1✔
NEW
58
            print("'group_name' is no longer supported.")
×
59

60
        raise ValueError(
1✔
61
            "For channel:",
62
            channel,
63
            "Provide both 'index_key' and 'dataset_key'.",
64
        )
65

66
    def get_dataset_array(
1✔
67
        self,
68
        channel: str,
69
        slice_: bool = False,
70
    ) -> tuple[pd.Index, h5py.Dataset]:
71
        """
72
        Returns a numpy array for a given channel name.
73

74
        Args:
75
            channel (str): The name of the channel.
76
            slice_ (bool): If True, applies slicing on the dataset.
77

78
        Returns:
79
            tuple[pd.Index, h5py.Dataset]: A tuple containing the train ID
80
            pd.Index and the numpy array for the channel's data.
81
        """
82
        # Get the data from the necessary h5 file and channel
83
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
84

85
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
86
        dataset = self.h5_file[dataset_key]
1✔
87

88
        if slice_:
1✔
89
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
90
            if slice_index is not None:
1✔
91
                dataset = np.take(dataset, slice_index, axis=1)
1✔
92
        # If np_array is size zero, fill with NaNs
93
        if dataset.shape[0] == 0:
1✔
94
            # Fill the np_array with NaN values of the same shape as train_id
NEW
95
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
96

97
        return key, dataset
1✔
98

99
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
100
        """
101
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
102
        sorting and electron counting within each pulse.
103

104
        Args:
105
            offset (int): The offset value.
106

107
        Returns:
108
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
109
            the indexer.
110
        """
111
        # Get the pulse_dataset and the train_index
112
        train_index, pulse_dataset = self.get_dataset_array("pulseId", slice_=True)
1✔
113
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
114
        # the daq has an offset so no pulses are missed. This offset is subtracted here
115
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
116
        # Here train_index is repeated to match the size of pulses
117
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
118
        # A pulse resolved multi-index is finally created.
119
        # Since there can be NaN pulses, those are dropped
120
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
121

122
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
123
        # The indexer is also returned to sort the data in df_electron
124
        indexer = slice(None)
1✔
125
        if not pulse_index.is_monotonic_increasing:
1✔
NEW
126
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
127

128
        # In the data, to signify different electrons, pulse_index is repeated by
129
        # the number of electrons in each pulse. Here the values are counted
130
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
131
        # Now we resolve each pulse to its electrons
132
        electron_index = np.concatenate([np.arange(count) for count in electron_counts])
1✔
133

134
        # Final multi-index constructed here
135
        index = pd.MultiIndex.from_arrays(
1✔
136
            (
137
                pulse_index.get_level_values(0),
138
                pulse_index.get_level_values(1).astype(int),
139
                electron_index,
140
            ),
141
            names=self.multi_index,
142
        )
143
        return index, indexer
1✔
144

145
    @property
1✔
146
    def df_electron(self) -> pd.DataFrame:
1✔
147
        """
148
        Returns a pandas DataFrame for channel names of type [per electron].
149

150
        Returns:
151
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
152
        """
153
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
154
        # Here we get the multi-index and the indexer to sort the data
155
        index, indexer = self.pulse_index(offset)
1✔
156

157
        # Get the relevant channels and their slice index
158
        channels = get_channels(self._config["channels"], "per_electron")
1✔
159
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
160

161
        # First checking if dataset keys are the same for all channels
162
        # because DLD at FLASH stores all channels in the same h5 dataset
163
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
164
        # Gives a true if all keys are the same
165
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
166

167
        # If all dataset keys are the same, we only need to load the dataset once and slice
168
        # the appropriate columns. This is much faster than loading the same dataset multiple times
169
        if all_keys_same:
1✔
170
            _, dataset = self.get_dataset_array(channels[0])
1✔
171
            data_dict = {
1✔
172
                channel: dataset[:, slice_, :].ravel()
173
                for channel, slice_ in zip(channels, slice_index)
174
            }
175
            dataframe = pd.DataFrame(data_dict)
1✔
176
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
177
        else:
178
            series = {
1✔
179
                channel: pd.Series(self.get_dataset_array(channel, slice_=True)[1].ravel())
180
                for channel in channels
181
            }
NEW
182
            dataframe = pd.concat(series, axis=1)
×
183

184
        # after offset, the negative pulse values are dropped as they are not valid
185
        drop_vals = np.arange(-offset, 0)
1✔
186

187
        # Few things happen here:
188
        # Drop all NaN values like while creating the multiindex
189
        # if necessary, the data is sorted with [indexer]
190
        # pd.MultiIndex is set
191
        # Finally, the offset values are dropped
192
        return (
1✔
193
            dataframe.dropna()
194
            .iloc[indexer]
195
            .set_index(index)
196
            .drop(index=drop_vals, level="pulseId", errors="ignore")
197
        )
198

199
    @property
1✔
200
    def df_pulse(self) -> pd.DataFrame:
1✔
201
        """
202
        Returns a pandas DataFrame for given channel names of type [per pulse].
203

204
        Returns:
205
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
206
        """
207
        series = []
1✔
208
        # Get the relevant channel names
209
        channels = get_channels(self._config["channels"], "per_pulse")
1✔
210
        # For each channel, a pd.Series is created and appended to the list
211
        for channel in channels:
1✔
212
            # train_index and (sliced) data is returned
213
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
214
            # Electron resolved MultiIndex is created. Since this is pulse data,
215
            # the electron index is always 0
216
            index = pd.MultiIndex.from_product(
1✔
217
                (key, np.arange(0, dataset.shape[1]), [0]),
218
                names=self.multi_index,
219
            )
220
            # The dataset is opened and converted to numpy array by [()]
221
            # and flattened to resolve per pulse
222
            # The pd.Series is created with the MultiIndex and appended to the list
223
            series.append(pd.Series(dataset[()].ravel(), index=index, name=channel))
1✔
224

225
        # All the channels are concatenated to a single DataFrame
226
        return pd.concat(
1✔
227
            series,
228
            axis=1,
229
        )
230

231
    @property
1✔
232
    def df_train(self) -> pd.DataFrame:
1✔
233
        """
234
        Returns a pandas DataFrame for given channel names of type [per train].
235

236
        Returns:
237
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
238
        """
239
        series = []
1✔
240
        # Get the relevant channel names
241
        channels = get_channels(self._config["channels"], "per_train")
1✔
242
        # For each channel, a pd.Series is created and appended to the list
243
        for channel in channels:
1✔
244
            # train_index and (sliced) data is returned
245
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
246
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
247
            # the electron and pulse index is always 0
248
            index = pd.MultiIndex.from_product(
1✔
249
                (key, [0], [0]),
250
                names=self.multi_index,
251
            )
252
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
253
            # contains multiple channels inside. Even though they are resolved per train,
254
            # they come in pulse format, so the extra values are sliced and individual channels are
255
            # created and appended to the list
256
            if channel == "dldAux":
1✔
257
                aux_channels = self._config["channels"]["dldAux"]["dldAuxChannels"].items()
1✔
258
                for name, slice_aux in aux_channels:
1✔
259
                    series.append(pd.Series(dataset[: key.size, slice_aux], index, name=name))
1✔
260
            else:
261
                series.append(pd.Series(dataset, index, name=channel))
1✔
262
        # All the channels are concatenated to a single DataFrame
263
        return pd.concat(series, axis=1)
1✔
264

265
    def validate_channel_keys(self) -> None:
1✔
266
        """
267
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
268

269
        Raises:
270
            KeyError: If the index or dataset keys do not exist in the file.
271
        """
272
        for channel in self._config["channels"]:
1✔
273
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
274
            if index_key not in self.h5_file:
1✔
275
                raise KeyError(f"pd.Index key '{index_key}' doesn't exist in the file.")
1✔
276
            if dataset_key not in self.h5_file:
1✔
NEW
277
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
278

279
    @property
1✔
280
    def df(self) -> pd.DataFrame:
1✔
281
        """
282
        Joins the 'per_electron', 'per_pulse', and 'per_train' using join operation,
283
        returning a single dataframe.
284

285
        Returns:
286
            pd.DataFrame: The combined pandas DataFrame.
287
        """
288

289
        self.validate_channel_keys()
1✔
290
        return (
1✔
291
            self.df_electron.join(self.df_pulse, on=self.multi_index, how="outer")
292
            .join(self.df_train, on=self.multi_index, how="outer")
293
            .sort_index()
294
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc