• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 6520232780

14 Oct 2023 10:12PM UTC coverage: 90.267% (-0.3%) from 90.603%
6520232780

Pull #181

github

rettigl
define jitter_amps as single amplitude in default config
Pull Request #181: define jitter_amps as single amplitude in default config

4229 of 4685 relevant lines covered (90.27%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.0
/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is a amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed funtionality.
9
"""
10
from functools import reduce
1✔
11
from itertools import compress
1✔
12
from pathlib import Path
1✔
13
from typing import List
1✔
14
from typing import Sequence
1✔
15
from typing import Tuple
1✔
16
from typing import Union
1✔
17

18
import dask.dataframe as dd
1✔
19
import h5py
1✔
20
import numpy as np
1✔
21
from joblib import delayed
1✔
22
from joblib import Parallel
1✔
23
from natsort import natsorted
1✔
24
from pandas import DataFrame
1✔
25
from pandas import MultiIndex
1✔
26
from pandas import Series
1✔
27

28
from sed.loader.base.loader import BaseLoader
1✔
29
from sed.loader.flash.metadata import MetadataRetriever
1✔
30
from sed.loader.utils import parse_h5_keys
1✔
31

32

33
class FlashLoader(BaseLoader):
1✔
34
    """
35
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
36
    dataformat resolved by both macro and microbunches alongside electrons.
37
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
38
    """
39

40
    __name__ = "flash"
1✔
41

42
    supported_file_types = ["h5"]
1✔
43

44
    def __init__(self, config: dict) -> None:
1✔
45

46
        super().__init__(config=config)
1✔
47
        self.multi_index = ["trainId", "pulseId", "electronId"]
1✔
48
        self.index_per_electron: MultiIndex = None
1✔
49
        self.index_per_pulse: MultiIndex = None
1✔
50
        self.failed_files_error: List[str] = []
1✔
51

52
    def initialize_paths(self) -> Tuple[List[Path], Path]:
1✔
53
        """
54
        Initializes the paths based on the configuration.
55

56
        Returns:
57
            Tuple[List[Path], Path]: A tuple containing a list of raw data directories
58
            paths and the parquet data directory path.
59

60
        Raises:
61
            ValueError: If required values are missing from the configuration.
62
            FileNotFoundError: If the raw data directories are not found.
63
        """
64
        # Parses to locate the raw beamtime directory from config file
65
        if "paths" in self._config["core"]:
1✔
66
            data_raw_dir = [
1✔
67
                Path(self._config["core"]["paths"].get("data_raw_dir", "")),
68
            ]
69
            data_parquet_dir = Path(
1✔
70
                self._config["core"]["paths"].get("data_parquet_dir", ""),
71
            )
72

73
        else:
74
            try:
×
75
                beamtime_id = self._config["core"]["beamtime_id"]
×
76
                year = self._config["core"]["year"]
×
77
                daq = self._config["dataframe"]["daq"]
×
78
            except KeyError as exc:
×
79
                raise ValueError(
×
80
                    "The beamtime_id, year and daq are required.",
81
                ) from exc
82

83
            beamtime_dir = Path(
×
84
                self._config["dataframe"]["beamtime_dir"][self._config["core"]["beamline"]],
85
            )
86
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
×
87

88
            # Use pathlib walk to reach the raw data directory
89
            data_raw_dir = []
×
90
            raw_path = beamtime_dir.joinpath("raw")
×
91

92
            for path in raw_path.glob("**/*"):
×
93
                if path.is_dir():
×
94
                    dir_name = path.name
×
95
                    if dir_name.startswith("express-") or dir_name.startswith(
×
96
                        "online-",
97
                    ):
98
                        data_raw_dir.append(path.joinpath(daq))
×
99
                    elif dir_name == daq.upper():
×
100
                        data_raw_dir.append(path)
×
101

102
            if not data_raw_dir:
×
103
                raise FileNotFoundError("Raw data directories not found.")
×
104

105
            parquet_path = "processed/parquet"
×
106
            data_parquet_dir = beamtime_dir.joinpath(parquet_path)
×
107

108
        data_parquet_dir.mkdir(parents=True, exist_ok=True)
1✔
109

110
        return data_raw_dir, data_parquet_dir
1✔
111

112
    def get_files_from_run_id(
1✔
113
        self,
114
        run_id: str,
115
        folders: Union[str, Sequence[str]] = None,
116
        extension: str = "h5",
117
        **kwds,
118
    ) -> List[str]:
119
        """Returns a list of filenames for a given run located in the specified directory
120
        for the specified data acquisition (daq).
121

122
        Args:
123
            run_id (str): The run identifier to locate.
124
            folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw
125
                data is located. Defaults to config["core"]["base_folder"].
126
            extension (str, optional): The file extension. Defaults to "h5".
127
            kwds: Keyword arguments:
128
                - daq (str): The data acquisition identifier.
129
                Defaults to config["dataframe"]["daq"].
130

131
        Returns:
132
            List[str]: A list of path strings representing the collected file names.
133

134
        Raises:
135
            FileNotFoundError: If no files are found for the given run in the directory.
136
        """
137
        # Define the stream name prefixes based on the data acquisition identifier
138
        stream_name_prefixes = self._config["dataframe"]["stream_name_prefixes"]
1✔
139

140
        if folders is None:
1✔
141
            folders = self._config["core"]["base_folder"]
×
142

143
        if isinstance(folders, str):
1✔
144
            folders = [folders]
×
145

146
        daq = kwds.pop("daq", self._config.get("dataframe", {}).get("daq"))
1✔
147

148
        # Generate the file patterns to search for in the directory
149
        file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
1✔
150

151
        files: List[Path] = []
1✔
152
        # Use pathlib to search for matching files in each directory
153
        for folder in folders:
1✔
154
            files.extend(
1✔
155
                natsorted(
156
                    Path(folder).glob(file_pattern),
157
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
158
                ),
159
            )
160

161
        # Check if any files are found
162
        if not files:
1✔
163
            raise FileNotFoundError(
×
164
                f"No files found for run {run_id} in directory {str(folders)}",
165
            )
166

167
        # Return the list of found files
168
        return [str(file.resolve()) for file in files]
1✔
169

170
    @property
1✔
171
    def available_channels(self) -> List:
1✔
172
        """Returns the channel names that are available for use,
173
        excluding pulseId, defined by the json file"""
174
        available_channels = list(self._config["dataframe"]["channels"].keys())
1✔
175
        available_channels.remove("pulseId")
1✔
176
        return available_channels
1✔
177

178
    def get_channels_by_format(self, formats: List[str]) -> List:
1✔
179
        """
180
        Returns a list of channels with the specified format.
181

182
        Args:
183
            formats (List[str]): The desired formats ('per_pulse', 'per_electron',
184
                or 'per_train').
185

186
        Returns:
187
            List: A list of channels with the specified format(s).
188
        """
189
        channels = []
1✔
190
        for format_ in formats:
1✔
191
            for key in self.available_channels:
1✔
192
                channel_format = self._config["dataframe"]["channels"][key]["format"]
1✔
193
                if channel_format == format_:
1✔
194
                    if key == "dldAux":
1✔
195
                        aux_channels = self._config["dataframe"]["channels"][key][
1✔
196
                            "dldAuxChannels"
197
                        ].keys()
198
                        channels.extend(aux_channels)
1✔
199
                    else:
200
                        channels.append(key)
1✔
201
        return channels
1✔
202

203
    def reset_multi_index(self) -> None:
1✔
204
        """Resets the index per pulse and electron"""
205
        self.index_per_electron = None
1✔
206
        self.index_per_pulse = None
1✔
207

208
    def create_multi_index_per_electron(self, h5_file: h5py.File) -> None:
1✔
209
        """
210
        Creates an index per electron using pulseId for usage with the electron
211
            resolved pandas DataFrame.
212

213
        Args:
214
            h5_file (h5py.File): The HDF5 file object.
215

216
        Notes:
217
            - This method relies on the 'pulseId' channel to determine
218
                the macrobunch IDs.
219
            - It creates a MultiIndex with trainId, pulseId, and electronId
220
                as the index levels.
221
        """
222

223
        # Macrobunch IDs obtained from the pulseId channel
224
        [train_id, np_array] = self.create_numpy_array_per_channel(
1✔
225
            h5_file,
226
            "pulseId",
227
        )
228

229
        # Create a series with the macrobunches as index and
230
        # microbunches as values
231
        macrobunches = (
1✔
232
            Series(
233
                (np_array[i] for i in train_id.index),
234
                name="pulseId",
235
                index=train_id,
236
            )
237
            - self._config["dataframe"]["ubid_offset"]
238
        )
239

240
        # Explode dataframe to get all microbunch vales per macrobunch,
241
        # remove NaN values and convert to type int
242
        microbunches = macrobunches.explode().dropna().astype(int)
1✔
243

244
        # Create temporary index values
245
        index_temp = MultiIndex.from_arrays(
1✔
246
            (microbunches.index, microbunches.values),
247
            names=["trainId", "pulseId"],
248
        )
249

250
        # Calculate the electron counts per pulseId unique preserves the order of appearance
251
        electron_counts = index_temp.value_counts()[index_temp.unique()].values
1✔
252

253
        # Series object for indexing with electrons
254
        electrons = (
1✔
255
            Series(
256
                [np.arange(electron_counts[i]) for i in range(electron_counts.size)],
257
            )
258
            .explode()
259
            .astype(int)
260
        )
261

262
        # Create a pandas MultiIndex using the exploded datasets
263
        self.index_per_electron = MultiIndex.from_arrays(
1✔
264
            (microbunches.index, microbunches.values, electrons),
265
            names=self.multi_index,
266
        )
267

268
    def create_multi_index_per_pulse(
1✔
269
        self,
270
        train_id: Series,
271
        np_array: np.ndarray,
272
    ) -> None:
273
        """
274
        Creates an index per pulse using a pulse resolved channel's macrobunch ID, for usage with
275
        the pulse resolved pandas DataFrame.
276

277
        Args:
278
            train_id (Series): The train ID Series.
279
            np_array (np.ndarray): The numpy array containing the pulse resolved data.
280

281
        Notes:
282
            - This method creates a MultiIndex with trainId and pulseId as the index levels.
283
        """
284

285
        # Create a pandas MultiIndex, useful for comparing electron and
286
        # pulse resolved dataframes
287
        self.index_per_pulse = MultiIndex.from_product(
×
288
            (train_id, np.arange(0, np_array.shape[1])),
289
            names=["trainId", "pulseId"],
290
        )
291

292
    def create_numpy_array_per_channel(
1✔
293
        self,
294
        h5_file: h5py.File,
295
        channel: str,
296
    ) -> Tuple[Series, np.ndarray]:
297
        """
298
        Returns a numpy array for a given channel name for a given file.
299

300
        Args:
301
            h5_file (h5py.File): The h5py file object.
302
            channel (str): The name of the channel.
303

304
        Returns:
305
            Tuple[Series, np.ndarray]: A tuple containing the train ID Series and the numpy array
306
            for the channel's data.
307

308
        """
309
        # Get the data from the necessary h5 file and channel
310
        group = h5_file[self._config["dataframe"]["channels"][channel]["group_name"]]
1✔
311

312
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
313

314
        train_id = Series(group["index"], name="trainId")  # macrobunch
1✔
315

316
        # unpacks the timeStamp or value
317
        if channel == "timeStamp":
1✔
318
            np_array = group["time"][()]
×
319
        else:
320
            np_array = group["value"][()]
1✔
321

322
        # Use predefined axis and slice from the json file
323
        # to choose correct dimension for necessary channel
324
        if "slice" in channel_dict:
1✔
325
            np_array = np.take(
1✔
326
                np_array,
327
                channel_dict["slice"],
328
                axis=1,
329
            )
330
        return train_id, np_array
1✔
331

332
    def create_dataframe_per_electron(
1✔
333
        self,
334
        np_array: np.ndarray,
335
        train_id: Series,
336
        channel: str,
337
    ) -> DataFrame:
338
        """
339
        Returns a pandas DataFrame for a given channel name of type [per electron].
340

341
        Args:
342
            np_array (np.ndarray): The numpy array containing the channel data.
343
            train_id (Series): The train ID Series.
344
            channel (str): The name of the channel.
345

346
        Returns:
347
            DataFrame: The pandas DataFrame for the channel's data.
348

349
        Notes:
350
            The microbunch resolved data is exploded and converted to a DataFrame. The MultiIndex
351
            is set, and the NaN values are dropped, alongside the pulseId = 0 (meaningless).
352

353
        """
354
        return (
1✔
355
            Series((np_array[i] for i in train_id.index), name=channel)
356
            .explode()
357
            .dropna()
358
            .to_frame()
359
            .set_index(self.index_per_electron)
360
            .drop(
361
                index=np.arange(-self._config["dataframe"]["ubid_offset"], 0),
362
                level=1,
363
                errors="ignore",
364
            )
365
        )
366

367
    def create_dataframe_per_pulse(
1✔
368
        self,
369
        np_array: np.ndarray,
370
        train_id: Series,
371
        channel: str,
372
        channel_dict: dict,
373
    ) -> DataFrame:
374
        """
375
        Returns a pandas DataFrame for a given channel name of type [per pulse].
376

377
        Args:
378
            np_array (np.ndarray): The numpy array containing the channel data.
379
            train_id (Series): The train ID Series.
380
            channel (str): The name of the channel.
381
            channel_dict (dict): The dictionary containing channel parameters.
382

383
        Returns:
384
            DataFrame: The pandas DataFrame for the channel's data.
385

386
        Notes:
387
            - For auxillary channels, the macrobunch resolved data is repeated 499 times to be
388
              compared to electron resolved data for each auxillary channel. The data is then
389
              converted to a multicolumn DataFrame.
390
            - For all other pulse resolved channels, the macrobunch resolved data is exploded
391
              to a DataFrame and the MultiIndex is set.
392

393
        """
394

395
        # Special case for auxillary channels
396
        if channel == "dldAux":
1✔
397
            # Checks the channel dictionary for correct slices and creates a multicolumn DataFrame
398
            data_frames = (
1✔
399
                Series(
400
                    (np_array[i, value] for i in train_id.index),
401
                    name=key,
402
                    index=train_id,
403
                ).to_frame()
404
                for key, value in channel_dict["dldAuxChannels"].items()
405
            )
406

407
            # Multiindex set and combined dataframe returned
408
            data = reduce(DataFrame.combine_first, data_frames)
1✔
409

410
        # For all other pulse resolved channels
411
        else:
412
            # Macrobunch resolved data is exploded to a DataFrame and the MultiIndex is set
413

414
            # Creates the index_per_pulse for the given channel
415
            self.create_multi_index_per_pulse(train_id, np_array)
×
416
            data = (
×
417
                Series((np_array[i] for i in train_id.index), name=channel)
418
                .explode()
419
                .to_frame()
420
                .set_index(self.index_per_pulse)
421
            )
422

423
        return data
1✔
424

425
    def create_dataframe_per_train(
1✔
426
        self,
427
        np_array: np.ndarray,
428
        train_id: Series,
429
        channel: str,
430
    ) -> DataFrame:
431
        """
432
        Returns a pandas DataFrame for a given channel name of type [per train].
433

434
        Args:
435
            np_array (np.ndarray): The numpy array containing the channel data.
436
            train_id (Series): The train ID Series.
437
            channel (str): The name of the channel.
438

439
        Returns:
440
            DataFrame: The pandas DataFrame for the channel's data.
441
        """
442
        return (
×
443
            Series((np_array[i] for i in train_id.index), name=channel)
444
            .to_frame()
445
            .set_index(train_id)
446
        )
447

448
    def create_dataframe_per_channel(
1✔
449
        self,
450
        h5_file: h5py.File,
451
        channel: str,
452
    ) -> Union[Series, DataFrame]:
453
        """
454
        Returns a pandas DataFrame for a given channel name from a given file.
455

456
        This method takes an h5py.File object `h5_file` and a channel name `channel`, and returns
457
        a pandas DataFrame containing the data for that channel from the file. The format of the
458
        DataFrame depends on the channel's format specified in the configuration.
459

460
        Args:
461
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
462
            channel (str): The name of the channel.
463

464
        Returns:
465
            Union[Series, DataFrame]: A pandas Series or DataFrame representing the channel's data.
466

467
        Raises:
468
            ValueError: If the channel has an undefined format.
469

470
        """
471
        [train_id, np_array] = self.create_numpy_array_per_channel(
1✔
472
            h5_file,
473
            channel,
474
        )  # numpy Array created
475
        channel_dict = self._config["dataframe"]["channels"][channel]  # channel parameters
1✔
476

477
        # If np_array is size zero, fill with NaNs
478
        if np_array.size == 0:
1✔
479
            # Fill the np_array with NaN values of the same shape as train_id
480
            np_array = np.full_like(train_id, np.nan, dtype=np.double)
×
481
            # Create a Series using np_array, with train_id as the index
482
            data = Series(
×
483
                (np_array[i] for i in train_id.index),
484
                name=channel,
485
                index=train_id,
486
            )
487

488
        # Electron resolved data is treated here
489
        if channel_dict["format"] == "per_electron":
1✔
490
            # If index_per_electron is None, create it for the given file
491
            if self.index_per_electron is None:
1✔
492
                self.create_multi_index_per_electron(h5_file)
1✔
493

494
            # Create a DataFrame for electron-resolved data
495
            data = self.create_dataframe_per_electron(
1✔
496
                np_array,
497
                train_id,
498
                channel,
499
            )
500

501
        # Pulse resolved data is treated here
502
        elif channel_dict["format"] == "per_pulse":
1✔
503
            # Create a DataFrame for pulse-resolved data
504
            data = self.create_dataframe_per_pulse(
1✔
505
                np_array,
506
                train_id,
507
                channel,
508
                channel_dict,
509
            )
510

511
        # Train resolved data is treated here
512
        elif channel_dict["format"] == "per_train":
×
513
            # Create a DataFrame for train-resolved data
514
            data = self.create_dataframe_per_train(np_array, train_id, channel)
×
515

516
        else:
517
            raise ValueError(
×
518
                channel
519
                + "has an undefined format. Available formats are \
520
                per_pulse, per_electron and per_train",
521
            )
522

523
        return data
1✔
524

525
    def concatenate_channels(
1✔
526
        self,
527
        h5_file: h5py.File,
528
    ) -> DataFrame:
529
        """
530
        Concatenates the channels from the provided h5py.File into a pandas DataFrame.
531

532
        This method takes an h5py.File object `h5_file` and concatenates the channels present in
533
        the file into a single pandas DataFrame. The concatenation is performed based on the
534
        available channels specified in the configuration.
535

536
        Args:
537
            h5_file (h5py.File): The h5py.File object representing the HDF5 file.
538

539
        Returns:
540
            DataFrame: A concatenated pandas DataFrame containing the channels.
541

542
        Raises:
543
            ValueError: If the group_name for any channel does not exist in the file.
544

545
        """
546
        all_keys = parse_h5_keys(h5_file)  # Parses all channels present
1✔
547

548
        # Check for if the provided group_name actually exists in the file
549
        for channel in self._config["dataframe"]["channels"]:
1✔
550
            if channel == "timeStamp":
1✔
551
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "time"
×
552
            else:
553
                group_name = self._config["dataframe"]["channels"][channel]["group_name"] + "value"
1✔
554

555
            if group_name not in all_keys:
1✔
556
                raise ValueError(
×
557
                    f"The group_name for channel {channel} does not exist.",
558
                )
559

560
        # Create a generator expression to generate data frames for each channel
561
        data_frames = (
1✔
562
            self.create_dataframe_per_channel(h5_file, each) for each in self.available_channels
563
        )
564

565
        # Use the reduce function to join the data frames into a single DataFrame
566
        return reduce(
1✔
567
            lambda left, right: left.join(right, how="outer"),
568
            data_frames,
569
        )
570

571
    def create_dataframe_per_file(
1✔
572
        self,
573
        file_path: Path,
574
    ) -> DataFrame:
575
        """
576
        Create pandas DataFrames for the given file.
577

578
        This method loads an HDF5 file specified by `file_path` and constructs a pandas DataFrame
579
        from the datasets within the file. The order of datasets in the DataFrames is the opposite
580
        of the order specified by channel names.
581

582
        Args:
583
            file_path (Path): Path to the input HDF5 file.
584

585
        Returns:
586
            DataFrame: pandas DataFrame
587

588
        """
589
        # Loads h5 file and creates a dataframe
590
        with h5py.File(file_path, "r") as h5_file:
1✔
591
            self.reset_multi_index()  # Reset MultiIndexes for next file
1✔
592
            return self.concatenate_channels(h5_file)
1✔
593

594
    def create_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
1✔
595
        """
596
        Converts an HDF5 file to Parquet format to create a buffer file.
597

598
        This method uses `create_dataframe_per_file` method to create dataframes from individual
599
        files within an HDF5 file. The resulting dataframe is then saved to a Parquet file.
600

601
        Args:
602
            h5_path (Path): Path to the input HDF5 file.
603
            parquet_path (Path): Path to the output Parquet file.
604

605
        Raises:
606
            ValueError: If an error occurs during the conversion process.
607

608
        """
609
        try:
1✔
610
            (
1✔
611
                self.create_dataframe_per_file(h5_path)
612
                .reset_index(level=self.multi_index)
613
                .to_parquet(parquet_path, index=False)
614
            )
615
        except ValueError as failed_string_error:
×
616
            self.failed_files_error.append(
×
617
                f"{parquet_path}: {failed_string_error}",
618
            )
619

620
    def buffer_file_handler(self, data_parquet_dir: Path, detector: str):
1✔
621
        """
622
        Handles the conversion of buffer files (h5 to parquet) and returns the filenames.
623

624
        Args:
625
            data_parquet_dir (Path): Directory where the parquet files will be stored.
626
            detector (str): Detector name.
627

628
        Returns:
629
            Tuple[List[Path], List[Path]]: Two lists, one for h5 file paths and one for
630
            corresponding parquet file paths.
631

632
        Raises:
633
            FileNotFoundError: If the conversion fails for any files or no data is available.
634
        """
635

636
        # Create the directory for buffer parquet files
637
        buffer_file_dir = data_parquet_dir.joinpath("buffer")
1✔
638
        buffer_file_dir.mkdir(parents=True, exist_ok=True)
1✔
639

640
        # Create two separate lists for h5 and parquet file paths
641
        h5_filenames = [Path(file) for file in self.files]
1✔
642
        parquet_filenames = [
1✔
643
            buffer_file_dir.joinpath(Path(file).stem + detector) for file in self.files
644
        ]
645

646
        # Raise a value error if no data is available after the conversion
647
        if len(h5_filenames) == 0:
1✔
648
            raise ValueError("No data available. Probably failed reading all h5 files")
×
649

650
        # Choose files to read
651
        files_to_read = [
1✔
652
            (h5_path, parquet_path)
653
            for h5_path, parquet_path in zip(h5_filenames, parquet_filenames)
654
            if not parquet_path.exists()
655
        ]
656

657
        print(f"Reading files: {len(files_to_read)} new files of {len(h5_filenames)} total.")
1✔
658

659
        # Initialize the indices for create_buffer_file conversion
660
        self.reset_multi_index()
1✔
661

662
        # Convert the remaining h5 files to parquet in parallel if there are any
663
        if len(files_to_read) > 0:
1✔
664
            Parallel(n_jobs=len(files_to_read), verbose=10)(
1✔
665
                delayed(self.create_buffer_file)(h5_path, parquet_path)
666
                for h5_path, parquet_path in files_to_read
667
            )
668

669
        # Raise an error if the conversion failed for any files
670
        if self.failed_files_error:
1✔
671
            raise FileNotFoundError(
×
672
                "Conversion failed for the following files:\n" + "\n".join(self.failed_files_error),
673
            )
674

675
        print("All files converted successfully!")
1✔
676

677
        return h5_filenames, parquet_filenames
1✔
678

679
    def fill_na(
1✔
680
        self,
681
        dataframes: List[dd.DataFrame],
682
    ) -> dd.DataFrame:
683
        """
684
        Fill NaN values in the given dataframes using intrafile forward filling.
685

686
        Args:
687
            dataframes (List[dd.DataFrame]): List of dataframes to fill NaN values.
688

689
        Returns:
690
            dd.DataFrame: Concatenated dataframe with filled NaN values.
691

692
        Notes:
693
            This method is specific to the flash data structure and is used to fill NaN values in
694
            certain channels that only store information at a lower frequency. The low frequency
695
            channels are exploded to match the dimensions of higher frequency channels, but they
696
            may contain NaNs in the other columns. This method fills the NaNs for the specific
697
            channels (per_pulse and per_train).
698

699
        """
700
        # Channels to fill NaN values
701
        channels: List[str] = self.get_channels_by_format(["per_pulse", "per_train"])
1✔
702

703
        # Fill NaN values within each dataframe
704
        for i, _ in enumerate(dataframes):
1✔
705
            dataframes[i][channels] = dataframes[i][channels].fillna(
1✔
706
                method="ffill",
707
            )
708

709
        # Forward fill between consecutive dataframes
710
        for i in range(1, len(dataframes)):
1✔
711
            # Select pulse channels from current dataframe
712
            subset = dataframes[i][channels]
1✔
713
            # Find columns with NaN values in the first row
714
            is_null = subset.loc[0].isnull().values.compute()
1✔
715
            # Execute if there are NaN values in the first row
716
            if is_null.sum() > 0:
1✔
717
                # Select channel names with only NaNs
718
                channels_to_overwrite = list(compress(channels, is_null[0]))
×
719
                # Get values for those channels from the previous dataframe
720
                values = dataframes[i - 1][channels].tail(1).values[0]
×
721
                # Create a dictionary to fill NaN values
722
                fill_dict = dict(zip(channels, values))
×
723
                fill_dict = {k: v for k, v in fill_dict.items() if k in channels_to_overwrite}
×
724
                # Fill NaN values with the corresponding values from the
725
                # previous dataframe
726
                dataframes[i][channels_to_overwrite] = subset[channels_to_overwrite].fillna(
×
727
                    fill_dict,
728
                )
729

730
        # Concatenate the filled dataframes
731
        return dd.concat(dataframes)
1✔
732

733
    def parquet_handler(
1✔
734
        self,
735
        data_parquet_dir: Path,
736
        detector: str = "",
737
        parquet_path: Path = None,
738
        converted: bool = False,
739
        load_parquet: bool = False,
740
        save_parquet: bool = False,
741
    ):
742
        """
743
        Handles loading and saving of parquet files based on the provided parameters.
744

745
        Args:
746
            data_parquet_dir (Path): Directory where the parquet files are located.
747
            detector (str, optional): Adds a identifier for parquets to distinguish multidetector
748
                systems.
749
            parquet_path (str, optional): Path to the combined parquet file.
750
            converted (bool, optional): True if data is augmented by adding additional columns
751
                externally and saved into converted folder.
752
            load_parquet (bool, optional): Loads the entire parquet into the dd dataframe.
753
            save_parquet (bool, optional): Saves the entire dataframe into a parquet.
754

755
        Returns:
756
            dataframe: Dataframe containing the loaded or processed data.
757

758
        Raises:
759
            FileNotFoundError: If the requested parquet file is not found.
760

761
        """
762

763
        # Construct the parquet path if not provided
764
        if parquet_path is None:
1✔
765
            parquet_name = "_".join(str(run) for run in self.runs)
1✔
766
            parquet_dir = data_parquet_dir.joinpath("converted") if converted else data_parquet_dir
1✔
767

768
            parquet_path = parquet_dir.joinpath(
1✔
769
                "run_" + parquet_name + detector,
770
            ).with_suffix(".parquet")
771

772
        # Check if load_parquet is flagged and then load the file if it exists
773
        if load_parquet:
1✔
774
            try:
×
775
                dataframe = dd.read_parquet(parquet_path)
×
776
            except Exception as exc:
×
777
                raise FileNotFoundError(
×
778
                    "The final parquet for this run(s) does not exist yet. "
779
                    "If it is in another location, please provide the path as parquet_path.",
780
                ) from exc
781

782
        else:
783
            # Obtain the filenames from the method which handles buffer file creation/reading
784
            _, parquet_filenames = self.buffer_file_handler(
1✔
785
                data_parquet_dir,
786
                detector,
787
            )
788

789
            # Read all parquet files using dask and concatenate into one dataframe after filling
790
            dataframe = self.fill_na(
1✔
791
                [dd.read_parquet(file) for file in parquet_filenames],
792
            )
793

794
            dataframe = dataframe.dropna(
1✔
795
                subset=self.get_channels_by_format(["per_electron"]),
796
            )
797

798
        # Save the dataframe as parquet if requested
799
        if save_parquet:
1✔
800
            dataframe.compute().reset_index(drop=True).to_parquet(parquet_path)
×
801
            print("Combined parquet file saved.")
×
802

803
        return dataframe
1✔
804

805
    def parse_metadata(self) -> dict:
1✔
806
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
807

808
        Returns:
809
            dict: Metadata dictionary
810
        """
811
        metadata_retriever = MetadataRetriever(self._config["metadata"])
×
812
        metadata = metadata_retriever.get_metadata(
×
813
            beamtime_id=self._config["core"]["beamtime_id"],
814
            runs=self.runs,
815
            metadata=self.metadata,
816
        )
817

818
        return metadata
×
819

820
    def get_count_rate(
1✔
821
        self,
822
        fids: Sequence[int] = None,
823
        **kwds,
824
    ):
825
        return None, None
1✔
826

827
    def get_elapsed_time(self, fids=None, **kwds):
1✔
828
        return None
1✔
829

830
    def read_dataframe(
1✔
831
        self,
832
        files: Union[str, Sequence[str]] = None,
833
        folders: Union[str, Sequence[str]] = None,
834
        runs: Union[str, Sequence[str]] = None,
835
        ftype: str = "h5",
836
        metadata: dict = None,
837
        collect_metadata: bool = False,
838
        **kwds,
839
    ) -> Tuple[dd.DataFrame, dict]:
840
        """
841
        Read express data from the DAQ, generating a parquet in between.
842

843
        Args:
844
            files (Union[str, Sequence[str]], optional): File path(s) to process. Defaults to None.
845
            folders (Union[str, Sequence[str]], optional): Path to folder(s) where files are stored
846
                Path has priority such that if it's specified, the specified files will be ignored.
847
                Defaults to None.
848
            runs (Union[str, Sequence[str]], optional): Run identifier(s). Corresponding files will
849
                be located in the location provided by ``folders``. Takes precendence over
850
                ``files`` and ``folders``. Defaults to None.
851
            ftype (str, optional): The file extension type. Defaults to "h5".
852
            metadata (dict, optional): Additional metadata. Defaults to None.
853
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
854

855
        Returns:
856
            Tuple[dd.DataFrame, dict]: A tuple containing the concatenated DataFrame and metadata.
857

858
        Raises:
859
            ValueError: If neither 'runs' nor 'files'/'data_raw_dir' is provided.
860
            FileNotFoundError: If the conversion fails for some files or no data is available.
861
        """
862

863
        data_raw_dir, data_parquet_dir = self.initialize_paths()
1✔
864

865
        # Prepare a list of names for the runs to read and parquets to write
866
        if runs is not None:
1✔
867
            files = []
1✔
868
            if isinstance(runs, (str, int)):
1✔
869
                runs = [runs]
1✔
870
            for run in runs:
1✔
871
                run_files = self.get_files_from_run_id(
1✔
872
                    run_id=run,
873
                    folders=[str(folder.resolve()) for folder in data_raw_dir],
874
                    extension=ftype,
875
                    daq=self._config["dataframe"]["daq"],
876
                )
877
                files.extend(run_files)
1✔
878
            self.runs = list(runs)
1✔
879
            super().read_dataframe(files=files, ftype=ftype)
1✔
880

881
        else:
882
            # This call takes care of files and folders. As we have converted runs into files
883
            # already, they are just stored in the class by this call.
884
            super().read_dataframe(
1✔
885
                files=files,
886
                folders=folders,
887
                ftype=ftype,
888
                metadata=metadata,
889
            )
890

891
        dataframe = self.parquet_handler(data_parquet_dir, **kwds)
1✔
892

893
        metadata = self.parse_metadata() if collect_metadata else {}
1✔
894

895
        return dataframe, metadata
1✔
896

897

898
LOADER = FlashLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc