• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9987723124

18 Jul 2024 07:58AM UTC coverage: 92.701% (+0.8%) from 91.936%
9987723124

Pull #437

github

rettigl
fix energy calibration and delay range
Pull Request #437: Upgrade to V1

1174 of 1217 new or added lines in 50 files covered. (96.47%)

2 existing lines in 2 files now uncovered.

7061 of 7617 relevant lines covered (92.7%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.78
/sed/loader/flash/buffer_handler.py
1
from __future__ import annotations
1✔
2

3
import os
1✔
4
from itertools import compress
1✔
5
from pathlib import Path
1✔
6

7
import dask.dataframe as dd
1✔
8
import pyarrow.parquet as pq
1✔
9
from joblib import delayed
1✔
10
from joblib import Parallel
1✔
11

12
from sed.core.dfops import forward_fill_lazy
1✔
13
from sed.loader.flash.dataframe import DataFrameCreator
1✔
14
from sed.loader.flash.utils import get_channels
1✔
15
from sed.loader.flash.utils import initialize_paths
1✔
16
from sed.loader.utils import get_parquet_metadata
1✔
17
from sed.loader.utils import split_dld_time_from_sector_id
1✔
18

19

20
class BufferHandler:
1✔
21
    """
22
    A class for handling the creation and manipulation of buffer files using DataFrameCreator.
23
    """
24

25
    def __init__(
1✔
26
        self,
27
        config: dict,
28
    ) -> None:
29
        """
30
        Initializes the BufferHandler.
31

32
        Args:
33
            config (dict): The configuration dictionary.
34
        """
35
        self._config = config["dataframe"]
1✔
36
        self.n_cores = config["core"].get("num_cores", os.cpu_count() - 1)
1✔
37

38
        self.buffer_paths: list[Path] = []
1✔
39
        self.missing_h5_files: list[Path] = []
1✔
40
        self.save_paths: list[Path] = []
1✔
41

42
        self.df_electron: dd.DataFrame = None
1✔
43
        self.df_pulse: dd.DataFrame = None
1✔
44
        self.metadata: dict = {}
1✔
45

46
    def _schema_check(self) -> None:
1✔
47
        """
48
        Checks the schema of the Parquet files.
49

50
        Raises:
51
            ValueError: If the schema of the Parquet files does not match the configuration.
52
        """
53
        existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()]
1✔
54
        parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
1✔
55
        config_schema_set = set(
1✔
56
            get_channels(self._config["channels"], formats="all", index=True, extend_aux=True),
57
        )
58

59
        for filename, schema in zip(existing_parquet_filenames, parquet_schemas):
1✔
60
            # for retro compatibility when sectorID was also saved in buffer
61
            if self._config["sector_id_column"] in schema.names:
1✔
NEW
62
                config_schema_set.add(
×
63
                    self._config["sector_id_column"],
64
                )
65
            schema_set = set(schema.names)
1✔
66
            if schema_set != config_schema_set:
1✔
67
                missing_in_parquet = config_schema_set - schema_set
1✔
68
                missing_in_config = schema_set - config_schema_set
1✔
69

70
                errors = []
1✔
71
                if missing_in_parquet:
1✔
72
                    errors.append(f"Missing in parquet: {missing_in_parquet}")
1✔
73
                if missing_in_config:
1✔
74
                    errors.append(f"Missing in config: {missing_in_config}")
1✔
75

76
                raise ValueError(
1✔
77
                    f"The available channels do not match the schema of file {filename}. "
78
                    f"{' '.join(errors)}. "
79
                    "Please check the configuration file or set force_recreate to True.",
80
                )
81

82
    def _get_files_to_read(
1✔
83
        self,
84
        h5_paths: list[Path],
85
        folder: Path,
86
        prefix: str,
87
        suffix: str,
88
        force_recreate: bool,
89
    ) -> None:
90
        """
91
        Determines the list of files to read and the corresponding buffer files to create.
92

93
        Args:
94
            h5_paths (List[Path]): List of paths to H5 files.
95
            folder (Path): Path to the folder for buffer files.
96
            prefix (str): Prefix for buffer file names.
97
            suffix (str): Suffix for buffer file names.
98
            force_recreate (bool): Flag to force recreation of buffer files.
99
        """
100
        # Getting the paths of the buffer files, with subfolder as buffer and no extension
101
        self.buffer_paths = initialize_paths(
1✔
102
            filenames=[h5_path.stem for h5_path in h5_paths],
103
            folder=folder,
104
            subfolder="buffer",
105
            prefix=prefix,
106
            suffix=suffix,
107
            extension="",
108
        )
109
        # read only the files that do not exist or if force_recreate is True
110
        files_to_read = [
1✔
111
            force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths
112
        ]
113

114
        # Get the list of H5 files to read and the corresponding buffer files to create
115
        self.missing_h5_files = list(compress(h5_paths, files_to_read))
1✔
116
        self.save_paths = list(compress(self.buffer_paths, files_to_read))
1✔
117

118
        print(f"Reading files: {len(self.missing_h5_files)} new files of {len(h5_paths)} total.")
1✔
119

120
    def _save_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
1✔
121
        """
122
        Creates a single buffer file.
123

124
        Args:
125
            h5_path (Path): Path to the H5 file.
126
            parquet_path (Path): Path to the buffer file.
127
        """
128

129
        # Create a DataFrameCreator instance and the h5 file
130
        df = DataFrameCreator(config_dataframe=self._config, h5_path=h5_path).df
1✔
131

132
        # Reset the index of the DataFrame and save it as a parquet file
133
        df.reset_index().to_parquet(parquet_path)
1✔
134

135
    def _save_buffer_files(self, debug: bool) -> None:
1✔
136
        """
137
        Creates the buffer files.
138

139
        Args:
140
            debug (bool): Flag to enable debug mode, which serializes the creation.
141
        """
142
        n_cores = min(len(self.missing_h5_files), self.n_cores)
1✔
143
        paths = zip(self.missing_h5_files, self.save_paths)
1✔
144
        if n_cores > 0:
1✔
145
            if debug:
1✔
146
                for h5_path, parquet_path in paths:
1✔
147
                    self._save_buffer_file(h5_path, parquet_path)
1✔
148
            else:
149
                Parallel(n_jobs=n_cores, verbose=10)(
1✔
150
                    delayed(self._save_buffer_file)(h5_path, parquet_path)
151
                    for h5_path, parquet_path in paths
152
                )
153

154
    def _fill_dataframes(self):
1✔
155
        """
156
        Reads all parquet files into one dataframe using dask and fills NaN values.
157
        """
158
        dataframe = dd.read_parquet(self.buffer_paths, calculate_divisions=True)
1✔
159
        file_metadata = get_parquet_metadata(
1✔
160
            self.buffer_paths,
161
            time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"),
162
        )
163
        self.metadata["file_statistics"] = file_metadata
1✔
164

165
        fill_channels: list[str] = get_channels(
1✔
166
            self._config["channels"],
167
            ["per_pulse", "per_train"],
168
            extend_aux=True,
169
        )
170
        index: list[str] = get_channels(index=True)
1✔
171
        overlap = min(file["num_rows"] for file in file_metadata.values())
1✔
172

173
        dataframe = forward_fill_lazy(
1✔
174
            df=dataframe,
175
            columns=fill_channels,
176
            before=overlap,
177
            iterations=self._config.get("forward_fill_iterations", 2),
178
        )
179
        self.metadata["forward_fill"] = {
1✔
180
            "columns": fill_channels,
181
            "overlap": overlap,
182
            "iterations": self._config.get("forward_fill_iterations", 2),
183
        }
184

185
        # Drop rows with nan values in electron channels
186
        df_electron = dataframe.dropna(
1✔
187
            subset=get_channels(self._config["channels"], ["per_electron"]),
188
        )
189

190
        # Set the dtypes of the channels here as there should be no null values
191
        channel_dtypes = get_channels(self._config["channels"], "all")
1✔
192
        config_channels = self._config["channels"]
1✔
193
        dtypes = {
1✔
194
            channel: config_channels[channel].get("dtype")
195
            for channel in channel_dtypes
196
            if config_channels[channel].get("dtype") is not None
197
        }
198

199
        # Correct the 3-bit shift which encodes the detector ID in the 8s time
200
        if self._config.get("split_sector_id_from_dld_time", False):
1✔
201
            df_electron, meta = split_dld_time_from_sector_id(
1✔
202
                df_electron,
203
                config=self._config,
204
            )
205
            self.metadata.update(meta)
1✔
206

207
        self.df_electron = df_electron.astype(dtypes)
1✔
208
        self.df_pulse = dataframe[index + fill_channels]
1✔
209

210
    def run(
1✔
211
        self,
212
        h5_paths: list[Path],
213
        folder: Path,
214
        force_recreate: bool = False,
215
        prefix: str = "",
216
        suffix: str = "",
217
        debug: bool = False,
218
    ) -> None:
219
        """
220
        Runs the buffer file creation process.
221

222
        Args:
223
            h5_paths (List[Path]): List of paths to H5 files.
224
            folder (Path): Path to the folder for buffer files.
225
            force_recreate (bool): Flag to force recreation of buffer files.
226
            prefix (str): Prefix for buffer file names.
227
            suffix (str): Suffix for buffer file names.
228
            debug (bool): Flag to enable debug mode.):
229
        """
230

231
        self._get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate)
1✔
232

233
        if not force_recreate:
1✔
234
            self._schema_check()
1✔
235

236
        self._save_buffer_files(debug)
1✔
237

238
        self._fill_dataframes()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc