• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 13419398366

19 Feb 2025 06:09PM UTC coverage: 91.6% (-0.6%) from 92.174%
13419398366

Pull #534

github

web-flow
Merge df78f6964 into 6b927a2db
Pull Request #534: Hextof lab loader

71 of 124 new or added lines in 7 files covered. (57.26%)

3 existing lines in 1 file now uncovered.

7731 of 8440 relevant lines covered (91.6%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.16
/src/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is an amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward-filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed functionality.
9
"""
10
from __future__ import annotations
1✔
11

12
import re
1✔
13
import time
1✔
14
from collections.abc import Sequence
1✔
15
from pathlib import Path
1✔
16

17
import dask.dataframe as dd
1✔
18
from natsort import natsorted
1✔
19

20
from sed.core.logging import set_verbosity
1✔
21
from sed.core.logging import setup_logging
1✔
22
from sed.loader.base.loader import BaseLoader
1✔
23
from sed.loader.flash.buffer_handler import BufferHandler
1✔
24
from sed.loader.flash.instruments import wespe_convert
1✔
25
from sed.loader.flash.metadata import MetadataRetriever
1✔
26

27
# Configure logging
28
logger = setup_logging("flash_loader")
1✔
29

30

31
class FlashLoader(BaseLoader):
1✔
32
    """
33
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
34
    dataformat resolved by both macro and microbunches alongside electrons.
35
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
36

37
    Args:
38
        config (dict, optional): Config dictionary. Defaults to None.
39
        verbose (bool, optional): Option to print out diagnostic information.
40
            Defaults to True.
41
    """
42

43
    __name__ = "flash"
1✔
44

45
    supported_file_types = ["h5"]
1✔
46

47
    def __init__(self, config: dict, verbose: bool = True) -> None:
1✔
48
        """
49
        Initializes the FlashLoader.
50

51
        Args:
52
            config (dict): Configuration dictionary.
53
            verbose (bool, optional): Option to print out diagnostic information.
54
        """
55
        super().__init__(config=config, verbose=verbose)
1✔
56

57
        set_verbosity(logger, self._verbose)
1✔
58

59
        self.instrument: str = self._config["core"].get("instrument", "hextof")  # default is hextof
1✔
60
        self.raw_dir: str = None
1✔
61
        self.processed_dir: str = None
1✔
62

63
    @property
1✔
64
    def verbose(self) -> bool:
1✔
65
        """Accessor to the verbosity flag.
66

67
        Returns:
68
            bool: Verbosity flag.
69
        """
70
        return self._verbose
×
71

72
    @verbose.setter
1✔
73
    def verbose(self, verbose: bool):
1✔
74
        """Setter for the verbosity.
75

76
        Args:
77
            verbose (bool): Option to turn on verbose output. Sets loglevel to INFO.
78
        """
79
        self._verbose = verbose
×
80
        set_verbosity(logger, self._verbose)
×
81

82
    def _initialize_dirs(self) -> None:
1✔
83
        """
84
        Initializes the directories on Maxwell based on configuration. If paths is provided in
85
        the configuration, the raw data directory and parquet data directory are taken from there.
86
        Otherwise, the beamtime_id and year are used to locate the data directories.
87
        The first path that has either online- or express- prefix, or the daq name is taken as the
88
        raw data directory.
89

90
        Raises:
91
            ValueError: If required values are missing from the configuration.
92
            FileNotFoundError: If the raw data directories are not found.
93
        """
94
        # Parses to locate the raw beamtime directory from config file
95
        # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided
96
        if "paths" in self._config["core"]:
1✔
97
            raw_dir = Path(self._config["core"]["paths"].get("raw", ""))
1✔
98
            processed_dir = Path(
1✔
99
                self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")),
100
            )
101

102
        else:
103
            try:
1✔
104
                beamtime_id = self._config["core"]["beamtime_id"]
1✔
105
                year = self._config["core"]["year"]
1✔
106

107
            except KeyError as exc:
1✔
108
                raise ValueError(
1✔
109
                    "The beamtime_id and year are required.",
110
                ) from exc
111

112
            beamtime_dir = Path(
1✔
113
                self._config["core"]["beamtime_dir"][self._config["core"]["beamline"]],
114
            )
115
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
1✔
116

117
            # Use pathlib walk to reach the raw data directory
118
            raw_paths: list[Path] = []
1✔
119

120
            for path in beamtime_dir.joinpath("raw").glob("**/*"):
1✔
121
                if path.is_dir():
1✔
122
                    dir_name = path.name
1✔
123
                    if dir_name.startswith(("online-", "express-")):
1✔
124
                        raw_paths.append(path.joinpath(self._config["dataframe"]["daq"]))
1✔
125
                    elif dir_name == self._config["dataframe"]["daq"].upper():
1✔
126
                        raw_paths.append(path)
1✔
127

128
            if not raw_paths:
1✔
129
                raise FileNotFoundError("Raw data directories not found.")
1✔
130

131
            raw_dir = raw_paths[0].resolve()
1✔
132

133
            processed_dir = beamtime_dir.joinpath("processed")
1✔
134

135
        processed_dir.mkdir(parents=True, exist_ok=True)
1✔
136

137
        self.raw_dir = str(raw_dir)
1✔
138
        self.processed_dir = str(processed_dir)
1✔
139

140
    @property
1✔
141
    def available_runs(self) -> list[int]:
1✔
142
        # Get all files in raw_dir with "run" in their names
143
        files = list(Path(self.raw_dir).glob("*run*"))
1✔
144

145
        # Extract run IDs from filenames
146
        run_ids = set()
1✔
147
        for file in files:
1✔
148
            match = re.search(r"run(\d+)", file.name)
1✔
149
            if match:
1✔
150
                run_ids.add(int(match.group(1)))
1✔
151

152
        # Return run IDs in sorted order
153
        return sorted(list(run_ids))
1✔
154

155
    def get_files_from_run_id(  # type: ignore[override]
1✔
156
        self,
157
        run_id: str | int,
158
        folders: str | Sequence[str] = None,
159
        extension: str = "h5",
160
    ) -> list[str]:
161
        """
162
        Returns a list of filenames for a given run located in the specified directory
163
        for the specified data acquisition (daq).
164

165
        Args:
166
            run_id (str | int): The run identifier to locate.
167
            folders (str | Sequence[str], optional): The directory(ies) where the raw
168
                data is located. Defaults to config["core"]["base_folder"].
169
            extension (str, optional): The file extension. Defaults to "h5".
170

171
        Returns:
172
            list[str]: A list of path strings representing the collected file names.
173

174
        Raises:
175
            FileNotFoundError: If no files are found for the given run in the directory.
176
        """
177
        # Define the stream name prefixes based on the data acquisition identifier
178
        stream_name_prefixes = self._config["core"].get("stream_name_prefixes")
1✔
179

180
        if folders is None:
1✔
181
            folders = self._config["core"]["base_folder"]
×
182

183
        if isinstance(folders, str):
1✔
184
            folders = [folders]
1✔
185

186
        daq = self._config["dataframe"]["daq"]
1✔
187

188
        # Generate the file patterns to search for in the directory
189
        if stream_name_prefixes:
1✔
190
            file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
1✔
191
        else:
NEW
192
            file_pattern = f"*{run_id}*." + extension
×
193

194
        files: list[Path] = []
1✔
195
        # Use pathlib to search for matching files in each directory
196
        for folder in folders:
1✔
197
            files.extend(
1✔
198
                natsorted(
199
                    Path(folder).glob(file_pattern),
200
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
201
                ),
202
            )
203

204
        # Check if any files are found
205
        if not files:
1✔
206
            raise FileNotFoundError(
×
207
                f"No files found for run {run_id} in directory {str(folders)}",
208
            )
209

210
        # Return the list of found files
211
        return [str(file.resolve()) for file in files]
1✔
212

213
    def parse_metadata(self, token: str = None) -> dict:
1✔
214
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
215

216
        Returns:
217
            dict: Metadata dictionary
218
            token (str, optional):: The scicat token to use for fetching metadata
219
        """
220
        metadata_retriever = MetadataRetriever(self._config["metadata"], token)
×
221
        metadata = metadata_retriever.get_metadata(
×
222
            beamtime_id=self._config["core"]["beamtime_id"],
223
            runs=self.runs,
224
            metadata=self.metadata,
225
        )
226

227
        return metadata
×
228

229
    def get_count_rate(
1✔
230
        self,
231
        fids: Sequence[int] = None,  # noqa: ARG002
232
        **kwds,  # noqa: ARG002
233
    ):
234
        return None, None
1✔
235

236
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[float]:  # type: ignore[override]
1✔
237
        """
238
        Calculates the elapsed time.
239

240
        Args:
241
            fids (Sequence[int]): A sequence of file IDs. Defaults to all files.
242

243
        Keyword Args:
244
            runs: A sequence of run IDs. Takes precedence over fids.
245
            aggregate: Whether to return the sum of the elapsed times across
246
                    the specified files or the elapsed time for each file. Defaults to True.
247

248
        Returns:
249
            float | list[float]: The elapsed time(s) in seconds.
250

251
        Raises:
252
            KeyError: If a file ID in fids or a run ID in 'runs' does not exist in the metadata.
253
        """
254
        try:
1✔
255
            file_statistics = self.metadata["file_statistics"]["timed"]
1✔
256
        except Exception as exc:
1✔
257
            raise KeyError(
1✔
258
                "File statistics missing. Use 'read_dataframe' first.",
259
            ) from exc
260
        time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp")
1✔
261

262
        def get_elapsed_time_from_fid(fid):
1✔
263
            try:
1✔
264
                fid = str(fid)  # Ensure the key is a string
1✔
265
                time_stamps = file_statistics[fid]["columns"][time_stamp_alias]
1✔
266
                elapsed_time = time_stamps["max"] - time_stamps["min"]
1✔
267
            except KeyError as exc:
1✔
268
                raise KeyError(
1✔
269
                    f"Timestamp metadata missing in file {fid}. "
270
                    "Add timestamp column and alias to config before loading.",
271
                ) from exc
272

273
            return elapsed_time
1✔
274

275
        def get_elapsed_time_from_run(run_id):
1✔
276
            if self.raw_dir is None:
1✔
277
                self._initialize_dirs()
×
278
            files = self.get_files_from_run_id(run_id=run_id, folders=self.raw_dir)
1✔
279
            fids = [self.files.index(file) for file in files]
1✔
280
            return sum(get_elapsed_time_from_fid(fid) for fid in fids)
1✔
281

282
        elapsed_times = []
1✔
283
        runs = kwds.pop("runs", None)
1✔
284
        aggregate = kwds.pop("aggregate", True)
1✔
285

286
        if len(kwds) > 0:
1✔
287
            raise TypeError(f"get_elapsed_time() got unexpected keyword arguments {kwds.keys()}.")
1✔
288

289
        if runs is not None:
1✔
290
            elapsed_times = [get_elapsed_time_from_run(run) for run in runs]
1✔
291
        else:
292
            if fids is None:
1✔
293
                fids = range(len(self.files))
1✔
294
            elapsed_times = [get_elapsed_time_from_fid(fid) for fid in fids]
1✔
295

296
        if aggregate:
1✔
297
            elapsed_times = sum(elapsed_times)
1✔
298

299
        return elapsed_times
1✔
300

301
    def read_dataframe(
1✔
302
        self,
303
        files: str | Sequence[str] = None,
304
        folders: str | Sequence[str] = None,
305
        runs: str | int | Sequence[str | int] = None,
306
        ftype: str = "h5",
307
        metadata: dict = {},
308
        collect_metadata: bool = False,
309
        **kwds,
310
    ) -> tuple[dd.DataFrame, dd.DataFrame, dict]:
311
        """
312
        Read express data from the DAQ, generating a parquet in between.
313

314
        Args:
315
            files (str | Sequence[str], optional): File path(s) to process. Defaults to None.
316
            folders (str | Sequence[str], optional): Path to folder(s) where files are stored
317
                Path has priority such that if it's specified, the specified files will be ignored.
318
                Defaults to None.
319
            runs (str | int | Sequence[str | int], optional): Run identifier(s).
320
                Corresponding files will be located in the location provided by ``folders``.
321
                Takes precedence over ``files`` and ``folders``. Defaults to None.
322
            ftype (str, optional): The file extension type. Defaults to "h5".
323
            metadata (dict, optional): Additional metadata. Defaults to None.
324
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
325

326
        Keyword Args:
327
            detector (str, optional): The detector to use. Defaults to "".
328
            force_recreate (bool, optional): Whether to force recreation of the buffer files.
329
                Defaults to False.
330
            processed_dir (str, optional): The directory to save the processed files.
331
                Defaults to None.
332
            debug (bool, optional): Whether to run buffer creation in serial. Defaults to False.
333
            remove_invalid_files (bool, optional): Whether to exclude invalid files.
334
                Defaults to False.
335
            token (str, optional): The scicat token to use for fetching metadata. If provided,
336
                will be saved to .env file for future use. If not provided, will check environment
337
                variables when collect_metadata is True.
338
            filter_timed_by_electron (bool, optional): When True, the timed dataframe will only
339
                contain data points where valid electron events were detected. When False, all
340
                timed data points are included regardless of electron detection. Defaults to True.
341

342
        Returns:
343
            tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame
344
            and metadata.
345

346
        Raises:
347
            ValueError: If neither 'runs' nor 'files'/'raw_dir' is provided.
348
            FileNotFoundError: If the conversion fails for some files or no data is available.
349
            ValueError: If collect_metadata is True and no token is available.
350
        """
351
        detector = kwds.pop("detector", "")
1✔
352
        force_recreate = kwds.pop("force_recreate", False)
1✔
353
        processed_dir = kwds.pop("processed_dir", None)
1✔
354
        debug = kwds.pop("debug", False)
1✔
355
        remove_invalid_files = kwds.pop("remove_invalid_files", False)
1✔
356
        token = kwds.pop("token", None)
1✔
357
        filter_timed_by_electron = kwds.pop("filter_timed_by_electron", True)
1✔
358

359
        if len(kwds) > 0:
1✔
360
            raise ValueError(f"Unexpected keyword arguments: {kwds.keys()}")
×
361
        t0 = time.time()
1✔
362

363
        self._initialize_dirs()
1✔
364
        # Prepare a list of names for the runs to read and parquets to write
365
        if runs is not None:
1✔
366
            files = []
1✔
367
            runs_ = [str(runs)] if isinstance(runs, (str, int)) else list(map(str, runs))
1✔
368
            for run in runs_:
1✔
369
                run_files = self.get_files_from_run_id(
1✔
370
                    run_id=run,
371
                    folders=self.raw_dir,
372
                )
373
                files.extend(run_files)
1✔
374
            self.runs = runs_
1✔
375
            super().read_dataframe(files=files, ftype=ftype)
1✔
376
        else:
377
            # This call takes care of files and folders. As we have converted runs into files
378
            # already, they are just stored in the class by this call.
379
            super().read_dataframe(
1✔
380
                files=files,
381
                folders=folders,
382
                ftype=ftype,
383
                metadata=metadata,
384
            )
385

386
        bh = BufferHandler(
1✔
387
            config=self._config,
388
        )
389

390
        # if processed_dir is None, use self.processed_dir
391
        processed_dir = processed_dir or self.processed_dir
1✔
392
        processed_dir = Path(processed_dir)
1✔
393

394
        # Obtain the parquet filenames, metadata, and schema from the method
395
        # which handles buffer file creation/reading
396
        h5_paths = [Path(file) for file in self.files]
1✔
397
        df, df_timed = bh.process_and_load_dataframe(
1✔
398
            h5_paths=h5_paths,
399
            folder=processed_dir,
400
            force_recreate=force_recreate,
401
            suffix=detector,
402
            debug=debug,
403
            remove_invalid_files=remove_invalid_files,
404
            filter_timed_by_electron=filter_timed_by_electron,
405
        )
406

407
        if self.instrument == "wespe":
1✔
408
            df, df_timed = wespe_convert(df, df_timed)
×
409

410
        self.metadata.update(self.parse_metadata(token) if collect_metadata else {})
1✔
411
        self.metadata.update(bh.metadata)
1✔
412

413
        print(f"loading complete in {time.time() - t0: .2f} s")
1✔
414

415
        return df, df_timed, self.metadata
1✔
416

417

418
LOADER = FlashLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc