• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 6498082478

12 Oct 2023 04:01PM UTC coverage: 99.663% (+9.1%) from 90.587%
6498082478

Pull #151

github

web-flow
Merge 103c9a6f3 into ec5bccda4
Pull Request #151: Documentation PR

4734 of 4750 relevant lines covered (99.66%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.97
/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is a amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed funtionality.
9
"""
10
import time
3✔
11
from functools import reduce
3✔
12
from pathlib import Path
3✔
13
from typing import List
3✔
14
from typing import Sequence
3✔
15
from typing import Tuple
3✔
16
from typing import Union
3✔
17

18
import dask.dataframe as dd
3✔
19
import h5py
3✔
20
import numpy as np
3✔
21
import pyarrow as pa
3✔
22
from joblib import delayed
3✔
23
from joblib import Parallel
3✔
24
from natsort import natsorted
3✔
25
from pandas import DataFrame
3✔
26
from pandas import MultiIndex
3✔
27
from pandas import Series
3✔
28

29
from sed.core import dfops
3✔
30
from sed.loader.base.loader import BaseLoader
3✔
31
from sed.loader.flash.metadata import MetadataRetriever
3✔
32
from sed.loader.utils import parse_h5_keys
3✔
33

34

35
class FlashLoader(BaseLoader):
3✔
36
    """
37
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
38
    dataformat resolved by both macro and microbunches alongside electrons.
39
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
40
    """
41

42
    __name__ = "flash"
3✔
43

44
    supported_file_types = ["h5"]
3✔
45

46
    def __init__(self, config: dict) -> None:
3✔
47

48
        super().__init__(config=config)
3✔
49
        self.multi_index = ["trainId", "pulseId", "electronId"]
3✔
50
        self.index_per_electron: MultiIndex = None
3✔
51
        self.index_per_pulse: MultiIndex = None
3✔
52
        self.failed_files_error: List[str] = []
3✔
53

54
    def initialize_paths(self) -> Tuple[List[Path], Path]:
3✔
55
        """
56
        Initializes the paths based on the configuration.
57

58
        Returns:
59
            Tuple[List[Path], Path]: A tuple containing a list of raw data directories
60
            paths and the parquet data directory path.
61

62
        Raises:
63
            ValueError: If required values are missing from the configuration.
64
            FileNotFoundError: If the raw data directories are not found.
65
        """
66
        # Parses to locate the raw beamtime directory from config file
67
        if "paths" in self._config["core"]:
3✔
68
            data_raw_dir = [
3✔
69
                Path(self._config["core"]["paths"].get("data_raw_dir", "")),
3✔
70
            ]
71
            data_parquet_dir = Path(
3✔
72
                self._config["core"]["paths"].get("data_parquet_dir", ""),
3✔
73
            )
74

75
        else:
×
76
            try:
×
77
                beamtime_id = self._config["core"]["beamtime_id"]
×
78
                year = self._config["core"]["year"]
×
79
                daq = self._config["dataframe"]["daq"]
×
80
            except KeyError as exc:
×
81
                raise ValueError(
82
                    "The beamtime_id, year and daq are required.",
83
                ) from exc
84

85
            beamtime_dir = Path(
86
                self._config["dataframe"]["beamtime_dir"][self._config["core"]["beamline"]],
87
            )
88
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
89

90
            # Use pathlib walk to reach the raw data directory
91
            data_raw_dir = []
92
            raw_path = beamtime_dir.joinpath("raw")
93

94
            for path in raw_path.glob("**/*"):
95
                if path.is_dir():
96
                    dir_name = path.name
97
                    if dir_name.startswith("express-") or dir_name.startswith(
98
                        "online-",
99
                    ):
100
                        data_raw_dir.append(path.joinpath(daq))
101
                    elif dir_name == daq.upper():
102
                        data_raw_dir.append(path)
103

104
            if not data_raw_dir:
105
                raise FileNotFoundError("Raw data directories not found.")
106

107
            parquet_path = "processed/parquet"
108
            data_parquet_dir = beamtime_dir.joinpath(parquet_path)
109

110
        data_parquet_dir.mkdir(parents=True, exist_ok=True)
3✔
111

112
        return data_raw_dir, data_parquet_dir
3✔
113

114
    def get_files_from_run_id(
3✔
115
        self,
116
        run_id: str,
3✔
117
        folders: Union[str, Sequence[str]] = None,
3✔
118
        extension: str = "h5",
3✔
119
        **kwds,
120
    ) -> List[str]:
3✔
121
        """Returns a list of filenames for a given run located in the specified directory
122
        for the specified data acquisition (daq).
123

124
        Args:
125
            run_id (str): The run identifier to locate.
126
            folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw
127
                data is located. Defaults to config["core"]["base_folder"].
128
            extension (str, optional): The file extension. Defaults to "h5".
129
            kwds: Keyword arguments:
130
                - daq (str): The data acquisition identifier.
131
                Defaults to config["dataframe"]["daq"].
132

133
        Returns:
134
            List[str]: A list of path strings representing the collected file names.
135

136
        Raises:
137
            FileNotFoundError: If no files are found for the given run in the directory.
138
        """
139
        # Define the stream name prefixes based on the data acquisition identifier
140
        stream_name_prefixes = self._config["dataframe"]["stream_name_prefixes"]
3✔
141

142
        if folders is None:
3✔
143
            folders = self._config["core"]["base_folder"]
144

145
        if isinstance(folders, str):
3✔
146
            folders = [folders]
147

148
        daq = kwds.pop("daq", self._config.get("dataframe", {}).get("daq"))
3✔
149

150
        # Generate the file patterns to search for in the directory
151
        file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
3✔
152

153
        files: List[Path] = []
3✔
154
        # Use pathlib to search for matching files in each directory
155
        for folder in folders:
3✔
156
            files.extend(
3✔
157
                natsorted(
3✔
158
                    Path(folder).glob(file_pattern),
3✔
159
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
3✔
160
                ),
161
            )
162

163
        # Check if any files are found
164
        if not files:
3✔
165
            raise FileNotFoundError(
166
                f"No files found for run {run_id} in directory {str(folders)}",
167
            )
168

169
        # Return the list of found files
170
        return [str(file.resolve()) for file in files]
3✔
171

172
    @property
3✔
173
    def available_channels(self) -> List:
3✔
174
        """Returns the channel names that are available for use,
175
        excluding pulseId, defined by the json file"""
176
        available_channels = list(self._config["dataframe"]["channels"].keys())
3✔
177
        available_channels.remove("pulseId")
3✔
178
        return available_channels
3✔
179

180
    def get_channels_by_format(self, formats: List[str]) -> List:
3✔
181
        """
182
        Returns a list of channels with the specified format.
183

184
        Args:
185
            formats (List[str]): The desired formats ('per_pulse', 'per_electron',
186
                or 'per_train').
187

188
        Returns:
189
            List: A list of channels with the specified format(s).
190
        """
191
        channels = []
3✔
192
        for format_ in formats:
3✔
193
            for key in self.available_channels:
3✔
194
                channel_format = self._config["dataframe"]["channels"][key]["format"]
3✔
195
                if channel_format == format_:
3✔
196
                    if key == "dldAux":
3✔
197
                        aux_channels = self._config["dataframe"]["channels"][key][
3✔
198
                            "dldAuxChannels"
3✔
199
                        ].keys()
1✔
200
                        channels.extend(aux_channels)
3✔
201
                    else:
202
                        channels.append(key)
3✔
203
        return channels
3✔
204

205
    def reset_multi_index(self) -> None:
3✔
206
        """Resets the index per pulse and electron"""
207
        self.index_per_electron = None
208
        self.index_per_pulse = None
209

210
    def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
211
        """
212
        Creates an index per electron using pulseId for usage with the electron
213
            resolved pandas DataFrame.
214

215
        Args:
216
            h5_file (h5py.File): The HDF5 file object.
217

218
        Notes:
219
            - This method relies on the 'pulseId' channel to determine
220
                the macrobunch IDs.
221
            - It creates a MultiIndex with trainId, pulseId, and electronId
222
                as the index levels.
223
        """
224

225
        # Macrobunch IDs obtained from the pulseId channel
226
        [train_id, np_array] = self.create_numpy_array_per_channel(
3✔
227
            h5_file,
3✔
228
            "pulseId",
3✔
229
        )
230

231
        # Create a series with the macrobunches as index and
232
        # microbunches as values
233
        macrobunches = (
3✔
234
            Series(
3✔
235
                (np_array[i] for i in train_id.index),
3✔
236
                name="pulseId",
3✔
237
                index=train_id,
3✔
238
            )
239
            - self._config["dataframe"]["ubid_offset"]
3✔
240
        )
241

242
        # Explode dataframe to get all microbunch vales per macrobunch,
243
        # remove NaN values and convert to type int
244
        microbunches = macrobunches.explode().dropna().astype(int)
3✔
245

246
        # Create temporary index values
247
        index_temp = MultiIndex.from_arrays(
3✔
248
            (microbunches.index, microbunches.values),
3✔
249
            names=["trainId", "pulseId"],
3✔
250
        )
251

252
        # Calculate the electron counts per pulseId unique preserves the order of appearance
253
        electron_counts = index_temp.value_counts()[index_temp.unique()].values
3✔
254

255
        # Series object for indexing with electrons
256
        electrons = (
3✔
257
            Series(
3✔
258
                [np.arange(electron_counts[i]) for i in range(electron_counts.size)],
3✔
259
            )
260
            .explode()
1✔
261
            .astype(int)
3✔
262
        )
263

264
        # Create a pandas MultiIndex using the exploded datasets
265
        self.index_per_electron = MultiIndex.from_arrays(
3✔
266
            (microbunches.index, microbunches.values, electrons),
3✔
267
            names=self.multi_index,
3✔
268
        )
269

270
    def create_multi_index_per_pulse(
3✔
271
        self,
272
        train_id: Series,
3✔
273
        np_array: np.ndarray,
3✔
274
    ) -> None:
3✔
275
        """
276
        Creates an index per pulse using a pulse resolved channel's macrobunch ID, for usage with
277
        the pulse resolved pandas DataFrame.
278

279
        Args:
280
            train_id (Series): The train ID Series.
281
            np_array (np.ndarray): The numpy array containing the pulse resolved data.
282

283
        Notes:
284
            - This method creates a MultiIndex with trainId and pulseId as the index levels.
285
        """
286

287
        # Create a pandas MultiIndex, useful for comparing electron and
288
        # pulse resolved dataframes
289
        self.index_per_pulse = MultiIndex.from_product(
290
            (train_id, np.arange(0, np_array.shape[1])),
291
            names=["trainId", "pulseId"],
292
        )
293

294
    def create_numpy_array_per_channel(
3✔
295
        self,
296
        h5_file: h5py.File,
3✔
297
        channel: str,
3✔
298
    ) -> Tuple[Series, np.ndarray]:
3✔
299
        """
300
        Returns a numpy array for a given channel name for a given file.
301

302
        Args:
303
            h5_file (h5py.File): The h5py file object.
304
            channel (str): The name of the channel.
305

306
        Returns:
307
            Tuple[Series, np.ndarray]: A tuple containing the train ID Series and the numpy array
308
            for the channel's data.
309

310
        """
311
        # Get the data from the necessary h5 file and channel
312
        group = h5_file[self._config["dataframe"]["channels"][channel]["group_name"]]
3✔
313

314
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
3✔
315

316
        train_id = Series(group["index"], name="trainId")  # macrobunch
3✔
317

318
        # unpacks the timeStamp or value
319
        if channel == "timeStamp":
3✔
320
            np_array = group["time"][()]
321
        else:
322
            np_array = group["value"][()]
3✔
323

324
        # Use predefined axis and slice from the json file
325
        # to choose correct dimension for necessary channel
326
        if "slice" in channel_dict:
3✔
327
            np_array = np.take(
3✔
328
                np_array,
3✔
329
                channel_dict["slice"],
3✔
330
                axis=1,
3✔
331
            )
332
        return train_id, np_array
3✔
333

334
    def create_dataframe_per_electron(
3✔
335
        self,
336
        np_array: np.ndarray,
3✔
337
        train_id: Series,
3✔
338
        channel: str,
3✔
339
    ) -> DataFrame:
3✔
340
        """
341
        Returns a pandas DataFrame for a given channel name of type [per electron].
342

343
        Args:
344
            np_array (np.ndarray): The numpy array containing the channel data.
345
            train_id (Series): The train ID Series.
346
            channel (str): The name of the channel.
347

348
        Returns:
349
            DataFrame: The pandas DataFrame for the channel's data.
350

351
        Notes:
352
            The microbunch resolved data is exploded and converted to a DataFrame. The MultiIndex
353
            is set, and the NaN values are dropped, alongside the pulseId = 0 (meaningless).
354

355
        """
356
        return (
3✔
357
            Series((np_array[i] for i in train_id.index), name=channel)
3✔
358
            .explode()
1✔
359
            .dropna()
1✔
360
            .to_frame()
1✔
361
            .set_index(self.index_per_electron)
3✔
362
            .drop(
1✔
363
                index=np.arange(-self._config["dataframe"]["ubid_offset"], 0),
3✔
364
                level=1,
3✔
365
                errors="ignore",
3✔
366
            )
367
        )
368

369
    def create_dataframe_per_pulse(
3✔
370
        self,
371
        np_array: np.ndarray,
3✔
372
        train_id: Series,
3✔
373
        channel: str,
3✔
374
        channel_dict: dict,
3✔
375
    ) -> DataFrame:
3✔
376
        """
377
        Returns a pandas DataFrame for a given channel name of type [per pulse].
378

379
        Args:
380
            np_array (np.ndarray): The numpy array containing the channel data.
381
            train_id (Series): The train ID Series.
382
            channel (str): The name of the channel.
383
            channel_dict (dict): The dictionary containing channel parameters.
384

385
        Returns:
386
            DataFrame: The pandas DataFrame for the channel's data.
387

388
        Notes:
389
            - For auxillary channels, the macrobunch resolved data is repeated 499 times to be
390
              compared to electron resolved data for each auxillary channel. The data is then
391
              converted to a multicolumn DataFrame.
392
            - For all other pulse resolved channels, the macrobunch resolved data is exploded
393
              to a DataFrame and the MultiIndex is set.
394

395
        """
396

397
        # Special case for auxillary channels
398
        if channel == "dldAux":
3✔
399
            # Checks the channel dictionary for correct slices and creates a multicolumn DataFrame
400
            data_frames = (
3✔
401
                Series(
3✔
402
                    (np_array[i, value] for i in train_id.index),
3✔
403
                    name=key,
3✔
404
                    index=train_id,
3✔
405
                ).to_frame()
1✔
406
                for key, value in channel_dict["dldAuxChannels"].items()
3✔
407
            )
408

409
            # Multiindex set and combined dataframe returned
410
            data = reduce(DataFrame.combine_first, data_frames)
3✔
411

412
        # For all other pulse resolved channels
413
        else:
414
            # Macrobunch resolved data is exploded to a DataFrame and the MultiIndex is set
415

416
            # Creates the index_per_pulse for the given channel
417
            self.create_multi_index_per_pulse(train_id, np_array)
418
            data = (
419
                Series((np_array[i] for i in train_id.index), name=channel)
420
                .explode()
421
                .to_frame()
422
                .set_index(self.index_per_pulse)
423
            )
424

425
        return data
3✔
426

427
    def create_dataframe_per_train(
3✔
428
        self,
429
        np_array: np.ndarray,
3✔
430
        train_id: Series,
3✔
431
        channel: str,
3✔
432
    ) -> DataFrame:
3✔
433
        """
434
        Returns a pandas DataFrame for a given channel name of type [per train].
435

436
        Args:
437
            np_array (np.ndarray): The numpy array containing the channel data.
438
            train_id (Series): The train ID Series.
439
            channel (str): The name of the channel.
440

441
        Returns:
442
            DataFrame: The pandas DataFrame for the channel's data.
443
        """
444
        return (
445
            Series((np_array[i] for i in train_id.index), name=channel)
446
            .to_frame()
447
            .set_index(train_id)
448
        )
449

450
    def create_dataframe_per_channel(
3✔
451
        self,
452
        h5_file: h5py.File,
3✔
453
        channel: str,
3✔
454
    ) -> Union[Series, DataFrame]:
3✔
455
        """
456
        Returns a pandas DataFrame for a given channel name from a given file.
457

458
        This method takes an h5py.File object `h5_file` and a channel name `channel`, and returns
459
        a pandas DataFrame containing the data for that channel from the file. The format of the
460
        DataFrame depends on the channel's format specified in the configuration.
461

462
        Args:
463
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
464
            channel (str): The name of the channel.
465

466
        Returns:
467
            Union[Series, DataFrame]: A pandas Series or DataFrame representing the channel's data.
468

469
        Raises:
470
            ValueError: If the channel has an undefined format.
471

472
        """
473
        [train_id, np_array] = self.create_numpy_array_per_channel(
3✔
474
            h5_file,
3✔
475
            channel,
3✔
476
        )  # numpy Array created
477
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
3✔
478

479
        # If np_array is size zero, fill with NaNs
480
        if np_array.size == 0:
3✔
481
            # Fill the np_array with NaN values of the same shape as train_id
482
            np_array = np.full_like(train_id, np.nan, dtype=np.double)
483
            # Create a Series using np_array, with train_id as the index
484
            data = Series(
485
                (np_array[i] for i in train_id.index),
486
                name=channel,
487
                index=train_id,
488
            )
489

490
        # Electron resolved data is treated here
491
        if channel_dict["format"] == "per_electron":
3✔
492
            # If index_per_electron is None, create it for the given file
493
            if self.index_per_electron is None:
3✔
494
                self.create_multi_index_per_electron(h5_file)
3✔
495

496
            # Create a DataFrame for electron-resolved data
497
            data = self.create_dataframe_per_electron(
3✔
498
                np_array,
3✔
499
                train_id,
3✔
500
                channel,
3✔
501
            )
502

503
        # Pulse resolved data is treated here
504
        elif channel_dict["format"] == "per_pulse":
3✔
505
            # Create a DataFrame for pulse-resolved data
506
            data = self.create_dataframe_per_pulse(
3✔
507
                np_array,
3✔
508
                train_id,
3✔
509
                channel,
3✔
510
                channel_dict,
3✔
511
            )
512

513
        # Train resolved data is treated here
514
        elif channel_dict["format"] == "per_train":
515
            # Create a DataFrame for train-resolved data
516
            data = self.create_dataframe_per_train(np_array, train_id, channel)
517

518
        else:
519
            raise ValueError(
520
                channel
521
                + "has an undefined format. Available formats are \
522
                per_pulse, per_electron and per_train",
523
            )
524

525
        return data
3✔
526

527
    def concatenate_channels(
3✔
528
        self,
529
        h5_file: h5py.File,
3✔
530
    ) -> DataFrame:
3✔
531
        """
532
        Concatenates the channels from the provided h5py.File into a pandas DataFrame.
533

534
        This method takes an h5py.File object `h5_file` and concatenates the channels present in
535
        the file into a single pandas DataFrame. The concatenation is performed based on the
536
        available channels specified in the configuration.
537

538
        Args:
539
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
540

541
        Returns:
542
            DataFrame: A concatenated pandas DataFrame containing the channels.
543

544
        Raises:
545
            ValueError: If the group_name for any channel does not exist in the file.
546

547
        """
548
        all_keys = parse_h5_keys(h5_file)  # Parses all channels present
3✔
549

550
        # Check for if the provided group_name actually exists in the file
551
        for channel in self._config["dataframe"]["channels"]:
3✔
552
            if channel == "timeStamp":
3✔
553
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "time"
554
            else:
555
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "value"
3✔
556

557
            if group_name not in all_keys:
3✔
558
                raise ValueError(
559
                    f"The group_name for channel {channel} does not exist.",
560
                )
561

562
        # Create a generator expression to generate data frames for each channel
563
        data_frames = (
3✔
564
            self.create_dataframe_per_channel(h5_file, each) for each in self.available_channels
3✔
565
        )
566

567
        # Use the reduce function to join the data frames into a single DataFrame
568
        return reduce(
3✔
569
            lambda left, right: left.join(right, how="outer"),
3✔
570
            data_frames,
3✔
571
        )
572

573
    def create_dataframe_per_file(
3✔
574
        self,
575
        file_path: Path,
3✔
576
    ) -> DataFrame:
3✔
577
        """
578
        Create pandas DataFrames for the given file.
579

580
        This method loads an HDF5 file specified by `file_path` and constructs a pandas DataFrame
581
        from the datasets within the file. The order of datasets in the DataFrames is the opposite
582
        of the order specified by channel names.
583

584
        Args:
585
            file_path (Path): Path to the input HDF5 file.
586

587
        Returns:
588
            DataFrame: pandas DataFrame
589

590
        """
591
        # Loads h5 file and creates a dataframe
592
        with h5py.File(file_path, "r") as h5_file:
3✔
593
            self.reset_multi_index()  # Reset MultiIndexes for next file
3✔
594
            return self.concatenate_channels(h5_file)
3✔
595

596
    def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
3✔
597
        """
598
        Converts an HDF5 file to Parquet format to create a buffer file.
599

600
        This method uses `create_dataframe_per_file` method to create dataframes from individual
601
        files within an HDF5 file. The resulting dataframe is then saved to a Parquet file.
602

603
        Args:
604
            h5_path (Path): Path to the input HDF5 file.
605
            parquet_path (Path): Path to the output Parquet file.
606

607
        Raises:
608
            ValueError: If an error occurs during the conversion process.
609

610
        """
611
        try:
3✔
612
            (
2✔
613
                self.create_dataframe_per_file(h5_path)
3✔
614
                .reset_index(level=self.multi_index)
3✔
615
                .to_parquet(parquet_path, index=False)
3✔
616
            )
617
        except ValueError as failed_string_error:
618
            self.failed_files_error.append(
619
                f"{parquet_path}: {failed_string_error}",
620
            )
621

622
    def buffer_file_handler(self, data_parquet_dir: Path, detector: str):
3✔
623
        """
624
        Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
625

626
        Args:
627
            data_parquet_dir (Path): Directory where the parquet files will be stored.
628
            detector (str): Detector name.
629

630
        Returns:
631
            Tuple[List[Path], List[Path]]: Two lists, one for h5 file paths and one for
632
            corresponding parquet file paths.
633

634
        Raises:
635
            FileNotFoundError: If the conversion fails for any files or no data is available.
636
        """
637

638
        # Create the directory for buffer parquet files
639
        buffer_file_dir = data_parquet_dir.joinpath("buffer")
3✔
640
        buffer_file_dir.mkdir(parents=True, exist_ok=True)
3✔
641

642
        # Create two separate lists for h5 and parquet file paths
643
        h5_filenames = [Path(file) for file in self.files]
3✔
644
        parquet_filenames = [
3✔
645
            buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
3✔
646
        ]
647

648
        # Raise a value error if no data is available after the conversion
649
        if len(h5_filenames) == 0:
3✔
650
            raise ValueError("No data available. Probably failed reading all h5 files")
651

652
        # Choose files to read
653
        files_to_read = [
3✔
654
            (h5_path, parquet_path)
3✔
655
            for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
3✔
656
            if not parquet_path.exists()
3✔
657
        ]
658

659
        print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")
3✔
660

661
        # Initialize the indices for create_buffer_file conversion
662
        self.reset_multi_index()
3✔
663

664
        # Convert the remaining h5 files to parquet in parallel if there are any
665
        if len(files_to_read) > 0:
3✔
666
            Parallel(n_jobs=len(files_to_read), verbose=10)(
3✔
667
                delayed(self.create_buffer_file)(h5_path, parquet_path)
3✔
668
                for h5_path, parquet_path in files_to_read
3✔
669
            )
670

671
        # Raise an error if the conversion failed for any files
672
        if self.failed_files_error:
3✔
673
            raise FileNotFoundError(
674
                "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error),
675
            )
676

677
        print("All files converted successfully!")
3✔
678

679
        return h5_filenames, parquet_filenames
3✔
680

681
    def parquet_handler(
3✔
682
        self,
683
        data_parquet_dir: Path,
3✔
684
        detector: str = "",
3✔
685
        parquet_path: Path = None,
3✔
686
        converted: bool = False,
3✔
687
        load_parquet: bool = False,
3✔
688
        save_parquet: bool = False,
3✔
689
    ):
690
        """
691
        Handles loading and saving of parquet files based on the provided parameters.
692

693
        Args:
694
            data_parquet_dir (Path): Directory where the parquet files are located.
695
            detector (str, optional): Adds a identifier for parquets to distinguish multidetector
696
                systems.
697
            parquet_path (str, optional): Path to the combined parquet file.
698
            converted (bool, optional): True if data is augmented by adding additional columns
699
                externally and saved into converted folder.
700
            load_parquet (bool, optional): Loads the entire parquet into the dd dataframe.
701
            save_parquet (bool, optional): Saves the entire dataframe into a parquet.
702

703
        Returns:
704
            dataframe: Dataframe containing the loaded or processed data.
705

706
        Raises:
707
            FileNotFoundError: If the requested parquet file is not found.
708

709
        """
710

711
        # Construct the parquet path if not provided
712
        if parquet_path is None:
3✔
713
            parquet_name = "_".join(str(run) for run in self.runs)
3✔
714
            parquet_dir = data_parquet_dir.joinpath("converted") if converted else data_parquet_dir
3✔
715

716
            parquet_path = parquet_dir.joinpath(
3✔
717
                "run_" + parquet_name + detector,
3✔
718
            ).with_suffix(".parquet")
3✔
719

720
        # Check if load_parquet is flagged and then load the file if it exists
721
        if load_parquet:
3✔
722
            try:
723
                dataframe = dd.read_parquet(parquet_path)
724
            except Exception as exc:
725
                raise FileNotFoundError(
726
                    "The final parquet for this run(s) does not exist yet. "
727
                    "If it is in another location, please provide the path as parquet_path.",
728
                ) from exc
729

730
        else:
731
            # Obtain the filenames from the method which handles buffer file creation/reading
732
            _, parquet_filenames = self.buffer_file_handler(
3✔
733
                data_parquet_dir,
3✔
734
                detector,
3✔
735
            )
736
            # Read all parquet files into one dataframe using dask
737
            dataframe = dd.read_parquet(parquet_filenames, calculate_divisions=True)
3✔
738
            # Channels to fill NaN values
739
            print("Filling nan values...")
3✔
740
            channels: List[str] = self.get_channels_by_format(["per_pulse", "per_train"])
3✔
741

742
            overlap = min(pa.parquet.read_metadata(prq).num_rows for prq in parquet_filenames)
3✔
743

744
            dataframe = dfops.forward_fill_lazy(
3✔
745
                df=dataframe,
3✔
746
                channels=channels,
3✔
747
                before=overlap,
3✔
748
                iterations=self._config["dataframe"].get("forward_fill_iterations", 2),
3✔
749
            )
750
            # Remove the NaNs from per_electron channels
751
            dataframe = dataframe.dropna(
3✔
752
                subset=self.get_channels_by_format(["per_electron"]),
3✔
753
            )
754
        # Save the dataframe as parquet if requested
755
        if save_parquet:
3✔
756
            dataframe.compute().reset_index(drop=True).to_parquet(parquet_path)
757
            print("Combined parquet file saved.")
758

759
        return dataframe
3✔
760

761
    def parse_metadata(self) -> dict:
3✔
762
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
763

764
        Returns:
765
            dict: Metadata dictionary
766
        """
767
        metadata_retriever = MetadataRetriever(self._config["metadata"])
768
        metadata = metadata_retriever.get_metadata(
769
            beamtime_id=self._config["core"]["beamtime_id"],
770
            runs=self.runs,
771
            metadata=self.metadata,
772
        )
773

774
        return metadata
775

776
    def get_count_rate(
3✔
777
        self,
778
        fids: Sequence[int] = None,
3✔
779
        **kwds,
780
    ):
781
        return None, None
3✔
782

783
    def get_elapsed_time(self, fids=None, **kwds):
3✔
784
        return None
3✔
785

786
    def read_dataframe(
3✔
787
        self,
788
        files: Union[str, Sequence[str]] = None,
3✔
789
        folders: Union[str, Sequence[str]] = None,
3✔
790
        runs: Union[str, Sequence[str]] = None,
3✔
791
        ftype: str = "h5",
3✔
792
        metadata: dict = None,
3✔
793
        collect_metadata: bool = False,
3✔
794
        **kwds,
795
    ) -> Tuple[dd.DataFrame, dict]:
3✔
796
        """
797
        Read express data from the DAQ, generating a parquet in between.
798

799
        Args:
800
            files (Union[str, Sequence[str]], optional): File path(s) to process. Defaults to None.
801
            folders (Union[str, Sequence[str]], optional): Path to folder(s) where files are stored
802
                Path has priority such that if it's specified, the specified files will be ignored.
803
                Defaults to None.
804
            runs (Union[str, Sequence[str]], optional): Run identifier(s). Corresponding files will
805
                be located in the location provided by ``folders``. Takes precendence over
806
                ``files`` and ``folders``. Defaults to None.
807
            ftype (str, optional): The file extension type. Defaults to "h5".
808
            metadata (dict, optional): Additional metadata. Defaults to None.
809
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
810

811
        Returns:
812
            Tuple[dd.DataFrame, dict]: A tuple containing the concatenated DataFrame and metadata.
813

814
        Raises:
815
            ValueError: If neither 'runs' nor 'files'/'data_raw_dir' is provided.
816
            FileNotFoundError: If the conversion fails for some files or no data is available.
817
        """
818
        t0 = time.time()
3✔
819

820
        data_raw_dir, data_parquet_dir = self.initialize_paths()
3✔
821

822
        # Prepare a list of names for the runs to read and parquets to write
823
        if runs is not None:
3✔
824
            files = []
3✔
825
            if isinstance(runs, (str, int)):
3✔
826
                runs = [runs]
3✔
827
            for run in runs:
3✔
828
                run_files = self.get_files_from_run_id(
3✔
829
                    run_id=run,
3✔
830
                    folders=[str(folder.resolve()) for folder in data_raw_dir],
3✔
831
                    extension=ftype,
3✔
832
                    daq=self._config["dataframe"]["daq"],
3✔
833
                )
834
                files.extend(run_files)
3✔
835
            self.runs = list(runs)
3✔
836
            super().read_dataframe(files=files, ftype=ftype)
3✔
837

838
        else:
839
            # This call takes care of files and folders. As we have converted runs into files
840
            # already, they are just stored in the class by this call.
841
            super().read_dataframe(
3✔
842
                files=files,
3✔
843
                folders=folders,
3✔
844
                ftype=ftype,
3✔
845
                metadata=metadata,
3✔
846
            )
847

848
        dataframe = self.parquet_handler(data_parquet_dir, **kwds)
3✔
849

850
        metadata = self.parse_metadata() if collect_metadata else {}
3✔
851
        print(f"loading complete  in {time.time() - t0:.2f} s")
3✔
852

853
        return dataframe, metadata
3✔
854

855

856
LOADER = FlashLoader
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc