• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 10064447099

23 Jul 2024 06:34PM UTC coverage: 92.688% (-0.01%) from 92.701%
10064447099

Pull #469

github

web-flow
Merge pull request #479 from OpenCOMPES/flash-minor-changes

Flash minor changes (Merge to #469)
Pull Request #469: Update to the BufferHandler

234 of 245 new or added lines in 13 files covered. (95.51%)

1 existing line in 1 file now uncovered.

7073 of 7631 relevant lines covered (92.69%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.09
/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16

17

18
class DataFrameCreator:
1✔
19
    """
20
    A class for creating pandas DataFrames from an HDF5 file.
21

22
    Attributes:
23
        h5_file (h5py.File): The HDF5 file object.
24
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
25
        _config (dict): The configuration dictionary for the DataFrame.
26
    """
27

28
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
29
        """
30
        Initializes the DataFrameCreator class.
31

32
        Args:
33
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
34
            h5_path (Path): Path to the h5 file.
35
        """
36
        self.h5_file = h5py.File(h5_path, "r")
1✔
37
        self.multi_index = get_channels(index=True)
1✔
38
        self._config = config_dataframe
1✔
39

40
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
41
        """
42
        Checks if 'index_key' and 'dataset_key' exists and returns that.
43

44
        Args:
45
            channel (str): The name of the channel.
46

47
        Returns:
48
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
49

50
        Raises:
51
            ValueError: If 'index_key' and 'dataset_key' are not provided.
52
        """
53
        channel_config = self._config["channels"][channel]
1✔
54
        group_err = ""
1✔
55
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
56
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
57
        elif "group_name" in channel_config:
1✔
NEW
58
            group_err = "'group_name' is no longer supported."
×
59
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
60
        raise ValueError(error)
1✔
61

62
    def get_dataset_array(
1✔
63
        self,
64
        channel: str,
65
        slice_: bool = True,
66
    ) -> tuple[pd.Index, np.ndarray | h5py.Dataset]:
67
        """
68
        Returns a numpy array for a given channel name.
69

70
        Args:
71
            channel (str): The name of the channel.
72
            slice_ (bool): Applies slicing on the dataset. Default is True.
73

74
        Returns:
75
            tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID
76
            pd.Index and the channel's data.
77
        """
78
        # Get the data from the necessary h5 file and channel
79
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
80

81
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
82
        dataset = self.h5_file[dataset_key]
1✔
83

84
        if slice_:
1✔
85
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
86
            if slice_index is not None:
1✔
87
                dataset = np.take(dataset, slice_index, axis=1)
1✔
88
        # If np_array is size zero, fill with NaNs, fill it with NaN values
89
        # of the same shape as index
90
        if dataset.shape[0] == 0:
1✔
UNCOV
91
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
92

93
        return key, dataset
1✔
94

95
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
96
        """
97
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
98
        sorting and electron counting within each pulse.
99

100
        Args:
101
            offset (int): The offset value.
102

103
        Returns:
104
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
105
            the indexer.
106
        """
107
        # Get the pulse_dataset and the train_index
108
        train_index, pulse_dataset = self.get_dataset_array("pulseId")
1✔
109
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
110
        # the daq has an offset so no pulses are missed. This offset is subtracted here
111
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
112
        # Here train_index is repeated to match the size of pulses
113
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
114
        # A pulse resolved multi-index is finally created.
115
        # Since there can be NaN pulses, those are dropped
116
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
117

118
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
119
        # The indexer is also returned to sort the data in df_electron
120
        indexer = slice(None)
1✔
121
        if not pulse_index.is_monotonic_increasing:
1✔
122
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
123

124
        # In the data, to signify different electrons, pulse_index is repeated by
125
        # the number of electrons in each pulse. Here the values are counted
126
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
127
        # Now we resolve each pulse to its electrons
128
        electron_index = np.concatenate(
1✔
129
            [np.arange(count, dtype="uint16") for count in electron_counts],
130
        )
131

132
        # Final multi-index constructed here
133
        index = pd.MultiIndex.from_arrays(
1✔
134
            (
135
                pulse_index.get_level_values(0),
136
                pulse_index.get_level_values(1).astype(int),
137
                electron_index,
138
            ),
139
            names=self.multi_index,
140
        )
141
        return index, indexer
1✔
142

143
    @property
1✔
144
    def df_electron(self) -> pd.DataFrame:
1✔
145
        """
146
        Returns a pandas DataFrame for channel names of type [per electron].
147

148
        Returns:
149
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
150
        """
151
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
152
        # Here we get the multi-index and the indexer to sort the data
153
        index, indexer = self.pulse_index(offset)
1✔
154

155
        # Get the relevant channels and their slice index
156
        channels = get_channels(self._config, "per_electron")
1✔
157
        if channels == []:
1✔
NEW
158
            return pd.DataFrame()
×
159
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
160

161
        # First checking if dataset keys are the same for all channels
162
        # because DLD at FLASH stores all channels in the same h5 dataset
163
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
164
        # Gives a true if all keys are the same
165
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
166

167
        # If all dataset keys are the same, we only need to load the dataset once and slice
168
        # the appropriate columns. This is much faster than loading the same dataset multiple times
169
        if all_keys_same:
1✔
170
            _, dataset = self.get_dataset_array(channels[0], slice_=False)
1✔
171
            data_dict = {
1✔
172
                channel: dataset[:, idx, :].ravel() for channel, idx in zip(channels, slice_index)
173
            }
174
            dataframe = pd.DataFrame(data_dict)
1✔
175
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
176
        else:
177
            series = {
1✔
178
                channel: pd.Series(self.get_dataset_array(channel)[1].ravel())
179
                for channel in channels
180
            }
181
            dataframe = pd.concat(series, axis=1)
×
182

183
        # after offset, the negative pulse values are dropped as they are not valid
184
        drop_vals = np.arange(-offset, 0)
1✔
185

186
        # Few things happen here:
187
        # Drop all NaN values like while creating the multiindex
188
        # if necessary, the data is sorted with [indexer]
189
        # pd.MultiIndex is set
190
        # Finally, the offset values are dropped
191
        return (
1✔
192
            dataframe.dropna()
193
            .iloc[indexer]
194
            .set_index(index)
195
            .drop(index=drop_vals, level="pulseId", errors="ignore")
196
        )
197

198
    @property
1✔
199
    def df_pulse(self) -> pd.DataFrame:
1✔
200
        """
201
        Returns a pandas DataFrame for given channel names of type [per pulse].
202

203
        Returns:
204
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
205
        """
206
        series = []
1✔
207
        # Get the relevant channel names
208
        channels = get_channels(self._config, "per_pulse")
1✔
209
        if channels == []:
1✔
NEW
210
            return pd.DataFrame()
×
211
        # For each channel, a pd.Series is created and appended to the list
212
        for channel in channels:
1✔
213
            # train_index and (sliced) data is returned
214
            key, dataset = self.get_dataset_array(channel)
1✔
215
            # Electron resolved MultiIndex is created. Since this is pulse data,
216
            # the electron index is always 0
217
            index = pd.MultiIndex.from_product(
1✔
218
                (key, np.arange(0, dataset.shape[1]), [0]),
219
                names=self.multi_index,
220
            )
221
            # The dataset is opened and converted to numpy array by [()]
222
            # and flattened to resolve per pulse
223
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
224
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
225
            # so we remove all 0 values from the series
226
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
227

228
        # All the channels are concatenated to a single DataFrame
229
        return pd.concat(
1✔
230
            series,
231
            axis=1,
232
        )
233

234
    @property
1✔
235
    def df_train(self) -> pd.DataFrame:
1✔
236
        """
237
        Returns a pandas DataFrame for given channel names of type [per train].
238

239
        Returns:
240
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
241
        """
242
        series = []
1✔
243
        # Get the relevant channel names
244
        channels = get_channels(self._config, "per_train")
1✔
245
        # For each channel, a pd.Series is created and appended to the list
246
        for channel in channels:
1✔
247
            # train_index and (sliced) data is returned
248
            key, dataset = self.get_dataset_array(channel)
1✔
249
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
250
            # the electron and pulse index is always 0
251
            index = pd.MultiIndex.from_product(
1✔
252
                (key, [0], [0]),
253
                names=self.multi_index,
254
            )
255
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
256
            # contains multiple channels inside. Even though they are resolved per train,
257
            # they come in pulse format, so the extra values are sliced and individual channels are
258
            # created and appended to the list
259
            aux_alias = self._config.get("aux_alias", "dldAux")
1✔
260
            if channel == aux_alias:
1✔
261
                try:
1✔
262
                    sub_channels = self._config["channels"][aux_alias]["subChannels"]
1✔
NEW
263
                except KeyError:
×
NEW
264
                    raise KeyError(
×
265
                        f"Provide 'subChannels' for auxiliary channel '{aux_alias}'.",
266
                    )
267
                for name, values in sub_channels.items():
1✔
268
                    series.append(
1✔
269
                        pd.Series(
270
                            dataset[: key.size, values["slice"]],
271
                            index,
272
                            name=name,
273
                        ),
274
                    )
275
            else:
276
                series.append(pd.Series(dataset, index, name=channel))
1✔
277
        # All the channels are concatenated to a single DataFrame
278
        return pd.concat(series, axis=1)
1✔
279

280
    def validate_channel_keys(self) -> None:
1✔
281
        """
282
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
283

284
        Raises:
285
            KeyError: If the index or dataset keys do not exist in the file.
286
        """
287
        for channel in self._config["channels"]:
1✔
288
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
289
            if index_key not in self.h5_file:
1✔
290
                raise KeyError(f"pd.Index key '{index_key}' doesn't exist in the file.")
1✔
291
            if dataset_key not in self.h5_file:
1✔
292
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
293

294
    @property
1✔
295
    def df(self) -> pd.DataFrame:
1✔
296
        """
297
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
298
        returning a single dataframe.
299

300
        Returns:
301
            pd.DataFrame: The combined pandas DataFrame.
302
        """
303

304
        self.validate_channel_keys()
1✔
305
        # been tested with merge, join and concat
306
        # concat offers best performance, almost 3 times faster
307
        return pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc