• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12876831595

20 Jan 2025 10:55PM UTC coverage: 92.174% (+0.4%) from 91.801%
12876831595

Pull #437

github

web-flow
Merge pull request #555 from OpenCOMPES/config_renaming

use user platformdir also for user config
Pull Request #437: Upgrade to V1

2235 of 2372 new or added lines in 53 files covered. (94.22%)

4 existing lines in 1 file now uncovered.

7703 of 8357 relevant lines covered (92.17%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/src/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from pathlib import Path
1✔
10

11
import h5py
1✔
12
import numpy as np
1✔
13
import pandas as pd
1✔
14

15
from sed.loader.flash.utils import get_channels
1✔
16
from sed.loader.flash.utils import InvalidFileError
1✔
17
from sed.core.logging import setup_logging
1✔
18

19
logger = setup_logging("flash_dataframe_creator")
1✔
20

21

22
class DataFrameCreator:
1✔
23
    """
24
    A class for creating pandas DataFrames from an HDF5 file.
25

26
    Attributes:
27
        h5_file (h5py.File): The HDF5 file object.
28
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
29
        _config (dict): The configuration dictionary for the DataFrame.
30
    """
31

32
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
33
        """
34
        Initializes the DataFrameCreator class.
35

36
        Args:
37
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
38
            h5_path (Path): Path to the h5 file.
39
        """
40
        logger.debug(f"Initializing DataFrameCreator for file: {h5_path}")
1✔
41
        self.h5_file = h5py.File(h5_path, "r")
1✔
42
        self.multi_index = get_channels(index=True)
1✔
43
        self._config = config_dataframe
1✔
44

45
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
46
        """
47
        Checks if 'index_key' and 'dataset_key' exists and returns that.
48

49
        Args:
50
            channel (str): The name of the channel.
51

52
        Returns:
53
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
54

55
        Raises:
56
            ValueError: If 'index_key' and 'dataset_key' are not provided.
57
        """
58
        channel_config = self._config["channels"][channel]
1✔
59
        group_err = ""
1✔
60
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
61
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
62
        elif "group_name" in channel_config:
1✔
NEW
63
            group_err = "'group_name' is no longer supported."
×
64
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
65
        raise ValueError(error)
1✔
66

67
    def get_dataset_array(
1✔
68
        self,
69
        channel: str,
70
        slice_: bool = True,
71
    ) -> tuple[pd.Index, np.ndarray | h5py.Dataset]:
72
        """
73
        Returns a numpy array for a given channel name.
74

75
        Args:
76
            channel (str): The name of the channel.
77
            slice_ (bool): Applies slicing on the dataset. Default is True.
78

79
        Returns:
80
            tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID
81
            pd.Index and the channel's data.
82
        """
83
        logger.debug(f"Getting dataset array for channel: {channel}")
1✔
84
        # Get the data from the necessary h5 file and channel
85
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
86

87
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
88
        dataset = self.h5_file[dataset_key]
1✔
89

90
        if slice_:
1✔
91
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
92
            if slice_index is not None:
1✔
93
                logger.debug(f"Slicing dataset with index: {slice_index}")
1✔
94
                dataset = np.take(dataset, slice_index, axis=1)
1✔
95
        # If np_array is size zero, fill with NaNs, fill it with NaN values
96
        # of the same shape as index
97
        if dataset.shape[0] == 0:
1✔
98
            dataset = np.full_like(key, np.nan, dtype=np.double)
1✔
99

100
        return key, dataset
1✔
101

102
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
103
        """
104
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
105
        sorting and electron counting within each pulse.
106

107
        Args:
108
            offset (int): The offset value.
109

110
        Returns:
111
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
112
            the indexer.
113
        """
114
        # Get the pulse_dataset and the train_index
115
        train_index, pulse_dataset = self.get_dataset_array("pulseId")
1✔
116
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
117
        # the daq has an offset so no pulses are missed. This offset is subtracted here
118
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
119
        # Here train_index is repeated to match the size of pulses
120
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
121
        # A pulse resolved multi-index is finally created.
122
        # Since there can be NaN pulses, those are dropped
123
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
124

125
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
126
        # The indexer is also returned to sort the data in df_electron
127
        indexer = slice(None)
1✔
128
        if not pulse_index.is_monotonic_increasing:
1✔
NEW
129
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
130

131
        # In the data, to signify different electrons, pulse_index is repeated by
132
        # the number of electrons in each pulse. Here the values are counted
133
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
134
        # Now we resolve each pulse to its electrons
135
        electron_index = np.concatenate(
1✔
136
            [np.arange(count, dtype="uint16") for count in electron_counts],
137
        )
138

139
        # Final multi-index constructed here
140
        index = pd.MultiIndex.from_arrays(
1✔
141
            (
142
                pulse_index.get_level_values(0),
143
                pulse_index.get_level_values(1).astype(int),
144
                electron_index,
145
            ),
146
            names=self.multi_index,
147
        )
148
        return index, indexer
1✔
149

150
    @property
1✔
151
    def df_electron(self) -> pd.DataFrame:
1✔
152
        """
153
        Returns a pandas DataFrame for channel names of type [per electron].
154

155
        Returns:
156
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
157
        """
158
        # Get the relevant channels and their slice index
159
        channels = get_channels(self._config, "per_electron")
1✔
160
        if channels == []:
1✔
NEW
161
            return pd.DataFrame()
×
162
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
163

164
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
165
        # Here we get the multi-index and the indexer to sort the data
166
        index, indexer = self.pulse_index(offset)
1✔
167

168
        # First checking if dataset keys are the same for all channels
169
        # because DLD at FLASH stores all channels in the same h5 dataset
170
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
171
        # Gives a true if all keys are the same
172
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
173

174
        # If all dataset keys are the same, we only need to load the dataset once and slice
175
        # the appropriate columns. This is much faster than loading the same dataset multiple times
176
        if all_keys_same:
1✔
177
            _, dataset = self.get_dataset_array(channels[0], slice_=False)
1✔
178
            data_dict = {
1✔
179
                channel: dataset[:, idx, :].ravel() for channel, idx in zip(channels, slice_index)
180
            }
181
            dataframe = pd.DataFrame(data_dict)
1✔
182
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
183
        else:
184
            series = {
1✔
185
                channel: pd.Series(self.get_dataset_array(channel)[1].ravel())
186
                for channel in channels
187
            }
NEW
188
            dataframe = pd.concat(series, axis=1)
×
189

190
        # NaN values dropped, data sorted with [indexer] if necessary, and the MultiIndex is set
191
        return dataframe.dropna().iloc[indexer].set_index(index)
1✔
192

193
    @property
1✔
194
    def df_pulse(self) -> pd.DataFrame:
1✔
195
        """
196
        Returns a pandas DataFrame for given channel names of type [per pulse].
197

198
        Returns:
199
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
200
        """
201
        series = []
1✔
202
        # Get the relevant channel names
203
        channels = get_channels(self._config, "per_pulse")
1✔
204
        if channels == []:
1✔
NEW
205
            return pd.DataFrame()
×
206
        # For each channel, a pd.Series is created and appended to the list
207
        for channel in channels:
1✔
208
            # train_index and (sliced) data is returned
209
            key, dataset = self.get_dataset_array(channel)
1✔
210
            # Electron resolved MultiIndex is created. Since this is pulse data,
211
            # the electron index is always 0
212
            index = pd.MultiIndex.from_product(
1✔
213
                (key, np.arange(0, dataset.shape[1]), [0]),
214
                names=self.multi_index,
215
            )
216
            # The dataset is opened and converted to numpy array by [()]
217
            # and flattened to resolve per pulse
218
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
219
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
220
            # so we remove all 0 values from the series
221
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
222

223
        # All the channels are concatenated to a single DataFrame
224
        return pd.concat(
1✔
225
            series,
226
            axis=1,
227
        )
228

229
    @property
1✔
230
    def df_train(self) -> pd.DataFrame:
1✔
231
        """
232
        Returns a pandas DataFrame for given channel names of type [per train].
233

234
        Returns:
235
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
236
        """
237
        series = []
1✔
238
        # Get the relevant channel names
239
        channels = get_channels(self._config, "per_train")
1✔
240
        # For each channel, a pd.Series is created and appended to the list
241
        for channel in channels:
1✔
242
            # train_index and (sliced) data is returned
243
            key, dataset = self.get_dataset_array(channel)
1✔
244
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
245
            # the electron and pulse index is always 0
246
            index = pd.MultiIndex.from_product(
1✔
247
                (key, [0], [0]),
248
                names=self.multi_index,
249
            )
250
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
251
            # contains multiple channels inside. Even though they are resolved per train,
252
            # they come in pulse format, so the extra values are sliced and individual channels are
253
            # created and appended to the list
254
            aux_alias = self._config.get("aux_alias", "dldAux")
1✔
255
            if channel == aux_alias:
1✔
256
                try:
1✔
257
                    sub_channels = self._config["channels"][aux_alias]["sub_channels"]
1✔
NEW
258
                except KeyError:
×
NEW
259
                    raise KeyError(
×
260
                        f"Provide 'subChannels' for auxiliary channel '{aux_alias}'.",
261
                    )
262
                for name, values in sub_channels.items():
1✔
263
                    series.append(
1✔
264
                        pd.Series(
265
                            dataset[: key.size, values["slice"]],
266
                            index,
267
                            name=name,
268
                        ),
269
                    )
270
            else:
271
                series.append(pd.Series(dataset, index, name=channel))
1✔
272
        # All the channels are concatenated to a single DataFrame
273
        return pd.concat(series, axis=1)
1✔
274

275
    def validate_channel_keys(self) -> None:
1✔
276
        """
277
        Validates if the index and dataset keys for all channels in the config exist in the h5 file.
278

279
        Raises:
280
            InvalidFileError: If the index or dataset keys are missing in the h5 file.
281
        """
282
        invalid_channels = []
1✔
283
        for channel in self._config["channels"]:
1✔
284
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
285
            if index_key not in self.h5_file or dataset_key not in self.h5_file:
1✔
286
                invalid_channels.append(channel)
1✔
287

288
        if invalid_channels:
1✔
289
            raise InvalidFileError(invalid_channels)
1✔
290

291
    @property
1✔
292
    def df(self) -> pd.DataFrame:
1✔
293
        """
294
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
295
        returning a single dataframe.
296

297
        Returns:
298
            pd.DataFrame: The combined pandas DataFrame.
299
        """
300
        logger.debug("Creating combined DataFrame")
1✔
301
        self.validate_channel_keys()
1✔
302

303
        df = pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
304
        logger.debug(f"Created DataFrame with shape: {df.shape}")
1✔
305

306
        # Filter negative pulse values
307
        df = df[df.index.get_level_values("pulseId") >= 0]
1✔
308
        logger.debug(f"Filtered DataFrame shape: {df.shape}")
1✔
309

310
        return df
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc