• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 12876237185

20 Jan 2025 09:59PM UTC coverage: 92.167% (+0.4%) from 91.801%
12876237185

Pull #437

github

web-flow
Merge pull request #553 from OpenCOMPES/fix-552

update tests for env variables
Pull Request #437: Upgrade to V1

2227 of 2364 new or added lines in 53 files covered. (94.2%)

4 existing lines in 1 file now uncovered.

7695 of 8349 relevant lines covered (92.17%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.72
/src/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is an amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward-filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed functionality.
9
"""
10
from __future__ import annotations
1✔
11

12
import re
1✔
13
import time
1✔
14
from collections.abc import Sequence
1✔
15
from pathlib import Path
1✔
16

17
import dask.dataframe as dd
1✔
18
from natsort import natsorted
1✔
19

20
from sed.core.logging import set_verbosity
1✔
21
from sed.core.logging import setup_logging
1✔
22
from sed.loader.base.loader import BaseLoader
1✔
23
from sed.loader.flash.buffer_handler import BufferHandler
1✔
24
from sed.loader.flash.instruments import wespe_convert
1✔
25
from sed.loader.flash.metadata import MetadataRetriever
1✔
26

27
# Configure logging
28
logger = setup_logging("flash_loader")
1✔
29

30

31
class FlashLoader(BaseLoader):
1✔
32
    """
33
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
34
    dataformat resolved by both macro and microbunches alongside electrons.
35
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
36

37
    Args:
38
        config (dict, optional): Config dictionary. Defaults to None.
39
        verbose (bool, optional): Option to print out diagnostic information.
40
            Defaults to True.
41
    """
42

43
    __name__ = "flash"
1✔
44

45
    supported_file_types = ["h5"]
1✔
46

47
    def __init__(self, config: dict, verbose: bool = True) -> None:
1✔
48
        """
49
        Initializes the FlashLoader.
50

51
        Args:
52
            config (dict): Configuration dictionary.
53
            verbose (bool, optional): Option to print out diagnostic information.
54
        """
55
        super().__init__(config=config, verbose=verbose)
1✔
56

57
        set_verbosity(logger, self._verbose)
1✔
58

59
        self.instrument: str = self._config["core"].get("instrument", "hextof")  # default is hextof
1✔
60
        self.raw_dir: str = None
1✔
61
        self.processed_dir: str = None
1✔
62

63
    @property
1✔
64
    def verbose(self) -> bool:
1✔
65
        """Accessor to the verbosity flag.
66

67
        Returns:
68
            bool: Verbosity flag.
69
        """
NEW
70
        return self._verbose
×
71

72
    @verbose.setter
1✔
73
    def verbose(self, verbose: bool):
1✔
74
        """Setter for the verbosity.
75

76
        Args:
77
            verbose (bool): Option to turn on verbose output. Sets loglevel to INFO.
78
        """
NEW
79
        self._verbose = verbose
×
NEW
80
        set_verbosity(logger, self._verbose)
×
81

82
    def _initialize_dirs(self) -> None:
1✔
83
        """
84
        Initializes the directories on Maxwell based on configuration. If paths is provided in
85
        the configuration, the raw data directory and parquet data directory are taken from there.
86
        Otherwise, the beamtime_id and year are used to locate the data directories.
87
        The first path that has either online- or express- prefix, or the daq name is taken as the
88
        raw data directory.
89

90
        Raises:
91
            ValueError: If required values are missing from the configuration.
92
            FileNotFoundError: If the raw data directories are not found.
93
        """
94
        # Parses to locate the raw beamtime directory from config file
95
        # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided
96
        if "paths" in self._config["core"]:
1✔
97
            raw_dir = Path(self._config["core"]["paths"].get("raw", ""))
1✔
98
            processed_dir = Path(
1✔
99
                self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")),
100
            )
101

102
        else:
103
            try:
1✔
104
                beamtime_id = self._config["core"]["beamtime_id"]
1✔
105
                year = self._config["core"]["year"]
1✔
106

107
            except KeyError as exc:
1✔
108
                raise ValueError(
1✔
109
                    "The beamtime_id and year are required.",
110
                ) from exc
111

112
            beamtime_dir = Path(
1✔
113
                self._config["core"]["beamtime_dir"][self._config["core"]["beamline"]],
114
            )
115
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
1✔
116

117
            # Use pathlib walk to reach the raw data directory
118
            raw_paths: list[Path] = []
1✔
119

120
            for path in beamtime_dir.joinpath("raw").glob("**/*"):
1✔
121
                if path.is_dir():
1✔
122
                    dir_name = path.name
1✔
123
                    if dir_name.startswith(("online-", "express-")):
1✔
124
                        raw_paths.append(path.joinpath(self._config["dataframe"]["daq"]))
1✔
125
                    elif dir_name == self._config["dataframe"]["daq"].upper():
1✔
126
                        raw_paths.append(path)
1✔
127

128
            if not raw_paths:
1✔
129
                raise FileNotFoundError("Raw data directories not found.")
1✔
130

131
            raw_dir = raw_paths[0].resolve()
1✔
132

133
            processed_dir = beamtime_dir.joinpath("processed")
1✔
134

135
        processed_dir.mkdir(parents=True, exist_ok=True)
1✔
136

137
        self.raw_dir = str(raw_dir)
1✔
138
        self.processed_dir = str(processed_dir)
1✔
139

140
    @property
1✔
141
    def available_runs(self) -> list[int]:
1✔
142
        # Get all files in raw_dir with "run" in their names
143
        files = list(Path(self.raw_dir).glob("*run*"))
1✔
144

145
        # Extract run IDs from filenames
146
        run_ids = set()
1✔
147
        for file in files:
1✔
148
            match = re.search(r"run(\d+)", file.name)
1✔
149
            if match:
1✔
150
                run_ids.add(int(match.group(1)))
1✔
151

152
        # Return run IDs in sorted order
153
        return sorted(list(run_ids))
1✔
154

155
    def get_files_from_run_id(  # type: ignore[override]
1✔
156
        self,
157
        run_id: str | int,
158
        folders: str | Sequence[str] = None,
159
        extension: str = "h5",
160
    ) -> list[str]:
161
        """
162
        Returns a list of filenames for a given run located in the specified directory
163
        for the specified data acquisition (daq).
164

165
        Args:
166
            run_id (str | int): The run identifier to locate.
167
            folders (str | Sequence[str], optional): The directory(ies) where the raw
168
                data is located. Defaults to config["core"]["base_folder"].
169
            extension (str, optional): The file extension. Defaults to "h5".
170

171
        Returns:
172
            list[str]: A list of path strings representing the collected file names.
173

174
        Raises:
175
            FileNotFoundError: If no files are found for the given run in the directory.
176
        """
177
        # Define the stream name prefixes based on the data acquisition identifier
178
        stream_name_prefixes = self._config["core"]["stream_name_prefixes"]
1✔
179

180
        if folders is None:
1✔
NEW
181
            folders = self._config["core"]["base_folder"]
×
182

183
        if isinstance(folders, str):
1✔
184
            folders = [folders]
1✔
185

186
        daq = self._config["dataframe"]["daq"]
1✔
187

188
        # Generate the file patterns to search for in the directory
189
        file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
1✔
190

191
        files: list[Path] = []
1✔
192
        # Use pathlib to search for matching files in each directory
193
        for folder in folders:
1✔
194
            files.extend(
1✔
195
                natsorted(
196
                    Path(folder).glob(file_pattern),
197
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
198
                ),
199
            )
200

201
        # Check if any files are found
202
        if not files:
1✔
NEW
203
            raise FileNotFoundError(
×
204
                f"No files found for run {run_id} in directory {str(folders)}",
205
            )
206

207
        # Return the list of found files
208
        return [str(file.resolve()) for file in files]
1✔
209

210
    def parse_metadata(self, token: str = None) -> dict:
1✔
211
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
212

213
        Returns:
214
            dict: Metadata dictionary
215
            token (str, optional):: The scicat token to use for fetching metadata
216
        """
NEW
217
        metadata_retriever = MetadataRetriever(self._config["metadata"], token)
×
NEW
218
        metadata = metadata_retriever.get_metadata(
×
219
            beamtime_id=self._config["core"]["beamtime_id"],
220
            runs=self.runs,
221
            metadata=self.metadata,
222
        )
223

NEW
224
        return metadata
×
225

226
    def get_count_rate(
1✔
227
        self,
228
        fids: Sequence[int] = None,  # noqa: ARG002
229
        **kwds,  # noqa: ARG002
230
    ):
231
        return None, None
1✔
232

233
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[float]:  # type: ignore[override]
1✔
234
        """
235
        Calculates the elapsed time.
236

237
        Args:
238
            fids (Sequence[int]): A sequence of file IDs. Defaults to all files.
239

240
        Keyword Args:
241
            runs: A sequence of run IDs. Takes precedence over fids.
242
            aggregate: Whether to return the sum of the elapsed times across
243
                    the specified files or the elapsed time for each file. Defaults to True.
244

245
        Returns:
246
            float | list[float]: The elapsed time(s) in seconds.
247

248
        Raises:
249
            KeyError: If a file ID in fids or a run ID in 'runs' does not exist in the metadata.
250
        """
251
        try:
1✔
252
            file_statistics = self.metadata["file_statistics"]["timed"]
1✔
253
        except Exception as exc:
1✔
254
            raise KeyError(
1✔
255
                "File statistics missing. Use 'read_dataframe' first.",
256
            ) from exc
257
        time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp")
1✔
258

259
        def get_elapsed_time_from_fid(fid):
1✔
260
            try:
1✔
261
                fid = str(fid)  # Ensure the key is a string
1✔
262
                time_stamps = file_statistics[fid]["columns"][time_stamp_alias]
1✔
263
                elapsed_time = time_stamps["max"] - time_stamps["min"]
1✔
264
            except KeyError as exc:
1✔
265
                raise KeyError(
1✔
266
                    f"Timestamp metadata missing in file {fid}. "
267
                    "Add timestamp column and alias to config before loading.",
268
                ) from exc
269

270
            return elapsed_time
1✔
271

272
        def get_elapsed_time_from_run(run_id):
1✔
273
            if self.raw_dir is None:
1✔
NEW
274
                self._initialize_dirs()
×
275
            files = self.get_files_from_run_id(run_id=run_id, folders=self.raw_dir)
1✔
276
            fids = [self.files.index(file) for file in files]
1✔
277
            return sum(get_elapsed_time_from_fid(fid) for fid in fids)
1✔
278

279
        elapsed_times = []
1✔
280
        runs = kwds.pop("runs", None)
1✔
281
        aggregate = kwds.pop("aggregate", True)
1✔
282

283
        if len(kwds) > 0:
1✔
284
            raise TypeError(f"get_elapsed_time() got unexpected keyword arguments {kwds.keys()}.")
1✔
285

286
        if runs is not None:
1✔
287
            elapsed_times = [get_elapsed_time_from_run(run) for run in runs]
1✔
288
        else:
289
            if fids is None:
1✔
290
                fids = range(len(self.files))
1✔
291
            elapsed_times = [get_elapsed_time_from_fid(fid) for fid in fids]
1✔
292

293
        if aggregate:
1✔
294
            elapsed_times = sum(elapsed_times)
1✔
295

296
        return elapsed_times
1✔
297

298
    def read_dataframe(
1✔
299
        self,
300
        files: str | Sequence[str] = None,
301
        folders: str | Sequence[str] = None,
302
        runs: str | int | Sequence[str | int] = None,
303
        ftype: str = "h5",
304
        metadata: dict = {},
305
        collect_metadata: bool = False,
306
        **kwds,
307
    ) -> tuple[dd.DataFrame, dd.DataFrame, dict]:
308
        """
309
        Read express data from the DAQ, generating a parquet in between.
310

311
        Args:
312
            files (str | Sequence[str], optional): File path(s) to process. Defaults to None.
313
            folders (str | Sequence[str], optional): Path to folder(s) where files are stored
314
                Path has priority such that if it's specified, the specified files will be ignored.
315
                Defaults to None.
316
            runs (str | int | Sequence[str | int], optional): Run identifier(s).
317
                Corresponding files will be located in the location provided by ``folders``.
318
                Takes precedence over ``files`` and ``folders``. Defaults to None.
319
            ftype (str, optional): The file extension type. Defaults to "h5".
320
            metadata (dict, optional): Additional metadata. Defaults to None.
321
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
322

323
        Keyword Args:
324
            detector (str, optional): The detector to use. Defaults to "".
325
            force_recreate (bool, optional): Whether to force recreation of the buffer files.
326
                Defaults to False.
327
            processed_dir (str, optional): The directory to save the processed files.
328
                Defaults to None.
329
            debug (bool, optional): Whether to run buffer creation in serial. Defaults to False.
330
            remove_invalid_files (bool, optional): Whether to exclude invalid files.
331
                Defaults to False.
332
            token (str, optional): The scicat token to use for fetching metadata. If provided,
333
                will be saved to .env file for future use. If not provided, will check environment
334
                variables when collect_metadata is True.
335
            filter_timed_by_electron (bool, optional): When True, the timed dataframe will only
336
                contain data points where valid electron events were detected. When False, all
337
                timed data points are included regardless of electron detection. Defaults to True.
338

339
        Returns:
340
            tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame
341
            and metadata.
342

343
        Raises:
344
            ValueError: If neither 'runs' nor 'files'/'raw_dir' is provided.
345
            FileNotFoundError: If the conversion fails for some files or no data is available.
346
            ValueError: If collect_metadata is True and no token is available.
347
        """
348
        detector = kwds.pop("detector", "")
1✔
349
        force_recreate = kwds.pop("force_recreate", False)
1✔
350
        processed_dir = kwds.pop("processed_dir", None)
1✔
351
        debug = kwds.pop("debug", False)
1✔
352
        remove_invalid_files = kwds.pop("remove_invalid_files", False)
1✔
353
        token = kwds.pop("token", None)
1✔
354
        filter_timed_by_electron = kwds.pop("filter_timed_by_electron", True)
1✔
355

356
        if len(kwds) > 0:
1✔
NEW
357
            raise ValueError(f"Unexpected keyword arguments: {kwds.keys()}")
×
358
        t0 = time.time()
1✔
359

360
        self._initialize_dirs()
1✔
361
        # Prepare a list of names for the runs to read and parquets to write
362
        if runs is not None:
1✔
363
            files = []
1✔
364
            runs_ = [str(runs)] if isinstance(runs, (str, int)) else list(map(str, runs))
1✔
365
            for run in runs_:
1✔
366
                run_files = self.get_files_from_run_id(
1✔
367
                    run_id=run,
368
                    folders=self.raw_dir,
369
                )
370
                files.extend(run_files)
1✔
371
            self.runs = runs_
1✔
372
            super().read_dataframe(files=files, ftype=ftype)
1✔
373
        else:
374
            # This call takes care of files and folders. As we have converted runs into files
375
            # already, they are just stored in the class by this call.
376
            super().read_dataframe(
1✔
377
                files=files,
378
                folders=folders,
379
                ftype=ftype,
380
                metadata=metadata,
381
            )
382

383
        bh = BufferHandler(
1✔
384
            config=self._config,
385
        )
386

387
        # if processed_dir is None, use self.processed_dir
388
        processed_dir = processed_dir or self.processed_dir
1✔
389
        processed_dir = Path(processed_dir)
1✔
390

391
        # Obtain the parquet filenames, metadata, and schema from the method
392
        # which handles buffer file creation/reading
393
        h5_paths = [Path(file) for file in self.files]
1✔
394
        df, df_timed = bh.process_and_load_dataframe(
1✔
395
            h5_paths=h5_paths,
396
            folder=processed_dir,
397
            force_recreate=force_recreate,
398
            suffix=detector,
399
            debug=debug,
400
            remove_invalid_files=remove_invalid_files,
401
            filter_timed_by_electron=filter_timed_by_electron,
402
        )
403

404
        if self.instrument == "wespe":
1✔
NEW
405
            df, df_timed = wespe_convert(df, df_timed)
×
406

407
        self.metadata.update(self.parse_metadata(token) if collect_metadata else {})
1✔
408
        self.metadata.update(bh.metadata)
1✔
409

410
        print(f"loading complete in {time.time() - t0: .2f} s")
1✔
411

412
        return df, df_timed, self.metadata
1✔
413

414

415
LOADER = FlashLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc