• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12811799296

16 Jan 2025 03:10PM UTC coverage: 91.652% (-0.5%) from 92.167%
12811799296

Pull #534

github

web-flow
Merge 788d18966 into 1752d347e
Pull Request #534: Hextof lab loader

66 of 117 new or added lines in 7 files covered. (56.41%)

75 existing lines in 4 files now uncovered.

7729 of 8433 relevant lines covered (91.65%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.69
/src/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5
import time
1✔
6

7
import dask.dataframe as dd
1✔
8
import pyarrow.parquet as pq
1✔
9
from joblib import delayed
1✔
10
from joblib import Parallel
1✔
11
from pandas import MultiIndex
1✔
12

13
from sed.core.dfops import forward_fill_lazy
1✔
14
from sed.core.logging import setup_logging
1✔
15
from sed.loader.flash.dataframe import BaseDataFrameCreator
1✔
16
from sed.loader.flash.dataframe import CFELDataFrameCreator
1✔
17
from sed.loader.flash.dataframe import DataFrameCreator
1✔
18
from sed.loader.flash.utils import get_channels
1✔
19
from sed.loader.flash.utils import get_dtypes
1✔
20
from sed.loader.flash.utils import InvalidFileError
1✔
21
from sed.loader.utils import get_parquet_metadata
1✔
22
from sed.loader.utils import split_dld_time_from_sector_id
1✔
23

24

25
DF_TYP = ["electron", "timed"]
1✔
26

27
logger = setup_logging("flash_buffer_handler")
1✔
28

29

30
class BufferFilePaths:
1✔
31
    """
32
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
33
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
34
    and the electron and timed buffer files.
35

36
    Structure of the file sets:
37
    {
38
        "raw": Path to the H5 file,
39
        "electron": Path to the electron buffer file,
40
        "timed": Path to the timed buffer file,
41
    }
42
    """
43

44
    def __init__(
1✔
45
        self,
46
        h5_paths: list[Path],
47
        folder: Path,
48
        suffix: str,
49
    ) -> None:
50
        """Initializes the BufferFilePaths.
51

52
        Args:
53
            h5_paths (list[Path]): List of paths to the H5 files.
54
            folder (Path): Path to the folder for processed files.
55
            suffix (str): Suffix for buffer file names.
56
        """
57
        suffix = f"_{suffix}" if suffix else ""
1✔
58
        folder = folder / "buffer"
1✔
59
        folder.mkdir(parents=True, exist_ok=True)
1✔
60

61
        self._file_paths = self._create_file_paths(h5_paths, folder, suffix)
1✔
62

63
    def _create_file_paths(
1✔
64
        self,
65
        h5_paths: list[Path],
66
        folder: Path,
67
        suffix: str,
68
    ) -> list[dict[str, Path]]:
69
        return [
1✔
70
            {
71
                "raw": h5_path,
72
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
73
            }
74
            for h5_path in h5_paths
75
        ]
76

77
    def __getitem__(self, key) -> list[Path]:
1✔
78
        if isinstance(key, str):
1✔
79
            return [file_set[key] for file_set in self._file_paths]
1✔
UNCOV
80
        return self._file_paths[key]
×
81

82
    def __iter__(self):
1✔
83
        return iter(self._file_paths)
1✔
84

85
    def __len__(self):
1✔
86
        return len(self._file_paths)
1✔
87

88
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
89
        """Returns a list of file sets that need to be processed."""
90
        if force_recreate:
1✔
91
            return self._file_paths
1✔
92
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
93

94

95
class BufferHandler:
1✔
96
    """
97
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
98
    """
99

100
    def __init__(
1✔
101
        self,
102
        config: dict,
103
    ) -> None:
104
        """
105
        Initializes the BufferHandler.
106

107
        Args:
108
            config (dict): The configuration dictionary.
109
        """
110
        self._config: dict = config["dataframe"]
1✔
111
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
112
        self.fp: BufferFilePaths = None
1✔
113
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
114
        fill_formats = self._config.get("fill_formats")
1✔
115
        self.fill_channels: list[str] = get_channels(
1✔
116
            self._config,
117
            fill_formats,
118
            extend_aux=True,
119
        )
120
        self.metadata: dict = {}
1✔
121
        self.filter_timed_by_electron: bool = None
1✔
122

123
        core_beamline = config["core"].get("beamline")
1✔
124
        self.DataFrameCreator: type[BaseDataFrameCreator] = None
1✔
125
        if core_beamline == "pg2":
1✔
126
            self.DataFrameCreator = DataFrameCreator
1✔
NEW
127
        elif core_beamline == "cfel":
×
NEW
128
            self.DataFrameCreator = CFELDataFrameCreator
×
129
        else:
NEW
130
            raise ValueError(f"Unsupported core beamline: {core_beamline}")
×
131

132
    def _validate_h5_files(self, config, h5_paths: list[Path]) -> list[Path]:
1✔
133
        valid_h5_paths = []
1✔
134
        for h5_path in h5_paths:
1✔
135
            try:
1✔
136
                dfc = self.DataFrameCreator(config_dataframe=config, h5_path=h5_path)
1✔
137
                dfc.validate_channel_keys()
1✔
138
                valid_h5_paths.append(h5_path)
1✔
139
            except InvalidFileError as e:
1✔
140
                logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}")
1✔
141

142
        return valid_h5_paths
1✔
143

144
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
145
        """
146
        Checks the schema of the Parquet files.
147
        """
148
        logger.debug(f"Checking schema for {len(files)} files")
1✔
149
        existing = [file for file in files if file.exists()]
1✔
150
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
151

152
        for filename, schema in zip(existing, parquet_schemas):
1✔
153
            schema_set = set(schema.names)
1✔
154
            if schema_set != expected_schema_set:
1✔
155
                logger.error(f"Schema mismatch in file: {filename}")
1✔
156
                missing_in_parquet = expected_schema_set - schema_set
1✔
157
                missing_in_config = schema_set - expected_schema_set
1✔
158

159
                errors = []
1✔
160
                if missing_in_parquet:
1✔
161
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
162
                if missing_in_config:
1✔
163
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
164

165
                raise ValueError(
1✔
166
                    f"The available channels do not match the schema of file {filename}. "
167
                    f"{' '.join(errors)}. "
168
                    "Please check the configuration file or set force_recreate to True.",
169
                )
170
        logger.debug("Schema check passed successfully")
1✔
171

172
    def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
1✔
173
        """Creates the timed dataframe, optionally filtering by electron events.
174

175
        Args:
176
            df (dd.DataFrame): The input dataframe containing all data
177

178
        Returns:
179
            dd.DataFrame: The timed dataframe
180
        """
181
        # Get channels that should be in timed dataframe
182
        timed_channels = self.fill_channels
1✔
183

184
        if self.filter_timed_by_electron:
1✔
185
            # Get electron channels to use for filtering
186
            electron_channels = get_channels(self._config, "per_electron")
1✔
187
            # Filter rows where electron data exists
188
            df_timed = df.dropna(subset=electron_channels)[timed_channels]
1✔
189
        else:
190
            # Take all timed data rows without filtering
191
            df_timed = df[timed_channels]
1✔
192

193
        # Take only first electron per event
194
        return df_timed.loc[:, :, 0]
1✔
195

196
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
197
        """Creates the electron and timed buffer files from the raw H5 file."""
198
        logger.debug(f"Processing file: {paths['raw'].stem}")
1✔
199
        start_time = time.time()
1✔
200
        # Create DataFrameCreator and get get dataframe
201
        df = self.DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
202

203
        # Forward fill non-electron channels
204
        logger.debug(f"Forward filling {len(self.fill_channels)} channels")
1✔
205
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
206

207
        # Save electron resolved dataframe
208
        electron_channels = get_channels(self._config, "per_electron")
1✔
209
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
210
        electron_df = df.dropna(subset=electron_channels).astype(dtypes).reset_index()
1✔
211
        logger.debug(f"Saving electron buffer with shape: {electron_df.shape}")
1✔
212
        electron_df.to_parquet(paths["electron"])
1✔
213

214
        # Create and save timed dataframe
215
        df_timed = self._create_timed_dataframe(df)
1✔
216
        # timed dataframe
217
        if isinstance(df.index, MultiIndex):
1✔
218
            # drop the electron channels and only take rows with the first electronId
219
            df_timed = df[self.fill_channels].loc[:, :, 0]
1✔
220
        else:
NEW
UNCOV
221
            df_timed = df[self.fill_channels]
×
222
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
223
        timed_df = df_timed.astype(dtypes).reset_index()
1✔
224
        logger.debug(f"Saving timed buffer with shape: {timed_df.shape}")
1✔
225
        timed_df.to_parquet(paths["timed"])
1✔
226

227
        logger.debug(f"Processed {paths['raw'].stem} in {time.time() - start_time:.2f}s")
1✔
228

229
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
230
        """
231
        Creates the buffer files that are missing.
232

233
        Args:
234
            force_recreate (bool): Flag to force recreation of buffer files.
235
            debug (bool): Flag to enable debug mode, which serializes the creation.
236
        """
237
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
238
        logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
239
        n_cores = min(len(file_sets), self.n_cores)
1✔
240
        if n_cores > 0:
1✔
241
            if debug:
1✔
242
                for file_set in file_sets:
1✔
243
                    self._save_buffer_file(file_set)
1✔
244
            else:
245
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
246
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
247
                )
248

249
    def _get_dataframes(self) -> None:
1✔
250
        """
251
        Reads the buffer files from a folder.
252

253
        First the buffer files are read as a dask dataframe is accessed.
254
        The dataframe is forward filled lazily with non-electron channels.
255
        For the electron dataframe, all values not in the electron channels
256
        are dropped, and splits the sector ID from the DLD time.
257
        For the timed dataframe, only the train and pulse channels are taken and
258
        it pulse resolved (no longer electron resolved). If time_index is True,
259
        the timeIndex is calculated and set as the index (slow operation).
260
        """
261
        if not self.fp:
1✔
262
            raise FileNotFoundError("Buffer files do not exist.")
1✔
263
        # Loop over the electron and timed dataframes
264
        file_stats = {}
1✔
265
        filling = {}
1✔
266
        for typ in DF_TYP:
1✔
267
            # Read the parquet files into a dask dataframe
268
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
269
            # Get the metadata from the parquet files
270
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
271

272
            # Forward fill the non-electron channels across files
273
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
274
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
275
            df = forward_fill_lazy(
1✔
276
                df=df,
277
                columns=self.fill_channels,
278
                before=overlap,
279
                iterations=iterations,
280
            )
281
            # TODO: This dict should be returned by forward_fill_lazy
282
            filling[typ] = {
1✔
283
                "columns": self.fill_channels,
284
                "overlap": overlap,
285
                "iterations": iterations,
286
            }
287

288
            self.df[typ] = df
1✔
289
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
290
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
291
        if (
1✔
292
            self._config.get("split_sector_id_from_dld_time", False)
293
            and self._config.get("tof_column", "dldTimeSteps") in self.df["electron"].columns
294
        ):
295
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
296
                self.df["electron"],
297
                config=self._config,
298
            )
299
            self.metadata.update(meta)
1✔
300

301
    def process_and_load_dataframe(
1✔
302
        self,
303
        h5_paths: list[Path],
304
        folder: Path,
305
        force_recreate: bool = False,
306
        suffix: str = "",
307
        debug: bool = False,
308
        remove_invalid_files: bool = False,
309
        filter_timed_by_electron: bool = True,
310
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
311
        """
312
        Runs the buffer file creation process.
313
        Does a schema check on the buffer files and creates them if they are missing.
314
        Performs forward filling and splits the sector ID from the DLD time lazily.
315

316
        Args:
317
            h5_paths (List[Path]): List of paths to H5 files.
318
            folder (Path): Path to the folder for processed files.
319
            force_recreate (bool): Flag to force recreation of buffer files.
320
            suffix (str): Suffix for buffer file names.
321
            debug (bool): Flag to enable debug mode.):
322
            remove_invalid_files (bool): Flag to remove invalid files.
323
            filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.
324

325
        Returns:
326
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
327
        """
328
        self.filter_timed_by_electron = filter_timed_by_electron
1✔
329
        if remove_invalid_files:
1✔
330
            h5_paths = self._validate_h5_files(self._config, h5_paths)
1✔
331

332
        self.fp = BufferFilePaths(h5_paths, folder, suffix)
1✔
333

334
        if not force_recreate:
1✔
335
            schema_set = set(
1✔
336
                get_channels(self._config, formats="all", index=True, extend_aux=True),
337
            )
338
            self._schema_check(self.fp["electron"], schema_set)
1✔
339
            schema_set = set(
1✔
340
                get_channels(
341
                    self._config,
342
                    formats=["per_pulse", "per_train"],
343
                    index=True,
344
                    extend_aux=True,
345
                ),
346
            ) - {"electronId"}
347
            self._schema_check(self.fp["timed"], schema_set)
1✔
348

349
        self._save_buffer_files(force_recreate, debug)
1✔
350

351
        self._get_dataframes()
1✔
352

353
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc