• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9506623971

13 Jun 2024 08:53PM UTC coverage: 92.335% (+0.6%) from 91.755%
9506623971

Pull #329

github

web-flow
Merge f010a2ef7 into 7e5b76a83
Pull Request #329: Refactor flashloader

699 of 724 new or added lines in 15 files covered. (96.55%)

3 existing lines in 1 file now uncovered.

6553 of 7097 relevant lines covered (92.33%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.84
/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from itertools import compress
1✔
5
from pathlib import Path
1✔
6

7
import dask.dataframe as dd
1✔
8
import h5py
1✔
9
import pyarrow.parquet as pq
1✔
10
from joblib import delayed
1✔
11
from joblib import Parallel
1✔
12

13
from sed.core.dfops import forward_fill_lazy
1✔
14
from sed.loader.flash.dataframe import DataFrameCreator
1✔
15
from sed.loader.flash.utils import get_channels
1✔
16
from sed.loader.flash.utils import initialize_paths
1✔
17
from sed.loader.utils import get_parquet_metadata
1✔
18
from sed.loader.utils import split_dld_time_from_sector_id
1✔
19

20

21
class BufferHandler:
1✔
22
    """
23
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
24
    """
25

26
    def __init__(
1✔
27
        self,
28
        config: dict,
29
    ) -> None:
30
        """
31
        Initializes the BufferHandler.
32

33
        Args:
34
            config (dict): The configuration dictionary.
35
        """
36
        self._config = config["dataframe"]
1✔
37
        self.n_cores = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
38

39
        self.buffer_paths: list[Path] = []
1✔
40
        self.missing_h5_files: list[Path] = []
1✔
41
        self.save_paths: list[Path] = []
1✔
42

43
        self.df_electron: dd.DataFrame = None
1✔
44
        self.df_pulse: dd.DataFrame = None
1✔
45
        self.metadata: dict = {}
1✔
46

47
    def _schema_check(self) -> None:
1✔
48
        """
49
        Checks the schema of the Parquet files.
50

51
        Raises:
52
            ValueError: If the schema of the Parquet files does not match the configuration.
53
        """
54
        existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()]
1✔
55
        parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
1✔
56
        config_schema_set = set(
1✔
57
            get_channels(self._config["channels"], formats="all", index=True, extend_aux=True),
58
        )
59

60
        for filename, schema in zip(existing_parquet_filenames, parquet_schemas):
1✔
61
            # for retro compatibility when sectorID was also saved in buffer
62
            if self._config["sector_id_column"] in schema.names:
1✔
NEW
63
                config_schema_set.add(
×
64
                    self._config["sector_id_column"],
65
                )
66
            schema_set = set(schema.names)
1✔
67
            if schema_set != config_schema_set:
1✔
68
                missing_in_parquet = config_schema_set - schema_set
1✔
69
                missing_in_config = schema_set - config_schema_set
1✔
70

71
                errors = []
1✔
72
                if missing_in_parquet:
1✔
73
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
74
                if missing_in_config:
1✔
75
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
76

77
                raise ValueError(
1✔
78
                    f"The available channels do not match the schema of file {filename}. "
79
                    f"{' '.join(errors)}. "
80
                    "Please check the configuration file or set force_recreate to True.",
81
                )
82

83
    def _get_files_to_read(
1✔
84
        self,
85
        h5_paths: list[Path],
86
        folder: Path,
87
        prefix: str,
88
        suffix: str,
89
        force_recreate: bool,
90
    ) -> None:
91
        """
92
        Determines the list of files to read and the corresponding buffer files to create.
93

94
        Args:
95
            h5_paths (List[Path]): List of paths to H5 files.
96
            folder (Path): Path to the folder for buffer files.
97
            prefix (str): Prefix for buffer file names.
98
            suffix (str): Suffix for buffer file names.
99
            force_recreate (bool): Flag to force recreation of buffer files.
100
        """
101
        # Getting the paths of the buffer files, with subfolder as buffer and no extension
102
        self.buffer_paths = initialize_paths(
1✔
103
            filenames=[h5_path.stem for h5_path in h5_paths],
104
            folder=folder,
105
            subfolder="buffer",
106
            prefix=prefix,
107
            suffix=suffix,
108
            extension="",
109
        )
110
        # read only the files that do not exist or if force_recreate is True
111
        files_to_read = [
1✔
112
            force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths
113
        ]
114

115
        # Get the list of H5 files to read and the corresponding buffer files to create
116
        self.missing_h5_files = list(compress(h5_paths, files_to_read))
1✔
117
        self.save_paths = list(compress(self.buffer_paths, files_to_read))
1✔
118

119
        print(f"Reading files: {len(self.missing_h5_files)} new files of {len(h5_paths)} total.")
1✔
120

121
    def _save_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
1✔
122
        """
123
        Creates a single buffer file. Useful because h5py.File cannot be pickled if left open.
124

125
        Args:
126
            h5_path (Path): Path to the H5 file.
127
            parquet_path (Path): Path to the buffer file.
128
        """
129
        # Open the h5 file in read mode
130
        h5_file = h5py.File(h5_path, "r")
1✔
131

132
        # Create a DataFrameCreator instance with the configuration and the h5 file
133
        dfc = DataFrameCreator(config_dataframe=self._config, h5_file=h5_file)
1✔
134

135
        # Get the DataFrame from the DataFrameCreator instance
136
        df = dfc.df
1✔
137

138
        # Close the h5 file
139
        h5_file.close()
1✔
140

141
        # Reset the index of the DataFrame and save it as a parquet file
142
        df.reset_index().to_parquet(parquet_path)
1✔
143

144
    def _save_buffer_files(self, debug: bool) -> None:
1✔
145
        """
146
        Creates the buffer files.
147

148
        Args:
149
            debug (bool): Flag to enable debug mode, which serializes the creation.
150
        """
151
        n_cores = min(len(self.missing_h5_files), self.n_cores)
1✔
152
        if n_cores > 0:
1✔
153
            if debug:
1✔
154
                for h5_path, parquet_path in zip(self.missing_h5_files, self.save_paths):
1✔
155
                    self._save_buffer_file(h5_path, parquet_path)
1✔
156
            else:
157
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
158
                    delayed(self._save_buffer_file)(h5_path, parquet_path)
159
                    for h5_path, parquet_path in zip(self.missing_h5_files, self.save_paths)
160
                )
161

162
    def _fill_dataframes(self):
1✔
163
        """
164
        Reads all parquet files into one dataframe using dask and fills NaN values.
165
        """
166
        dataframe = dd.read_parquet(self.buffer_paths, calculate_divisions=True)
1✔
167
        file_metadata = get_parquet_metadata(
1✔
168
            self.buffer_paths,
169
            time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"),
170
        )
171
        self.metadata["file_statistics"] = file_metadata
1✔
172

173
        channels: list[str] = get_channels(
1✔
174
            self._config["channels"],
175
            ["per_pulse", "per_train"],
176
            extend_aux=True,
177
        )
178
        index: list[str] = get_channels(index=True)
1✔
179
        overlap = min(file["num_rows"] for file in file_metadata.values())
1✔
180

181
        dataframe = forward_fill_lazy(
1✔
182
            df=dataframe,
183
            columns=channels,
184
            before=overlap,
185
            iterations=self._config.get("forward_fill_iterations", 2),
186
        )
187
        self.metadata["forward_fill"] = {
1✔
188
            "columns": channels,
189
            "overlap": overlap,
190
            "iterations": self._config.get("forward_fill_iterations", 2),
191
        }
192

193
        # Drop rows with nan values in the tof column
194
        tof_column = self._config.get("tof_column", "dldTimeSteps")
1✔
195
        df_electron = dataframe.dropna(subset=tof_column)
1✔
196

197
        # Set the dtypes of the channels here as there should be no null values
198
        channel_dtypes = get_channels(self._config["channels"], "all")
1✔
199
        config_channels = self._config["channels"]
1✔
200
        dtypes = {
1✔
201
            channel: config_channels[channel].get("dtype")
202
            for channel in channel_dtypes
203
            if config_channels[channel].get("dtype") is not None
204
        }
205

206
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
207
        if self._config.get("split_sector_id_from_dld_time", False):
1✔
208
            df_electron, meta = split_dld_time_from_sector_id(
1✔
209
                df_electron,
210
                config=self._config,
211
            )
212
            self.metadata.update(meta)
1✔
213

214
        self.df_electron = df_electron.astype(dtypes)
1✔
215
        self.df_pulse = dataframe[index + channels]
1✔
216

217
    def run(
1✔
218
        self,
219
        h5_paths: list[Path],
220
        folder: Path,
221
        force_recreate: bool = False,
222
        prefix: str = "",
223
        suffix: str = "",
224
        debug: bool = False,
225
    ) -> None:
226
        """
227
        Runs the buffer file creation process.
228

229
        Args:
230
            h5_paths (List[Path]): List of paths to H5 files.
231
            folder (Path): Path to the folder for buffer files.
232
            force_recreate (bool): Flag to force recreation of buffer files.
233
            prefix (str): Prefix for buffer file names.
234
            suffix (str): Suffix for buffer file names.
235
            debug (bool): Flag to enable debug mode.):
236
        """
237

238
        self._get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate)
1✔
239

240
        if not force_recreate:
1✔
241
            self._schema_check()
1✔
242

243
        self._save_buffer_files(debug)
1✔
244

245
        self._fill_dataframes()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc