• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 10948927564

19 Sep 2024 09:07PM UTC coverage: 92.693% (+0.7%) from 91.948%
10948927564

Pull #437

github

rettigl
Merge remote-tracking branch 'origin/main' into v1_feature_branch
Pull Request #437: Upgrade to V1

1255 of 1301 new or added lines in 50 files covered. (96.46%)

5 existing lines in 3 files now uncovered.

7129 of 7691 relevant lines covered (92.69%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.15
/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5

6
import dask.dataframe as dd
1✔
7
import pyarrow.parquet as pq
1✔
8
from joblib import delayed
1✔
9
from joblib import Parallel
1✔
10

11
from sed.core.dfops import forward_fill_lazy
1✔
12
from sed.core.logging import setup_logging
1✔
13
from sed.loader.flash.dataframe import DataFrameCreator
1✔
14
from sed.loader.flash.utils import get_channels
1✔
15
from sed.loader.flash.utils import get_dtypes
1✔
16
from sed.loader.flash.utils import InvalidFileError
1✔
17
from sed.loader.utils import get_parquet_metadata
1✔
18
from sed.loader.utils import split_dld_time_from_sector_id
1✔
19

20

21
DF_TYP = ["electron", "timed"]
1✔
22

23
logger = setup_logging(__name__)
1✔
24

25

26
class BufferFilePaths:
1✔
27
    """
28
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
29
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
30
    and the electron and timed buffer files.
31

32
    Structure of the file sets:
33
    {
34
        "raw": Path to the H5 file,
35
        "electron": Path to the electron buffer file,
36
        "timed": Path to the timed buffer file,
37
    }
38
    """
39

40
    def __init__(
1✔
41
        self,
42
        config: dict,
43
        h5_paths: list[Path],
44
        folder: Path,
45
        suffix: str,
46
        remove_invalid_files: bool,
47
    ) -> None:
48
        """Initializes the BufferFilePaths.
49

50
        Args:
51
            h5_paths (list[Path]): List of paths to the H5 files.
52
            folder (Path): Path to the folder for processed files.
53
            suffix (str): Suffix for buffer file names.
54
        """
55
        suffix = f"_{suffix}" if suffix else ""
1✔
56
        folder = folder / "buffer"
1✔
57
        folder.mkdir(parents=True, exist_ok=True)
1✔
58

59
        if remove_invalid_files:
1✔
60
            h5_paths = self.remove_invalid_files(config, h5_paths)
1✔
61

62
        self._file_paths = self._create_file_paths(h5_paths, folder, suffix)
1✔
63

64
    def _create_file_paths(
1✔
65
        self,
66
        h5_paths: list[Path],
67
        folder: Path,
68
        suffix: str,
69
    ) -> list[dict[str, Path]]:
70
        return [
1✔
71
            {
72
                "raw": h5_path,
73
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
74
            }
75
            for h5_path in h5_paths
76
        ]
77

78
    def __getitem__(self, key) -> list[Path]:
1✔
79
        if isinstance(key, str):
1✔
80
            return [file_set[key] for file_set in self._file_paths]
1✔
NEW
81
        return self._file_paths[key]
×
82

83
    def __iter__(self):
1✔
84
        return iter(self._file_paths)
1✔
85

86
    def __len__(self):
1✔
87
        return len(self._file_paths)
1✔
88

89
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
90
        """Returns a list of file sets that need to be processed."""
91
        if force_recreate:
1✔
92
            return self._file_paths
1✔
93
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
94

95
    def remove_invalid_files(self, config, h5_paths: list[Path]) -> list[Path]:
1✔
96
        valid_h5_paths = []
1✔
97
        for h5_path in h5_paths:
1✔
98
            try:
1✔
99
                dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path)
1✔
100
                dfc.validate_channel_keys()
1✔
101
                valid_h5_paths.append(h5_path)
1✔
102
            except InvalidFileError as e:
1✔
103
                logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}")
1✔
104

105
        return valid_h5_paths
1✔
106

107

108
class BufferHandler:
1✔
109
    """
110
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
111
    """
112

113
    def __init__(
1✔
114
        self,
115
        config: dict,
116
    ) -> None:
117
        """
118
        Initializes the BufferHandler.
119

120
        Args:
121
            config (dict): The configuration dictionary.
122
        """
123
        self._config: dict = config["dataframe"]
1✔
124
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
125
        self.fp: BufferFilePaths = None
1✔
126
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
127
        self.fill_channels: list[str] = get_channels(
1✔
128
            self._config,
129
            ["per_pulse", "per_train"],
130
            extend_aux=True,
131
        )
132
        self.metadata: dict = {}
1✔
133

134
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
135
        """
136
        Checks the schema of the Parquet files.
137

138
        Raises:
139
            ValueError: If the schema of the Parquet files does not match the configuration.
140
        """
141
        existing = [file for file in files if file.exists()]
1✔
142
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
143

144
        for filename, schema in zip(existing, parquet_schemas):
1✔
145
            schema_set = set(schema.names)
1✔
146
            if schema_set != expected_schema_set:
1✔
147
                missing_in_parquet = expected_schema_set - schema_set
1✔
148
                missing_in_config = schema_set - expected_schema_set
1✔
149

150
                errors = []
1✔
151
                if missing_in_parquet:
1✔
152
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
153
                if missing_in_config:
1✔
154
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
155

156
                raise ValueError(
1✔
157
                    f"The available channels do not match the schema of file {filename}. "
158
                    f"{' '.join(errors)}. "
159
                    "Please check the configuration file or set force_recreate to True.",
160
                )
161

162
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
163
        """
164
        Creates the electron and timed buffer files from the raw H5 file.
165
        First the dataframe is accessed and forward filled in the non-electron channels.
166
        Then the data types are set. For the electron dataframe, all values not in the electron
167
        channels are dropped. For the timed dataframe, only the train and pulse channels are taken
168
        and it pulse resolved (no longer electron resolved). Both are saved as parquet files.
169

170
        Args:
171
            paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files.
172
        """
173

174
        # Create a DataFrameCreator instance and the h5 file
175
        df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
176

177
        # forward fill all the non-electron channels
178
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
179

180
        # Reset the index of the DataFrame and save both the electron and timed dataframes
181
        # electron resolved dataframe
182
        electron_channels = get_channels(self._config, "per_electron")
1✔
183
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
184
        df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet(
1✔
185
            paths["electron"],
186
        )
187

188
        # timed dataframe
189
        # drop the electron channels and only take rows with the first electronId
190
        df_timed = df[self.fill_channels].loc[:, :, 0]
1✔
191
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
192
        df_timed.astype(dtypes).reset_index().to_parquet(paths["timed"])
1✔
193
        logger.debug(f"Processed {paths['raw'].stem}")
1✔
194

195
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
196
        """
197
        Creates the buffer files that are missing.
198

199
        Args:
200
            force_recreate (bool): Flag to force recreation of buffer files.
201
            debug (bool): Flag to enable debug mode, which serializes the creation.
202
        """
203
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
204
        logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
205
        n_cores = min(len(file_sets), self.n_cores)
1✔
206
        if n_cores > 0:
1✔
207
            if debug:
1✔
208
                for file_set in file_sets:
1✔
209
                    self._save_buffer_file(file_set)
1✔
210
            else:
211
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
212
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
213
                )
214

215
    def _get_dataframes(self) -> None:
1✔
216
        """
217
        Reads the buffer files from a folder.
218

219
        First the buffer files are read as a dask dataframe is accessed.
220
        The dataframe is forward filled lazily with non-electron channels.
221
        For the electron dataframe, all values not in the electron channels
222
        are dropped, and splits the sector ID from the DLD time.
223
        For the timed dataframe, only the train and pulse channels are taken and
224
        it pulse resolved (no longer electron resolved). If time_index is True,
225
        the timeIndex is calculated and set as the index (slow operation).
226
        """
227
        if not self.fp:
1✔
228
            raise FileNotFoundError("Buffer files do not exist.")
1✔
229
        # Loop over the electron and timed dataframes
230
        file_stats = {}
1✔
231
        filling = {}
1✔
232
        for typ in DF_TYP:
1✔
233
            # Read the parquet files into a dask dataframe
234
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
235
            # Get the metadata from the parquet files
236
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
237

238
            # Forward fill the non-electron channels across files
239
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
240
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
241
            df = forward_fill_lazy(
1✔
242
                df=df,
243
                columns=self.fill_channels,
244
                before=overlap,
245
                iterations=iterations,
246
            )
247
            # TODO: This dict should be returned by forward_fill_lazy
248
            filling[typ] = {
1✔
249
                "columns": self.fill_channels,
250
                "overlap": overlap,
251
                "iterations": iterations,
252
            }
253

254
            self.df[typ] = df
1✔
255
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
256
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
257
        if (
1✔
258
            self._config.get("split_sector_id_from_dld_time", False)
259
            and self._config.get("tof_column", "dldTimeSteps") in self.df["electron"].columns
260
        ):
261
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
262
                self.df["electron"],
263
                config=self._config,
264
            )
265
            self.metadata.update(meta)
1✔
266

267
    def process_and_load_dataframe(
1✔
268
        self,
269
        h5_paths: list[Path],
270
        folder: Path,
271
        force_recreate: bool = False,
272
        suffix: str = "",
273
        debug: bool = False,
274
        remove_invalid_files: bool = False,
275
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
276
        """
277
        Runs the buffer file creation process.
278
        Does a schema check on the buffer files and creates them if they are missing.
279
        Performs forward filling and splits the sector ID from the DLD time lazily.
280

281
        Args:
282
            h5_paths (List[Path]): List of paths to H5 files.
283
            folder (Path): Path to the folder for processed files.
284
            force_recreate (bool): Flag to force recreation of buffer files.
285
            suffix (str): Suffix for buffer file names.
286
            debug (bool): Flag to enable debug mode.):
287

288
        Returns:
289
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
290
        """
291
        self.fp = BufferFilePaths(self._config, h5_paths, folder, suffix, remove_invalid_files)
1✔
292

293
        if not force_recreate:
1✔
294
            schema_set = set(
1✔
295
                get_channels(self._config, formats="all", index=True, extend_aux=True),
296
            )
297
            self._schema_check(self.fp["electron"], schema_set)
1✔
298
            schema_set = set(
1✔
299
                get_channels(
300
                    self._config,
301
                    formats=["per_pulse", "per_train"],
302
                    index=True,
303
                    extend_aux=True,
304
                ),
305
            ) - {"electronId"}
306
            self._schema_check(self.fp["timed"], schema_set)
1✔
307

308
        self._save_buffer_files(force_recreate, debug)
1✔
309

310
        self._get_dataframes()
1✔
311

312
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc