• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9991515592

18 Jul 2024 12:29PM UTC coverage: 92.722% (+0.04%) from 92.682%
9991515592

Pull #479

github

zain-sohail
roll back to iterations
Pull Request #479: Flash minor changes (Merge to #469)

92 of 93 new or added lines in 9 files covered. (98.92%)

5 existing lines in 3 files now uncovered.

7071 of 7626 relevant lines covered (92.72%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/sed/loader/flash/loader.py
1
"""
2
This module implements the flash data loader.
3
This loader currently supports hextof, wespe and instruments with similar structure.
4
The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe.
5
The dataframe is an amalgamation of all h5 files for a combination of runs, where the NaNs are
6
automatically forward-filled across different files.
7
This can then be saved as a parquet for out-of-sed processing and reread back to access other
8
sed functionality.
9
"""
10
from __future__ import annotations
1✔
11

12
import re
1✔
13
import time
1✔
14
from collections.abc import Sequence
1✔
15
from pathlib import Path
1✔
16

17
import dask.dataframe as dd
1✔
18
from natsort import natsorted
1✔
19

20
from sed.loader.base.loader import BaseLoader
1✔
21
from sed.loader.flash.buffer_handler import BufferHandler
1✔
22
from sed.loader.flash.instruments import wespe_convert
1✔
23
from sed.loader.flash.metadata import MetadataRetriever
1✔
24

25

26
class FlashLoader(BaseLoader):
1✔
27
    """
28
    The class generates multiindexed multidimensional pandas dataframes from the new FLASH
29
    dataformat resolved by both macro and microbunches alongside electrons.
30
    Only the read_dataframe (inherited and implemented) method is accessed by other modules.
31
    """
32

33
    __name__ = "flash"
1✔
34

35
    supported_file_types = ["h5"]
1✔
36

37
    def __init__(self, config: dict) -> None:
1✔
38
        """
39
        Initializes the FlashLoader.
40

41
        Args:
42
            config (dict): Configuration dictionary.
43
        """
44
        super().__init__(config=config)
1✔
45
        self.instrument: str = self._config["core"].get("instrument", "hextof")  # default is hextof
1✔
46
        self.raw_dir: str = None
1✔
47
        self.processed_dir: str = None
1✔
48

49
    def _initialize_dirs(self) -> None:
1✔
50
        """
51
        Initializes the directories on Maxwell based on configuration. If paths is provided in
52
        the configuration, the raw data directory and parquet data directory are taken from there.
53
        Otherwise, the beamtime_id and year are used to locate the data directories.
54
        The first path that has either online- or express- prefix, or the daq name is taken as the
55
        raw data directory.
56

57
        Raises:
58
            ValueError: If required values are missing from the configuration.
59
            FileNotFoundError: If the raw data directories are not found.
60
        """
61
        # Parses to locate the raw beamtime directory from config file
62
        # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided
63
        if "paths" in self._config["core"]:
1✔
64
            raw_dir = Path(self._config["core"]["paths"].get("raw", ""))
1✔
65
            processed_dir = Path(
1✔
66
                self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")),
67
            )
68

69
        else:
70
            try:
1✔
71
                beamtime_id = self._config["core"]["beamtime_id"]
1✔
72
                year = self._config["core"]["year"]
1✔
73

74
            except KeyError as exc:
1✔
75
                raise ValueError(
1✔
76
                    "The beamtime_id and year are required.",
77
                ) from exc
78

79
            beamtime_dir = Path(
1✔
80
                self._config["dataframe"]["beamtime_dir"][self._config["core"]["beamline"]],
81
            )
82
            beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/")
1✔
83

84
            # Use pathlib walk to reach the raw data directory
85
            raw_paths: list[Path] = []
1✔
86

87
            for path in beamtime_dir.joinpath("raw").glob("**/*"):
1✔
88
                if path.is_dir():
1✔
89
                    dir_name = path.name
1✔
90
                    if dir_name.startswith(("online-", "express-")):
1✔
91
                        raw_paths.append(path.joinpath(self._config["dataframe"]["daq"]))
1✔
92
                    elif dir_name == self._config["dataframe"]["daq"].upper():
1✔
93
                        raw_paths.append(path)
1✔
94

95
            if not raw_paths:
1✔
96
                raise FileNotFoundError("Raw data directories not found.")
1✔
97

98
            raw_dir = raw_paths[0].resolve()
1✔
99

100
            processed_dir = beamtime_dir.joinpath("processed")
1✔
101

102
        processed_dir.mkdir(parents=True, exist_ok=True)
1✔
103

104
        self.raw_dir = str(raw_dir)
1✔
105
        self.processed_dir = str(processed_dir)
1✔
106

107
    @property
1✔
108
    def available_runs(self) -> list[int]:
1✔
109
        # Get all files in raw_dir with "run" in their names
110
        files = list(Path(self.raw_dir).glob("*run*"))
1✔
111

112
        # Extract run IDs from filenames
113
        run_ids = set()
1✔
114
        for file in files:
1✔
115
            match = re.search(r"run(\d+)", file.name)
1✔
116
            if match:
1✔
117
                run_ids.add(int(match.group(1)))
1✔
118

119
        # Return run IDs in sorted order
120
        return sorted(list(run_ids))
1✔
121

122
    def get_files_from_run_id(  # type: ignore[override]
1✔
123
        self,
124
        run_id: str | int,
125
        folders: str | Sequence[str] = None,
126
        extension: str = "h5",
127
    ) -> list[str]:
128
        """
129
        Returns a list of filenames for a given run located in the specified directory
130
        for the specified data acquisition (daq).
131

132
        Args:
133
            run_id (str | int): The run identifier to locate.
134
            folders (str | Sequence[str], optional): The directory(ies) where the raw
135
                data is located. Defaults to config["core"]["base_folder"].
136
            extension (str, optional): The file extension. Defaults to "h5".
137

138
        Returns:
139
            list[str]: A list of path strings representing the collected file names.
140

141
        Raises:
142
            FileNotFoundError: If no files are found for the given run in the directory.
143
        """
144
        # Define the stream name prefixes based on the data acquisition identifier
145
        stream_name_prefixes = self._config["dataframe"]["stream_name_prefixes"]
1✔
146

147
        if folders is None:
1✔
148
            folders = self._config["core"]["base_folder"]
×
149

150
        if isinstance(folders, str):
1✔
151
            folders = [folders]
1✔
152

153
        daq = self._config["dataframe"].get("daq")
1✔
154

155
        # Generate the file patterns to search for in the directory
156
        file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension
1✔
157

158
        files: list[Path] = []
1✔
159
        # Use pathlib to search for matching files in each directory
160
        for folder in folders:
1✔
161
            files.extend(
1✔
162
                natsorted(
163
                    Path(folder).glob(file_pattern),
164
                    key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1],
165
                ),
166
            )
167

168
        # Check if any files are found
169
        if not files:
1✔
170
            raise FileNotFoundError(
×
171
                f"No files found for run {run_id} in directory {str(folders)}",
172
            )
173

174
        # Return the list of found files
175
        return [str(file.resolve()) for file in files]
1✔
176

177
    def parse_metadata(self, scicat_token: str = None) -> dict:
1✔
178
        """Uses the MetadataRetriever class to fetch metadata from scicat for each run.
179

180
        Returns:
181
            dict: Metadata dictionary
182
            scicat_token (str, optional):: The scicat token to use for fetching metadata
183
        """
184
        metadata_retriever = MetadataRetriever(self._config["metadata"], scicat_token)
×
185
        metadata = metadata_retriever.get_metadata(
×
186
            beamtime_id=self._config["core"]["beamtime_id"],
187
            runs=self.runs,
188
            metadata=self.metadata,
189
        )
190

191
        return metadata
×
192

193
    def get_count_rate(
1✔
194
        self,
195
        fids: Sequence[int] = None,  # noqa: ARG002
196
        **kwds,  # noqa: ARG002
197
    ):
198
        return None, None
1✔
199

200
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[float]:  # type: ignore[override]
1✔
201
        """
202
        Calculates the elapsed time.
203

204
        Args:
205
            fids (Sequence[int]): A sequence of file IDs. Defaults to all files.
206
            **kwds:
207
                - runs: A sequence of run IDs. Takes precedence over fids.
208
                - aggregate: Whether to return the sum of the elapsed times across
209
                    the specified files or the elapsed time for each file. Defaults to True.
210

211
        Returns:
212
            float | list[float]: The elapsed time(s) in seconds.
213

214
        Raises:
215
            KeyError: If a file ID in fids or a run ID in 'runs' does not exist in the metadata.
216
        """
217
        try:
1✔
218
            file_statistics = self.metadata["file_statistics"]["timed"]
1✔
219
        except Exception as exc:
1✔
220
            raise KeyError(
1✔
221
                "File statistics missing. Use 'read_dataframe' first.",
222
            ) from exc
223
        time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp")
1✔
224

225
        def get_elapsed_time_from_fid(fid):
1✔
226
            try:
1✔
227
                fid = str(fid)  # Ensure the key is a string
1✔
228
                time_stamps = file_statistics[fid]["columns"][time_stamp_alias]
1✔
229
                elapsed_time = time_stamps["max"] - time_stamps["min"]
1✔
230
            except KeyError as exc:
1✔
231
                raise KeyError(
1✔
232
                    f"Timestamp metadata missing in file {fid}. "
233
                    "Add timestamp column and alias to config before loading.",
234
                ) from exc
235

236
            return elapsed_time
1✔
237

238
        def get_elapsed_time_from_run(run_id):
1✔
239
            if self.raw_dir is None:
1✔
240
                self._initialize_dirs()
×
241
            files = self.get_files_from_run_id(run_id=run_id, folders=self.raw_dir)
1✔
242
            fids = [self.files.index(file) for file in files]
1✔
243
            return sum(get_elapsed_time_from_fid(fid) for fid in fids)
1✔
244

245
        elapsed_times = []
1✔
246
        runs = kwds.pop("runs", None)
1✔
247
        aggregate = kwds.pop("aggregate", True)
1✔
248

249
        if len(kwds) > 0:
1✔
250
            raise TypeError(f"get_elapsed_time() got unexpected keyword arguments {kwds.keys()}.")
1✔
251

252
        if runs is not None:
1✔
253
            elapsed_times = [get_elapsed_time_from_run(run) for run in runs]
1✔
254
        else:
255
            if fids is None:
1✔
256
                fids = range(len(self.files))
1✔
257
            elapsed_times = [get_elapsed_time_from_fid(fid) for fid in fids]
1✔
258

259
        if aggregate:
1✔
260
            elapsed_times = sum(elapsed_times)
1✔
261

262
        return elapsed_times
1✔
263

264
    def read_dataframe(
1✔
265
        self,
266
        files: str | Sequence[str] = None,
267
        folders: str | Sequence[str] = None,
268
        runs: str | int | Sequence[str | int] = None,
269
        ftype: str = "h5",
270
        metadata: dict = {},
271
        collect_metadata: bool = False,
272
        detector: str = "",
273
        force_recreate: bool = False,
274
        processed_dir: str | Path = None,
275
        debug: bool = False,
276
        **kwds,
277
    ) -> tuple[dd.DataFrame, dd.DataFrame, dict]:
278
        """
279
        Read express data from the DAQ, generating a parquet in between.
280

281
        Args:
282
            files (str | Sequence[str], optional): File path(s) to process. Defaults to None.
283
            folders (str | Sequence[str], optional): Path to folder(s) where files are stored
284
                Path has priority such that if it's specified, the specified files will be ignored.
285
                Defaults to None.
286
            runs (str | int | Sequence[str | int], optional): Run identifier(s).
287
                Corresponding files will be located in the location provided by ``folders``.
288
                Takes precedence over ``files`` and ``folders``. Defaults to None.
289
            ftype (str, optional): The file extension type. Defaults to "h5".
290
            metadata (dict, optional): Additional metadata. Defaults to None.
291
            collect_metadata (bool, optional): Whether to collect metadata. Defaults to False.
292
            **kwds: Additional keyword arguments passed to ``parse_metadata``.
293

294
        Returns:
295
            tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame
296
            and metadata.
297

298
        Raises:
299
            ValueError: If neither 'runs' nor 'files'/'raw_dir' is provided.
300
            FileNotFoundError: If the conversion fails for some files or no data is available.
301
        """
302
        t0 = time.time()
1✔
303

304
        self._initialize_dirs()
1✔
305
        # Prepare a list of names for the runs to read and parquets to write
306
        if runs is not None:
1✔
307
            files = []
1✔
308
            runs_ = [str(runs)] if isinstance(runs, (str, int)) else list(map(str, runs))
1✔
309
            for run in runs_:
1✔
310
                run_files = self.get_files_from_run_id(
1✔
311
                    run_id=run,
312
                    folders=self.raw_dir,
313
                )
314
                files.extend(run_files)
1✔
315
            self.runs = runs_
1✔
316
            super().read_dataframe(files=files, ftype=ftype)
1✔
317
        else:
318
            # This call takes care of files and folders. As we have converted runs into files
319
            # already, they are just stored in the class by this call.
320
            super().read_dataframe(
1✔
321
                files=files,
322
                folders=folders,
323
                ftype=ftype,
324
                metadata=metadata,
325
            )
326

327
        bh = BufferHandler(
1✔
328
            config=self._config,
329
        )
330

331
        # if processed_dir is None, use self.processed_dir
332
        processed_dir = processed_dir or self.processed_dir
1✔
333
        processed_dir = Path(processed_dir)
1✔
334

335
        # Obtain the parquet filenames, metadata, and schema from the method
336
        # which handles buffer file creation/reading
337
        h5_paths = [Path(file) for file in self.files]
1✔
338
        bh.run(
1✔
339
            h5_paths=h5_paths,
340
            folder=processed_dir,
341
            force_recreate=force_recreate,
342
            suffix=detector,
343
            debug=debug,
344
        )
345
        df = bh.df["electron"]
1✔
346
        df_timed = bh.df["timed"]
1✔
347

348
        if self.instrument == "wespe":
1✔
UNCOV
349
            df, df_timed = wespe_convert(df, df_timed)
×
350

351
        self.metadata.update(self.parse_metadata(**kwds) if collect_metadata else {})
1✔
352
        self.metadata.update(bh.metadata)
1✔
353

354
        print(f"loading complete in {time.time() - t0: .2f} s")
1✔
355

356
        return df, df_timed, self.metadata
1✔
357

358

359
LOADER = FlashLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc