• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9852407573

09 Jul 2024 06:57AM UTC coverage: 92.453% (-0.02%) from 92.472%
9852407573

Pull #469

github

web-flow
Merge branch 'v1_feature_branch' into fix-459
Pull Request #469: Update to the BufferHandler

163 of 171 new or added lines in 10 files covered. (95.32%)

5 existing lines in 3 files now uncovered.

6872 of 7433 relevant lines covered (92.45%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.95
/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from pathlib import Path
1✔
5

6
import dask.dataframe as dd
1✔
7
import pyarrow.parquet as pq
1✔
8
from joblib import delayed
1✔
9
from joblib import Parallel
1✔
10

11
from sed.core.dfops import forward_fill_lazy
1✔
12
from sed.loader.flash.dataframe import DataFrameCreator
1✔
13
from sed.loader.flash.utils import get_channels
1✔
14
from sed.loader.flash.utils import get_dtypes
1✔
15
from sed.loader.utils import get_parquet_metadata
1✔
16
from sed.loader.utils import split_dld_time_from_sector_id
1✔
17

18

19
DF_TYP = ["electron", "timed"]
1✔
20

21

22
class BufferFilePaths:
1✔
23
    """
24
    A class for handling the paths to the raw and buffer files of electron and timed dataframes.
25
    A list of file sets (dict) are created for each H5 file containing the paths to the raw file
26
    and the electron and timed buffer files.
27

28
    Structure of the file sets:
29
    {
30
        "raw": Path to the H5 file,
31
        "electron": Path to the electron buffer file,
32
        "timed": Path to the timed buffer file,
33
    }
34
    """
35

36
    def __init__(self, h5_paths: list[Path], folder: Path, suffix: str) -> None:
1✔
37
        """Initializes the BufferFilePaths.
38

39
        Args:
40
            h5_paths (list[Path]): List of paths to the H5 files.
41
            folder (Path): Path to the folder for processed files.
42
            suffix (str): Suffix for buffer file names.
43
        """
44
        suffix = f"_{suffix}" if suffix else ""
1✔
45
        folder = folder / "buffer"
1✔
46
        folder.mkdir(parents=True, exist_ok=True)
1✔
47

48
        # a list of file sets containing the paths to the raw, electron and timed buffer files
49
        self._file_paths = [
1✔
50
            {
51
                "raw": h5_path,
52
                **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP},
53
            }
54
            for h5_path in h5_paths
55
        ]
56

57
    def __getitem__(self, key) -> list[Path]:
1✔
58
        if isinstance(key, str):
1✔
59
            return [file_set[key] for file_set in self._file_paths]
1✔
NEW
60
        return self._file_paths[key]
×
61

62
    def __iter__(self):
1✔
63
        return iter(self._file_paths)
1✔
64

65
    def __len__(self):
1✔
66
        return len(self._file_paths)
1✔
67

68
    def to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]:
1✔
69
        """Returns a list of file sets that need to be processed."""
70
        if not force_recreate:
1✔
71
            return [
1✔
72
                file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)
73
            ]
74
        else:
75
            return list(self)
1✔
76

77

78
class BufferHandler:
1✔
79
    """
80
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
81
    """
82

83
    def __init__(
1✔
84
        self,
85
        config: dict,
86
    ) -> None:
87
        """
88
        Initializes the BufferHandler.
89

90
        Args:
91
            config (dict): The configuration dictionary.
92
        """
93
        self._config: dict = config["dataframe"]
1✔
94
        self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
95
        self.fp: BufferFilePaths = None
1✔
96
        self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP}
1✔
97
        self.fill_channels: list[str] = get_channels(
1✔
98
            self._config["channels"],
99
            ["per_pulse", "per_train"],
100
            extend_aux=True,
101
        )
102
        self.metadata: dict = {}
1✔
103

104
    def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
1✔
105
        """
106
        Checks the schema of the Parquet files.
107

108
        Raises:
109
            ValueError: If the schema of the Parquet files does not match the configuration.
110
        """
111
        existing = [file for file in files if file.exists()]
1✔
112
        parquet_schemas = [pq.read_schema(file) for file in existing]
1✔
113

114
        for filename, schema in zip(existing, parquet_schemas):
1✔
115
            schema_set = set(schema.names)
1✔
116
            if schema_set != expected_schema_set:
1✔
117
                missing_in_parquet = expected_schema_set - schema_set
1✔
118
                missing_in_config = schema_set - expected_schema_set
1✔
119

120
                errors = []
1✔
121
                if missing_in_parquet:
1✔
122
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
123
                if missing_in_config:
1✔
124
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
125

126
                raise ValueError(
1✔
127
                    f"The available channels do not match the schema of file {filename}. "
128
                    f"{' '.join(errors)}. "
129
                    "Please check the configuration file or set force_recreate to True.",
130
                )
131

132
    def _save_buffer_file(self, paths: dict[str, Path]) -> None:
1✔
133
        """
134
        Creates the electron and timed buffer files from the raw H5 file.
135
        First the dataframe is accessed and forward filled in the non-electron channels.
136
        Then the data types are set. For the electron dataframe, all values not in the electron
137
        channels are dropped. For the timed dataframe, only the train and pulse channels are taken
138
        and it pulse resolved (no longer electron resolved). Both are saved as parquet files.
139

140
        Args:
141
            paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files.
142
        """
143

144
        # Create a DataFrameCreator instance and the h5 file
145
        df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df
1✔
146

147
        # forward fill all the non-electron channels
148
        df[self.fill_channels] = df[self.fill_channels].ffill()
1✔
149

150
        # Reset the index of the DataFrame and save both the electron and timed dataframes
151
        # electron resolved dataframe
152
        electron_channels = get_channels(self._config["channels"], "per_electron")
1✔
153
        dtypes = get_dtypes(self._config["channels"], "all")
1✔
154
        df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet(
1✔
155
            paths["electron"],
156
        )
157

158
        # timed dataframe
159
        dtypes = get_dtypes(self._config["channels"], ["per_pulse", "per_train"])
1✔
160
        # drop the electron channels and only take rows with the first electronId
161
        df[self.fill_channels].loc[:, :, 0].astype(dtypes).reset_index().to_parquet(paths["timed"])
1✔
162

163
    def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
1✔
164
        """
165
        Creates the buffer files that are missing.
166

167
        Args:
168
            force_recreate (bool): Flag to force recreation of buffer files.
169
            debug (bool): Flag to enable debug mode, which serializes the creation.
170
        """
171
        to_process = self.fp.to_process(force_recreate)
1✔
172
        print(f"Reading files: {len(to_process)} new files of {len(self.fp)} total.")
1✔
173
        n_cores = min(len(to_process), self.n_cores)
1✔
174
        if n_cores > 0:
1✔
175
            if debug:
1✔
176
                for file_set in to_process:
1✔
177
                    self._save_buffer_file(file_set)
1✔
178
            else:
179
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
180
                    delayed(self._save_buffer_file)(file_set) for file_set in to_process
181
                )
182

183
    def _fill_dataframes(self):
1✔
184
        """
185
        Reads all parquet files into one dataframe using dask and fills NaN values.
186
        """
187
        # Loop over the electron and timed dataframes
188
        file_stats = {}
1✔
189
        filling = {}
1✔
190
        for typ in DF_TYP:
1✔
191
            # Read the parquet files into a dask dataframe
192
            df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
1✔
193
            # Get the metadata from the parquet files
194
            file_stats[typ] = get_parquet_metadata(
1✔
195
                self.fp[typ],
196
                time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"),
197
            )
198

199
            # Forward fill the non-electron channels across files
200
            overlap = min(file["num_rows"] for file in file_stats[typ].values())
1✔
201
            iterations = self._config.get("forward_fill_iterations", 2)
1✔
202
            df = forward_fill_lazy(
1✔
203
                df=df,
204
                columns=self.fill_channels,
205
                before=overlap,
206
                iterations=iterations,
207
            )
208
            # TODO: This dict should be returned by forward_fill_lazy
209
            filling[typ] = {
1✔
210
                "columns": self.fill_channels,
211
                "overlap": overlap,
212
                "iterations": iterations,
213
            }
214

215
            self.df[typ] = df
1✔
216
        self.metadata.update({"file_statistics": file_stats, "filling": filling})
1✔
217

218
    def run(
1✔
219
        self,
220
        h5_paths: list[Path],
221
        folder: Path,
222
        force_recreate: bool = False,
223
        suffix: str = "",
224
        debug: bool = False,
225
    ) -> None:
226
        """
227
        Runs the buffer file creation process.
228
        Does a schema check on the buffer files and creates them if they are missing.
229
        Performs forward filling and splits the sector ID from the DLD time lazily.
230

231
        Args:
232
            h5_paths (List[Path]): List of paths to H5 files.
233
            folder (Path): Path to the folder for processed files.
234
            force_recreate (bool): Flag to force recreation of buffer files.
235
            suffix (str): Suffix for buffer file names.
236
            debug (bool): Flag to enable debug mode.):
237
        """
238
        self.fp = BufferFilePaths(h5_paths, folder, suffix)
1✔
239

240
        if not force_recreate:
1✔
241
            schema_set = set(
1✔
242
                get_channels(self._config["channels"], formats="all", index=True, extend_aux=True),
243
            )
244
            self._schema_check(self.fp["electron"], schema_set)
1✔
245
            schema_set = set(
1✔
246
                get_channels(
247
                    self._config["channels"],
248
                    formats=["per_pulse", "per_train"],
249
                    index=True,
250
                    extend_aux=True,
251
                ),
252
            ) - {"electronId"}
253
            self._schema_check(self.fp["timed"], schema_set)
1✔
254

255
        self._save_buffer_files(force_recreate, debug)
1✔
256

257
        self._fill_dataframes()
1✔
258

259
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
260
        if self._config.get("split_sector_id_from_dld_time", False):
1✔
261
            self.df["electron"], meta = split_dld_time_from_sector_id(
1✔
262
                self.df["electron"],
263
                config=self._config,
264
            )
265
            self.metadata.update(meta)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc