• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9853004019

09 Jul 2024 07:18AM UTC coverage: 92.453% (-0.02%) from 92.472%
9853004019

Pull #469

github

rettigl
update paths
Pull Request #469: Update to the BufferHandler

163 of 171 new or added lines in 10 files covered. (95.32%)

5 existing lines in 3 files now uncovered.

6872 of 7433 relevant lines covered (92.45%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.68
/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16

17

18
class DataFrameCreator:
1✔
19
    """
20
    A class for creating pandas DataFrames from an HDF5 file.
21

22
    Attributes:
23
        h5_file (h5py.File): The HDF5 file object.
24
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
25
        _config (dict): The configuration dictionary for the DataFrame.
26
    """
27

28
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
29
        """
30
        Initializes the DataFrameCreator class.
31

32
        Args:
33
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
34
            h5_path (Path): Path to the h5 file.
35
        """
36
        self.h5_file = h5py.File(h5_path, "r")
1✔
37
        self.multi_index = get_channels(index=True)
1✔
38
        self._config = config_dataframe
1✔
39

40
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
41
        """
42
        Checks if 'index_key' and 'dataset_key' exists and returns that.
43

44
        Args:
45
            channel (str): The name of the channel.
46

47
        Returns:
48
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
49

50
        Raises:
51
            ValueError: If 'index_key' and 'dataset_key' are not provided.
52
        """
53
        channel_config = self._config["channels"][channel]
1✔
54
        group_err = ""
1✔
55
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
56
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
57
        elif "group_name" in channel_config:
1✔
NEW
58
            group_err = "'group_name' is no longer supported."
×
59
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
60
        raise ValueError(error)
1✔
61

62
    def get_dataset_array(
1✔
63
        self,
64
        channel: str,
65
        slice_: bool = False,
66
    ) -> tuple[pd.Index, h5py.Dataset]:
67
        """
68
        Returns a numpy array for a given channel name.
69

70
        Args:
71
            channel (str): The name of the channel.
72
            slice_ (bool): If True, applies slicing on the dataset.
73

74
        Returns:
75
            tuple[pd.Index, h5py.Dataset]: A tuple containing the train ID
76
            pd.Index and the numpy array for the channel's data.
77
        """
78
        # Get the data from the necessary h5 file and channel
79
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
80

81
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
82
        dataset = self.h5_file[dataset_key]
1✔
83

84
        if slice_:
1✔
85
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
86
            if slice_index is not None:
1✔
87
                dataset = np.take(dataset, slice_index, axis=1)
1✔
88
        # If np_array is size zero, fill with NaNs
89
        if dataset.shape[0] == 0:
1✔
90
            # Fill the np_array with NaN values of the same shape as train_id
UNCOV
91
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
92

93
        return key, dataset
1✔
94

95
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
96
        """
97
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
98
        sorting and electron counting within each pulse.
99

100
        Args:
101
            offset (int): The offset value.
102

103
        Returns:
104
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
105
            the indexer.
106
        """
107
        # Get the pulse_dataset and the train_index
108
        train_index, pulse_dataset = self.get_dataset_array("pulseId", slice_=True)
1✔
109
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
110
        # the daq has an offset so no pulses are missed. This offset is subtracted here
111
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
112
        # Here train_index is repeated to match the size of pulses
113
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
114
        # A pulse resolved multi-index is finally created.
115
        # Since there can be NaN pulses, those are dropped
116
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
117

118
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
119
        # The indexer is also returned to sort the data in df_electron
120
        indexer = slice(None)
1✔
121
        if not pulse_index.is_monotonic_increasing:
1✔
122
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
123

124
        # In the data, to signify different electrons, pulse_index is repeated by
125
        # the number of electrons in each pulse. Here the values are counted
126
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
127
        # Now we resolve each pulse to its electrons
128
        electron_index = np.concatenate([np.arange(count) for count in electron_counts])
1✔
129

130
        # Final multi-index constructed here
131
        index = pd.MultiIndex.from_arrays(
1✔
132
            (
133
                pulse_index.get_level_values(0),
134
                pulse_index.get_level_values(1).astype(int),
135
                electron_index,
136
            ),
137
            names=self.multi_index,
138
        )
139
        return index, indexer
1✔
140

141
    @property
1✔
142
    def df_electron(self) -> pd.DataFrame:
1✔
143
        """
144
        Returns a pandas DataFrame for channel names of type [per electron].
145

146
        Returns:
147
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
148
        """
149
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
150
        # Here we get the multi-index and the indexer to sort the data
151
        index, indexer = self.pulse_index(offset)
1✔
152

153
        # Get the relevant channels and their slice index
154
        channels = get_channels(self._config["channels"], "per_electron")
1✔
155
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
156

157
        # First checking if dataset keys are the same for all channels
158
        # because DLD at FLASH stores all channels in the same h5 dataset
159
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
160
        # Gives a true if all keys are the same
161
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
162

163
        # If all dataset keys are the same, we only need to load the dataset once and slice
164
        # the appropriate columns. This is much faster than loading the same dataset multiple times
165
        if all_keys_same:
1✔
166
            _, dataset = self.get_dataset_array(channels[0])
1✔
167
            data_dict = {
1✔
168
                channel: dataset[:, slice_, :].ravel()
169
                for channel, slice_ in zip(channels, slice_index)
170
            }
171
            dataframe = pd.DataFrame(data_dict)
1✔
172
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
173
        else:
174
            series = {
1✔
175
                channel: pd.Series(self.get_dataset_array(channel, slice_=True)[1].ravel())
176
                for channel in channels
177
            }
178
            dataframe = pd.concat(series, axis=1)
×
179

180
        # after offset, the negative pulse values are dropped as they are not valid
181
        drop_vals = np.arange(-offset, 0)
1✔
182

183
        # Few things happen here:
184
        # Drop all NaN values like while creating the multiindex
185
        # if necessary, the data is sorted with [indexer]
186
        # pd.MultiIndex is set
187
        # Finally, the offset values are dropped
188
        return (
1✔
189
            dataframe.dropna()
190
            .iloc[indexer]
191
            .set_index(index)
192
            .drop(index=drop_vals, level="pulseId", errors="ignore")
193
        )
194

195
    @property
1✔
196
    def df_pulse(self) -> pd.DataFrame:
1✔
197
        """
198
        Returns a pandas DataFrame for given channel names of type [per pulse].
199

200
        Returns:
201
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
202
        """
203
        series = []
1✔
204
        # Get the relevant channel names
205
        channels = get_channels(self._config["channels"], "per_pulse")
1✔
206
        # check if dldAux is in the channels and raise error if so
207
        if "dldAux" in channels:
1✔
NEW
208
            raise ValueError(
×
209
                "dldAux is a 'per_train' channel. "
210
                "Please choose 'per_train' as the format for dldAux.",
211
            )
212
        # For each channel, a pd.Series is created and appended to the list
213
        for channel in channels:
1✔
214
            # train_index and (sliced) data is returned
215
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
216
            # Electron resolved MultiIndex is created. Since this is pulse data,
217
            # the electron index is always 0
218
            index = pd.MultiIndex.from_product(
1✔
219
                (key, np.arange(0, dataset.shape[1]), [0]),
220
                names=self.multi_index,
221
            )
222
            # The dataset is opened and converted to numpy array by [()]
223
            # and flattened to resolve per pulse
224
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
225
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
226
            # so we remove all 0 values from the series
227
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
228

229
        # All the channels are concatenated to a single DataFrame
230
        return pd.concat(
1✔
231
            series,
232
            axis=1,
233
        )
234

235
    @property
1✔
236
    def df_train(self) -> pd.DataFrame:
1✔
237
        """
238
        Returns a pandas DataFrame for given channel names of type [per train].
239

240
        Returns:
241
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
242
        """
243
        series = []
1✔
244
        # Get the relevant channel names
245
        channels = get_channels(self._config["channels"], "per_train")
1✔
246
        # For each channel, a pd.Series is created and appended to the list
247
        for channel in channels:
1✔
248
            # train_index and (sliced) data is returned
249
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
250
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
251
            # the electron and pulse index is always 0
252
            index = pd.MultiIndex.from_product(
1✔
253
                (key, [0], [0]),
254
                names=self.multi_index,
255
            )
256
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
257
            # contains multiple channels inside. Even though they are resolved per train,
258
            # they come in pulse format, so the extra values are sliced and individual channels are
259
            # created and appended to the list
260
            if channel == "dldAux":
1✔
261
                aux_channels = self._config["channels"]["dldAux"]["dldAuxChannels"].items()
1✔
262
                for name, slice_aux in aux_channels:
1✔
263
                    series.append(pd.Series(dataset[: key.size, slice_aux], index, name=name))
1✔
264
            else:
265
                series.append(pd.Series(dataset, index, name=channel))
1✔
266
        # All the channels are concatenated to a single DataFrame
267
        return pd.concat(series, axis=1)
1✔
268

269
    def validate_channel_keys(self) -> None:
1✔
270
        """
271
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
272

273
        Raises:
274
            KeyError: If the index or dataset keys do not exist in the file.
275
        """
276
        for channel in self._config["channels"]:
1✔
277
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
278
            if index_key not in self.h5_file:
1✔
279
                raise KeyError(f"pd.Index key '{index_key}' doesn't exist in the file.")
1✔
280
            if dataset_key not in self.h5_file:
1✔
UNCOV
281
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
282

283
    @property
1✔
284
    def df(self) -> pd.DataFrame:
1✔
285
        """
286
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
287
        returning a single dataframe.
288

289
        Returns:
290
            pd.DataFrame: The combined pandas DataFrame.
291
        """
292

293
        self.validate_channel_keys()
1✔
294
        # been tested with merge, join and concat
295
        # concat offers best performance, almost 3 times faster
296
        return pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc