• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 13419398366

19 Feb 2025 06:09PM CUT coverage: 91.6% (-0.6%) from 92.174%
13419398366

Pull #534

github

web-flow
Merge df78f6964 into 6b927a2db
Pull Request #534: Hextof lab loader

71 of 124 new or added lines in 7 files covered. (57.26%)

3 existing lines in 1 file now uncovered.

7731 of 8440 relevant lines covered (91.6%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.66
/src/sed/loader/flash/dataframe.py
1
"""
2
This module creates pandas DataFrames from HDF5 files for different levels of data granularity
3
[per electron, per pulse, and per train]. It efficiently handles concatenation of data from
4
various channels within the HDF5 file, making use of the structured nature data to optimize
5
join operations. This approach significantly enhances performance compared to earlier.
6
"""
7
from __future__ import annotations
1✔
8

9
from abc import ABC
1✔
10
from abc import abstractmethod
1✔
11
from pathlib import Path
1✔
12

13
import h5py
1✔
14
import numpy as np
1✔
15
import pandas as pd
1✔
16

17
from sed.core.logging import setup_logging
1✔
18
from sed.loader.flash.utils import get_channels
1✔
19
from sed.loader.flash.utils import InvalidFileError
1✔
20

21
logger = setup_logging("flash_dataframe_creator")
1✔
22

23

24
class BaseDataFrameCreator(ABC):
1✔
25
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
26
        """
27
        Initializes the DataFrameCreator class.
28

29
        Args:
30
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
31
            h5_path (Path): Path to the h5 file.
32
        """
NEW
33
        pass
×
34

35
    @property
1✔
36
    @abstractmethod
1✔
37
    def df(self) -> pd.DataFrame:
1✔
38
        """DataFrame property that must be implemented by subclasses."""
NEW
39
        pass
×
40

41
    def validate_channel_keys(self) -> None:
1✔
42
        """
43
        Validates if the index and dataset keys for all channels in the config exist in the h5 file.
44

45
        Raises:
46
            InvalidFileError: If the index or dataset keys are missing in the h5 file.
47
        """
NEW
48
        pass
×
49

50

51
class DataFrameCreator(BaseDataFrameCreator):
1✔
52
    """
53
    A class for creating pandas DataFrames from an HDF5 file for FLASH data.
54

55
    Attributes:
56
        h5_file (h5py.File): The HDF5 file object.
57
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
58
        _config (dict): The configuration dictionary for the DataFrame.
59
    """
60

61
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
62
        """
63
        Initializes the DataFrameCreator class.
64

65
        Args:
66
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
67
            h5_path (Path): Path to the h5 file.
68
        """
69
        logger.debug(f"Initializing DataFrameCreator for file: {h5_path}")
1✔
70
        self.h5_file = h5py.File(h5_path, "r")
1✔
71
        self._config = config_dataframe
1✔
72
        self.multi_index = get_channels(config_dataframe, index=True)
1✔
73

74
    def get_index_dataset_key(self, channel: str) -> tuple[str, str]:
1✔
75
        """
76
        Checks if 'index_key' and 'dataset_key' exists and returns that.
77

78
        Args:
79
            channel (str): The name of the channel.
80

81
        Returns:
82
            tuple[str, str]: Outputs a tuple of 'index_key' and 'dataset_key'.
83

84
        Raises:
85
            ValueError: If 'index_key' and 'dataset_key' are not provided.
86
        """
87
        channel_config = self._config["channels"][channel]
1✔
88
        group_err = ""
1✔
89
        if "index_key" in channel_config and "dataset_key" in channel_config:
1✔
90
            return channel_config["index_key"], channel_config["dataset_key"]
1✔
91
        elif "group_name" in channel_config:
1✔
92
            group_err = "'group_name' is no longer supported."
×
93
        error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'."
1✔
94
        raise ValueError(error)
1✔
95

96
    def get_dataset_array(
1✔
97
        self,
98
        channel: str,
99
        slice_: bool = True,
100
    ) -> tuple[pd.Index, np.ndarray | h5py.Dataset]:
101
        """
102
        Returns a numpy array for a given channel name.
103

104
        Args:
105
            channel (str): The name of the channel.
106
            slice_ (bool): Applies slicing on the dataset. Default is True.
107

108
        Returns:
109
            tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID
110
            pd.Index and the channel's data.
111
        """
112
        logger.debug(f"Getting dataset array for channel: {channel}")
1✔
113
        # Get the data from the necessary h5 file and channel
114
        index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
115

116
        key = pd.Index(self.h5_file[index_key], name="trainId")  # macrobunch
1✔
117
        dataset = self.h5_file[dataset_key]
1✔
118

119
        if slice_:
1✔
120
            slice_index = self._config["channels"][channel].get("slice", None)
1✔
121
            if slice_index is not None:
1✔
122
                logger.debug(f"Slicing dataset with index: {slice_index}")
1✔
123
                dataset = np.take(dataset, slice_index, axis=1)
1✔
124
        # If np_array is size zero, fill with NaNs, fill it with NaN values
125
        # of the same shape as index
126
        if dataset.shape[0] == 0:
1✔
127
            dataset = np.full_like(key, np.nan, dtype=np.double)
1✔
128

129
        return key, dataset
1✔
130

131
    def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]:
1✔
132
        """
133
        Creates a multi-level index that combines train IDs and pulse IDs, and handles
134
        sorting and electron counting within each pulse.
135

136
        Args:
137
            offset (int): The offset value.
138

139
        Returns:
140
            tuple[pd.MultiIndex, np.ndarray]: A tuple containing the computed pd.MultiIndex and
141
            the indexer.
142
        """
143
        # Get the pulse_dataset and the train_index
144
        train_index, pulse_dataset = self.get_dataset_array("pulseId")
1✔
145
        # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened
146
        # the daq has an offset so no pulses are missed. This offset is subtracted here
147
        pulse_ravel = pulse_dataset.ravel() - offset
1✔
148
        # Here train_index is repeated to match the size of pulses
149
        train_index_repeated = np.repeat(train_index, pulse_dataset.shape[1])
1✔
150
        # A pulse resolved multi-index is finally created.
151
        # Since there can be NaN pulses, those are dropped
152
        pulse_index = pd.MultiIndex.from_arrays((train_index_repeated, pulse_ravel)).dropna()
1✔
153

154
        # Sometimes the pulse_index are not monotonic, so we might need to sort them
155
        # The indexer is also returned to sort the data in df_electron
156
        indexer = slice(None)
1✔
157
        if not pulse_index.is_monotonic_increasing:
1✔
158
            pulse_index, indexer = pulse_index.sort_values(return_indexer=True)
×
159

160
        # In the data, to signify different electrons, pulse_index is repeated by
161
        # the number of electrons in each pulse. Here the values are counted
162
        electron_counts = pulse_index.value_counts(sort=False).values
1✔
163
        # Now we resolve each pulse to its electrons
164
        electron_index = np.concatenate(
1✔
165
            [np.arange(count, dtype="uint16") for count in electron_counts],
166
        )
167

168
        # Final multi-index constructed here
169
        index = pd.MultiIndex.from_arrays(
1✔
170
            (
171
                pulse_index.get_level_values(0),
172
                pulse_index.get_level_values(1).astype(int),
173
                electron_index,
174
            ),
175
            names=self.multi_index,
176
        )
177
        return index, indexer
1✔
178

179
    @property
1✔
180
    def df_electron(self) -> pd.DataFrame:
1✔
181
        """
182
        Returns a pandas DataFrame for channel names of type [per electron].
183

184
        Returns:
185
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
186
        """
187
        # Get the relevant channels and their slice index
188
        channels = get_channels(self._config, "per_electron")
1✔
189
        if channels == []:
1✔
190
            return pd.DataFrame()
×
191
        slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels]
1✔
192

193
        offset = self._config.get("ubid_offset", 5)  # 5 is the default value
1✔
194
        # Here we get the multi-index and the indexer to sort the data
195
        index, indexer = self.pulse_index(offset)
1✔
196

197
        # First checking if dataset keys are the same for all channels
198
        # because DLD at FLASH stores all channels in the same h5 dataset
199
        dataset_keys = [self.get_index_dataset_key(channel)[1] for channel in channels]
1✔
200
        # Gives a true if all keys are the same
201
        all_keys_same = all(key == dataset_keys[0] for key in dataset_keys)
1✔
202

203
        # If all dataset keys are the same, we only need to load the dataset once and slice
204
        # the appropriate columns. This is much faster than loading the same dataset multiple times
205
        if all_keys_same:
1✔
206
            _, dataset = self.get_dataset_array(channels[0], slice_=False)
1✔
207
            data_dict = {
1✔
208
                channel: dataset[:, idx, :].ravel() for channel, idx in zip(channels, slice_index)
209
            }
210
            dataframe = pd.DataFrame(data_dict)
1✔
211
        # In case channels do differ, we create a pd.Series for each channel and concatenate them
212
        else:
213
            series = {
1✔
214
                channel: pd.Series(self.get_dataset_array(channel)[1].ravel())
215
                for channel in channels
216
            }
217
            dataframe = pd.concat(series, axis=1)
×
218

219
        # NaN values dropped, data sorted with [indexer] if necessary, and the MultiIndex is set
220
        return dataframe.dropna().iloc[indexer].set_index(index)
1✔
221

222
    @property
1✔
223
    def df_pulse(self) -> pd.DataFrame:
1✔
224
        """
225
        Returns a pandas DataFrame for given channel names of type [per pulse].
226

227
        Returns:
228
            pd.DataFrame: The pandas DataFrame for the 'per_pulse' channel's data.
229
        """
230
        series = []
1✔
231
        # Get the relevant channel names
232
        channels = get_channels(self._config, "per_pulse")
1✔
233
        if channels == []:
1✔
234
            return pd.DataFrame()
×
235
        # For each channel, a pd.Series is created and appended to the list
236
        for channel in channels:
1✔
237
            # train_index and (sliced) data is returned
238
            key, dataset = self.get_dataset_array(channel)
1✔
239
            # Electron resolved MultiIndex is created. Since this is pulse data,
240
            # the electron index is always 0
241
            index = pd.MultiIndex.from_product(
1✔
242
                (key, np.arange(0, dataset.shape[1]), [0]),
243
                names=self.multi_index,
244
            )
245
            # The dataset is opened and converted to numpy array by [()]
246
            # and flattened to resolve per pulse
247
            channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel)
1✔
248
            # sometimes pulse columns have more pulses than valid ones such as with bam channel
249
            # so we remove all 0 values from the series
250
            series.append(channel_series[channel_series != 0])  # TODO: put this in metadata
1✔
251

252
        # All the channels are concatenated to a single DataFrame
253
        return pd.concat(
1✔
254
            series,
255
            axis=1,
256
        )
257

258
    @property
1✔
259
    def df_train(self) -> pd.DataFrame:
1✔
260
        """
261
        Returns a pandas DataFrame for given channel names of type [per train].
262

263
        Returns:
264
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
265
        """
266
        series = []
1✔
267
        # Get the relevant channel names
268
        channels = get_channels(self._config, "per_train")
1✔
269
        # For each channel, a pd.Series is created and appended to the list
270
        for channel in channels:
1✔
271
            # train_index and (sliced) data is returned
272
            key, dataset = self.get_dataset_array(channel)
1✔
273
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
274
            # the electron and pulse index is always 0
275
            index = pd.MultiIndex.from_product(
1✔
276
                (key, [0], [0]),
277
                names=self.multi_index,
278
            )
279
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
280
            # contains multiple channels inside. Even though they are resolved per train,
281
            # they come in pulse format, so the extra values are sliced and individual channels are
282
            # created and appended to the list
283
            aux_alias = self._config.get("aux_alias", "dldAux")
1✔
284
            if channel == aux_alias:
1✔
285
                try:
1✔
286
                    sub_channels = self._config["channels"][aux_alias]["sub_channels"]
1✔
287
                except KeyError:
×
288
                    raise KeyError(
×
289
                        f"Provide 'subChannels' for auxiliary channel '{aux_alias}'.",
290
                    )
291
                for name, values in sub_channels.items():
1✔
292
                    series.append(
1✔
293
                        pd.Series(
294
                            dataset[: key.size, values["slice"]],
295
                            index,
296
                            name=name,
297
                        ),
298
                    )
299
            else:
300
                series.append(pd.Series(dataset, index, name=channel))
1✔
301
        # All the channels are concatenated to a single DataFrame
302
        return pd.concat(series, axis=1)
1✔
303

304
    def validate_channel_keys(self) -> None:
1✔
305
        """
306
        Validates if the index and dataset keys for all channels in the config exist in the h5 file.
307

308
        Raises:
309
            InvalidFileError: If the index or dataset keys are missing in the h5 file.
310
        """
311
        invalid_channels = []
1✔
312
        for channel in self._config["channels"]:
1✔
313
            index_key, dataset_key = self.get_index_dataset_key(channel)
1✔
314
            if index_key not in self.h5_file or dataset_key not in self.h5_file:
1✔
315
                invalid_channels.append(channel)
1✔
316

317
        if invalid_channels:
1✔
318
            raise InvalidFileError(invalid_channels)
1✔
319

320
    @property
1✔
321
    def df(self) -> pd.DataFrame:
1✔
322
        """
323
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
324
        returning a single dataframe.
325

326
        Returns:
327
            pd.DataFrame: The combined pandas DataFrame.
328
        """
329
        logger.debug("Creating combined DataFrame")
1✔
330
        self.validate_channel_keys()
1✔
331

332
        df = pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index()
1✔
333
        logger.debug(f"Created DataFrame with shape: {df.shape}")
1✔
334

335
        # all the negative pulse values are dropped as they are invalid
336
        df = df[df.index.get_level_values("pulseId") >= 0]
1✔
337
        logger.debug(f"Filtered DataFrame shape: {df.shape}")
1✔
338

339
        return df
1✔
340

341

342
class CFELDataFrameCreator(BaseDataFrameCreator):
1✔
343
    """
344
    A class for creating pandas DataFrames from an HDF5 file for HEXTOF lab data at CFEL.
345

346
    Attributes:
347
        h5_file (h5py.File): The HDF5 file object.
348
        multi_index (pd.MultiIndex): The multi-index structure for the DataFrame.
349
        _config (dict): The configuration dictionary for the DataFrame.
350
    """
351

352
    def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
1✔
353
        """
354
        Initializes the DataFrameCreator class.
355

356
        Args:
357
            config_dataframe (dict): The configuration dictionary with only the dataframe key.
358
            h5_path (Path): Path to the h5 file.
359
        """
NEW
360
        self.h5_file = h5py.File(h5_path, "r")
×
NEW
361
        self._config = config_dataframe
×
362

363
    def get_dataset_key(self, channel: str) -> str:
1✔
364
        """
365
        Checks if 'dataset_key' exists and returns that.
366

367
        Args:
368
            channel (str): The name of the channel.
369

370
        Returns:
371
            str: The 'dataset_key'.
372

373
        Raises:
374
            ValueError: If 'dataset_key' is not provided.
375
        """
NEW
376
        channel_config = self._config["channels"][channel]
×
NEW
377
        if "dataset_key" in channel_config:
×
NEW
378
            return channel_config["dataset_key"]
×
NEW
379
        error = f"For channel: {channel}, provide 'dataset_key'."
×
NEW
380
        raise ValueError(error)
×
381

382
    def get_dataset_array(
1✔
383
        self,
384
        channel: str,
385
    ) -> h5py.Dataset:
386
        """
387
        Returns a numpy array for a given channel name.
388

389
        Args:
390
            channel (str): The name of the channel.
391
            slice_ (bool): Applies slicing on the dataset. Default is True.
392

393
        Returns:
394
            tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID
395
            pd.Index and the channel's data.
396
        """
397
        # Get the data from the necessary h5 file and channel
NEW
398
        dataset_key = self.get_dataset_key(channel)
×
NEW
399
        dataset = self.h5_file[dataset_key]
×
400

NEW
401
        return dataset
×
402

403
    @property
1✔
404
    def df_electron(self) -> pd.DataFrame:
1✔
405
        """
406
        Returns a pandas DataFrame for channel names of type [per electron].
407

408
        Returns:
409
            pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data.
410
        """
411
        # Get the relevant channels and their slice index
NEW
412
        channels = get_channels(self._config, "per_electron")
×
NEW
413
        if channels == []:
×
NEW
414
            return pd.DataFrame()
×
415

NEW
416
        series = {channel: pd.Series(self.get_dataset_array(channel)) for channel in channels}
×
NEW
417
        dataframe = pd.concat(series, axis=1)
×
NEW
418
        return dataframe.dropna()
×
419

420
    @property
1✔
421
    def df_train(self) -> pd.DataFrame:
1✔
422
        """
423
        Returns a pandas DataFrame for given channel names of type [per pulse].
424

425
        Returns:
426
            pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data.
427
        """
NEW
428
        series = []
×
429
        # Get the relevant channel names
NEW
430
        channels = get_channels(self._config, "per_train")
×
431
        # For each channel, a pd.Series is created and appended to the list
NEW
432
        for channel in channels:
×
433
            # train_index and (sliced) data is returned
NEW
434
            dataset = self.get_dataset_array(channel)
×
435
            # Electron and pulse resolved MultiIndex is created. Since this is train data,
436
            # the electron and pulse index is always 0
NEW
437
            index_alias = self._config.get("index", ["countId"])[0]
×
NEW
438
            index = np.cumsum([0, *self.get_dataset_array(index_alias)[:-1]])
×
439
            # Auxiliary dataset (which is stored in the same dataset as other DLD channels)
440
            # contains multiple channels inside. Even though they are resolved per train,
441
            # they come in pulse format, so the extra values are sliced and individual channels are
442
            # created and appended to the list
NEW
443
            aux_alias = self._config.get("aux_alias", "dldAux")
×
NEW
444
            if channel == aux_alias:
×
NEW
445
                try:
×
NEW
446
                    sub_channels = self._config["channels"][aux_alias]["sub_channels"]
×
NEW
447
                except KeyError:
×
NEW
448
                    raise KeyError(
×
449
                        f"Provide 'sub_channels' for auxiliary channel '{aux_alias}'.",
450
                    )
NEW
451
                for name, values in sub_channels.items():
×
NEW
452
                    series.append(
×
453
                        pd.Series(
454
                            dataset[:, values["slice"]],
455
                            index,
456
                            name=name,
457
                        ),
458
                    )
459
            else:
NEW
460
                series.append(pd.Series(dataset, index, name=channel))
×
461
        # All the channels are concatenated to a single DataFrame
NEW
462
        return pd.concat(series, axis=1)
×
463

464
    def validate_channel_keys(self) -> None:
1✔
465
        """
466
        Validates if the index and dataset keys for all channels in the config exist in the h5 file.
467

468
        Raises:
469
            InvalidFileError: If the index or dataset keys are missing in the h5 file.
470
        """
NEW
471
        invalid_channels = []
×
NEW
472
        for channel in self._config["channels"]:
×
NEW
473
            dataset_key = self.get_dataset_key(channel)
×
NEW
474
            if dataset_key not in self.h5_file:
×
NEW
475
                invalid_channels.append(channel)
×
476

NEW
477
        if invalid_channels:
×
NEW
478
            raise InvalidFileError(invalid_channels)
×
479

480
    @property
1✔
481
    def df(self) -> pd.DataFrame:
1✔
482
        """
483
        Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation,
484
        returning a single dataframe.
485

486
        Returns:
487
            pd.DataFrame: The combined pandas DataFrame.
488
        """
489

NEW
490
        self.validate_channel_keys()
×
491
        # been tested with merge, join and concat
492
        # concat offers best performance, almost 3 times faster
NEW
493
        df = pd.concat((self.df_electron, self.df_train), axis=1)
×
NEW
494
        df[self.df_train.columns] = df[self.df_train.columns].ffill()
×
NEW
495
        df.index.name = self._config.get("index", ["countId"])[0]
×
NEW
496
        return df
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc