• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12876831595

20 Jan 2025 10:55PM UTC coverage: 92.174% (+0.4%) from 91.801%
12876831595

Pull #437

github

web-flow
Merge pull request #555 from OpenCOMPES/config_renaming

use user platformdir also for user config
Pull Request #437: Upgrade to V1

2235 of 2372 new or added lines in 53 files covered. (94.22%)

4 existing lines in 1 file now uncovered.

7703 of 8357 relevant lines covered (92.17%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.44
/src/sed/loader/sxp/loader.py
1
# pylint: disable=duplicate-code
2
"""
3
This module implements the SXP data loader.
4
This loader currently supports the SXP momentum microscope instrument.
5
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
6
The dataframe is a amalgamation of all h5 files for a combination of runs, where the NaNs are
7
automatically forward filled across different files.
8
This can then be saved as a parquet for out-of-sed processing and reread back to access other
9
sed functionality.
10
Most of the structure is identical to the FLASH loader.
11
"""
12
from __future__ import annotations
1✔
13

14
import time
1✔
15
from collections.abc import Sequence
1✔
16
from functools import reduce
1✔
17
from pathlib import Path
1✔
18

19
import dask.dataframe as dd
1✔
20
import h5py
1✔
21
import numpy as np
1✔
22
import pyarrow.parquet as pq
1✔
23
from joblib import delayed
1✔
24
from joblib import Parallel
1✔
25
from natsort import natsorted
1✔
26
from pandas import DataFrame
1✔
27
from pandas import MultiIndex
1✔
28
from pandas import Series
1✔
29

30
from sed.core import dfops
1✔
31
from sed.core.logging import set_verbosity
1✔
32
from sed.core.logging import setup_logging
1✔
33
from sed.loader.base.loader import BaseLoader
1✔
34
from sed.loader.utils import parse_h5_keys
1✔
35
from sed.loader.utils import split_dld_time_from_sector_id
1✔
36

37
# Configure logging
38
logger = setup_logging("sxp_loader")
1✔
39

40

41
class SXPLoader(BaseLoader):
1✔
42
    """
43
    The class generates multiindexed multidimensional pandas dataframes from the new SXP
44
    dataformat resolved by both macro and microbunches alongside electrons.
45
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
46

47
    Args:
48
        config (dict): Config dictionary.
49
        verbose (bool, optional): Option to print out diagnostic information.
50
    """
51

52
    __name__ = "sxp"
1✔
53

54
    supported_file_types = ["h5"]
1✔
55

56
    def __init__(self, config: dict, verbose: bool = True) -> None:
1✔
57
        super().__init__(config=config, verbose=verbose)
1✔
58

59
        set_verbosity(logger, self._verbose)
1✔
60

61
        self.multi_index = ["trainId", "pulseId", "electronId"]
1✔
62
        self.index_per_electron: MultiIndex = None
1✔
63
        self.index_per_pulse: MultiIndex = None
1✔
64
        self.failed_files_error: list[str] = []
1✔
65
        self.array_indices: list[list[slice]] = None
1✔
66
        self.raw_dir: str = None
1✔
67
        self.processed_dir: str = None
1✔
68

69
    @property
1✔
70
    def verbose(self) -> bool:
1✔
71
        """Accessor to the verbosity flag.
72

73
        Returns:
74
            bool: Verbosity flag.
75
        """
NEW
76
        return self._verbose
×
77

78
    @verbose.setter
1✔
79
    def verbose(self, verbose: bool):
1✔
80
        """Setter for the verbosity.
81

82
        Args:
83
            verbose (bool): Option to turn on verbose output. Sets loglevel to INFO.
84
        """
NEW
85
        self._verbose = verbose
×
NEW
86
        set_verbosity(logger, self._verbose)
×
87

88
    def _initialize_dirs(self):
1✔
89
        """
90
        Initializes the paths based on the configuration.
91

92
        Raises:
93
            ValueError: If required values are missing from the configuration.
94
            FileNotFoundError: If the raw data directories are not found.
95
        """
96
        # Parses to locate the raw beamtime directory from config file
97
        if (
1✔
98
            "paths" in self._config["core"]
99
            and self._config["core"]["paths"].get("raw", "")
100
            and self._config["core"]["paths"].get("processed", "")
101
        ):
102
            data_raw_dir = [
1✔
103
                Path(self._config["core"]["paths"].get("raw", "")),
104
            ]
105
            data_parquet_dir = Path(
1✔
106
                self._config["core"]["paths"].get("processed", ""),
107
            )
108

109
        else:
110
            try:
1✔
111
                beamtime_id = self._config["core"]["beamtime_id"]
1✔
112
                year = self._config["core"]["year"]
1✔
113
            except KeyError as exc:
×
114
                raise ValueError(
×
115
                    "The beamtime_id and year are required.",
116
                ) from exc
117

118
            beamtime_dir = Path(
1✔
119
                self._config["core"]["beamtime_dir"][self._config["core"]["beamline"]],
120
            )
121
            beamtime_dir = beamtime_dir.joinpath(f"{year}/{beamtime_id}/")
1✔
122

123
            if not beamtime_dir.joinpath("raw").is_dir():
1✔
124
                raise FileNotFoundError("Raw data directory not found.")
1✔
125

126
            data_raw_dir = [beamtime_dir.joinpath("raw")]
1✔
127

128
            parquet_path = "processed/parquet"
1✔
129
            data_parquet_dir = beamtime_dir.joinpath(parquet_path)
1✔
130

131
        data_parquet_dir.mkdir(parents=True, exist_ok=True)
1✔
132

133
        self.raw_dir = data_raw_dir
1✔
134
        self.processed_dir = data_parquet_dir
1✔
135

136
    def get_files_from_run_id(
1✔
137
        self,
138
        run_id: str,
139
        folders: str | Sequence[str] = None,
140
        extension: str = "h5",
141
        **kwds,
142
    ) -> list[str]:
143
        """Returns a list of filenames for a given run located in the specified directory
144
        for the specified data acquisition (daq).
145

146
        Args:
147
            run_id (str): The run identifier to locate.
148
            folders (str | Sequence[str], optional): The directory(ies) where the raw
149
                data is located. Defaults to config["core"]["base_folder"].
150
            extension (str, optional): The file extension. Defaults to "h5".
151
            kwds: Keyword arguments:
152
                - daq (str): The data acquisition identifier.
153

154
        Returns:
155
            list[str]: A list of path strings representing the collected file names.
156

157
        Raises:
158
            FileNotFoundError: If no files are found for the given run in the directory.
159
        """
160
        # Define the stream name prefixes based on the data acquisition identifier
161
        stream_name_prefixes = self._config["core"]["stream_name_prefixes"]
1✔
162
        stream_name_postfixes = self._config["core"].get("stream_name_postfixes", {})
1✔
163

164
        if isinstance(run_id, (int, np.integer)):
1✔
165
            run_id = str(run_id).zfill(4)
×
166

167
        if folders is None:
1✔
168
            folders = self._config["core"]["base_folder"]
×
169

170
        if isinstance(folders, str):
1✔
171
            folders = [folders]
×
172

173
        daq = kwds.pop("daq", self._config.get("dataframe", {}).get("daq"))
1✔
174

175
        if len(kwds) > 0:
1✔
NEW
176
            raise TypeError(
×
177
                f"get_files_from_run_id() got unexpected keyword arguments {kwds.keys()}.",
178
            )
179

180
        stream_name_postfix = stream_name_postfixes.get(daq, "")
1✔
181
        # Generate the file patterns to search for in the directory
182
        file_pattern = f"**/{stream_name_prefixes[daq]}{run_id}{stream_name_postfix}*." + extension
1✔
183

184
        files: list[Path] = []
1✔
185
        # Use pathlib to search for matching files in each directory
186
        for folder in folders:
1✔
187
            files.extend(
1✔
188
                natsorted(
189
                    Path(folder).glob(file_pattern),
190
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
191
                ),
192
            )
193

194
        # Check if any files are found
195
        if not files:
1✔
196
            raise FileNotFoundError(
×
197
                f"No files found for run {run_id} in directory {str(folders)}",
198
            )
199

200
        # Return the list of found files
201
        return [str(file.resolve()) for file in files]
1✔
202

203
    @property
1✔
204
    def available_channels(self) -> list:
1✔
205
        """Returns the channel names that are available for use,
206
        excluding pulseId, defined by the json file"""
207
        available_channels = list(self._config["dataframe"]["channels"].keys())
1✔
208
        available_channels.remove("pulseId")
1✔
209
        available_channels.remove("trainId")
1✔
210
        return available_channels
1✔
211

212
    def get_channels(self, formats: str | list[str] = "", index: bool = False) -> list[str]:
1✔
213
        """
214
        Returns a list of channels associated with the specified format(s).
215

216
        Args:
217
            formats (str | list[str]): The desired format(s)
218
                ('per_pulse', 'per_electron', 'per_train', 'all').
219
            index (bool): If True, includes channels from the multi_index.
220

221
        Returns:
222
            List[str]: A list of channels with the specified format(s).
223
        """
224
        # If 'formats' is a single string, convert it to a list for uniform processing.
225
        if isinstance(formats, str):
1✔
226
            formats = [formats]
1✔
227

228
        # If 'formats' is a string "all", gather all possible formats.
229
        if formats == ["all"]:
1✔
230
            channels = self.get_channels(["per_pulse", "per_train", "per_electron"], index)
1✔
231
            return channels
1✔
232

233
        channels = []
1✔
234
        for format_ in formats:
1✔
235
            # Gather channels based on the specified format(s).
236
            channels.extend(
1✔
237
                key
238
                for key in self.available_channels
239
                if self._config["dataframe"]["channels"][key]["format"] == format_
240
                and key != "dldAux"
241
            )
242
            # Include 'dldAuxChannels' if the format is 'per_pulse'.
243
            if format_ == "per_pulse" and "dldAux" in self._config["dataframe"]["channels"]:
1✔
244
                channels.extend(
×
245
                    self._config["dataframe"]["channels"]["dldAux"]["dldAuxChannels"].keys(),
246
                )
247

248
        # Include channels from multi_index if 'index' is True.
249
        if index:
1✔
250
            channels.extend(self.multi_index)
1✔
251

252
        return channels
1✔
253

254
    def reset_multi_index(self) -> None:
1✔
255
        """Resets the index per pulse and electron"""
256
        self.index_per_electron = None
1✔
257
        self.index_per_pulse = None
1✔
258
        self.array_indices = None
1✔
259

260
    def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
1✔
261
        """
262
        Creates an index per electron using pulseId for usage with the electron
263
            resolved pandas DataFrame.
264

265
        Args:
266
            h5_file (h5py.File): The HDF5 file object.
267

268
        Notes:
269
            - This method relies on the 'pulseId' channel to determine
270
                the macrobunch IDs.
271
            - It creates a MultiIndex with trainId, pulseId, and electronId
272
                as the index levels.
273
        """
274

275
        # relative macrobunch IDs obtained from the trainId channel
276
        train_id, mab_array = self.create_numpy_array_per_channel(
1✔
277
            h5_file,
278
            "trainId",
279
        )
280
        # Internal microbunch IDs obtained from the pulseId channel
281
        train_id, mib_array = self.create_numpy_array_per_channel(
1✔
282
            h5_file,
283
            "pulseId",
284
        )
285

286
        # Chopping data into trains
287
        macrobunch_index = []
1✔
288
        microbunch_ids = []
1✔
289
        macrobunch_indices = []
1✔
290
        for i in train_id.index:
1✔
291
            # removing broken trailing hit copies
292
            num_trains = self._config["dataframe"].get("num_trains", 0)
1✔
293
            num_pulses = self._config["dataframe"].get("num_pulses", 0)
1✔
294
            if num_trains:
1✔
295
                try:
×
296
                    num_valid_hits = np.where(np.diff(mib_array[i].astype(np.int32)) < 0)[0][
×
297
                        num_trains - 1
298
                    ]
299
                    mab_array[i, num_valid_hits:] = 0
×
300
                    mib_array[i, num_valid_hits:] = 0
×
301
                except IndexError:
×
302
                    pass
×
303
            train_ends = np.where(np.diff(mib_array[i].astype(np.int32)) < -1)[0]
1✔
304
            indices = []
1✔
305
            index = 0
1✔
306
            for train, train_end in enumerate(train_ends):
1✔
307
                macrobunch_index.append(train_id[i] + np.uint(train))
1✔
308
                if num_pulses:
1✔
309
                    microbunch_ids.append(mib_array[i, index:train_end] % num_pulses)
×
310
                else:
311
                    microbunch_ids.append(mib_array[i, index:train_end])
1✔
312
                indices.append(slice(index, train_end))
1✔
313
                index = train_end + 1
1✔
314
            macrobunch_indices.append(indices)
1✔
315
        self.array_indices = macrobunch_indices
1✔
316
        # Create a series with the macrobunches as index and
317
        # microbunches as values
318
        macrobunches = (
1✔
319
            Series(
320
                (microbunch_ids[i] for i in range(len(macrobunch_index))),
321
                name="pulseId",
322
                index=macrobunch_index,
323
            )
324
            - self._config["dataframe"]["ubid_offset"]
325
        )
326

327
        # Explode dataframe to get all microbunch vales per macrobunch,
328
        # remove NaN values and convert to type int
329
        microbunches = macrobunches.explode().dropna().astype(int)
1✔
330

331
        # Create temporary index values
332
        index_temp = MultiIndex.from_arrays(
1✔
333
            (microbunches.index, microbunches.values),
334
            names=["trainId", "pulseId"],
335
        )
336

337
        # Calculate the electron counts per pulseId unique preserves the order of appearance
338
        electron_counts = index_temp.value_counts()[index_temp.unique()].values
1✔
339

340
        # Series object for indexing with electrons
341
        electrons = (
1✔
342
            Series(
343
                [np.arange(electron_counts[i]) for i in range(electron_counts.size)],
344
            )
345
            .explode()
346
            .astype(int)
347
        )
348

349
        # Create a pandas MultiIndex using the exploded datasets
350
        self.index_per_electron = MultiIndex.from_arrays(
1✔
351
            (microbunches.index, microbunches.values, electrons),
352
            names=self.multi_index,
353
        )
354

355
    def create_multi_index_per_pulse(
1✔
356
        self,
357
        train_id: Series,
358
        np_array: np.ndarray,
359
    ) -> None:
360
        """
361
        Creates an index per pulse using a pulse resolved channel's macrobunch ID, for usage with
362
        the pulse resolved pandas DataFrame.
363

364
        Args:
365
            train_id (Series): The train ID Series.
366
            np_array (np.ndarray): The numpy array containing the pulse resolved data.
367

368
        Notes:
369
            - This method creates a MultiIndex with trainId and pulseId as the index levels.
370
        """
371

372
        # Create a pandas MultiIndex, useful for comparing electron and
373
        # pulse resolved dataframes
374
        self.index_per_pulse = MultiIndex.from_product(
×
375
            (train_id, np.arange(0, np_array.shape[1])),
376
            names=["trainId", "pulseId"],
377
        )
378

379
    def create_numpy_array_per_channel(
1✔
380
        self,
381
        h5_file: h5py.File,
382
        channel: str,
383
    ) -> tuple[Series, np.ndarray]:
384
        """
385
        Returns a numpy array for a given channel name for a given file.
386

387
        Args:
388
            h5_file (h5py.File): The h5py file object.
389
            channel (str): The name of the channel.
390

391
        Returns:
392
            tuple[Series, np.ndarray]: A tuple containing the train ID Series and the numpy array
393
            for the channel's data.
394

395
        """
396
        # Get the data from the necessary h5 file and channel
397
        dataset = h5_file[self._config["dataframe"]["channels"][channel]["dataset_key"]]
1✔
398
        index = h5_file[self._config["dataframe"]["channels"][channel]["index_key"]]
1✔
399

400
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
401

402
        train_id = Series(index, name="trainId")  # macrobunch
1✔
403

404
        # unpacks the data into np.ndarray
405
        np_array = dataset[()]
1✔
406
        if len(np_array.shape) == 2 and self._config["dataframe"]["channels"][channel].get(
1✔
407
            "max_hits",
408
            0,
409
        ):
410
            np_array = np_array[:, : self._config["dataframe"]["channels"][channel]["max_hits"]]
1✔
411

412
        # Use predefined axis and slice from the json file
413
        # to choose correct dimension for necessary channel
414
        if "slice" in channel_dict:
1✔
415
            np_array = np.take(
×
416
                np_array,
417
                channel_dict["slice"],
418
                axis=1,
419
            )
420

421
        if "scale" in channel_dict:
1✔
422
            np_array = np_array / float(channel_dict["scale"])
×
423

424
        return train_id, np_array
1✔
425

426
    def create_dataframe_per_electron(
1✔
427
        self,
428
        np_array: np.ndarray,
429
        channel: str,
430
    ) -> DataFrame:
431
        """
432
        Returns a pandas DataFrame for a given channel name of type [per electron].
433

434
        Args:
435
            np_array (np.ndarray): The numpy array containing the channel data.
436
            channel (str): The name of the channel.
437

438
        Returns:
439
            DataFrame: The pandas DataFrame for the channel's data.
440

441
        Notes:
442
            The microbunch resolved data is exploded and converted to a DataFrame. The MultiIndex
443
            is set, and the NaN values are dropped, alongside the pulseId = 0 (meaningless).
444

445
        """
446
        if self.array_indices is None or len(self.array_indices) != np_array.shape[0]:
1✔
447
            raise RuntimeError(
×
448
                "macrobunch_indices not set correctly, internal inconsistency detected.",
449
            )
450
        train_data = []
1✔
451
        for i, _ in enumerate(self.array_indices):
1✔
452
            for indices in self.array_indices[i]:
1✔
453
                train_data.append(np_array[i, indices])
1✔
454
        return (
1✔
455
            Series((train for train in train_data), name=channel)
456
            .explode()
457
            .dropna()
458
            .to_frame()
459
            .set_index(self.index_per_electron)
460
            .drop(
461
                index=np.arange(-self._config["dataframe"]["ubid_offset"], 0),
462
                level=1,
463
                errors="ignore",
464
            )
465
        )
466

467
    def create_dataframe_per_pulse(
1✔
468
        self,
469
        np_array: np.ndarray,
470
        train_id: Series,
471
        channel: str,
472
        channel_dict: dict,
473
    ) -> DataFrame:
474
        """
475
        Returns a pandas DataFrame for a given channel name of type [per pulse].
476

477
        Args:
478
            np_array (np.ndarray): The numpy array containing the channel data.
479
            train_id (Series): The train ID Series.
480
            channel (str): The name of the channel.
481
            channel_dict (dict): The dictionary containing channel parameters.
482

483
        Returns:
484
            DataFrame: The pandas DataFrame for the channel's data.
485

486
        Notes:
487
            - For auxiliary channels, the macrobunch resolved data is repeated 499 times to be
488
              compared to electron resolved data for each auxiliary channel. The data is then
489
              converted to a multicolumn DataFrame.
490
            - For all other pulse resolved channels, the macrobunch resolved data is exploded
491
              to a DataFrame and the MultiIndex is set.
492

493
        """
494

495
        # Special case for auxiliary channels
496
        if channel == "dldAux":
×
497
            # Checks the channel dictionary for correct slices and creates a multicolumn DataFrame
498
            data_frames = (
×
499
                Series(
500
                    (np_array[i, value] for i in train_id.index),
501
                    name=key,
502
                    index=train_id,
503
                ).to_frame()
504
                for key, value in channel_dict["dldAuxChannels"].items()
505
            )
506

507
            # Multiindex set and combined dataframe returned
508
            data = reduce(DataFrame.combine_first, data_frames)
×
509

510
        # For all other pulse resolved channels
511
        else:
512
            # Macrobunch resolved data is exploded to a DataFrame and the MultiIndex is set
513

514
            # Creates the index_per_pulse for the given channel
515
            self.create_multi_index_per_pulse(train_id, np_array)
×
516
            data = (
×
517
                Series((np_array[i] for i in train_id.index), name=channel)
518
                .explode()
519
                .to_frame()
520
                .set_index(self.index_per_pulse)
521
            )
522

523
        return data
×
524

525
    def create_dataframe_per_train(
1✔
526
        self,
527
        np_array: np.ndarray,
528
        train_id: Series,
529
        channel: str,
530
    ) -> DataFrame:
531
        """
532
        Returns a pandas DataFrame for a given channel name of type [per train].
533

534
        Args:
535
            np_array (np.ndarray): The numpy array containing the channel data.
536
            train_id (Series): The train ID Series.
537
            channel (str): The name of the channel.
538

539
        Returns:
540
            DataFrame: The pandas DataFrame for the channel's data.
541
        """
542
        return (
1✔
543
            Series((np_array[i] for i in train_id.index), name=channel)
544
            .to_frame()
545
            .set_index(train_id)
546
        )
547

548
    def create_dataframe_per_channel(
1✔
549
        self,
550
        file_path: Path,
551
        channel: str,
552
    ) -> Series | DataFrame:
553
        """
554
        Returns a pandas DataFrame for a given channel name from a given file.
555

556
        This method takes an h5py.File object `h5_file` and a channel name `channel`, and returns
557
        a pandas DataFrame containing the data for that channel from the file. The format of the
558
        DataFrame depends on the channel's format specified in the configuration.
559

560
        Args:
561
            file_path (Path): The path to the main HDF5 file.
562
            channel (str): The name of the channel.
563

564
        Returns:
565
            Series | DataFrame: A pandas Series or DataFrame representing the channel's data.
566

567
        Raises:
568
            ValueError: If the channel has an undefined format.
569

570
        """
571
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
572
        main_daq = self._config["dataframe"]["daq"]
1✔
573
        channel_daq = self._config["dataframe"]["channels"][channel].get("daq", main_daq)
1✔
574
        # load file corresponding to daq
575
        h5_file = h5py.File(Path(str(file_path).replace(main_daq, channel_daq)))
1✔
576

577
        [train_id, np_array] = self.create_numpy_array_per_channel(
1✔
578
            h5_file,
579
            channel,
580
        )  # numpy Array created
581

582
        # If np_array is size zero, fill with NaNs
583
        if np_array.size == 0:
1✔
584
            # Fill the np_array with NaN values of the same shape as train_id
585
            np_array = np.full_like(train_id, np.nan, dtype=np.double)
×
586
            # Create a Series using np_array, with train_id as the index
587
            data = Series(
×
588
                (np_array[i] for i in train_id.index),
589
                name=channel,
590
                index=train_id,
591
            )
592

593
        # Electron resolved data is treated here
594
        if channel_dict["format"] == "per_electron":
1✔
595
            # If index_per_electron is None, create it for the given file
596
            if self.index_per_electron is None:
1✔
597
                self.create_multi_index_per_electron(h5_file)
1✔
598

599
            # Create a DataFrame for electron-resolved data
600
            data = self.create_dataframe_per_electron(
1✔
601
                np_array,
602
                channel,
603
            )
604

605
        # Pulse resolved data is treated here
606
        elif channel_dict["format"] == "per_pulse":
1✔
607
            # Create a DataFrame for pulse-resolved data
608
            data = self.create_dataframe_per_pulse(
×
609
                np_array,
610
                train_id,
611
                channel,
612
                channel_dict,
613
            )
614

615
        # Train resolved data is treated here
616
        elif channel_dict["format"] == "per_train":
1✔
617
            # Create a DataFrame for train-resolved data
618
            data = self.create_dataframe_per_train(np_array, train_id, channel)
1✔
619

620
        else:
621
            raise ValueError(
×
622
                channel
623
                + "has an undefined format. Available formats are \
624
                per_pulse, per_electron and per_train",
625
            )
626

627
        return data
1✔
628

629
    def concatenate_channels(
1✔
630
        self,
631
        file_path: Path,
632
    ) -> DataFrame:
633
        """
634
        Concatenates the channels from the provided h5py.File into a pandas DataFrame.
635

636
        This method takes an h5py.File object `h5_file` and concatenates the channels present in
637
        the file into a single pandas DataFrame. The concatenation is performed based on the
638
        available channels specified in the configuration.
639

640
        Args:
641
            file_path (Path): The path to the main HDF5 file.
642

643
        Returns:
644
            DataFrame: A concatenated pandas DataFrame containing the channels.
645

646
        Raises:
647
            ValueError: If the group_name for any channel does not exist in the file.
648

649
        """
650
        # Check for if the provided dataset_keys and index_keys actually exists in the file
651
        for channel in self._config["dataframe"]["channels"]:
1✔
652
            dataset_key = self._config["dataframe"]["channels"][channel]["dataset_key"]
1✔
653
            daq = self._config["dataframe"]["channels"][channel].get("daq", "DA03")
1✔
654
            # load file corresponding to daq
655
            h5_file = h5py.File(Path(str(file_path).replace("DA03", daq)))
1✔
656
            all_keys = parse_h5_keys(h5_file)  # Parses all channels present
1✔
657
            if dataset_key not in all_keys:
1✔
658
                raise ValueError(
1✔
659
                    f"The dataset_key for channel {channel} does not exist.",
660
                )
661
            index_key = self._config["dataframe"]["channels"][channel]["index_key"]
1✔
662
            if index_key not in all_keys:
1✔
663
                raise ValueError(
1✔
664
                    f"The index_key for channel {channel} does not exist.",
665
                )
666

667
        # Create a generator expression to generate data frames for each channel
668
        data_frames = (
1✔
669
            self.create_dataframe_per_channel(file_path, each) for each in self.available_channels
670
        )
671

672
        # Use the reduce function to join the data frames into a single DataFrame
673
        return reduce(
1✔
674
            lambda left, right: left.join(right, how="outer"),
675
            data_frames,
676
        )
677

678
    def create_dataframe_per_file(
1✔
679
        self,
680
        file_path: Path,
681
    ) -> DataFrame:
682
        """
683
        Create pandas DataFrames for the given file.
684

685
        This method loads an HDF5 file specified by `file_path` and constructs a pandas DataFrame
686
        from the datasets within the file. The order of datasets in the DataFrames is the opposite
687
        of the order specified by channel names.
688

689
        Args:
690
            file_path (Path): Path to the input HDF5 file.
691

692
        Returns:
693
            DataFrame: pandas DataFrame
694

695
        """
696
        # Loads h5 file and creates a dataframe
697
        self.reset_multi_index()  # Reset MultiIndexes for next file
1✔
698
        df = self.concatenate_channels(file_path)
1✔
699
        df = df.dropna(subset=self._config["dataframe"]["columns"].get("tof", "dldTimeSteps"))
1✔
700
        # correct the 3 bit shift which encodes the detector ID in the 8s time
701
        if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
1✔
NEW
702
            df, _ = split_dld_time_from_sector_id(df, config=self._config)
×
703
        return df
1✔
704

705
    def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> bool | Exception:
1✔
706
        """
707
        Converts an HDF5 file to Parquet format to create a buffer file.
708

709
        This method uses `create_dataframe_per_file` method to create dataframes from individual
710
        files within an HDF5 file. The resulting dataframe is then saved to a Parquet file.
711

712
        Args:
713
            h5_path (Path): Path to the input HDF5 file.
714
            parquet_path (Path): Path to the output Parquet file.
715

716
        Returns:
717
            bool | Exception: Collected exceptions if any.
718

719
        Raises:
720
            ValueError: If an error occurs during the conversion process.
721

722
        """
723
        try:
1✔
724
            (
1✔
725
                self.create_dataframe_per_file(h5_path)
726
                .reset_index(level=self.multi_index)
727
                .to_parquet(parquet_path, index=False)
728
            )
729
        except Exception as exc:  # pylint: disable=broad-except
×
730
            self.failed_files_error.append(f"{parquet_path}: {type(exc)} {exc}")
×
731
            return exc
×
732
        return None
1✔
733

734
    def buffer_file_handler(
1✔
735
        self,
736
        data_parquet_dir: Path,
737
        detector: str,
738
        force_recreate: bool,
739
    ) -> tuple[list[Path], list, list]:
740
        """
741
        Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
742

743
        Args:
744
            data_parquet_dir (Path): Directory where the parquet files will be stored.
745
            detector (str): Detector name.
746
            force_recreate (bool): Forces recreation of buffer files
747

748
        Returns:
749
            tuple[list[Path], list, list]: Three lists, one for
750
            parquet file paths, one for metadata and one for schema.
751

752
        Raises:
753
            FileNotFoundError: If the conversion fails for any files or no data is available.
754
        """
755

756
        # Create the directory for buffer parquet files
757
        buffer_file_dir = data_parquet_dir.joinpath("buffer")
1✔
758
        buffer_file_dir.mkdir(parents=True, exist_ok=True)
1✔
759

760
        # Create two separate lists for h5 and parquet file paths
761
        h5_filenames = [Path(file) for file in self.files]
1✔
762
        parquet_filenames = [
1✔
763
            buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
764
        ]
765
        existing_parquet_filenames = [file for file in parquet_filenames if file.exists()]
1✔
766

767
        # Raise a value error if no data is available after the conversion
768
        if len(h5_filenames) == 0:
1✔
769
            raise ValueError("No data available. Probably failed reading all h5 files")
×
770

771
        if not force_recreate:
1✔
772
            # Check if the available channels match the schema of the existing parquet files
773
            parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
1✔
774
            config_schema = set(self.get_channels(formats="all", index=True))
1✔
775
            if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
1✔
NEW
776
                config_schema.add(self._config["dataframe"]["columns"].get("sector_id", False))
×
777

778
            for i, schema in enumerate(parquet_schemas):
1✔
779
                schema_set = set(schema.names)
1✔
780
                if schema_set != config_schema:
1✔
781
                    missing_in_parquet = config_schema - schema_set
1✔
782
                    missing_in_config = schema_set - config_schema
1✔
783

784
                    missing_in_parquet_str = (
1✔
785
                        f"Missing in parquet: {missing_in_parquet}" if missing_in_parquet else ""
786
                    )
787
                    missing_in_config_str = (
1✔
788
                        f"Missing in config: {missing_in_config}" if missing_in_config else ""
789
                    )
790

791
                    raise ValueError(
1✔
792
                        "The available channels do not match the schema of file",
793
                        f"{existing_parquet_filenames[i]}",
794
                        f"{missing_in_parquet_str}",
795
                        f"{missing_in_config_str}",
796
                        "Please check the configuration file or set force_recreate to True.",
797
                    )
798

799
        # Choose files to read
800
        files_to_read = [
1✔
801
            (h5_path, parquet_path)
802
            for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
803
            if force_recreate or not parquet_path.exists()
804
        ]
805

806
        print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")
1✔
807

808
        # Initialize the indices for create_buffer_file conversion
809
        self.reset_multi_index()
1✔
810

811
        # Convert the remaining h5 files to parquet in parallel if there are any
812
        if len(files_to_read) > 0:
1✔
813
            error = Parallel(n_jobs=len(files_to_read), verbose=10)(
1✔
814
                delayed(self.create_buffer_file)(h5_path, parquet_path)
815
                for h5_path, parquet_path in files_to_read
816
            )
817
            if any(error):
1✔
818
                raise RuntimeError(f"Conversion failed for some files. {error}")
×
819
        # for h5_path, parquet_path in files_to_read:
820
        #     self.create_buffer_file(h5_path, parquet_path)
821

822
        # Raise an error if the conversion failed for any files
823
        # TODO: merge this and the previous error trackings
824
        if self.failed_files_error:
1✔
825
            raise FileNotFoundError(
×
826
                "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error),
827
            )
828

829
        print("All files converted successfully!")
1✔
830

831
        # read all parquet metadata and schema
832
        metadata = [pq.read_metadata(file) for file in parquet_filenames]
1✔
833
        schema = [pq.read_schema(file) for file in parquet_filenames]
1✔
834

835
        return parquet_filenames, metadata, schema
1✔
836

837
    def parquet_handler(
1✔
838
        self,
839
        data_parquet_dir: Path,
840
        detector: str = "",
841
        parquet_path: Path = None,
842
        converted: bool = False,
843
        load_parquet: bool = False,
844
        save_parquet: bool = False,
845
        force_recreate: bool = False,
846
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
847
        """
848
        Handles loading and saving of parquet files based on the provided parameters.
849

850
        Args:
851
            data_parquet_dir (Path): Directory where the parquet files are located.
852
            detector (str, optional): Adds a identifier for parquets to distinguish multidetector
853
                systems.
854
            parquet_path (str, optional): Path to the combined parquet file.
855
            converted (bool, optional): True if data is augmented by adding additional columns
856
                externally and saved into converted folder.
857
            load_parquet (bool, optional): Loads the entire parquet into the dd dataframe.
858
            save_parquet (bool, optional): Saves the entire dataframe into a parquet.
859
            force_recreate (bool, optional): Forces recreation of buffer file.
860
        Returns:
861
            tuple[dd.DataFrame, dd.DataFrame]: A tuple containing two dataframes:
862
            - dataframe_electron: Dataframe containing the loaded/augmented electron data.
863
            - dataframe_pulse: Dataframe containing the loaded/augmented timed data.
864

865
        Raises:
866
            FileNotFoundError: If the requested parquet file is not found.
867

868
        """
869

870
        # Construct the parquet path if not provided
871
        if parquet_path is None:
1✔
872
            parquet_name = "_".join(str(run) for run in self.runs)
1✔
873
            parquet_dir = data_parquet_dir.joinpath("converted") if converted else data_parquet_dir
1✔
874

875
            parquet_path = parquet_dir.joinpath(
1✔
876
                "run_" + parquet_name + detector,
877
            ).with_suffix(".parquet")
878

879
        # Check if load_parquet is flagged and then load the file if it exists
880
        if load_parquet:
1✔
881
            try:
×
882
                dataframe = dd.read_parquet(parquet_path)
×
883
            except Exception as exc:
×
884
                raise FileNotFoundError(
×
885
                    "The final parquet for this run(s) does not exist yet. "
886
                    "If it is in another location, please provide the path as parquet_path.",
887
                ) from exc
888

889
        else:
890
            # Obtain the parquet filenames, metadata and schema from the method
891
            # which handles buffer file creation/reading
892
            filenames, metadata, _ = self.buffer_file_handler(
1✔
893
                data_parquet_dir,
894
                detector,
895
                force_recreate,
896
            )
897

898
            # Read all parquet files into one dataframe using dask
899
            dataframe = dd.read_parquet(filenames, calculate_divisions=True)
1✔
900

901
            # Channels to fill NaN values
902
            channels: list[str] = self.get_channels(["per_pulse", "per_train"])
1✔
903

904
            overlap = min(file.num_rows for file in metadata)
1✔
905

906
            print("Filling nan values...")
1✔
907
            dataframe = dfops.forward_fill_lazy(
1✔
908
                df=dataframe,
909
                columns=channels,
910
                before=overlap,
911
                iterations=self._config["dataframe"].get("forward_fill_iterations", 2),
912
            )
913
            # Remove the NaNs from per_electron channels
914
            dataframe_electron = dataframe.dropna(
1✔
915
                subset=self.get_channels(["per_electron"]),
916
            )
917
            dataframe_pulse = dataframe[
1✔
918
                self.multi_index + self.get_channels(["per_pulse", "per_train"])
919
            ]
920
            dataframe_pulse = dataframe_pulse[
1✔
921
                (dataframe_pulse["electronId"] == 0) | (np.isnan(dataframe_pulse["electronId"]))
922
            ]
923

924
        # Save the dataframe as parquet if requested
925
        if save_parquet:
1✔
926
            dataframe_electron.compute().reset_index(drop=True).to_parquet(parquet_path)
×
927
            print("Combined parquet file saved.")
×
928

929
        return dataframe_electron, dataframe_pulse
1✔
930

931
    def gather_metadata(self, metadata: dict = None) -> dict:
1✔
932
        """Dummy function returning empty metadata dictionary for now.
933

934
        Args:
935
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
936
                meta data are added to it. Defaults to None.
937

938
        Returns:
939
            dict: Metadata dictionary
940
        """
941
        if metadata is None:
×
942
            metadata = {}
×
943

944
        return metadata
×
945

946
    def get_count_rate(
1✔
947
        self,
948
        fids: Sequence[int] = None,  # noqa: ARG002
949
        **kwds,  # noqa: ARG002
950
    ):
951
        return None, None
1✔
952

953
    def get_elapsed_time(self, fids=None, **kwds):  # noqa: ARG002
1✔
954
        return None
1✔
955

956
    def read_dataframe(
1✔
957
        self,
958
        files: str | Sequence[str] = None,
959
        folders: str | Sequence[str] = None,
960
        runs: str | Sequence[str] = None,
961
        ftype: str = "h5",
962
        metadata: dict = None,
963
        collect_metadata: bool = False,
964
        **kwds,
965
    ) -> tuple[dd.DataFrame, dd.DataFrame, dict]:
966
        """
967
        Read express data from the DAQ, generating a parquet in between.
968

969
        Args:
970
            files (str | Sequence[str], optional): File path(s) to process. Defaults to None.
971
            folders (str | Sequence[str], optional): Path to folder(s) where files are stored
972
                Path has priority such that if it's specified, the specified files will be ignored.
973
                Defaults to None.
974
            runs (str | Sequence[str], optional): Run identifier(s). Corresponding files will
975
                be located in the location provided by ``folders``. Takes precedence over
976
                ``files`` and ``folders``. Defaults to None.
977
            ftype (str, optional): The file extension type. Defaults to "h5".
978
            metadata (dict, optional): Additional metadata. Defaults to None.
979
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
980
            **kwds: Keyword arguments passed to ``parquet_handler``.
981

982
        Returns:
983
            tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame,
984
            timed DataFrame, and metadata.
985

986
        Raises:
987
            ValueError: If neither 'runs' nor 'files'/'data_raw_dir' is provided.
988
            FileNotFoundError: If the conversion fails for some files or no data is available.
989
        """
990
        t0 = time.time()
1✔
991

992
        self._initialize_dirs()
1✔
993

994
        # Prepare a list of names for the runs to read and parquets to write
995
        if runs is not None:
1✔
996
            files = []
1✔
997
            if isinstance(runs, (str, int)):
1✔
998
                runs = [runs]
1✔
999
            for run in runs:
1✔
1000
                run_files = self.get_files_from_run_id(
1✔
1001
                    run_id=run,
1002
                    folders=[str(Path(folder).resolve()) for folder in self.raw_dir],
1003
                    extension=ftype,
1004
                    daq=self._config["dataframe"]["daq"],
1005
                )
1006
                files.extend(run_files)
1✔
1007
            self.runs = list(runs)
1✔
1008
            super().read_dataframe(files=files, ftype=ftype)
1✔
1009

1010
        else:
1011
            # This call takes care of files and folders. As we have converted runs into files
1012
            # already, they are just stored in the class by this call.
1013
            super().read_dataframe(
1✔
1014
                files=files,
1015
                folders=folders,
1016
                ftype=ftype,
1017
                metadata=metadata,
1018
            )
1019

1020
        df, df_timed = self.parquet_handler(Path(self.processed_dir), **kwds)
1✔
1021

1022
        if collect_metadata:
1✔
1023
            metadata = self.gather_metadata(
×
1024
                metadata=self.metadata,
1025
            )
1026
        else:
1027
            metadata = self.metadata
1✔
1028
        print(f"loading complete in {time.time() - t0: .2f} s")
1✔
1029

1030
        return df, df_timed, metadata
1✔
1031

1032

1033
LOADER = SXPLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc