• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12735296384

12 Jan 2025 04:37PM UTC coverage: 92.185% (+0.4%) from 91.801%
12735296384

Pull #437

github

web-flow
Merge pull request #541 from OpenCOMPES/flash_normalization_fixes

remove empty pulses from timed dataframe, and bring back old behavior
Pull Request #437: Upgrade to V1

2093 of 2215 new or added lines in 53 files covered. (94.49%)

4 existing lines in 1 file now uncovered.

7573 of 8215 relevant lines covered (92.19%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.2
/src/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16
from sed.loader.flash.utils import InvalidFileError
1✔
17

18

19
class DataFrameCreator:
1✔
20
    """
21
    A class for creating pandas DataFrames from an HDF5 file.
22

23
    Attributes:
24
        h5_file (h5py.File): The HDF5 file object.
25
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
26
        _config (dict): The configuration dictionary for the DataFrame.
27
    """
28

29
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
30
        """
31
        Initializes the DataFrameCreator class.
32

33
        Args:
34
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
35
            h5_path (Path): Path to the h5 file.
36
        """
37
        self.h5_file = h5py.File(h5_path, "r")
1✔
38
        self.multi_index = get_channels(index=True)
1✔
39
        self._config = config_dataframe
1✔
40

41
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
42
        """
43
        Checks if 'index_key' and 'dataset_key' exists and returns that.
44

45
        Args:
46
            channel (str): The name of the channel.
47

48
        Returns:
49
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
50

51
        Raises:
52
            ValueError: If 'index_key' and 'dataset_key' are not provided.
53
        """
54
        channel_config = self._config["channels"][channel]
1✔
55
        group_err = ""
1✔
56
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
57
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
58
        elif "group_name" in channel_config:
1✔
NEW
59
            group_err = "'group_name' is no longer supported."
×
60
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
61
        raise ValueError(error)
1✔
62

63
    def get_dataset_array(
1✔
64
        self,
65
        channel: str,
66
        slice_: bool = True,
67
    ) -> tuple[pd.Index, np.ndarray | h5py.Dataset]:
68
        """
69
        Returns a numpy array for a given channel name.
70

71
        Args:
72
            channel (str): The name of the channel.
73
            slice_ (bool): Applies slicing on the dataset. Default is True.
74

75
        Returns:
76
            tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID
77
            pd.Index and the channel's data.
78
        """
79
        # Get the data from the necessary h5 file and channel
80
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
81

82
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
83
        dataset = self.h5_file[dataset_key]
1✔
84

85
        if slice_:
1✔
86
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
87
            if slice_index is not None:
1✔
88
                dataset = np.take(dataset, slice_index, axis=1)
1✔
89
        # If np_array is size zero, fill with NaNs, fill it with NaN values
90
        # of the same shape as index
91
        if dataset.shape[0] == 0:
1✔
92
            dataset = np.full_like(key, np.nan, dtype=np.double)
1✔
93

94
        return key, dataset
1✔
95

96
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
97
        """
98
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
99
        sorting and electron counting within each pulse.
100

101
        Args:
102
            offset (int): The offset value.
103

104
        Returns:
105
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
106
            the indexer.
107
        """
108
        # Get the pulse_dataset and the train_index
109
        train_index, pulse_dataset = self.get_dataset_array("pulseId")
1✔
110
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
111
        # the daq has an offset so no pulses are missed. This offset is subtracted here
112
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
113
        # Here train_index is repeated to match the size of pulses
114
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
115
        # A pulse resolved multi-index is finally created.
116
        # Since there can be NaN pulses, those are dropped
117
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
118

119
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
120
        # The indexer is also returned to sort the data in df_electron
121
        indexer = slice(None)
1✔
122
        if not pulse_index.is_monotonic_increasing:
1✔
NEW
123
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
124

125
        # In the data, to signify different electrons, pulse_index is repeated by
126
        # the number of electrons in each pulse. Here the values are counted
127
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
128
        # Now we resolve each pulse to its electrons
129
        electron_index = np.concatenate(
1✔
130
            [np.arange(count, dtype="uint16") for count in electron_counts],
131
        )
132

133
        # Final multi-index constructed here
134
        index = pd.MultiIndex.from_arrays(
1✔
135
            (
136
                pulse_index.get_level_values(0),
137
                pulse_index.get_level_values(1).astype(int),
138
                electron_index,
139
            ),
140
            names=self.multi_index,
141
        )
142
        return index, indexer
1✔
143

144
    @property
1✔
145
    def df_electron(self) -> pd.DataFrame:
1✔
146
        """
147
        Returns a pandas DataFrame for channel names of type [per electron].
148

149
        Returns:
150
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
151
        """
152
        # Get the relevant channels and their slice index
153
        channels = get_channels(self._config, "per_electron")
1✔
154
        if channels == []:
1✔
NEW
155
            return pd.DataFrame()
×
156
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
157

158
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
159
        # Here we get the multi-index and the indexer to sort the data
160
        index, indexer = self.pulse_index(offset)
1✔
161

162
        # First checking if dataset keys are the same for all channels
163
        # because DLD at FLASH stores all channels in the same h5 dataset
164
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
165
        # Gives a true if all keys are the same
166
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
167

168
        # If all dataset keys are the same, we only need to load the dataset once and slice
169
        # the appropriate columns. This is much faster than loading the same dataset multiple times
170
        if all_keys_same:
1✔
171
            _, dataset = self.get_dataset_array(channels[0], slice_=False)
1✔
172
            data_dict = {
1✔
173
                channel: dataset[:, idx, :].ravel() for channel, idx in zip(channels, slice_index)
174
            }
175
            dataframe = pd.DataFrame(data_dict)
1✔
176
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
177
        else:
178
            series = {
1✔
179
                channel: pd.Series(self.get_dataset_array(channel)[1].ravel())
180
                for channel in channels
181
            }
NEW
182
            dataframe = pd.concat(series, axis=1)
×
183

184
        # NaN values dropped, data sorted with [indexer] if necessary, and the MultiIndex is set
185
        return dataframe.dropna().iloc[indexer].set_index(index)
1✔
186

187
    @property
1✔
188
    def df_pulse(self) -> pd.DataFrame:
1✔
189
        """
190
        Returns a pandas DataFrame for given channel names of type [per pulse].
191

192
        Returns:
193
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
194
        """
195
        series = []
1✔
196
        # Get the relevant channel names
197
        channels = get_channels(self._config, "per_pulse")
1✔
198
        if channels == []:
1✔
NEW
199
            return pd.DataFrame()
×
200
        # For each channel, a pd.Series is created and appended to the list
201
        for channel in channels:
1✔
202
            # train_index and (sliced) data is returned
203
            key, dataset = self.get_dataset_array(channel)
1✔
204
            # Electron resolved MultiIndex is created. Since this is pulse data,
205
            # the electron index is always 0
206
            index = pd.MultiIndex.from_product(
1✔
207
                (key, np.arange(0, dataset.shape[1]), [0]),
208
                names=self.multi_index,
209
            )
210
            # The dataset is opened and converted to numpy array by [()]
211
            # and flattened to resolve per pulse
212
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
213
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
214
            # so we remove all 0 values from the series
215
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
216

217
        # All the channels are concatenated to a single DataFrame
218
        return pd.concat(
1✔
219
            series,
220
            axis=1,
221
        )
222

223
    @property
1✔
224
    def df_train(self) -> pd.DataFrame:
1✔
225
        """
226
        Returns a pandas DataFrame for given channel names of type [per train].
227

228
        Returns:
229
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
230
        """
231
        series = []
1✔
232
        # Get the relevant channel names
233
        channels = get_channels(self._config, "per_train")
1✔
234
        # For each channel, a pd.Series is created and appended to the list
235
        for channel in channels:
1✔
236
            # train_index and (sliced) data is returned
237
            key, dataset = self.get_dataset_array(channel)
1✔
238
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
239
            # the electron and pulse index is always 0
240
            index = pd.MultiIndex.from_product(
1✔
241
                (key, [0], [0]),
242
                names=self.multi_index,
243
            )
244
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
245
            # contains multiple channels inside. Even though they are resolved per train,
246
            # they come in pulse format, so the extra values are sliced and individual channels are
247
            # created and appended to the list
248
            aux_alias = self._config.get("aux_alias", "dldAux")
1✔
249
            if channel == aux_alias:
1✔
250
                try:
1✔
251
                    sub_channels = self._config["channels"][aux_alias]["sub_channels"]
1✔
NEW
252
                except KeyError:
×
NEW
253
                    raise KeyError(
×
254
                        f"Provide 'subChannels' for auxiliary channel '{aux_alias}'.",
255
                    )
256
                for name, values in sub_channels.items():
1✔
257
                    series.append(
1✔
258
                        pd.Series(
259
                            dataset[: key.size, values["slice"]],
260
                            index,
261
                            name=name,
262
                        ),
263
                    )
264
            else:
265
                series.append(pd.Series(dataset, index, name=channel))
1✔
266
        # All the channels are concatenated to a single DataFrame
267
        return pd.concat(series, axis=1)
1✔
268

269
    def validate_channel_keys(self) -> None:
1✔
270
        """
271
        Validates if the index and dataset keys for all channels in the config exist in the h5 file.
272

273
        Raises:
274
            InvalidFileError: If the index or dataset keys are missing in the h5 file.
275
        """
276
        invalid_channels = []
1✔
277
        for channel in self._config["channels"]:
1✔
278
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
279
            if index_key not in self.h5_file or dataset_key not in self.h5_file:
1✔
280
                invalid_channels.append(channel)
1✔
281

282
        if invalid_channels:
1✔
283
            raise InvalidFileError(invalid_channels)
1✔
284

285
    @property
1✔
286
    def df(self) -> pd.DataFrame:
1✔
287
        """
288
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
289
        returning a single dataframe.
290

291
        Returns:
292
            pd.DataFrame: The combined pandas DataFrame.
293
        """
294

295
        self.validate_channel_keys()
1✔
296
        # been tested with merge, join and concat
297
        # concat offers best performance, almost 3 times faster
298
        df = pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
299
        # all the negative pulse values are dropped as they are invalid
300
        return df[df.index.get_level_values("pulseId") >= 0]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc