• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 10493123392

21 Aug 2024 03:46PM UTC coverage: 92.722% (+0.03%) from 92.688%
10493123392

Pull #484

github

zain-sohail
filter out all negative pulse values as they are invalid
Pull Request #484: FlashLoader: Remove invalid files by catching exception

67 of 70 new or added lines in 6 files covered. (95.71%)

3 existing lines in 1 file now uncovered.

7122 of 7681 relevant lines covered (92.72%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.23
/tests/loader/flash/test_buffer_handler.py
1
"""Test cases for the BufferHandler class in the Flash module."""
2
from copy import deepcopy
1✔
3
from pathlib import Path
1✔
4

5
import numpy as np
1✔
6
import pandas as pd
1✔
7
import pytest
1✔
8
from h5py import File
1✔
9

10
from sed.loader.flash.buffer_handler import BufferFilePaths
1✔
11
from sed.loader.flash.buffer_handler import BufferHandler
1✔
12
from sed.loader.flash.utils import get_channels
1✔
13
from sed.loader.flash.utils import InvalidFileError
1✔
14

15

16
def create_parquet_dir(config: dict, folder: str) -> Path:
1✔
17
    """
18
    Creates a directory for storing Parquet files based on the provided configuration
19
    and folder name.
20
    """
21

22
    parquet_path = Path(config["core"]["paths"]["processed"])
1✔
23
    parquet_path = parquet_path.joinpath(folder)
1✔
24
    parquet_path.mkdir(parents=True, exist_ok=True)
1✔
25
    return parquet_path
1✔
26

27

28
def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None:
1✔
29
    """
30
    Test the BufferFilePath's ability to identify files that need to be read and
31
    manage buffer file paths using a directory structure.
32

33
    This test performs several checks to ensure the BufferFilePath correctly identifies
34
    which HDF5 files need to be read and properly manages the paths for saving buffer
35
    files. It follows these steps:
36
    1. Creates a directory structure for storing buffer files and initializes the BufferHandler.
37
    2. Checks if the file_sets_to_process method populates the dict of missing file sets and
38
       verify that initially, all provided files are considered missing.
39
    3. Checks that the paths for saving buffer files are correctly generated.
40
    4. Creates a single buffer file and reruns file_sets_to_process to ensure that the BufferHandler
41
        recognizes one less missing file.
42
    5. Checks if the force_recreate parameter forces the BufferHandler to consider all files
43
    6. Cleans up by removing the created buffer file.
44
    7. Tests the handling of suffix in buffer file names (for multidetector setups) by rerunning
45
        the checks with modified file name parameters.
46
    """
47
    folder = create_parquet_dir(config, "get_files_to_read")
1✔
48
    fp = BufferFilePaths(config, h5_paths, folder, suffix="", remove_invalid_files=False)
1✔
49

50
    # check that all files are to be read
51
    assert len(fp.file_sets_to_process()) == len(h5_paths)
1✔
52
    print(folder)
1✔
53
    # create expected paths
54
    expected_buffer_electron_paths = [
1✔
55
        folder / f"buffer/electron_{Path(path).stem}" for path in h5_paths
56
    ]
57
    expected_buffer_timed_paths = [folder / f"buffer/timed_{Path(path).stem}" for path in h5_paths]
1✔
58

59
    # check that all buffer paths are correct
60
    assert np.all(fp["electron"] == expected_buffer_electron_paths)
1✔
61
    assert np.all(fp["timed"] == expected_buffer_timed_paths)
1✔
62

63
    # create a single buffer file to check if it changes
64
    path = {
1✔
65
        "raw": h5_paths[0],
66
        "electron": expected_buffer_electron_paths[0],
67
        "timed": expected_buffer_timed_paths[0],
68
    }
69
    bh = BufferHandler(config)
1✔
70
    bh._save_buffer_file(path)
1✔
71

72
    # check again for files to read and expect one less file
73
    fp = BufferFilePaths(config, h5_paths, folder, suffix="", remove_invalid_files=False)
1✔
74
    # check that only one file is to be read
75
    assert len(fp.file_sets_to_process()) == len(h5_paths) - 1
1✔
76

77
    # check that both files are to be read if force_recreate is set to True
78
    assert len(fp.file_sets_to_process(force_recreate=True)) == len(h5_paths)
1✔
79

80
    # remove buffer files
81
    Path(path["electron"]).unlink()
1✔
82
    Path(path["timed"]).unlink()
1✔
83

84
    # Test for adding a suffix
85
    fp = BufferFilePaths(config, h5_paths, folder, "suffix", remove_invalid_files=False)
1✔
86

87
    # expected buffer paths with prefix and suffix
88
    for typ in ["electron", "timed"]:
1✔
89
        expected_buffer_paths = [
1✔
90
            folder / "buffer" / f"{typ}_{Path(path).stem}_suffix" for path in h5_paths
91
        ]
92
        assert np.all(fp[typ] == expected_buffer_paths)
1✔
93

94

95
def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None:
1✔
96
    """
97
    Test function to verify schema mismatch handling in the FlashLoader's 'read_dataframe' method.
98

99
    The test validates the error handling mechanism when the available channels do not match the
100
    schema of the existing parquet files.
101

102
    Test Steps:
103
    - Attempt to read a dataframe after adding a new channel 'gmdTunnel2' to the configuration.
104
    - Check for an expected error related to the mismatch between available channels and schema.
105
    - Force recreation of dataframe with the added channel, ensuring successful dataframe
106
      creation.
107
    - Simulate a missing channel scenario by removing 'gmdTunnel2' from the configuration.
108
    - Check for an error indicating a missing channel in the configuration.
109
    - Clean up created buffer files after the test.
110
    """
111
    folder = create_parquet_dir(config, "schema_mismatch")
1✔
112
    bh = BufferHandler(config)
1✔
113
    bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
1✔
114

115
    # Manipulate the configuration to introduce a new channel 'gmdTunnel2'
116
    config_dict = config
1✔
117
    config_dict["dataframe"]["channels"]["gmdTunnel2"] = {
1✔
118
        "index_key": "/FL1/Photon Diagnostic/GMD/Pulse resolved energy/energy tunnel/index",
119
        "dataset_key": "/FL1/Photon Diagnostic/GMD/Pulse resolved energy/energy tunnel/value",
120
        "format": "per_pulse",
121
        "slice": 0,
122
    }
123

124
    # Reread the dataframe with the modified configuration, expecting a schema mismatch error
125
    with pytest.raises(ValueError) as e:
1✔
126
        bh = BufferHandler(config)
1✔
127
        bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
1✔
128
    expected_error = e.value.args[0]
1✔
129

130
    # Validate the specific error messages for schema mismatch
131
    assert "The available channels do not match the schema of file" in expected_error
1✔
132
    assert "Missing in parquet: {'gmdTunnel2'}" in expected_error
1✔
133
    assert "Please check the configuration file or set force_recreate to True." in expected_error
1✔
134

135
    # Force recreation of the dataframe, including the added channel 'gmdTunnel2'
136
    bh = BufferHandler(config)
1✔
137
    bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, force_recreate=True, debug=True)
1✔
138

139
    # Remove 'gmdTunnel2' from the configuration to simulate a missing channel scenario
140
    del config["dataframe"]["channels"]["gmdTunnel2"]
1✔
141
    # also results in error but different from before
142
    with pytest.raises(ValueError) as e:
1✔
143
        # Attempt to read the dataframe again to check for the missing channel error
144
        bh = BufferHandler(config)
1✔
145
        bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
1✔
146

147
    expected_error = e.value.args[0]
1✔
148
    # Check for the specific error message indicating a missing channel in the configuration
149
    assert "Missing in config: {'gmdTunnel2'}" in expected_error
1✔
150

151
    # Clean up created buffer files after the test
152
    for path in bh.fp["electron"]:
1✔
153
        path.unlink()
1✔
154
    for path in bh.fp["timed"]:
1✔
155
        path.unlink()
1✔
156

157

158
def test_save_buffer_files(config: dict, h5_paths: list[Path]) -> None:
1✔
159
    """
160
    Test the BufferHandler's ability to save buffer files serially and in parallel.
161

162
    This test ensures that the BufferHandler can run both serially and in parallel, saving the
163
    output to buffer files, and then it compares the resulting DataFrames to ensure they are
164
    identical. This verifies that parallel processing does not affect the integrity of the data
165
    saved. After the comparison, it cleans up by removing the created buffer files.
166
    """
167
    folder_serial = create_parquet_dir(config, "save_buffer_files_serial")
1✔
168
    bh_serial = BufferHandler(config)
1✔
169
    bh_serial.process_and_load_dataframe(h5_paths, folder_serial, debug=True)
1✔
170

171
    folder_parallel = create_parquet_dir(config, "save_buffer_files_parallel")
1✔
172
    bh_parallel = BufferHandler(config)
1✔
173
    bh_parallel.process_and_load_dataframe(h5_paths, folder_parallel)
1✔
174

175
    df_serial = pd.read_parquet(folder_serial)
1✔
176
    df_parallel = pd.read_parquet(folder_parallel)
1✔
177

178
    pd.testing.assert_frame_equal(df_serial, df_parallel)
1✔
179

180
    # remove buffer files
181
    for df_type in ["electron", "timed"]:
1✔
182
        for path in bh_serial.fp[df_type]:
1✔
183
            path.unlink()
1✔
184
        for path in bh_parallel.fp[df_type]:
1✔
185
            path.unlink()
1✔
186

187

188
def test_save_buffer_files_exception(
1✔
189
    config: dict,
190
    h5_paths: list[Path],
191
    h5_file_copy: File,
192
    h5_file2_copy: File,
193
    tmp_path: Path,
194
) -> None:
195
    """Test function to verify exception handling in the BufferHandler's
196
    'process_and_load_dataframe' method. The test checks for exceptions raised due to missing
197
    channels in the configuration and empty datasets.
198
    Test Steps:
199
    - Create a directory structure for storing buffer files and initialize the BufferHandler.
200
    - Check for an exception when a channel is missing in the configuration.
201
    - Create an empty dataset in the HDF5 file to simulate an invalid file scenario.
202
    - Check for an expected error related to the missing index dataset that invalidates the file.
203
    - Check for an error when 'remove_invalid_files' is set to True and the file is invalid.
204
    - Create an empty dataset in the second HDF5 file to simulate an invalid file scenario.
205
    - Check for an error when 'remove_invalid_files' is set to True and the file is invalid.
206
    - Check for an error when only a single file is provided, and the file is not buffered.
207
    """
208
    folder_parallel = create_parquet_dir(config, "save_buffer_files_exception")
1✔
209
    config_ = deepcopy(config)
1✔
210

211
    # check exception in case of missing channel in config
212
    channel = "dldPosX"
1✔
213
    del config_["dataframe"]["channels"][channel]["index_key"]
1✔
214

215
    # testing exception in parallel execution
216
    with pytest.raises(ValueError):
1✔
217
        bh = BufferHandler(config_)
1✔
218
        bh.process_and_load_dataframe(h5_paths, folder_parallel, debug=False)
1✔
219

220
    # check exception message with empty dataset
221
    config_ = deepcopy(config)
1✔
222
    channel = "testChannel"
1✔
223
    channel_index_key = "test/dataset/empty/index"
1✔
224
    empty_dataset_key = "test/dataset/empty/value"
1✔
225
    config_["dataframe"]["channels"][channel] = {
1✔
226
        "index_key": channel_index_key,
227
        "dataset_key": empty_dataset_key,
228
        "format": "per_train",
229
    }
230

231
    # create an empty dataset
232
    h5_file_copy.create_dataset(
1✔
233
        name=empty_dataset_key,
234
        shape=0,
235
    )
236

237
    # expect invalid file error because of missing index dataset that invalidates entire file
238
    with pytest.raises(InvalidFileError):
1✔
239
        bh = BufferHandler(config_)
1✔
240
        bh.process_and_load_dataframe(
1✔
241
            [tmp_path / "copy.h5"],
242
            folder_parallel,
243
            debug=False,
244
            force_recreate=True,
245
        )
246

247
    # create an empty dataset
248
    h5_file2_copy.create_dataset(
1✔
249
        name=channel_index_key,
250
        shape=0,
251
    )
252
    h5_file2_copy.create_dataset(
1✔
253
        name=empty_dataset_key,
254
        shape=0,
255
    )
256

257
    # if remove_invalid_files is True, the file should be removed and no error should be raised
258
    bh = BufferHandler(config_)
1✔
259
    try:
1✔
260
        bh.process_and_load_dataframe(
1✔
261
            [tmp_path / "copy.h5", tmp_path / "copy2.h5"],
262
            folder_parallel,
263
            debug=False,
264
            force_recreate=True,
265
            remove_invalid_files=True,
266
        )
NEW
267
    except InvalidFileError:
×
NEW
268
        assert (
×
269
            False
270
        ), "InvalidFileError should not be raised when remove_invalid_files is set to True"
271

272
    # with only a single file, the file will not be buffered so a FileNotFoundError should be raised
273
    with pytest.raises(FileNotFoundError):
1✔
274
        bh.process_and_load_dataframe(
1✔
275
            [tmp_path / "copy.h5"],
276
            folder_parallel,
277
            debug=False,
278
            force_recreate=True,
279
            remove_invalid_files=True,
280
        )
281

282

283
def test_get_filled_dataframe(config: dict, h5_paths: list[Path]) -> None:
1✔
284
    """Test function to verify the creation of a filled dataframe from the buffer files."""
285
    folder = create_parquet_dir(config, "get_filled_dataframe")
1✔
286
    bh = BufferHandler(config)
1✔
287
    bh.process_and_load_dataframe(h5_paths, folder)
1✔
288

289
    df = pd.read_parquet(folder)
1✔
290

291
    assert np.all(list(bh.df["electron"].columns) == list(df.columns) + ["dldSectorID"])
1✔
292

293
    channel_pulse = set(
1✔
294
        get_channels(
295
            config["dataframe"],
296
            formats=["per_pulse", "per_train"],
297
            index=True,
298
            extend_aux=True,
299
        ),
300
    ) - {"electronId"}
301
    assert np.all(set(bh.df["timed"].columns) == channel_pulse)
1✔
302
    # remove buffer files
303
    for df_type in ["electron", "timed"]:
1✔
304
        for path in bh.fp[df_type]:
1✔
305
            path.unlink()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc