• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 13419398366

19 Feb 2025 06:09PM CUT coverage: 91.6% (-0.6%) from 92.174%
13419398366

Pull #534

github

web-flow
Merge df78f6964 into 6b927a2db
Pull Request #534: Hextof lab loader

71 of 124 new or added lines in 7 files covered. (57.26%)

3 existing lines in 1 file now uncovered.

7731 of 8440 relevant lines covered (91.6%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/src/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
import time
1✔
5
from pathlib import Path
1✔
6

7
import dask.dataframe as dd
1✔
8
import pyarrow.parquet as pq
1✔
9
from joblib import delayed
1✔
10
from joblib import Parallel
1✔
11
from pandas import MultiIndex
1✔
12

13
from sed.core.dfops import forward_fill_lazy
1✔
14
from sed.core.logging import setup_logging
1✔
15
from sed.loader.flash.dataframe import BaseDataFrameCreator
1✔
16
from sed.loader.flash.dataframe import CFELDataFrameCreator
1✔
17
from sed.loader.flash.dataframe import DataFrameCreator
1✔
18
from sed.loader.flash.utils import get_channels
1✔
19
from sed.loader.flash.utils import get_dtypes
1✔
20
from sed.loader.flash.utils import InvalidFileError
1✔
21
from sed.loader.utils import get_parquet_metadata
1✔
22
from sed.loader.utils import split_dld_time_from_sector_id
1✔
23

24

25
DF_TYP = ["electron", "timed"]
1✔
26

27
logger = setup_logging("flash_buffer_handler")
1✔
28

29

30
class BufferFilePaths:
1✔
31
    """
32
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
33
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
34
    and the electron and timed buffer files.
35

36
    Structure of the file sets:
37
    {
38
        "raw": Path to the H5 file,
39
        "electron": Path to the electron buffer file,
40
        "timed": Path to the timed buffer file,
41
    }
42
    """
43

44
    def __init__(
1✔
45
        self,
46
        h5_paths: list[Path],
47
        folder: Path,
48
        suffix: str,
49
    ) -> None:
50
        """Initializes the BufferFilePaths.
51

52
        Args:
53
            h5_paths (list[Path]): List of paths to the H5 files.
54
            folder (Path): Path to the folder for processed files.
55
            suffix (str): Suffix for buffer file names.
56
        """
57
        suffix = f"_{suffix}" if suffix else ""
1✔
58
        folder = folder / "buffer"
1✔
59
        folder.mkdir(parents=True, exist_ok=True)
1✔
60

61
        self._file_paths = self._create_file_paths(h5_paths, folder, suffix)
1✔
62

63
    def _create_file_paths(
1✔
64
        self,
65
        h5_paths: list[Path],
66
        folder: Path,
67
        suffix: str,
68
    ) -> list[dict[str, Path]]:
69
        return [
1✔
70
            {
71
                "raw": h5_path,
72
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
73
            }
74
            for h5_path in h5_paths
75
        ]
76

77
    def __getitem__(self, key) -> list[Path]:
1✔
78
        if isinstance(key, str):
1✔
79
            return [file_set[key] for file_set in self._file_paths]
1✔
80
        return self._file_paths[key]
×
81

82
    def __iter__(self):
1✔
83
        return iter(self._file_paths)
1✔
84

85
    def __len__(self):
1✔
86
        return len(self._file_paths)
1✔
87

88
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
89
        """Returns a list of file sets that need to be processed."""
90
        if force_recreate:
1✔
91
            return self._file_paths
1✔
92
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
93

94

95
class BufferHandler:
1✔
96
    """
97
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
98
    """
99

100
    def __init__(
1✔
101
        self,
102
        config: dict,
103
    ) -> None:
104
        """
105
        Initializes the BufferHandler.
106

107
        Args:
108
            config (dict): The configuration dictionary.
109
        """
110
        self._config: dict = config["dataframe"]
1✔
111
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
112
        self.fp: BufferFilePaths = None
1✔
113
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
114
        fill_formats = self._config.get("fill_formats", ["per_train", "per_pulse"])
1✔
115
        self.fill_channels: list[str] = get_channels(
1✔
116
            self._config,
117
            fill_formats,
118
            extend_aux=True,
119
        )
120
        self.metadata: dict = {}
1✔
121
        self.filter_timed_by_electron: bool = None
1✔
122

123
        core_beamline = config["core"].get("beamline")
1✔
124
        self.DataFrameCreator: type[BaseDataFrameCreator] = None
1✔
125
        if core_beamline == "pg2":
1✔
126
            self.DataFrameCreator = DataFrameCreator
1✔
NEW
127
        elif core_beamline == "cfel":
×
NEW
128
            self.DataFrameCreator = CFELDataFrameCreator
×
129
        else:
NEW
130
            raise ValueError(f"Unsupported core beamline: {core_beamline}")
×
131

132
    def _validate_h5_files(self, config, h5_paths: list[Path]) -> list[Path]:
1✔
133
        valid_h5_paths = []
1✔
134
        for h5_path in h5_paths:
1✔
135
            try:
1✔
136
                dfc = self.DataFrameCreator(config_dataframe=config, h5_path=h5_path)
1✔
137
                dfc.validate_channel_keys()
1✔
138
                valid_h5_paths.append(h5_path)
1✔
139
            except InvalidFileError as e:
1✔
140
                logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}")
1✔
141

142
        return valid_h5_paths
1✔
143

144
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
145
        """
146
        Checks the schema of the Parquet files.
147
        """
148
        logger.debug(f"Checking schema for {len(files)} files")
1✔
149
        existing = [file for file in files if file.exists()]
1✔
150
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
151

152
        for filename, schema in zip(existing, parquet_schemas):
1✔
153
            schema_set = set(schema.names)
1✔
154
            if schema_set != expected_schema_set:
1✔
155
                logger.error(f"Schema mismatch in file: {filename}")
1✔
156
                missing_in_parquet = expected_schema_set - schema_set
1✔
157
                missing_in_config = schema_set - expected_schema_set
1✔
158

159
                errors = []
1✔
160
                if missing_in_parquet:
1✔
161
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
162
                if missing_in_config:
1✔
163
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
164

165
                raise ValueError(
1✔
166
                    f"The available channels do not match the schema of file {filename}. "
167
                    f"{' '.join(errors)}. "
168
                    "Please check the configuration file or set force_recreate to True.",
169
                )
170
        logger.debug("Schema check passed successfully")
1✔
171

172
    def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
1✔
173
        """Creates the timed dataframe, optionally filtering by electron events.
174

175
        Args:
176
            df (dd.DataFrame): The input dataframe containing all data
177

178
        Returns:
179
            dd.DataFrame: The timed dataframe
180
        """
181
        # Get channels that should be in timed dataframe
182
        timed_channels = self.fill_channels
1✔
183

184
        if self.filter_timed_by_electron:
1✔
185
            # Get electron channels to use for filtering
186
            electron_channels = get_channels(self._config, "per_electron")
1✔
187
            # Filter rows where electron data exists
188
            df_timed = df.dropna(subset=electron_channels)[timed_channels]
1✔
189
        else:
190
            # Take all timed data rows without filtering
191
            df_timed = df[timed_channels]
1✔
192

193
        return df_timed
1✔
194

195
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
196
        """Creates the electron and timed buffer files from the raw H5 file."""
197
        logger.debug(f"Processing file: {paths['raw'].stem}")
1✔
198
        start_time = time.time()
1✔
199
        # Create DataFrameCreator and get get dataframe
200
        df = self.DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
201

202
        # Forward fill non-electron channels
203
        logger.debug(f"Forward filling {len(self.fill_channels)} channels")
1✔
204
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
205

206
        # Save electron resolved dataframe
207
        electron_channels = get_channels(self._config, "per_electron")
1✔
208
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
209
        electron_df = df.dropna(subset=electron_channels).astype(dtypes).reset_index()
1✔
210
        logger.debug(f"Saving electron buffer with shape: {electron_df.shape}")
1✔
211
        electron_df.to_parquet(paths["electron"])
1✔
212

213
        # Create and save timed dataframe
214
        df_timed = self._create_timed_dataframe(df)
1✔
215
        # timed dataframe
216
        if isinstance(df.index, MultiIndex):
1✔
217
            # drop the electron channels and only take rows with the first electronId
218
            df_timed = df[self.fill_channels].loc[:, :, 0]
1✔
219
        else:
NEW
220
            df_timed = df[self.fill_channels]
×
221
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
222
        timed_df = df_timed.astype(dtypes).reset_index()
1✔
223
        logger.debug(f"Saving timed buffer with shape: {timed_df.shape}")
1✔
224
        timed_df.to_parquet(paths["timed"])
1✔
225

226
        logger.debug(f"Processed {paths['raw'].stem} in {time.time() - start_time:.2f}s")
1✔
227

228
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
229
        """
230
        Creates the buffer files that are missing.
231

232
        Args:
233
            force_recreate (bool): Flag to force recreation of buffer files.
234
            debug (bool): Flag to enable debug mode, which serializes the creation.
235
        """
236
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
237
        logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
238
        n_cores = min(len(file_sets), self.n_cores)
1✔
239
        if n_cores > 0:
1✔
240
            if debug:
1✔
241
                for file_set in file_sets:
1✔
242
                    self._save_buffer_file(file_set)
1✔
243
            else:
244
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
245
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
246
                )
247

248
    def _get_dataframes(self) -> None:
1✔
249
        """
250
        Reads the buffer files from a folder.
251

252
        First the buffer files are read as a dask dataframe is accessed.
253
        The dataframe is forward filled lazily with non-electron channels.
254
        For the electron dataframe, all values not in the electron channels
255
        are dropped, and splits the sector ID from the DLD time.
256
        For the timed dataframe, only the train and pulse channels are taken and
257
        it pulse resolved (no longer electron resolved). If time_index is True,
258
        the timeIndex is calculated and set as the index (slow operation).
259
        """
260
        if not self.fp:
1✔
261
            raise FileNotFoundError("Buffer files do not exist.")
1✔
262
        # Loop over the electron and timed dataframes
263
        file_stats = {}
1✔
264
        filling = {}
1✔
265
        for typ in DF_TYP:
1✔
266
            # Read the parquet files into a dask dataframe
267
            df = dd.read_parquet(self.fp[typ])  # , calculate_divisions=True)
1✔
268
            # Get the metadata from the parquet files
269
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
270

271
            # Forward fill the non-electron channels across files
272
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
273
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
274
            if iterations:
1✔
275
                df = forward_fill_lazy(
1✔
276
                    df=df,
277
                    columns=self.fill_channels,
278
                    before=overlap,
279
                    iterations=iterations,
280
                )
281
                # TODO: This dict should be returned by forward_fill_lazy
282
                filling[typ] = {
1✔
283
                    "columns": self.fill_channels,
284
                    "overlap": overlap,
285
                    "iterations": iterations,
286
                }
287

288
            self.df[typ] = df
1✔
289
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
290
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
291
        if (
1✔
292
            self._config.get("split_sector_id_from_dld_time", False)
293
            and self._config.get("tof_column", "dldTimeSteps") in self.df["electron"].columns
294
        ):
295
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
296
                self.df["electron"],
297
                config=self._config,
298
            )
299
            self.metadata.update(meta)
1✔
300

301
    def process_and_load_dataframe(
1✔
302
        self,
303
        h5_paths: list[Path],
304
        folder: Path,
305
        force_recreate: bool = False,
306
        suffix: str = "",
307
        debug: bool = False,
308
        remove_invalid_files: bool = False,
309
        filter_timed_by_electron: bool = True,
310
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
311
        """
312
        Runs the buffer file creation process.
313
        Does a schema check on the buffer files and creates them if they are missing.
314
        Performs forward filling and splits the sector ID from the DLD time lazily.
315

316
        Args:
317
            h5_paths (List[Path]): List of paths to H5 files.
318
            folder (Path): Path to the folder for processed files.
319
            force_recreate (bool): Flag to force recreation of buffer files.
320
            suffix (str): Suffix for buffer file names.
321
            debug (bool): Flag to enable debug mode.):
322
            remove_invalid_files (bool): Flag to remove invalid files.
323
            filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.
324

325
        Returns:
326
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
327
        """
328
        self.filter_timed_by_electron = filter_timed_by_electron
1✔
329
        if remove_invalid_files:
1✔
330
            h5_paths = self._validate_h5_files(self._config, h5_paths)
1✔
331

332
        self.fp = BufferFilePaths(h5_paths, folder, suffix)
1✔
333

334
        if not force_recreate:
1✔
335
            schema_set = set(
1✔
336
                get_channels(self._config, formats="all", index=True, extend_aux=True),
337
            )
338
            self._schema_check(self.fp["electron"], schema_set)
1✔
339
            # schema_set = set(
340
            #     get_channels(
341
            #         self._config,
342
            #         formats=["per_pulse", "per_train"],
343
            #         index=True,
344
            #         extend_aux=True,
345
            #     ),
346
            # ) - {"electronId"}
347
            # self._schema_check(self.fp["timed"], schema_set)
348

349
        self._save_buffer_files(force_recreate, debug)
1✔
350

351
        self._get_dataframes()
1✔
352

353
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc