• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 8740934984

18 Apr 2024 03:49PM UTC coverage: 91.562% (-0.03%) from 91.591%
8740934984

push

github

web-flow
FlashLoader relevant: fix reference before assignment for wespe (#385)

0 of 2 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

6044 of 6601 relevant lines covered (91.56%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.4
/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is a amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed funtionality.
9
"""
10
import time
1✔
11
from functools import reduce
1✔
12
from pathlib import Path
1✔
13
from typing import List
1✔
14
from typing import Sequence
1✔
15
from typing import Tuple
1✔
16
from typing import Union
1✔
17

18
import dask.dataframe as dd
1✔
19
import h5py
1✔
20
import numpy as np
1✔
21
import pyarrow.parquet as pq
1✔
22
from joblib import delayed
1✔
23
from joblib import Parallel
1✔
24
from natsort import natsorted
1✔
25
from pandas import DataFrame
1✔
26
from pandas import MultiIndex
1✔
27
from pandas import Series
1✔
28

29
from sed.core import dfops
1✔
30
from sed.loader.base.loader import BaseLoader
1✔
31
from sed.loader.flash.metadata import MetadataRetriever
1✔
32
from sed.loader.utils import parse_h5_keys
1✔
33
from sed.loader.utils import split_dld_time_from_sector_id
1✔
34

35

36
class FlashLoader(BaseLoader):
1✔
37
    """
38
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
39
    dataformat resolved by both macro and microbunches alongside electrons.
40
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
41
    """
42

43
    __name__ = "flash"
1✔
44

45
    supported_file_types = ["h5"]
1✔
46

47
    def __init__(self, config: dict) -> None:
1✔
48
        super().__init__(config=config)
1✔
49
        self.multi_index = ["trainId", "pulseId", "electronId"]
1✔
50
        self.index_per_electron: MultiIndex = None
1✔
51
        self.index_per_pulse: MultiIndex = None
1✔
52
        self.failed_files_error: List[str] = []
1✔
53

54
    def initialize_paths(self) -> Tuple[List[Path], Path]:
1✔
55
        """
56
        Initializes the paths based on the configuration.
57

58
        Returns:
59
            Tuple[List[Path], Path]: A tuple containing a list of raw data directories
60
            paths and the parquet data directory path.
61

62
        Raises:
63
            ValueError: If required values are missing from the configuration.
64
            FileNotFoundError: If the raw data directories are not found.
65
        """
66
        # Parses to locate the raw beamtime directory from config file
67
        if "paths" in self._config["core"]:
1✔
68
            data_raw_dir = [
1✔
69
                Path(self._config["core"]["paths"].get("data_raw_dir", "")),
70
            ]
71
            data_parquet_dir = Path(
1✔
72
                self._config["core"]["paths"].get("data_parquet_dir", ""),
73
            )
74

75
        else:
76
            try:
1✔
77
                beamtime_id = self._config["core"]["beamtime_id"]
1✔
78
                year = self._config["core"]["year"]
1✔
79
                daq = self._config["dataframe"]["daq"]
1✔
80
            except KeyError as exc:
×
81
                raise ValueError(
×
82
                    "The beamtime_id, year and daq are required.",
83
                ) from exc
84

85
            beamtime_dir = Path(
1✔
86
                self._config["dataframe"]["beamtime_dir"][self._config["core"]["beamline"]],
87
            )
88
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
1✔
89

90
            # Use pathlib walk to reach the raw data directory
91
            data_raw_dir = []
1✔
92
            raw_path = beamtime_dir.joinpath("raw")
1✔
93

94
            for path in raw_path.glob("**/*"):
1✔
95
                if path.is_dir():
1✔
96
                    dir_name = path.name
1✔
97
                    if dir_name.startswith("express-") or dir_name.startswith(
1✔
98
                        "online-",
99
                    ):
100
                        data_raw_dir.append(path.joinpath(daq))
1✔
101
                    elif dir_name == daq.upper():
1✔
102
                        data_raw_dir.append(path)
1✔
103

104
            if not data_raw_dir:
1✔
105
                raise FileNotFoundError("Raw data directories not found.")
1✔
106

107
            parquet_path = "processed/parquet"
1✔
108
            data_parquet_dir = beamtime_dir.joinpath(parquet_path)
1✔
109

110
        data_parquet_dir.mkdir(parents=True, exist_ok=True)
1✔
111

112
        return data_raw_dir, data_parquet_dir
1✔
113

114
    def get_files_from_run_id(
1✔
115
        self,
116
        run_id: str,
117
        folders: Union[str, Sequence[str]] = None,
118
        extension: str = "h5",
119
        **kwds,
120
    ) -> List[str]:
121
        """Returns a list of filenames for a given run located in the specified directory
122
        for the specified data acquisition (daq).
123

124
        Args:
125
            run_id (str): The run identifier to locate.
126
            folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw
127
                data is located. Defaults to config["core"]["base_folder"].
128
            extension (str, optional): The file extension. Defaults to "h5".
129
            kwds: Keyword arguments:
130
                - daq (str): The data acquisition identifier.
131

132
        Returns:
133
            List[str]: A list of path strings representing the collected file names.
134

135
        Raises:
136
            FileNotFoundError: If no files are found for the given run in the directory.
137
        """
138
        # Define the stream name prefixes based on the data acquisition identifier
139
        stream_name_prefixes = self._config["dataframe"]["stream_name_prefixes"]
1✔
140

141
        if folders is None:
1✔
142
            folders = self._config["core"]["base_folder"]
×
143

144
        if isinstance(folders, str):
1✔
145
            folders = [folders]
×
146

147
        daq = kwds.pop("daq", self._config.get("dataframe", {}).get("daq"))
1✔
148

149
        # Generate the file patterns to search for in the directory
150
        file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
1✔
151

152
        files: List[Path] = []
1✔
153
        # Use pathlib to search for matching files in each directory
154
        for folder in folders:
1✔
155
            files.extend(
1✔
156
                natsorted(
157
                    Path(folder).glob(file_pattern),
158
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
159
                ),
160
            )
161

162
        # Check if any files are found
163
        if not files:
1✔
164
            raise FileNotFoundError(
×
165
                f"No files found for run {run_id} in directory {str(folders)}",
166
            )
167

168
        # Return the list of found files
169
        return [str(file.resolve()) for file in files]
1✔
170

171
    @property
1✔
172
    def available_channels(self) -> List:
1✔
173
        """Returns the channel names that are available for use,
174
        excluding pulseId, defined by the json file"""
175
        available_channels = list(self._config["dataframe"]["channels"].keys())
1✔
176
        available_channels.remove("pulseId")
1✔
177
        return available_channels
1✔
178

179
    def get_channels(self, formats: Union[str, List[str]] = "", index: bool = False) -> List[str]:
1✔
180
        """
181
        Returns a list of channels associated with the specified format(s).
182

183
        Args:
184
            formats (Union[str, List[str]]): The desired format(s)
185
                                ('per_pulse', 'per_electron', 'per_train', 'all').
186
            index (bool): If True, includes channels from the multi_index.
187

188
        Returns:
189
            List[str]: A list of channels with the specified format(s).
190
        """
191
        # If 'formats' is a single string, convert it to a list for uniform processing.
192
        if isinstance(formats, str):
1✔
193
            formats = [formats]
1✔
194

195
        # If 'formats' is a string "all", gather all possible formats.
196
        if formats == ["all"]:
1✔
197
            channels = self.get_channels(["per_pulse", "per_train", "per_electron"], index)
1✔
198
            return channels
1✔
199

200
        channels = []
1✔
201
        for format_ in formats:
1✔
202
            # Gather channels based on the specified format(s).
203
            channels.extend(
1✔
204
                key
205
                for key in self.available_channels
206
                if self._config["dataframe"]["channels"][key]["format"] == format_
207
                and key != "dldAux"
208
            )
209
            # Include 'dldAuxChannels' if the format is 'per_pulse'.
210
            if format_ == "per_pulse":
1✔
211
                channels.extend(
1✔
212
                    self._config["dataframe"]["channels"]["dldAux"]["dldAuxChannels"].keys(),
213
                )
214

215
        # Include channels from multi_index if 'index' is True.
216
        if index:
1✔
217
            channels.extend(self.multi_index)
1✔
218

219
        return channels
1✔
220

221
    def reset_multi_index(self) -> None:
1✔
222
        """Resets the index per pulse and electron"""
223
        self.index_per_electron = None
1✔
224
        self.index_per_pulse = None
1✔
225

226
    def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
1✔
227
        """
228
        Creates an index per electron using pulseId for usage with the electron
229
            resolved pandas DataFrame.
230

231
        Args:
232
            h5_file (h5py.File): The HDF5 file object.
233

234
        Notes:
235
            - This method relies on the 'pulseId' channel to determine
236
                the macrobunch IDs.
237
            - It creates a MultiIndex with trainId, pulseId, and electronId
238
                as the index levels.
239
        """
240

241
        # Macrobunch IDs obtained from the pulseId channel
242
        [train_id, np_array] = self.create_numpy_array_per_channel(
1✔
243
            h5_file,
244
            "pulseId",
245
        )
246

247
        # Create a series with the macrobunches as index and
248
        # microbunches as values
249
        macrobunches = (
1✔
250
            Series(
251
                (np_array[i] for i in train_id.index),
252
                name="pulseId",
253
                index=train_id,
254
            )
255
            - self._config["dataframe"]["ubid_offset"]
256
        )
257

258
        # Explode dataframe to get all microbunch vales per macrobunch,
259
        # remove NaN values and convert to type int
260
        microbunches = macrobunches.explode().dropna().astype(int)
1✔
261

262
        # Create temporary index values
263
        index_temp = MultiIndex.from_arrays(
1✔
264
            (microbunches.index, microbunches.values),
265
            names=["trainId", "pulseId"],
266
        )
267

268
        # Calculate the electron counts per pulseId unique preserves the order of appearance
269
        electron_counts = index_temp.value_counts()[index_temp.unique()].values
1✔
270

271
        # Series object for indexing with electrons
272
        electrons = (
1✔
273
            Series(
274
                [np.arange(electron_counts[i]) for i in range(electron_counts.size)],
275
            )
276
            .explode()
277
            .astype(int)
278
        )
279

280
        # Create a pandas MultiIndex using the exploded datasets
281
        self.index_per_electron = MultiIndex.from_arrays(
1✔
282
            (microbunches.index, microbunches.values, electrons),
283
            names=self.multi_index,
284
        )
285

286
    def create_multi_index_per_pulse(
1✔
287
        self,
288
        train_id: Series,
289
        np_array: np.ndarray,
290
    ) -> None:
291
        """
292
        Creates an index per pulse using a pulse resolved channel's macrobunch ID, for usage with
293
        the pulse resolved pandas DataFrame.
294

295
        Args:
296
            train_id (Series): The train ID Series.
297
            np_array (np.ndarray): The numpy array containing the pulse resolved data.
298

299
        Notes:
300
            - This method creates a MultiIndex with trainId and pulseId as the index levels.
301
        """
302

303
        # Create a pandas MultiIndex, useful for comparing electron and
304
        # pulse resolved dataframes
305
        self.index_per_pulse = MultiIndex.from_product(
1✔
306
            (train_id, np.arange(0, np_array.shape[1])),
307
            names=["trainId", "pulseId"],
308
        )
309

310
    def create_numpy_array_per_channel(
1✔
311
        self,
312
        h5_file: h5py.File,
313
        channel: str,
314
    ) -> Tuple[Series, np.ndarray]:
315
        """
316
        Returns a numpy array for a given channel name for a given file.
317

318
        Args:
319
            h5_file (h5py.File): The h5py file object.
320
            channel (str): The name of the channel.
321

322
        Returns:
323
            Tuple[Series, np.ndarray]: A tuple containing the train ID Series and the numpy array
324
            for the channel's data.
325

326
        """
327
        # Get the data from the necessary h5 file and channel
328
        group = h5_file[self._config["dataframe"]["channels"][channel]["group_name"]]
1✔
329

330
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
331

332
        train_id = Series(group["index"], name="trainId")  # macrobunch
1✔
333

334
        # unpacks the timeStamp or value
335
        if channel == "timeStamp":
1✔
336
            np_array = group["time"][()]
1✔
337
        else:
338
            np_array = group["value"][()]
1✔
339

340
        # Use predefined axis and slice from the json file
341
        # to choose correct dimension for necessary channel
342
        if "slice" in channel_dict:
1✔
343
            np_array = np.take(
1✔
344
                np_array,
345
                channel_dict["slice"],
346
                axis=1,
347
            )
348
        return train_id, np_array
1✔
349

350
    def create_dataframe_per_electron(
1✔
351
        self,
352
        np_array: np.ndarray,
353
        train_id: Series,
354
        channel: str,
355
    ) -> DataFrame:
356
        """
357
        Returns a pandas DataFrame for a given channel name of type [per electron].
358

359
        Args:
360
            np_array (np.ndarray): The numpy array containing the channel data.
361
            train_id (Series): The train ID Series.
362
            channel (str): The name of the channel.
363

364
        Returns:
365
            DataFrame: The pandas DataFrame for the channel's data.
366

367
        Notes:
368
            The microbunch resolved data is exploded and converted to a DataFrame. The MultiIndex
369
            is set, and the NaN values are dropped, alongside the pulseId = 0 (meaningless).
370

371
        """
372
        return (
1✔
373
            Series((np_array[i] for i in train_id.index), name=channel)
374
            .explode()
375
            .dropna()
376
            .to_frame()
377
            .set_index(self.index_per_electron)
378
            .drop(
379
                index=np.arange(-self._config["dataframe"]["ubid_offset"], 0),
380
                level=1,
381
                errors="ignore",
382
            )
383
        )
384

385
    def create_dataframe_per_pulse(
1✔
386
        self,
387
        np_array: np.ndarray,
388
        train_id: Series,
389
        channel: str,
390
        channel_dict: dict,
391
    ) -> DataFrame:
392
        """
393
        Returns a pandas DataFrame for a given channel name of type [per pulse].
394

395
        Args:
396
            np_array (np.ndarray): The numpy array containing the channel data.
397
            train_id (Series): The train ID Series.
398
            channel (str): The name of the channel.
399
            channel_dict (dict): The dictionary containing channel parameters.
400

401
        Returns:
402
            DataFrame: The pandas DataFrame for the channel's data.
403

404
        Notes:
405
            - For auxillary channels, the macrobunch resolved data is repeated 499 times to be
406
              compared to electron resolved data for each auxillary channel. The data is then
407
              converted to a multicolumn DataFrame.
408
            - For all other pulse resolved channels, the macrobunch resolved data is exploded
409
              to a DataFrame and the MultiIndex is set.
410

411
        """
412

413
        # Special case for auxillary channels
414
        if channel == "dldAux":
1✔
415
            # Checks the channel dictionary for correct slices and creates a multicolumn DataFrame
416
            data_frames = (
1✔
417
                Series(
418
                    (np_array[i, value] for i in train_id.index),
419
                    name=key,
420
                    index=train_id,
421
                ).to_frame()
422
                for key, value in channel_dict["dldAuxChannels"].items()
423
            )
424

425
            # Multiindex set and combined dataframe returned
426
            data = reduce(DataFrame.combine_first, data_frames)
1✔
427

428
        # For all other pulse resolved channels
429
        else:
430
            # Macrobunch resolved data is exploded to a DataFrame and the MultiIndex is set
431

432
            # Creates the index_per_pulse for the given channel
433
            self.create_multi_index_per_pulse(train_id, np_array)
1✔
434
            data = (
1✔
435
                Series((np_array[i] for i in train_id.index), name=channel)
436
                .explode()
437
                .to_frame()
438
                .set_index(self.index_per_pulse)
439
            )
440

441
        return data
1✔
442

443
    def create_dataframe_per_train(
1✔
444
        self,
445
        np_array: np.ndarray,
446
        train_id: Series,
447
        channel: str,
448
    ) -> DataFrame:
449
        """
450
        Returns a pandas DataFrame for a given channel name of type [per train].
451

452
        Args:
453
            np_array (np.ndarray): The numpy array containing the channel data.
454
            train_id (Series): The train ID Series.
455
            channel (str): The name of the channel.
456

457
        Returns:
458
            DataFrame: The pandas DataFrame for the channel's data.
459
        """
460
        return (
1✔
461
            Series((np_array[i] for i in train_id.index), name=channel)
462
            .to_frame()
463
            .set_index(train_id)
464
        )
465

466
    def create_dataframe_per_channel(
1✔
467
        self,
468
        h5_file: h5py.File,
469
        channel: str,
470
    ) -> Union[Series, DataFrame]:
471
        """
472
        Returns a pandas DataFrame for a given channel name from a given file.
473

474
        This method takes an h5py.File object `h5_file` and a channel name `channel`, and returns
475
        a pandas DataFrame containing the data for that channel from the file. The format of the
476
        DataFrame depends on the channel's format specified in the configuration.
477

478
        Args:
479
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
480
            channel (str): The name of the channel.
481

482
        Returns:
483
            Union[Series, DataFrame]: A pandas Series or DataFrame representing the channel's data.
484

485
        Raises:
486
            ValueError: If the channel has an undefined format.
487

488
        """
489
        [train_id, np_array] = self.create_numpy_array_per_channel(
1✔
490
            h5_file,
491
            channel,
492
        )  # numpy Array created
493
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
494

495
        # If np_array is size zero, fill with NaNs
496
        if np_array.size == 0:
1✔
497
            # Fill the np_array with NaN values of the same shape as train_id
498
            np_array = np.full_like(train_id, np.nan, dtype=np.double)
×
499
            # Create a Series using np_array, with train_id as the index
500
            data = Series(
×
501
                (np_array[i] for i in train_id.index),
502
                name=channel,
503
                index=train_id,
504
            )
505

506
        # Electron resolved data is treated here
507
        if channel_dict["format"] == "per_electron":
1✔
508
            # If index_per_electron is None, create it for the given file
509
            if self.index_per_electron is None:
1✔
510
                self.create_multi_index_per_electron(h5_file)
1✔
511

512
            # Create a DataFrame for electron-resolved data
513
            data = self.create_dataframe_per_electron(
1✔
514
                np_array,
515
                train_id,
516
                channel,
517
            )
518

519
        # Pulse resolved data is treated here
520
        elif channel_dict["format"] == "per_pulse":
1✔
521
            # Create a DataFrame for pulse-resolved data
522
            data = self.create_dataframe_per_pulse(
1✔
523
                np_array,
524
                train_id,
525
                channel,
526
                channel_dict,
527
            )
528

529
        # Train resolved data is treated here
530
        elif channel_dict["format"] == "per_train":
1✔
531
            # Create a DataFrame for train-resolved data
532
            data = self.create_dataframe_per_train(np_array, train_id, channel)
1✔
533

534
        else:
535
            raise ValueError(
×
536
                channel
537
                + "has an undefined format. Available formats are \
538
                per_pulse, per_electron and per_train",
539
            )
540

541
        return data
1✔
542

543
    def concatenate_channels(
1✔
544
        self,
545
        h5_file: h5py.File,
546
    ) -> DataFrame:
547
        """
548
        Concatenates the channels from the provided h5py.File into a pandas DataFrame.
549

550
        This method takes an h5py.File object `h5_file` and concatenates the channels present in
551
        the file into a single pandas DataFrame. The concatenation is performed based on the
552
        available channels specified in the configuration.
553

554
        Args:
555
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
556

557
        Returns:
558
            DataFrame: A concatenated pandas DataFrame containing the channels.
559

560
        Raises:
561
            ValueError: If the group_name for any channel does not exist in the file.
562

563
        """
564
        all_keys = parse_h5_keys(h5_file)  # Parses all channels present
1✔
565

566
        # Check for if the provided group_name actually exists in the file
567
        for channel in self._config["dataframe"]["channels"]:
1✔
568
            if channel == "timeStamp":
1✔
569
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "time"
1✔
570
            else:
571
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "value"
1✔
572

573
            if group_name not in all_keys:
1✔
574
                raise ValueError(
1✔
575
                    f"The group_name for channel {channel} does not exist.",
576
                )
577

578
        # Create a generator expression to generate data frames for each channel
579
        data_frames = (
1✔
580
            self.create_dataframe_per_channel(h5_file, each) for each in self.available_channels
581
        )
582

583
        # Use the reduce function to join the data frames into a single DataFrame
584
        return reduce(
1✔
585
            lambda left, right: left.join(right, how="outer"),
586
            data_frames,
587
        )
588

589
    def create_dataframe_per_file(
1✔
590
        self,
591
        file_path: Path,
592
    ) -> DataFrame:
593
        """
594
        Create pandas DataFrames for the given file.
595

596
        This method loads an HDF5 file specified by `file_path` and constructs a pandas DataFrame
597
        from the datasets within the file. The order of datasets in the DataFrames is the opposite
598
        of the order specified by channel names.
599

600
        Args:
601
            file_path (Path): Path to the input HDF5 file.
602

603
        Returns:
604
            DataFrame: pandas DataFrame
605

606
        """
607
        # Loads h5 file and creates a dataframe
608
        with h5py.File(file_path, "r") as h5_file:
1✔
609
            self.reset_multi_index()  # Reset MultiIndexes for next file
1✔
610
            df = self.concatenate_channels(h5_file)
1✔
611
            df = df.dropna(subset=self._config["dataframe"].get("tof_column", "dldTimeSteps"))
1✔
612
            # correct the 3 bit shift which encodes the detector ID in the 8s time
613
            if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
1✔
614
                df = split_dld_time_from_sector_id(df, config=self._config)
1✔
615
            return df
1✔
616

617
    def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> Union[bool, Exception]:
1✔
618
        """
619
        Converts an HDF5 file to Parquet format to create a buffer file.
620

621
        This method uses `create_dataframe_per_file` method to create dataframes from individual
622
        files within an HDF5 file. The resulting dataframe is then saved to a Parquet file.
623

624
        Args:
625
            h5_path (Path): Path to the input HDF5 file.
626
            parquet_path (Path): Path to the output Parquet file.
627

628
        Raises:
629
            ValueError: If an error occurs during the conversion process.
630

631
        """
632
        try:
1✔
633
            (
1✔
634
                self.create_dataframe_per_file(h5_path)
635
                .reset_index(level=self.multi_index)
636
                .to_parquet(parquet_path, index=False)
637
            )
638
        except Exception as exc:  # pylint: disable=broad-except
×
639
            self.failed_files_error.append(f"{parquet_path}: {type(exc)} {exc}")
×
640
            return exc
×
641
        return None
1✔
642

643
    def buffer_file_handler(
1✔
644
        self,
645
        data_parquet_dir: Path,
646
        detector: str,
647
        force_recreate: bool,
648
    ) -> Tuple[List[Path], List, List]:
649
        """
650
        Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
651

652
        Args:
653
            data_parquet_dir (Path): Directory where the parquet files will be stored.
654
            detector (str): Detector name.
655
            force_recreate (bool): Forces recreation of buffer files
656

657
        Returns:
658
            Tuple[List[Path], List, List]: Three lists, one for
659
            parquet file paths, one for metadata and one for schema.
660

661
        Raises:
662
            FileNotFoundError: If the conversion fails for any files or no data is available.
663
        """
664

665
        # Create the directory for buffer parquet files
666
        buffer_file_dir = data_parquet_dir.joinpath("buffer")
1✔
667
        buffer_file_dir.mkdir(parents=True, exist_ok=True)
1✔
668

669
        # Create two separate lists for h5 and parquet file paths
670
        h5_filenames = [Path(file) for file in self.files]
1✔
671
        parquet_filenames = [
1✔
672
            buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
673
        ]
674
        existing_parquet_filenames = [file for file in parquet_filenames if file.exists()]
1✔
675

676
        # Raise a value error if no data is available after the conversion
677
        if len(h5_filenames) == 0:
1✔
678
            raise ValueError("No data available. Probably failed reading all h5 files")
×
679

680
        if not force_recreate:
1✔
681
            # Check if the available channels match the schema of the existing parquet files
682
            parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
1✔
683
            config_schema = set(self.get_channels(formats="all", index=True))
1✔
684
            if self._config["dataframe"].get("split_sector_id_from_dld_time", False):
1✔
685
                config_schema.add(self._config["dataframe"].get("sector_id_column", False))
1✔
686

687
            for i, schema in enumerate(parquet_schemas):
1✔
688
                schema_set = set(schema.names)
1✔
689
                if schema_set != config_schema:
1✔
690
                    missing_in_parquet = config_schema - schema_set
1✔
691
                    missing_in_config = schema_set - config_schema
1✔
692

693
                    missing_in_parquet_str = (
1✔
694
                        f"Missing in parquet: {missing_in_parquet}" if missing_in_parquet else ""
695
                    )
696
                    missing_in_config_str = (
1✔
697
                        f"Missing in config: {missing_in_config}" if missing_in_config else ""
698
                    )
699

700
                    raise ValueError(
1✔
701
                        "The available channels do not match the schema of file",
702
                        f"{existing_parquet_filenames[i]}",
703
                        f"{missing_in_parquet_str}",
704
                        f"{missing_in_config_str}",
705
                        "Please check the configuration file or set force_recreate to True.",
706
                    )
707

708
        # Choose files to read
709
        files_to_read = [
1✔
710
            (h5_path, parquet_path)
711
            for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
712
            if force_recreate or not parquet_path.exists()
713
        ]
714

715
        print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")
1✔
716

717
        # Initialize the indices for create_buffer_file conversion
718
        self.reset_multi_index()
1✔
719

720
        # Convert the remaining h5 files to parquet in parallel if there are any
721
        if len(files_to_read) > 0:
1✔
722
            error = Parallel(n_jobs=len(files_to_read), verbose=10)(
1✔
723
                delayed(self.create_buffer_file)(h5_path, parquet_path)
724
                for h5_path, parquet_path in files_to_read
725
            )
726
            if any(error):
1✔
727
                raise RuntimeError(f"Conversion failed for some files. {error}")
×
728

729
        # Raise an error if the conversion failed for any files
730
        # TODO: merge this and the previous error trackings
731
        if self.failed_files_error:
1✔
732
            raise FileNotFoundError(
×
733
                "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error),
734
            )
735

736
        print("All files converted successfully!")
1✔
737

738
        # read all parquet metadata and schema
739
        metadata = [pq.read_metadata(file) for file in parquet_filenames]
1✔
740
        schema = [pq.read_schema(file) for file in parquet_filenames]
1✔
741

742
        return parquet_filenames, metadata, schema
1✔
743

744
    def parquet_handler(
1✔
745
        self,
746
        data_parquet_dir: Path,
747
        detector: str = "",
748
        parquet_path: Path = None,
749
        converted: bool = False,
750
        load_parquet: bool = False,
751
        save_parquet: bool = False,
752
        force_recreate: bool = False,
753
    ) -> Tuple[dd.DataFrame, dd.DataFrame]:
754
        """
755
        Handles loading and saving of parquet files based on the provided parameters.
756

757
        Args:
758
            data_parquet_dir (Path): Directory where the parquet files are located.
759
            detector (str, optional): Adds a identifier for parquets to distinguish multidetector
760
                systems.
761
            parquet_path (str, optional): Path to the combined parquet file.
762
            converted (bool, optional): True if data is augmented by adding additional columns
763
                externally and saved into converted folder.
764
            load_parquet (bool, optional): Loads the entire parquet into the dd dataframe.
765
            save_parquet (bool, optional): Saves the entire dataframe into a parquet.
766
            force_recreate (bool, optional): Forces recreation of buffer file.
767
        Returns:
768
            tuple: A tuple containing two dataframes:
769
            - dataframe_electron: Dataframe containing the loaded/augmented electron data.
770
            - dataframe_pulse: Dataframe containing the loaded/augmented timed data.
771

772
        Raises:
773
            FileNotFoundError: If the requested parquet file is not found.
774

775
        """
776

777
        # Construct the parquet path if not provided
778
        if parquet_path is None:
1✔
779
            parquet_name = "_".join(str(run) for run in self.runs)
1✔
780
            parquet_dir = data_parquet_dir.joinpath("converted") if converted else data_parquet_dir
1✔
781

782
            parquet_path = parquet_dir.joinpath(
1✔
783
                "run_" + parquet_name + detector,
784
            ).with_suffix(".parquet")
785

786
        # Check if load_parquet is flagged and then load the file if it exists
787
        if load_parquet:
1✔
788
            try:
×
NEW
789
                dataframe_electron = dd.read_parquet(parquet_path)
×
NEW
790
                dataframe_pulse = dataframe_electron
×
791
            except Exception as exc:
×
792
                raise FileNotFoundError(
×
793
                    "The final parquet for this run(s) does not exist yet. "
794
                    "If it is in another location, please provide the path as parquet_path.",
795
                ) from exc
796

797
        else:
798
            # Obtain the parquet filenames, metadata and schema from the method
799
            # which handles buffer file creation/reading
800
            filenames, metadata, _ = self.buffer_file_handler(
1✔
801
                data_parquet_dir,
802
                detector,
803
                force_recreate,
804
            )
805

806
            # Read all parquet files into one dataframe using dask
807
            dataframe = dd.read_parquet(filenames, calculate_divisions=True)
1✔
808

809
            # Channels to fill NaN values
810
            channels: List[str] = self.get_channels(["per_pulse", "per_train"])
1✔
811

812
            overlap = min(file.num_rows for file in metadata)
1✔
813

814
            print("Filling nan values...")
1✔
815
            dataframe = dfops.forward_fill_lazy(
1✔
816
                df=dataframe,
817
                columns=channels,
818
                before=overlap,
819
                iterations=self._config["dataframe"].get("forward_fill_iterations", 2),
820
            )
821
            # Remove the NaNs from per_electron channels
822
            dataframe_electron = dataframe.dropna(
1✔
823
                subset=self.get_channels(["per_electron"]),
824
            )
825
            dataframe_pulse = dataframe[
1✔
826
                self.multi_index + self.get_channels(["per_pulse", "per_train"])
827
            ]
828
            dataframe_pulse = dataframe_pulse[
1✔
829
                (dataframe_pulse["electronId"] == 0) | (np.isnan(dataframe_pulse["electronId"]))
830
            ]
831

832
        # Save the dataframe as parquet if requested
833
        if save_parquet:
1✔
834
            dataframe_electron.compute().reset_index(drop=True).to_parquet(parquet_path)
×
835
            print("Combined parquet file saved.")
×
836

837
        return dataframe_electron, dataframe_pulse
1✔
838

839
    def parse_metadata(self, scicat_token: str = None) -> dict:
1✔
840
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
841

842
        Returns:
843
            dict: Metadata dictionary
844
            scicat_token (str, optional):: The scicat token to use for fetching metadata
845
        """
846
        metadata_retriever = MetadataRetriever(self._config["metadata"], scicat_token)
×
847
        metadata = metadata_retriever.get_metadata(
×
848
            beamtime_id=self._config["core"]["beamtime_id"],
849
            runs=self.runs,
850
            metadata=self.metadata,
851
        )
852

853
        return metadata
×
854

855
    def get_count_rate(
1✔
856
        self,
857
        fids: Sequence[int] = None,  # noqa: ARG002
858
        **kwds,  # noqa: ARG002
859
    ):
860
        return None, None
1✔
861

862
    def get_elapsed_time(self, fids=None, **kwds):  # noqa: ARG002
1✔
863
        return None
1✔
864

865
    def read_dataframe(
1✔
866
        self,
867
        files: Union[str, Sequence[str]] = None,
868
        folders: Union[str, Sequence[str]] = None,
869
        runs: Union[str, Sequence[str]] = None,
870
        ftype: str = "h5",
871
        metadata: dict = None,
872
        collect_metadata: bool = False,
873
        **kwds,
874
    ) -> Tuple[dd.DataFrame, dd.DataFrame, dict]:
875
        """
876
        Read express data from the DAQ, generating a parquet in between.
877

878
        Args:
879
            files (Union[str, Sequence[str]], optional): File path(s) to process. Defaults to None.
880
            folders (Union[str, Sequence[str]], optional): Path to folder(s) where files are stored
881
                Path has priority such that if it's specified, the specified files will be ignored.
882
                Defaults to None.
883
            runs (Union[str, Sequence[str]], optional): Run identifier(s). Corresponding files will
884
                be located in the location provided by ``folders``. Takes precendence over
885
                ``files`` and ``folders``. Defaults to None.
886
            ftype (str, optional): The file extension type. Defaults to "h5".
887
            metadata (dict, optional): Additional metadata. Defaults to None.
888
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
889

890
        Returns:
891
            Tuple[dd.DataFrame, dict]: A tuple containing the concatenated DataFrame and metadata.
892

893
        Raises:
894
            ValueError: If neither 'runs' nor 'files'/'data_raw_dir' is provided.
895
            FileNotFoundError: If the conversion fails for some files or no data is available.
896
        """
897
        t0 = time.time()
1✔
898

899
        data_raw_dir, data_parquet_dir = self.initialize_paths()
1✔
900

901
        # Prepare a list of names for the runs to read and parquets to write
902
        if runs is not None:
1✔
903
            files = []
1✔
904
            if isinstance(runs, (str, int)):
1✔
905
                runs = [runs]
1✔
906
            for run in runs:
1✔
907
                run_files = self.get_files_from_run_id(
1✔
908
                    run_id=run,
909
                    folders=[str(folder.resolve()) for folder in data_raw_dir],
910
                    extension=ftype,
911
                    daq=self._config["dataframe"]["daq"],
912
                )
913
                files.extend(run_files)
1✔
914
            self.runs = list(runs)
1✔
915
            super().read_dataframe(files=files, ftype=ftype)
1✔
916

917
        else:
918
            # This call takes care of files and folders. As we have converted runs into files
919
            # already, they are just stored in the class by this call.
920
            super().read_dataframe(
1✔
921
                files=files,
922
                folders=folders,
923
                ftype=ftype,
924
                metadata=metadata,
925
            )
926

927
        df, df_timed = self.parquet_handler(data_parquet_dir, **kwds)
1✔
928

929
        metadata = self.parse_metadata(**kwds) if collect_metadata else {}
1✔
930
        print(f"loading complete in {time.time() - t0: .2f} s")
1✔
931

932
        return df, df_timed, metadata
1✔
933

934

935
LOADER = FlashLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc