• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 10064447099

23 Jul 2024 06:34PM UTC coverage: 92.688% (-0.01%) from 92.701%
10064447099

Pull #469

github

web-flow
Merge pull request #479 from OpenCOMPES/flash-minor-changes

Flash minor changes (Merge to #469)
Pull Request #469: Update to the BufferHandler

234 of 245 new or added lines in 13 files covered. (95.51%)

1 existing line in 1 file now uncovered.

7073 of 7631 relevant lines covered (92.69%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.98
/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5

6
import dask.dataframe as dd
1✔
7
import pyarrow.parquet as pq
1✔
8
from joblib import delayed
1✔
9
from joblib import Parallel
1✔
10

11
from sed.core.dfops import forward_fill_lazy
1✔
12
from sed.loader.flash.dataframe import DataFrameCreator
1✔
13
from sed.loader.flash.utils import get_channels
1✔
14
from sed.loader.flash.utils import get_dtypes
1✔
15
from sed.loader.utils import get_parquet_metadata
1✔
16
from sed.loader.utils import split_dld_time_from_sector_id
1✔
17

18

19
DF_TYP = ["electron", "timed"]
1✔
20

21

22
class BufferFilePaths:
1✔
23
    """
24
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
25
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
26
    and the electron and timed buffer files.
27

28
    Structure of the file sets:
29
    {
30
        "raw": Path to the H5 file,
31
        "electron": Path to the electron buffer file,
32
        "timed": Path to the timed buffer file,
33
    }
34
    """
35

36
    def __init__(self, h5_paths: list[Path], folder: Path, suffix: str) -> None:
1✔
37
        """Initializes the BufferFilePaths.
38

39
        Args:
40
            h5_paths (list[Path]): List of paths to the H5 files.
41
            folder (Path): Path to the folder for processed files.
42
            suffix (str): Suffix for buffer file names.
43
        """
44
        suffix = f"_{suffix}" if suffix else ""
1✔
45
        folder = folder / "buffer"
1✔
46
        folder.mkdir(parents=True, exist_ok=True)
1✔
47

48
        # a list of file sets containing the paths to the raw, electron and timed buffer files
49
        self._file_paths = [
1✔
50
            {
51
                "raw": h5_path,
52
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
53
            }
54
            for h5_path in h5_paths
55
        ]
56

57
    def __getitem__(self, key) -> list[Path]:
1✔
58
        if isinstance(key, str):
1✔
59
            return [file_set[key] for file_set in self._file_paths]
1✔
NEW
60
        return self._file_paths[key]
×
61

62
    def __iter__(self):
1✔
63
        return iter(self._file_paths)
1✔
64

65
    def __len__(self):
1✔
66
        return len(self._file_paths)
1✔
67

68
    def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
69
        """Returns a list of file sets that need to be processed."""
70
        if force_recreate:
1✔
71
            return self._file_paths
1✔
72
        return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)]
1✔
73

74

75
class BufferHandler:
1✔
76
    """
77
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
78
    """
79

80
    def __init__(
1✔
81
        self,
82
        config: dict,
83
    ) -> None:
84
        """
85
        Initializes the BufferHandler.
86

87
        Args:
88
            config (dict): The configuration dictionary.
89
        """
90
        self._config: dict = config["dataframe"]
1✔
91
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
92
        self.fp: BufferFilePaths = None
1✔
93
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
94
        self.fill_channels: list[str] = get_channels(
1✔
95
            self._config,
96
            ["per_pulse", "per_train"],
97
            extend_aux=True,
98
        )
99
        self.metadata: dict = {}
1✔
100

101
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
102
        """
103
        Checks the schema of the Parquet files.
104

105
        Raises:
106
            ValueError: If the schema of the Parquet files does not match the configuration.
107
        """
108
        existing = [file for file in files if file.exists()]
1✔
109
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
110

111
        for filename, schema in zip(existing, parquet_schemas):
1✔
112
            schema_set = set(schema.names)
1✔
113
            if schema_set != expected_schema_set:
1✔
114
                missing_in_parquet = expected_schema_set - schema_set
1✔
115
                missing_in_config = schema_set - expected_schema_set
1✔
116

117
                errors = []
1✔
118
                if missing_in_parquet:
1✔
119
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
120
                if missing_in_config:
1✔
121
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
122

123
                raise ValueError(
1✔
124
                    f"The available channels do not match the schema of file {filename}. "
125
                    f"{' '.join(errors)}. "
126
                    "Please check the configuration file or set force_recreate to True.",
127
                )
128

129
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
130
        """
131
        Creates the electron and timed buffer files from the raw H5 file.
132
        First the dataframe is accessed and forward filled in the non-electron channels.
133
        Then the data types are set. For the electron dataframe, all values not in the electron
134
        channels are dropped. For the timed dataframe, only the train and pulse channels are taken
135
        and it pulse resolved (no longer electron resolved). Both are saved as parquet files.
136

137
        Args:
138
            paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files.
139
        """
140

141
        # Create a DataFrameCreator instance and the h5 file
142
        df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
143

144
        # forward fill all the non-electron channels
145
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
146

147
        # Reset the index of the DataFrame and save both the electron and timed dataframes
148
        # electron resolved dataframe
149
        electron_channels = get_channels(self._config, "per_electron")
1✔
150
        dtypes = get_dtypes(self._config, df.columns.values)
1✔
151
        df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet(
1✔
152
            paths["electron"],
153
        )
154

155
        # timed dataframe
156
        # drop the electron channels and only take rows with the first electronId
157
        df_timed = df[self.fill_channels].loc[:, :, 0]
1✔
158
        dtypes = get_dtypes(self._config, df_timed.columns.values)
1✔
159
        df_timed.astype(dtypes).reset_index().to_parquet(paths["timed"])
1✔
160

161
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
162
        """
163
        Creates the buffer files that are missing.
164

165
        Args:
166
            force_recreate (bool): Flag to force recreation of buffer files.
167
            debug (bool): Flag to enable debug mode, which serializes the creation.
168
        """
169
        file_sets = self.fp.file_sets_to_process(force_recreate)
1✔
170
        print(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
1✔
171
        n_cores = min(len(file_sets), self.n_cores)
1✔
172
        if n_cores > 0:
1✔
173
            if debug:
1✔
174
                for file_set in file_sets:
1✔
175
                    self._save_buffer_file(file_set)
1✔
176
                    print(f"Processed {file_set['raw'].stem}")
1✔
177
            else:
178
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
179
                    delayed(self._save_buffer_file)(file_set) for file_set in file_sets
180
                )
181

182
    def _get_dataframes(self) -> None:
1✔
183
        """
184
        Reads the buffer files from a folder.
185

186
        First the buffer files are read as a dask dataframe is accessed.
187
        The dataframe is forward filled lazily with non-electron channels.
188
        For the electron dataframe, all values not in the electron channels
189
        are dropped, and splits the sector ID from the DLD time.
190
        For the timed dataframe, only the train and pulse channels are taken and
191
        it pulse resolved (no longer electron resolved). If time_index is True,
192
        the timeIndex is calculated and set as the index (slow operation).
193
        """
194
        # Loop over the electron and timed dataframes
195
        file_stats = {}
1✔
196
        filling = {}
1✔
197
        for typ in DF_TYP:
1✔
198
            # Read the parquet files into a dask dataframe
199
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
200
            # Get the metadata from the parquet files
201
            file_stats[typ] = get_parquet_metadata(self.fp[typ])
1✔
202

203
            # Forward fill the non-electron channels across files
204
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
205
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
206
            df = forward_fill_lazy(
1✔
207
                df=df,
208
                columns=self.fill_channels,
209
                before=overlap,
210
                iterations=iterations,
211
            )
212
            # TODO: This dict should be returned by forward_fill_lazy
213
            filling[typ] = {
1✔
214
                "columns": self.fill_channels,
215
                "overlap": overlap,
216
                "iterations": iterations,
217
            }
218

219
            self.df[typ] = df
1✔
220
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
221
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
222
        if self._config.get("split_sector_id_from_dld_time", False):
1✔
223
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
224
                self.df["electron"],
225
                config=self._config,
226
            )
227
            self.metadata.update(meta)
1✔
228

229
    def process_and_load_dataframe(
1✔
230
        self,
231
        h5_paths: list[Path],
232
        folder: Path,
233
        force_recreate: bool = False,
234
        suffix: str = "",
235
        debug: bool = False,
236
    ) -> tuple[dd.DataFrame, dd.DataFrame]:
237
        """
238
        Runs the buffer file creation process.
239
        Does a schema check on the buffer files and creates them if they are missing.
240
        Performs forward filling and splits the sector ID from the DLD time lazily.
241

242
        Args:
243
            h5_paths (List[Path]): List of paths to H5 files.
244
            folder (Path): Path to the folder for processed files.
245
            force_recreate (bool): Flag to force recreation of buffer files.
246
            suffix (str): Suffix for buffer file names.
247
            debug (bool): Flag to enable debug mode.):
248

249
        Returns:
250
            Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
251
        """
252
        self.fp = BufferFilePaths(h5_paths, folder, suffix)
1✔
253

254
        if not force_recreate:
1✔
255
            schema_set = set(
1✔
256
                get_channels(self._config, formats="all", index=True, extend_aux=True),
257
            )
258
            self._schema_check(self.fp["electron"], schema_set)
1✔
259
            schema_set = set(
1✔
260
                get_channels(
261
                    self._config,
262
                    formats=["per_pulse", "per_train"],
263
                    index=True,
264
                    extend_aux=True,
265
                ),
266
            ) - {"electronId"}
267
            self._schema_check(self.fp["timed"], schema_set)
1✔
268

269
        self._save_buffer_files(force_recreate, debug)
1✔
270

271
        self._get_dataframes()
1✔
272

273
        return self.df["electron"], self.df["timed"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc