• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 13167225169

05 Feb 2025 09:50PM UTC coverage: 92.174% (+0.4%) from 91.801%
13167225169

push

github

web-flow
Merge pull request #437 from OpenCOMPES/v1_feature_branch

Upgrade to V1

2235 of 2372 new or added lines in 53 files covered. (94.22%)

4 existing lines in 1 file now uncovered.

7703 of 8357 relevant lines covered (92.17%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.27
/src/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5
import time
1✔
6

7
import dask.dataframe as dd
1✔
8
import pyarrow.parquet as pq
1✔
9
from joblib import delayed
1✔
10
from joblib import Parallel
1✔
11

12
from sed.core.dfops import forward_fill_lazy
1✔
13
from sed.core.logging import setup_logging
1✔
14
from sed.loader.flash.dataframe import DataFrameCreator
1✔
15
from sed.loader.flash.utils import get_channels
1✔
16
from sed.loader.flash.utils import get_dtypes
1✔
17
from sed.loader.flash.utils import InvalidFileError
1✔
18
from sed.loader.utils import get_parquet_metadata
1✔
19
from sed.loader.utils import split_dld_time_from_sector_id
1✔
20

21

22
DF_TYP = ["electron", "timed"]
1✔
23

24
logger = setup_logging("flash_buffer_handler")
1✔
25

26

27
class BufferFilePaths:
1✔
28
    """
29
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
30
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
31
    and the electron and timed buffer files.
32

33
    Structure of the file sets:
34
    {
35
        "raw": Path to the H5 file,
36
        "electron": Path to the electron buffer file,
37
        "timed": Path to the timed buffer file,
38
    }
39
    """
40

41
    def __init__(
1✔
42
        self,
43
        config: dict,
44
        h5_paths: list[Path],
45
        folder: Path,
46
        suffix: str,
47
        remove_invalid_files: bool,
48
    ) -> None:
49
        """Initializes the BufferFilePaths.
50

51
        Args:
52
            h5_paths (list[Path]): List of paths to the H5 files.
53
            folder (Path): Path to the folder for processed files.
54
            suffix (str): Suffix for buffer file names.
55
        """
56
        suffix = f"_{suffix}" if suffix else ""
1✔
57
        folder = folder / "buffer"
1✔
58
        folder.mkdir(parents=True, exist_ok=True)
1✔
59

60
        if remove_invalid_files:
1✔
61
            h5_paths = self.remove_invalid_files(config, h5_paths)
1✔
62

63
        self._file_paths = self._create_file_paths(h5_paths, folder, suffix)
1✔
64

65
    def _create_file_paths(
1✔
66
        self,
67
        h5_paths: list[Path],
68
        folder: Path,
69
        suffix: str,
70
    ) -> list[dict[str, Path]]:
71
        return [
1✔
72
            {
73
                "raw": h5_path,
74
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
75
            }
76
            for h5_path in h5_paths
77
        ]
78

79
    def __getitem__(self, key) -> list[Path]:
1✔
80
        if isinstance(key, str):
1✔
81
            return [file_set[key] for file_set in self._file_paths]
1✔
NEW
82
        return self._file_paths[key]
×
83

84
    def __iter__(self):
1✔
85
        return iter(self._file_paths)
1✔
86

87
    def __len__(self):
1✔
88
        return len(self._file_paths)
1✔
89

90
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
91
        """Returns a list of file sets that need to be processed."""
92
        if force_recreate:
1✔
93
            return self._file_paths
1✔
94
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
95

96
    def remove_invalid_files(self, config, h5_paths: list[Path]) -> list[Path]:
1✔
97
        valid_h5_paths = []
1✔
98
        for h5_path in h5_paths:
1✔
99
            try:
1✔
100
                dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path)
1✔
101
                dfc.validate_channel_keys()
1✔
102
                valid_h5_paths.append(h5_path)
1✔
103
            except InvalidFileError as e:
1✔
104
                logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}")
1✔
105

106
        return valid_h5_paths
1✔
107

108

109
class BufferHandler:
1✔
110
    """
111
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
112
    """
113

114
    def __init__(
1✔
115
        self,
116
        config: dict,
117
    ) -> None:
118
        """
119
        Initializes the BufferHandler.
120

121
        Args:
122
            config (dict): The configuration dictionary.
123
        """
124
        self._config: dict = config["dataframe"]
1✔
125
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
126
        self.fp: BufferFilePaths = None
1✔
127
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
128
        self.fill_channels: list[str] = get_channels(
1✔
129
            self._config,
130
            ["per_pulse", "per_train"],
131
            extend_aux=True,
132
        )
133
        self.metadata: dict = {}
1✔
134
        self.filter_timed_by_electron: bool = None
1✔
135

136
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
137
        """
138
        Checks the schema of the Parquet files.
139
        """
140
        logger.debug(f"Checking schema for {len(files)} files")
1✔
141
        existing = [file for file in files if file.exists()]
1✔
142
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
143

144
        for filename, schema in zip(existing, parquet_schemas):
1✔
145
            schema_set = set(schema.names)
1✔
146
            if schema_set != expected_schema_set:
1✔
147
                logger.error(f"Schema mismatch in file: {filename}")
1✔
148
                missing_in_parquet = expected_schema_set - schema_set
1✔
149
                missing_in_config = schema_set - expected_schema_set
1✔
150

151
                errors = []
1✔
152
                if missing_in_parquet:
1✔
153
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
154
                if missing_in_config:
1✔
155
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
156

157
                raise ValueError(
1✔
158
                    f"The available channels do not match the schema of file {filename}. "
159
                    f"{' '.join(errors)}. "
160
                    "Please check the configuration file or set force_recreate to True.",
161
                )
162
        logger.debug("Schema check passed successfully")
1✔
163

164
    def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
1✔
165
        """Creates the timed dataframe, optionally filtering by electron events.
166

167
        Args:
168
            df (dd.DataFrame): The input dataframe containing all data
169

170
        Returns:
171
            dd.DataFrame: The timed dataframe
172
        """
173
        # Get channels that should be in timed dataframe
174
        timed_channels = self.fill_channels
1✔
175

176
        if self.filter_timed_by_electron:
1✔
177
            # Get electron channels to use for filtering
178
            electron_channels = get_channels(self._config, "per_electron")
1✔
179
            # Filter rows where electron data exists
180
            df_timed = df.dropna(subset=electron_channels)[timed_channels]
1✔
181
        else:
182
            # Take all timed data rows without filtering
183
            df_timed = df[timed_channels]
1✔
184

185
        # Take only first electron per event
186
        return df_timed.loc[:, :, 0]
1✔
187

188
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
189
        """Creates the electron and timed buffer files from the raw H5 file."""
190
        logger.debug(f"Processing file: {paths['raw'].stem}")
1✔
191
        start_time = time.time()
1✔
192
        # Create DataFrameCreator and get get dataframe
193
        df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
194

195
        # Forward fill non-electron channels
196
        logger.debug(f"Forward filling {len(self.fill_channels)} channels")
1✔
197
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
198

199
        # Save electron resolved dataframe
200
        electron_channels = get_channels(self._config, "per_electron")
1✔
201
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
202
        electron_df = df.dropna(subset=electron_channels).astype(dtypes).reset_index()
1✔
203
        logger.debug(f"Saving electron buffer with shape: {electron_df.shape}")
1✔
204
        electron_df.to_parquet(paths["electron"])
1✔
205

206
        # Create and save timed dataframe
207
        df_timed = self._create_timed_dataframe(df)
1✔
208
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
209
        timed_df = df_timed.astype(dtypes).reset_index()
1✔
210
        logger.debug(f"Saving timed buffer with shape: {timed_df.shape}")
1✔
211
        timed_df.to_parquet(paths["timed"])
1✔
212

213
        logger.debug(f"Processed {paths['raw'].stem} in {time.time() - start_time:.2f}s")
1✔
214

215
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
216
        """
217
        Creates the buffer files that are missing.
218

219
        Args:
220
            force_recreate (bool): Flag to force recreation of buffer files.
221
            debug (bool): Flag to enable debug mode, which serializes the creation.
222
        """
223
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
224
        logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
225
        n_cores = min(len(file_sets), self.n_cores)
1✔
226
        if n_cores > 0:
1✔
227
            if debug:
1✔
228
                for file_set in file_sets:
1✔
229
                    self._save_buffer_file(file_set)
1✔
230
            else:
231
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
232
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
233
                )
234

235
    def _get_dataframes(self) -> None:
1✔
236
        """
237
        Reads the buffer files from a folder.
238

239
        First the buffer files are read as a dask dataframe is accessed.
240
        The dataframe is forward filled lazily with non-electron channels.
241
        For the electron dataframe, all values not in the electron channels
242
        are dropped, and splits the sector ID from the DLD time.
243
        For the timed dataframe, only the train and pulse channels are taken and
244
        it pulse resolved (no longer electron resolved). If time_index is True,
245
        the timeIndex is calculated and set as the index (slow operation).
246
        """
247
        if not self.fp:
1✔
248
            raise FileNotFoundError("Buffer files do not exist.")
1✔
249
        # Loop over the electron and timed dataframes
250
        file_stats = {}
1✔
251
        filling = {}
1✔
252
        for typ in DF_TYP:
1✔
253
            # Read the parquet files into a dask dataframe
254
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
255
            # Get the metadata from the parquet files
256
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
257

258
            # Forward fill the non-electron channels across files
259
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
260
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
261
            df = forward_fill_lazy(
1✔
262
                df=df,
263
                columns=self.fill_channels,
264
                before=overlap,
265
                iterations=iterations,
266
            )
267
            # TODO: This dict should be returned by forward_fill_lazy
268
            filling[typ] = {
1✔
269
                "columns": self.fill_channels,
270
                "overlap": overlap,
271
                "iterations": iterations,
272
            }
273

274
            self.df[typ] = df
1✔
275
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
276
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
277
        if (
1✔
278
            self._config.get("split_sector_id_from_dld_time", False)
279
            and self._config.get("tof_column", "dldTimeSteps") in self.df["electron"].columns
280
        ):
281
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
282
                self.df["electron"],
283
                config=self._config,
284
            )
285
            self.metadata.update(meta)
1✔
286

287
    def process_and_load_dataframe(
1✔
288
        self,
289
        h5_paths: list[Path],
290
        folder: Path,
291
        force_recreate: bool = False,
292
        suffix: str = "",
293
        debug: bool = False,
294
        remove_invalid_files: bool = False,
295
        filter_timed_by_electron: bool = True,
296
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
297
        """
298
        Runs the buffer file creation process.
299
        Does a schema check on the buffer files and creates them if they are missing.
300
        Performs forward filling and splits the sector ID from the DLD time lazily.
301

302
        Args:
303
            h5_paths (List[Path]): List of paths to H5 files.
304
            folder (Path): Path to the folder for processed files.
305
            force_recreate (bool): Flag to force recreation of buffer files.
306
            suffix (str): Suffix for buffer file names.
307
            debug (bool): Flag to enable debug mode.):
308
            remove_invalid_files (bool): Flag to remove invalid files.
309
            filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.
310

311
        Returns:
312
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
313
        """
314
        self.fp = BufferFilePaths(self._config, h5_paths, folder, suffix, remove_invalid_files)
1✔
315
        self.filter_timed_by_electron = filter_timed_by_electron
1✔
316

317
        if not force_recreate:
1✔
318
            schema_set = set(
1✔
319
                get_channels(self._config, formats="all", index=True, extend_aux=True),
320
            )
321
            self._schema_check(self.fp["electron"], schema_set)
1✔
322
            schema_set = set(
1✔
323
                get_channels(
324
                    self._config,
325
                    formats=["per_pulse", "per_train"],
326
                    index=True,
327
                    extend_aux=True,
328
                ),
329
            ) - {"electronId"}
330
            self._schema_check(self.fp["timed"], schema_set)
1✔
331

332
        self._save_buffer_files(force_recreate, debug)
1✔
333

334
        self._get_dataframes()
1✔
335

336
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc