• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9991515592

18 Jul 2024 12:29PM UTC coverage: 92.722% (+0.04%) from 92.682%
9991515592

Pull #479

github

zain-sohail
roll back to iterations
Pull Request #479: Flash minor changes (Merge to #469)

92 of 93 new or added lines in 9 files covered. (98.92%)

5 existing lines in 3 files now uncovered.

7071 of 7626 relevant lines covered (92.72%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.81
/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16

17

18
class DataFrameCreator:
1✔
19
    """
20
    A class for creating pandas DataFrames from an HDF5 file.
21

22
    Attributes:
23
        h5_file (h5py.File): The HDF5 file object.
24
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
25
        _config (dict): The configuration dictionary for the DataFrame.
26
    """
27

28
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
29
        """
30
        Initializes the DataFrameCreator class.
31

32
        Args:
33
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
34
            h5_path (Path): Path to the h5 file.
35
        """
36
        self.h5_file = h5py.File(h5_path, "r")
1✔
37
        self.multi_index = get_channels(index=True)
1✔
38
        self._config = config_dataframe
1✔
39

40
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
41
        """
42
        Checks if 'index_key' and 'dataset_key' exists and returns that.
43

44
        Args:
45
            channel (str): The name of the channel.
46

47
        Returns:
48
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
49

50
        Raises:
51
            ValueError: If 'index_key' and 'dataset_key' are not provided.
52
        """
53
        channel_config = self._config["channels"][channel]
1✔
54
        group_err = ""
1✔
55
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
56
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
57
        elif "group_name" in channel_config:
1✔
58
            group_err = "'group_name' is no longer supported."
×
59
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
60
        raise ValueError(error)
1✔
61

62
    def get_dataset_array(
1✔
63
        self,
64
        channel: str,
65
        slice_: bool = False,
66
    ) -> tuple[pd.Index, h5py.Dataset]:
67
        """
68
        Returns a numpy array for a given channel name.
69

70
        Args:
71
            channel (str): The name of the channel.
72
            slice_ (bool): If True, applies slicing on the dataset.
73

74
        Returns:
75
            tuple[pd.Index, h5py.Dataset]: A tuple containing the train ID
76
            pd.Index and the numpy array for the channel's data.
77
        """
78
        # Get the data from the necessary h5 file and channel
79
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
80

81
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
82
        dataset = self.h5_file[dataset_key]
1✔
83

84
        if slice_:
1✔
85
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
86
            if slice_index is not None:
1✔
87
                dataset = np.take(dataset, slice_index, axis=1)
1✔
88
        # If np_array is size zero, fill with NaNs
89
        if dataset.shape[0] == 0:
1✔
90
            # Fill the np_array with NaN values of the same shape as train_id
UNCOV
91
            dataset = np.full_like(key, np.nan, dtype=np.double)
×
92

93
        return key, dataset
1✔
94

95
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
96
        """
97
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
98
        sorting and electron counting within each pulse.
99

100
        Args:
101
            offset (int): The offset value.
102

103
        Returns:
104
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
105
            the indexer.
106
        """
107
        # Get the pulse_dataset and the train_index
108
        train_index, pulse_dataset = self.get_dataset_array("pulseId", slice_=True)
1✔
109
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
110
        # the daq has an offset so no pulses are missed. This offset is subtracted here
111
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
112
        # Here train_index is repeated to match the size of pulses
113
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
114
        # A pulse resolved multi-index is finally created.
115
        # Since there can be NaN pulses, those are dropped
116
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
117

118
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
119
        # The indexer is also returned to sort the data in df_electron
120
        indexer = slice(None)
1✔
121
        if not pulse_index.is_monotonic_increasing:
1✔
122
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
123

124
        # In the data, to signify different electrons, pulse_index is repeated by
125
        # the number of electrons in each pulse. Here the values are counted
126
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
127
        # Now we resolve each pulse to its electrons
128
        electron_index = np.concatenate(
1✔
129
            [np.arange(count, dtype="uint16") for count in electron_counts],
130
        )
131

132
        # Final multi-index constructed here
133
        index = pd.MultiIndex.from_arrays(
1✔
134
            (
135
                pulse_index.get_level_values(0),
136
                pulse_index.get_level_values(1).astype(int),
137
                electron_index,
138
            ),
139
            names=self.multi_index,
140
        )
141
        return index, indexer
1✔
142

143
    @property
1✔
144
    def df_electron(self) -> pd.DataFrame:
1✔
145
        """
146
        Returns a pandas DataFrame for channel names of type [per electron].
147

148
        Returns:
149
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
150
        """
151
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
152
        # Here we get the multi-index and the indexer to sort the data
153
        index, indexer = self.pulse_index(offset)
1✔
154

155
        # Get the relevant channels and their slice index
156
        channels = get_channels(self._config, "per_electron")
1✔
157
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
158

159
        # First checking if dataset keys are the same for all channels
160
        # because DLD at FLASH stores all channels in the same h5 dataset
161
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
162
        # Gives a true if all keys are the same
163
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
164

165
        # If all dataset keys are the same, we only need to load the dataset once and slice
166
        # the appropriate columns. This is much faster than loading the same dataset multiple times
167
        if all_keys_same:
1✔
168
            _, dataset = self.get_dataset_array(channels[0])
1✔
169
            data_dict = {
1✔
170
                channel: dataset[:, slice_, :].ravel()
171
                for channel, slice_ in zip(channels, slice_index)
172
            }
173
            dataframe = pd.DataFrame(data_dict)
1✔
174
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
175
        else:
176
            series = {
1✔
177
                channel: pd.Series(self.get_dataset_array(channel, slice_=True)[1].ravel())
178
                for channel in channels
179
            }
UNCOV
180
            dataframe = pd.concat(series, axis=1)
×
181

182
        # after offset, the negative pulse values are dropped as they are not valid
183
        drop_vals = np.arange(-offset, 0)
1✔
184

185
        # Few things happen here:
186
        # Drop all NaN values like while creating the multiindex
187
        # if necessary, the data is sorted with [indexer]
188
        # pd.MultiIndex is set
189
        # Finally, the offset values are dropped
190
        return (
1✔
191
            dataframe.dropna()
192
            .iloc[indexer]
193
            .set_index(index)
194
            .drop(index=drop_vals, level="pulseId", errors="ignore")
195
        )
196

197
    @property
1✔
198
    def df_pulse(self) -> pd.DataFrame:
1✔
199
        """
200
        Returns a pandas DataFrame for given channel names of type [per pulse].
201

202
        Returns:
203
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
204
        """
205
        series = []
1✔
206
        # Get the relevant channel names
207
        channels = get_channels(self._config, "per_pulse")
1✔
208
        # check if dldAux is in the channels and raise error if so
209
        if self._config.get("aux_alias", "dldAux") in channels:
1✔
NEW
210
            raise ValueError(
×
211
                "dldAux is a 'per_train' channel. "
212
                "Please choose 'per_train' as the format for dldAux.",
213
            )
214
        # For each channel, a pd.Series is created and appended to the list
215
        for channel in channels:
1✔
216
            # train_index and (sliced) data is returned
217
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
218
            # Electron resolved MultiIndex is created. Since this is pulse data,
219
            # the electron index is always 0
220
            index = pd.MultiIndex.from_product(
1✔
221
                (key, np.arange(0, dataset.shape[1]), [0]),
222
                names=self.multi_index,
223
            )
224
            # The dataset is opened and converted to numpy array by [()]
225
            # and flattened to resolve per pulse
226
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
227
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
228
            # so we remove all 0 values from the series
229
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
230

231
        # All the channels are concatenated to a single DataFrame
232
        return pd.concat(
1✔
233
            series,
234
            axis=1,
235
        )
236

237
    @property
1✔
238
    def df_train(self) -> pd.DataFrame:
1✔
239
        """
240
        Returns a pandas DataFrame for given channel names of type [per train].
241

242
        Returns:
243
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
244
        """
245
        series = []
1✔
246
        # Get the relevant channel names
247
        channels = get_channels(self._config, "per_train")
1✔
248
        # For each channel, a pd.Series is created and appended to the list
249
        for channel in channels:
1✔
250
            # train_index and (sliced) data is returned
251
            key, dataset = self.get_dataset_array(channel, slice_=True)
1✔
252
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
253
            # the electron and pulse index is always 0
254
            index = pd.MultiIndex.from_product(
1✔
255
                (key, [0], [0]),
256
                names=self.multi_index,
257
            )
258
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
259
            # contains multiple channels inside. Even though they are resolved per train,
260
            # they come in pulse format, so the extra values are sliced and individual channels are
261
            # created and appended to the list
262
            aux_alias = self._config.get("aux_alias", "dldAux")
1✔
263
            sub_channels = self._config.get("aux_subchannels_alias", "dldAuxChannels")
1✔
264
            if channel == "dldAux":
1✔
265
                aux_channels = self._config["channels"][aux_alias][sub_channels].items()
1✔
266
                for name, values in aux_channels:
1✔
267
                    series.append(
1✔
268
                        pd.Series(
269
                            dataset[: key.size, values["slice"]],
270
                            index,
271
                            name=name,
272
                        ),
273
                    )
274
            else:
275
                series.append(pd.Series(dataset, index, name=channel))
1✔
276
        # All the channels are concatenated to a single DataFrame
277
        return pd.concat(series, axis=1)
1✔
278

279
    def validate_channel_keys(self) -> None:
1✔
280
        """
281
        Validates if the index and dataset keys for all channels in config exist in the h5 file.
282

283
        Raises:
284
            KeyError: If the index or dataset keys do not exist in the file.
285
        """
286
        for channel in self._config["channels"]:
1✔
287
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
288
            if index_key not in self.h5_file:
1✔
289
                raise KeyError(f"pd.Index key '{index_key}' doesn't exist in the file.")
1✔
290
            if dataset_key not in self.h5_file:
1✔
UNCOV
291
                raise KeyError(f"Dataset key '{dataset_key}' doesn't exist in the file.")
×
292

293
    @property
1✔
294
    def df(self) -> pd.DataFrame:
1✔
295
        """
296
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
297
        returning a single dataframe.
298

299
        Returns:
300
            pd.DataFrame: The combined pandas DataFrame.
301
        """
302

303
        self.validate_channel_keys()
1✔
304
        # been tested with merge, join and concat
305
        # concat offers best performance, almost 3 times faster
306
        return pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc