• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12737093410

12 Jan 2025 09:08PM UTC coverage: 92.047% (+0.2%) from 91.801%
12737093410

Pull #437

github

web-flow
Merge pull request #542 from OpenCOMPES/more-broken-file-fixes

add further exceptions for completely empty files, and exceptions
Pull Request #437: Upgrade to V1

2103 of 2238 new or added lines in 53 files covered. (93.97%)

4 existing lines in 1 file now uncovered.

7581 of 8236 relevant lines covered (92.05%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.21
/src/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5

6
import dask.dataframe as dd
1✔
7
import pyarrow.parquet as pq
1✔
8
from joblib import delayed
1✔
9
from joblib import Parallel
1✔
10

11
from sed.core.dfops import forward_fill_lazy
1✔
12
from sed.core.logging import setup_logging
1✔
13
from sed.loader.flash.dataframe import DataFrameCreator
1✔
14
from sed.loader.flash.utils import get_channels
1✔
15
from sed.loader.flash.utils import get_dtypes
1✔
16
from sed.loader.flash.utils import InvalidFileError
1✔
17
from sed.loader.utils import get_parquet_metadata
1✔
18
from sed.loader.utils import split_dld_time_from_sector_id
1✔
19

20

21
DF_TYP = ["electron", "timed"]
1✔
22

23
logger = setup_logging(__name__)
1✔
24

25

26
class BufferFilePaths:
1✔
27
    """
28
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
29
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
30
    and the electron and timed buffer files.
31

32
    Structure of the file sets:
33
    {
34
        "raw": Path to the H5 file,
35
        "electron": Path to the electron buffer file,
36
        "timed": Path to the timed buffer file,
37
    }
38
    """
39

40
    def __init__(
1✔
41
        self,
42
        config: dict,
43
        h5_paths: list[Path],
44
        folder: Path,
45
        suffix: str,
46
        remove_invalid_files: bool,
47
    ) -> None:
48
        """Initializes the BufferFilePaths.
49

50
        Args:
51
            h5_paths (list[Path]): List of paths to the H5 files.
52
            folder (Path): Path to the folder for processed files.
53
            suffix (str): Suffix for buffer file names.
54
        """
55
        suffix = f"_{suffix}" if suffix else ""
1✔
56
        folder = folder / "buffer"
1✔
57
        folder.mkdir(parents=True, exist_ok=True)
1✔
58

59
        if remove_invalid_files:
1✔
60
            h5_paths = self.remove_invalid_files(config, h5_paths)
1✔
61

62
        self._file_paths = self._create_file_paths(h5_paths, folder, suffix)
1✔
63

64
    def _create_file_paths(
1✔
65
        self,
66
        h5_paths: list[Path],
67
        folder: Path,
68
        suffix: str,
69
    ) -> list[dict[str, Path]]:
70
        return [
1✔
71
            {
72
                "raw": h5_path,
73
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
74
            }
75
            for h5_path in h5_paths
76
        ]
77

78
    def __getitem__(self, key) -> list[Path]:
1✔
79
        if isinstance(key, str):
1✔
80
            return [file_set[key] for file_set in self._file_paths]
1✔
NEW
81
        return self._file_paths[key]
×
82

83
    def __iter__(self):
1✔
84
        return iter(self._file_paths)
1✔
85

86
    def __len__(self):
1✔
87
        return len(self._file_paths)
1✔
88

89
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
90
        """Returns a list of file sets that need to be processed."""
91
        if force_recreate:
1✔
92
            return self._file_paths
1✔
93
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
94

95
    def remove_invalid_files(self, config, h5_paths: list[Path]) -> list[Path]:
1✔
96
        valid_h5_paths = []
1✔
97
        for h5_path in h5_paths:
1✔
98
            try:
1✔
99
                dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path)
1✔
100
                dfc.validate_channel_keys()
1✔
101
                valid_h5_paths.append(h5_path)
1✔
102
            except InvalidFileError as e:
1✔
103
                logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}")
1✔
104

105
        return valid_h5_paths
1✔
106

107

108
class BufferHandler:
1✔
109
    """
110
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
111
    """
112

113
    def __init__(
1✔
114
        self,
115
        config: dict,
116
    ) -> None:
117
        """
118
        Initializes the BufferHandler.
119

120
        Args:
121
            config (dict): The configuration dictionary.
122
        """
123
        self._config: dict = config["dataframe"]
1✔
124
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
125
        self.fp: BufferFilePaths = None
1✔
126
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
127
        self.fill_channels: list[str] = get_channels(
1✔
128
            self._config,
129
            ["per_pulse", "per_train"],
130
            extend_aux=True,
131
        )
132
        self.metadata: dict = {}
1✔
133
        self.filter_timed_by_electron: bool = None
1✔
134

135
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
136
        """
137
        Checks the schema of the Parquet files.
138

139
        Raises:
140
            ValueError: If the schema of the Parquet files does not match the configuration.
141
        """
142
        existing = [file for file in files if file.exists()]
1✔
143
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
144

145
        for filename, schema in zip(existing, parquet_schemas):
1✔
146
            schema_set = set(schema.names)
1✔
147
            if schema_set != expected_schema_set:
1✔
148
                missing_in_parquet = expected_schema_set - schema_set
1✔
149
                missing_in_config = schema_set - expected_schema_set
1✔
150

151
                errors = []
1✔
152
                if missing_in_parquet:
1✔
153
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
154
                if missing_in_config:
1✔
155
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
156

157
                raise ValueError(
1✔
158
                    f"The available channels do not match the schema of file {filename}. "
159
                    f"{' '.join(errors)}. "
160
                    "Please check the configuration file or set force_recreate to True.",
161
                )
162

163
    def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
1✔
164
        """Creates the timed dataframe, optionally filtering by electron events.
165

166
        Args:
167
            df (dd.DataFrame): The input dataframe containing all data
168

169
        Returns:
170
            dd.DataFrame: The timed dataframe
171
        """
172
        # Get channels that should be in timed dataframe
173
        timed_channels = self.fill_channels
1✔
174

175
        if self.filter_timed_by_electron:
1✔
176
            # Get electron channels to use for filtering
177
            electron_channels = get_channels(self._config, "per_electron")
1✔
178
            # Filter rows where electron data exists
179
            df_timed = df.dropna(subset=electron_channels)[timed_channels]
1✔
180
        else:
181
            # Take all timed data rows without filtering
182
            df_timed = df[timed_channels]
1✔
183

184
        # Take only first electron per event
185
        return df_timed.loc[:, :, 0]
1✔
186

187
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
188
        """
189
        Creates the electron and timed buffer files from the raw H5 file.
190
        First the dataframe is accessed and forward filled in the non-electron channels.
191
        Then the data types are set. For the electron dataframe, all values not in the electron
192
        channels are dropped. For the timed dataframe, only the train and pulse channels are taken
193
        and it pulse resolved (no longer electron resolved). Both are saved as parquet files.
194

195
        Args:
196
            paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files.
197
        """
198
        # Create a DataFrameCreator instance and get the h5 file
199
        df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
200

201
        # forward fill all the non-electron channels
202
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
203

204
        # Save electron resolved dataframe
205
        electron_channels = get_channels(self._config, "per_electron")
1✔
206
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
207
        df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet(
1✔
208
            paths["electron"],
209
        )
210

211
        # Create and save timed dataframe
212
        df_timed = self._create_timed_dataframe(df)
1✔
213
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
214
        df_timed.astype(dtypes).reset_index().to_parquet(paths["timed"])
1✔
215

216
        logger.debug(f"Processed {paths['raw'].stem}")
1✔
217

218
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
219
        """
220
        Creates the buffer files that are missing.
221

222
        Args:
223
            force_recreate (bool): Flag to force recreation of buffer files.
224
            debug (bool): Flag to enable debug mode, which serializes the creation.
225
        """
226
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
227
        logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
228
        n_cores = min(len(file_sets), self.n_cores)
1✔
229
        if n_cores > 0:
1✔
230
            if debug:
1✔
231
                for file_set in file_sets:
1✔
232
                    self._save_buffer_file(file_set)
1✔
233
            else:
234
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
235
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
236
                )
237

238
    def _get_dataframes(self) -> None:
1✔
239
        """
240
        Reads the buffer files from a folder.
241

242
        First the buffer files are read as a dask dataframe is accessed.
243
        The dataframe is forward filled lazily with non-electron channels.
244
        For the electron dataframe, all values not in the electron channels
245
        are dropped, and splits the sector ID from the DLD time.
246
        For the timed dataframe, only the train and pulse channels are taken and
247
        it pulse resolved (no longer electron resolved). If time_index is True,
248
        the timeIndex is calculated and set as the index (slow operation).
249
        """
250
        if not self.fp:
1✔
251
            raise FileNotFoundError("Buffer files do not exist.")
1✔
252
        # Loop over the electron and timed dataframes
253
        file_stats = {}
1✔
254
        filling = {}
1✔
255
        for typ in DF_TYP:
1✔
256
            # Read the parquet files into a dask dataframe
257
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
258
            # Get the metadata from the parquet files
259
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
260

261
            # Forward fill the non-electron channels across files
262
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
263
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
264
            df = forward_fill_lazy(
1✔
265
                df=df,
266
                columns=self.fill_channels,
267
                before=overlap,
268
                iterations=iterations,
269
            )
270
            # TODO: This dict should be returned by forward_fill_lazy
271
            filling[typ] = {
1✔
272
                "columns": self.fill_channels,
273
                "overlap": overlap,
274
                "iterations": iterations,
275
            }
276

277
            self.df[typ] = df
1✔
278
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
279
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
280
        if (
1✔
281
            self._config.get("split_sector_id_from_dld_time", False)
282
            and self._config.get("tof_column", "dldTimeSteps") in self.df["electron"].columns
283
        ):
284
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
285
                self.df["electron"],
286
                config=self._config,
287
            )
288
            self.metadata.update(meta)
1✔
289

290
    def process_and_load_dataframe(
1✔
291
        self,
292
        h5_paths: list[Path],
293
        folder: Path,
294
        force_recreate: bool = False,
295
        suffix: str = "",
296
        debug: bool = False,
297
        remove_invalid_files: bool = False,
298
        filter_timed_by_electron: bool = True,
299
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
300
        """
301
        Runs the buffer file creation process.
302
        Does a schema check on the buffer files and creates them if they are missing.
303
        Performs forward filling and splits the sector ID from the DLD time lazily.
304

305
        Args:
306
            h5_paths (List[Path]): List of paths to H5 files.
307
            folder (Path): Path to the folder for processed files.
308
            force_recreate (bool): Flag to force recreation of buffer files.
309
            suffix (str): Suffix for buffer file names.
310
            debug (bool): Flag to enable debug mode.):
311
            remove_invalid_files (bool): Flag to remove invalid files.
312
            filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.
313

314
        Returns:
315
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
316
        """
317
        self.fp = BufferFilePaths(self._config, h5_paths, folder, suffix, remove_invalid_files)
1✔
318
        self.filter_timed_by_electron = filter_timed_by_electron
1✔
319

320
        if not force_recreate:
1✔
321
            schema_set = set(
1✔
322
                get_channels(self._config, formats="all", index=True, extend_aux=True),
323
            )
324
            self._schema_check(self.fp["electron"], schema_set)
1✔
325
            schema_set = set(
1✔
326
                get_channels(
327
                    self._config,
328
                    formats=["per_pulse", "per_train"],
329
                    index=True,
330
                    extend_aux=True,
331
                ),
332
            ) - {"electronId"}
333
            self._schema_check(self.fp["timed"], schema_set)
1✔
334

335
        self._save_buffer_files(force_recreate, debug)
1✔
336

337
        self._get_dataframes()
1✔
338

339
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc