• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9588080384

19 Jun 2024 09:00PM UTC coverage: 91.995% (+0.08%) from 91.919%
9588080384

Pull #411

github

rettigl
Merge branch 'reader_benchmarks' into energy_calibration_bias_shift
Pull Request #411: Energy calibration bias shift

87 of 99 new or added lines in 3 files covered. (87.88%)

123 existing lines in 3 files now uncovered.

6493 of 7058 relevant lines covered (91.99%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.52
/sed/loader/mpes/loader.py
1
"""
2
module sed.loader.mpes, code for loading hdf5 files delayed into a dask dataframe.
3
Mostly ported from https://github.com/mpes-kit/mpes.
4
@author: L. Rettig
5
"""
6
import datetime
1✔
7
import glob
1✔
8
import json
1✔
9
import os
1✔
10
from typing import Any
1✔
11
from typing import Dict
1✔
12
from typing import List
1✔
13
from typing import Sequence
1✔
14
from typing import Tuple
1✔
15
from typing import Union
1✔
16
from urllib.error import HTTPError
1✔
17
from urllib.error import URLError
1✔
18
from urllib.request import urlopen
1✔
19

20
import dask
1✔
21
import dask.array as da
1✔
22
import dask.dataframe as ddf
1✔
23
import h5py
1✔
24
import numpy as np
1✔
25
import scipy.interpolate as sint
1✔
26
from natsort import natsorted
1✔
27

28
from sed.loader.base.loader import BaseLoader
1✔
29

30

31
def hdf5_to_dataframe(
1✔
32
    files: Sequence[str],
33
    channels: Dict[str, Any] = None,
34
    time_stamps: bool = False,
35
    time_stamp_alias: str = "timeStamps",
36
    ms_markers_key: str = "msMarkers",
37
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
38
    **kwds,
39
) -> ddf.DataFrame:
40
    """Function to read a selection of hdf5-files, and generate a delayed dask
41
    dataframe from provided groups in the files. Optionally, aliases can be defined.
42

43
    Args:
44
        files (List[str]): A list of the file paths to load.
45
        channels (Dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
46
            should contain the keys "format" and "dataset_key". Defaults to load all groups
47
            containing "Stream", and to read the attribute "Name" from each group.
48
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
49
            False.
50
        time_stamp_alias (str): Alias name for the timestamp column.
51
            Defaults to "timeStamps".
52
        ms_markers_key (str): hdf5 path containing timestamp information.
53
            Defaults to "msMarkers".
54
        first_event_time_stamp_key (str): h5 attribute containing the start
55
            timestamp of a file. Defaults to "FirstEventTimeStamp".
56

57
    Returns:
58
        ddf.DataFrame: The delayed Dask DataFrame
59
    """
60
    # Read a file to parse the file structure
61
    test_fid = kwds.pop("test_fid", 0)
1✔
62
    test_proc = h5py.File(files[test_fid])
1✔
63

64
    if channels is None:
1✔
65
        channels = get_datasets_and_aliases(
1✔
66
            h5file=test_proc,
67
            seach_pattern="Stream",
68
        )
69

70
    channel_list = []
1✔
71
    column_names = []
1✔
72

73
    for name, channel in channels.items():
1✔
74
        if (
1✔
75
            channel["format"] == "per_electron"
76
            and channel["dataset_key"] in test_proc
77
            or channel["format"] == "per_file"
78
            and channel["dataset_key"] in test_proc.attrs
79
        ):
80
            channel_list.append(channel)
1✔
81
            column_names.append(name)
1✔
82
        else:
NEW
83
            print(
×
84
                f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
85
                "Skipping the channel.",
86
            )
87

88
    if time_stamps:
1✔
89
        column_names.append(time_stamp_alias)
1✔
90

91
    test_array = hdf5_to_array(
1✔
92
        h5file=test_proc,
93
        channels=channel_list,
94
        time_stamps=time_stamps,
95
        ms_markers_key=ms_markers_key,
96
        first_event_time_stamp_key=first_event_time_stamp_key,
97
    )
98

99
    # Delay-read all files
100
    arrays = [
1✔
101
        da.from_delayed(
102
            dask.delayed(hdf5_to_array)(
103
                h5file=h5py.File(f),
104
                channels=channel_list,
105
                time_stamps=time_stamps,
106
                ms_markers_key=ms_markers_key,
107
                first_event_time_stamp_key=first_event_time_stamp_key,
108
            ),
109
            dtype=test_array.dtype,
110
            shape=(test_array.shape[0], np.nan),
111
        )
112
        for f in files
113
    ]
114
    array_stack = da.concatenate(arrays, axis=1).T
1✔
115

116
    return ddf.from_dask_array(array_stack, columns=column_names)
1✔
117

118

119
def hdf5_to_timed_dataframe(
1✔
120
    files: Sequence[str],
121
    channels: Dict[str, Any] = None,
122
    time_stamps: bool = False,
123
    time_stamp_alias: str = "timeStamps",
124
    ms_markers_key: str = "msMarkers",
125
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
126
    **kwds,
127
) -> ddf.DataFrame:
128
    """Function to read a selection of hdf5-files, and generate a delayed dask
129
    dataframe from provided groups in the files. Optionally, aliases can be defined.
130
    Returns a dataframe for evenly spaced time intervals.
131

132
    Args:
133
        files (List[str]): A list of the file paths to load.
134
        channels (Dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
135
            should contain the keys "format" and "groupName". Defaults to load all groups
136
            containing "Stream", and to read the attribute "Name" from each group.
137
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
138
            False.
139
        time_stamp_alias (str): Alias name for the timestamp column.
140
            Defaults to "timeStamps".
141
        ms_markers_key (str): hdf5 dataset containing timestamp information.
142
            Defaults to "msMarkers".
143
        first_event_time_stamp_key (str): h5 attribute containing the start
144
            timestamp of a file. Defaults to "FirstEventTimeStamp".
145

146
    Returns:
147
        ddf.DataFrame: The delayed Dask DataFrame
148
    """
149
    # Read a file to parse the file structure
150
    test_fid = kwds.pop("test_fid", 0)
1✔
151
    test_proc = h5py.File(files[test_fid])
1✔
152

153
    if channels is None:
1✔
154
        channels = get_datasets_and_aliases(
1✔
155
            h5file=test_proc,
156
            seach_pattern="Stream",
157
        )
158

159
    channel_list = []
1✔
160
    column_names = []
1✔
161

162
    for name, channel in channels.items():
1✔
163
        if (
1✔
164
            channel["format"] == "per_electron"
165
            and channel["dataset_key"] in test_proc
166
            or channel["format"] == "per_file"
167
            and channel["dataset_key"] in test_proc.attrs
168
        ):
169
            channel_list.append(channel)
1✔
170
            column_names.append(name)
1✔
171
        else:
NEW
172
            print(
×
173
                f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
174
                "Skipping the channel.",
175
            )
176

177
    if time_stamps:
1✔
178
        column_names.append(time_stamp_alias)
1✔
179

180
    test_array = hdf5_to_timed_array(
1✔
181
        h5file=test_proc,
182
        channels=channel_list,
183
        time_stamps=time_stamps,
184
        ms_markers_key=ms_markers_key,
185
        first_event_time_stamp_key=first_event_time_stamp_key,
186
    )
187

188
    # Delay-read all files
189
    arrays = [
1✔
190
        da.from_delayed(
191
            dask.delayed(hdf5_to_timed_array)(
192
                h5file=h5py.File(f),
193
                channels=channel_list,
194
                time_stamps=time_stamps,
195
                ms_markers_key=ms_markers_key,
196
                first_event_time_stamp_key=first_event_time_stamp_key,
197
            ),
198
            dtype=test_array.dtype,
199
            shape=(test_array.shape[0], np.nan),
200
        )
201
        for f in files
202
    ]
203
    array_stack = da.concatenate(arrays, axis=1).T
1✔
204

205
    return ddf.from_dask_array(array_stack, columns=column_names)
1✔
206

207

208
def get_datasets_and_aliases(
1✔
209
    h5file: h5py.File,
210
    seach_pattern: str = None,
211
    alias_key: str = "Name",
212
) -> Dict[str, Any]:
213
    """Read datasets and aliases from a provided hdf5 file handle
214

215
    Args:
216
        h5file (h5py.File):
217
            The hdf5 file handle
218
        seach_pattern (str, optional):
219
            Search pattern to select groups. Defaults to include all groups.
220
        alias_key (str, optional):
221
            Attribute key where aliases are stored. Defaults to "Name".
222

223
    Returns:
224
        Dict[str, Any]:
225
            A dict of aliases and groupnames parsed from the file
226
    """
227
    # get group names:
228
    dataset_names = list(h5file)
1✔
229

230
    # Filter the group names
231
    if seach_pattern is None:
1✔
NEW
232
        filtered_dataset_names = dataset_names
×
233
    else:
234
        filtered_dataset_names = [name for name in dataset_names if seach_pattern in name]
1✔
235

236
    alias_dict = {}
1✔
237
    for name in filtered_dataset_names:
1✔
238
        alias_dict[name] = get_attribute(h5file[name], alias_key)
1✔
239

240
    return {
1✔
241
        alias_dict[name]: {"format": "per_electron", "dataset_key": name}
242
        for name in filtered_dataset_names
243
    }
244

245

246
def hdf5_to_array(
1✔
247
    h5file: h5py.File,
248
    channels: Sequence[Dict[str, Any]],
249
    time_stamps=False,
250
    ms_markers_key: str = "msMarkers",
251
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
252
) -> np.ndarray:
253
    """Reads the content of the given groups in an hdf5 file, and returns a
254
    2-dimensional array with the corresponding values.
255

256
    Args:
257
        h5file (h5py.File):
258
            hdf5 file handle to read from
259
        electron_channels (Sequence[Dict[str, any]]):
260
            channel dicts containing group names and types to read.
261
        time_stamps (bool, optional):
262
            Option to calculate time stamps. Defaults to False.
263
        ms_markers_group (str): hdf5 dataset containing timestamp information.
264
            Defaults to "msMarkers".
265
        first_event_time_stamp_key (str): h5 attribute containing the start
266
            timestamp of a file. Defaults to "FirstEventTimeStamp".
267

268
    Returns:
269
        np.ndarray: The 2-dimensional data array containing the values of the groups.
270
    """
271

272
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
273

274
    # determine group length from per_electron column:
275
    nelectrons = 0
1✔
276
    for channel in channels:
1✔
277
        if channel["format"] == "per_electron":
1✔
278
            nelectrons = len(h5file[channel["dataset_key"]])
1✔
279
            break
1✔
280
    if nelectrons == 0:
1✔
NEW
281
        raise ValueError("No 'per_electron' columns defined, or no hits found in file.")
×
282

283
    # Read out groups:
284
    data_list = []
1✔
285
    for channel in channels:
1✔
286
        if channel["format"] == "per_electron":
1✔
287
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
288
        elif channel["format"] == "per_file":
1✔
289
            value = float(get_attribute(h5file, channel["dataset_key"]))
1✔
290
            g_dataset = np.asarray([value] * nelectrons)
1✔
291
        else:
NEW
UNCOV
292
            raise ValueError(
×
293
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
294
            )
295
        if "data_type" in channel.keys():
1✔
NEW
296
            g_dataset = g_dataset.astype(channel["data_type"])
×
297
        else:
298
            g_dataset = g_dataset.astype("float32")
1✔
299
        if len(g_dataset) != nelectrons:
1✔
NEW
UNCOV
300
            raise ValueError(f"Inconsistent entries found for channel {channel['dataset_key']}.")
×
301
        data_list.append(g_dataset)
1✔
302

303
    # calculate time stamps
304
    if time_stamps:
1✔
305
        # create target array for time stamps
306
        time_stamp_data = np.zeros(nelectrons)
1✔
307
        # the ms marker contains a list of events that occurred at full ms intervals.
308
        # It's monotonically increasing, and can contain duplicates
309
        ms_marker = np.asarray(h5file[ms_markers_key])
1✔
310

311
        # try to get start timestamp from "FirstEventTimeStamp" attribute
312
        try:
1✔
313
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
314
            start_time = datetime.datetime.strptime(
1✔
315
                start_time_str,
316
                "%Y-%m-%dT%H:%M:%S.%f%z",
317
            ).timestamp()
UNCOV
318
        except KeyError:
×
319
            # get the start time of the file from its modification date if the key
320
            # does not exist (old files)
UNCOV
321
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
322
            # the modification time points to the time when the file was finished, so we
323
            # need to correct for the time it took to write the file
UNCOV
324
            start_time -= len(ms_marker) / 1000
×
325

326
        # fill in range before 1st marker
327
        time_stamp_data[0 : ms_marker[0]] = start_time
1✔
328
        for i in range(len(ms_marker) - 1):
1✔
329
            # linear interpolation between ms: Disabled, because it takes a lot of
330
            # time, and external signals are anyway not better synchronized than 1 ms
331
            # time_stamp_data[ms_marker[n] : ms_marker[n + 1]] = np.linspace(
332
            #     start_time + n,
333
            #     start_time + n + 1,
334
            #     ms_marker[n + 1] - ms_marker[n],
335
            # )
336
            time_stamp_data[ms_marker[i] : ms_marker[i + 1]] = start_time + (i + 1) / 1000
1✔
337
        # fill any remaining points
338
        time_stamp_data[ms_marker[len(ms_marker) - 1] : len(time_stamp_data)] = (
1✔
339
            start_time + len(ms_marker) / 1000
340
        )
341

342
        data_list.append(time_stamp_data)
1✔
343

344
    return np.asarray(data_list)
1✔
345

346

347
def hdf5_to_timed_array(
1✔
348
    h5file: h5py.File,
349
    channels: Sequence[Dict[str, Any]],
350
    time_stamps=False,
351
    ms_markers_key: str = "msMarkers",
352
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
353
) -> np.ndarray:
354
    """Reads the content of the given groups in an hdf5 file, and returns a
355
    timed version of a 2-dimensional array with the corresponding values.
356

357
    Args:
358
        h5file (h5py.File):
359
            hdf5 file handle to read from
360
        electron_channels (Sequence[Dict[str, any]]):
361
            channel dicts containing group names and types to read.
362
        time_stamps (bool, optional):
363
            Option to calculate time stamps. Defaults to False.
364
        ms_markers_group (str): hdf5 dataset containing timestamp information.
365
            Defaults to "msMarkers".
366
        first_event_time_stamp_key (str): h5 attribute containing the start
367
            timestamp of a file. Defaults to "FirstEventTimeStamp".
368

369
    Returns:
370
        np.ndarray: the array of the values at evently spaced timing obtained from
371
        the ms_markers.
372
    """
373

374
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
375

376
    # Read out groups:
377
    data_list = []
1✔
378
    ms_marker = np.asarray(h5file[ms_markers_key])
1✔
379
    for channel in channels:
1✔
380
        timed_dataset = np.zeros_like(ms_marker)
1✔
381
        if channel["format"] == "per_electron":
1✔
382
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
383
            for i, point in enumerate(ms_marker):
1✔
384
                timed_dataset[i] = g_dataset[int(point) - 1]
1✔
385
        elif channel["format"] == "per_file":
1✔
386
            value = float(get_attribute(h5file, channel["dataset_key"]))
1✔
387
            timed_dataset[:] = value
1✔
388
        else:
NEW
UNCOV
389
            raise ValueError(
×
390
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
391
            )
392
        if "data_type" in channel.keys():
1✔
NEW
UNCOV
393
            timed_dataset = timed_dataset.astype(channel["data_type"])
×
394
        else:
395
            timed_dataset = timed_dataset.astype("float32")
1✔
396

397
        data_list.append(timed_dataset)
1✔
398

399
    # calculate time stamps
400
    if time_stamps:
1✔
401
        # try to get start timestamp from "FirstEventTimeStamp" attribute
402
        try:
1✔
403
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
404
            start_time = datetime.datetime.strptime(
1✔
405
                start_time_str,
406
                "%Y-%m-%dT%H:%M:%S.%f%z",
407
            ).timestamp()
UNCOV
408
        except KeyError:
×
409
            # get the start time of the file from its modification date if the key
410
            # does not exist (old files)
UNCOV
411
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
412
            # the modification time points to the time when the file was finished, so we
413
            # need to correct for the time it took to write the file
UNCOV
414
            start_time -= len(ms_marker) / 1000
×
415

416
        time_stamp_data = start_time + np.arange(len(ms_marker)) / 1000
1✔
417

418
        data_list.append(time_stamp_data)
1✔
419

420
    return np.asarray(data_list)
1✔
421

422

423
def get_attribute(h5group: h5py.Group, attribute: str) -> str:
1✔
424
    """Reads, decodes and returns an attrubute from an hdf5 group
425

426
    Args:
427
        h5group (h5py.Group):
428
            The hdf5 group to read from
429
        attribute (str):
430
            The name of the attribute
431

432
    Returns:
433
        str: The parsed attribute data
434
    """
435
    try:
1✔
436
        content = h5group.attrs[attribute].decode("utf-8")
1✔
437
    except AttributeError:  # No need to decode
1✔
438
        content = h5group.attrs[attribute]
1✔
UNCOV
439
    except KeyError as exc:  # No such attribute
×
UNCOV
440
        raise KeyError(f"Attribute '{attribute}' not found!") from exc
×
441

442
    return content
1✔
443

444

445
def get_count_rate(
1✔
446
    h5file: h5py.File,
447
    ms_markers_key: str = "msMarkers",
448
) -> Tuple[np.ndarray, np.ndarray]:
449
    """Create count rate in the file from the msMarker column.
450

451
    Args:
452
        h5file (h5py.File): The h5file from which to get the count rate.
453
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
454
            are stored. Defaults to "msMarkers".
455

456
    Returns:
457
        Tuple[np.ndarray, np.ndarray]: The count rate in Hz and the seconds into the
458
        scan.
459
    """
460
    ms_markers = np.asarray(h5file[ms_markers_key])
1✔
461
    secs = np.arange(0, len(ms_markers)) / 1000
1✔
462
    msmarker_spline = sint.InterpolatedUnivariateSpline(secs, ms_markers, k=1)
1✔
463
    rate_spline = msmarker_spline.derivative()
1✔
464
    count_rate = rate_spline(secs)
1✔
465

466
    return (count_rate, secs)
1✔
467

468

469
def get_elapsed_time(
1✔
470
    h5file: h5py.File,
471
    ms_markers_key: str = "msMarkers",
472
) -> float:
473
    """Return the elapsed time in the file from the msMarkers wave
474

475
    Args:
476
        h5file (h5py.File): The h5file from which to get the count rate.
477
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
478
            are stored. Defaults to "msMarkers".
479

480
    Return:
481
        float: The acquision time of the file in seconds.
482
    """
483
    secs = h5file[ms_markers_key].len() / 1000
1✔
484

485
    return secs
1✔
486

487

488
def get_archiver_data(
1✔
489
    archiver_url: str,
490
    archiver_channel: str,
491
    ts_from: float,
492
    ts_to: float,
493
) -> Tuple[np.ndarray, np.ndarray]:
494
    """Extract time stamps and corresponding data from and EPICS archiver instance
495

496
    Args:
497
        archiver_url (str): URL of the archiver data extraction interface
498
        archiver_channel (str): EPICS channel to extract data for
499
        ts_from (float): starting time stamp of the range of interest
500
        ts_to (float): ending time stamp of the range of interest
501

502
    Returns:
503
        Tuple[List, List]: The extracted time stamps and corresponding data
504
    """
505
    iso_from = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
506
    iso_to = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
1✔
507
    req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
1✔
508
    with urlopen(req_str) as req:
1✔
509
        data = json.load(req)
×
UNCOV
510
        secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
×
UNCOV
511
        vals = [x["val"] for x in data[0]["data"]]
×
512

UNCOV
513
    return (np.asarray(secs), np.asarray(vals))
×
514

515

516
class MpesLoader(BaseLoader):
1✔
517
    """Mpes implementation of the Loader. Reads from h5 files or folders of the
518
    SPECS Metis 1000 (FHI Berlin)
519

520
    Args:
521
        config (dict, optional): Config dictionary. Defaults to None.
522
        meta_handler (MetaHandler, optional): MetaHandler object. Defaults to None.
523
    """
524

525
    __name__ = "mpes"
1✔
526

527
    supported_file_types = ["h5"]
1✔
528

529
    def __init__(
1✔
530
        self,
531
        config: dict = None,
532
    ):
533
        super().__init__(config=config)
1✔
534

535
        self.read_timestamps = self._config.get("dataframe", {}).get(
1✔
536
            "read_timestamps",
537
            False,
538
        )
539

540
    def read_dataframe(
1✔
541
        self,
542
        files: Union[str, Sequence[str]] = None,
543
        folders: Union[str, Sequence[str]] = None,
544
        runs: Union[str, Sequence[str]] = None,
545
        ftype: str = "h5",
546
        metadata: dict = None,
547
        collect_metadata: bool = False,
548
        time_stamps: bool = False,
549
        **kwds,
550
    ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]:
551
        """Read stored hdf5 files from a list or from folder and returns a dask
552
        dataframe and corresponding metadata.
553

554
        Args:
555
            files (Union[str, Sequence[str]], optional): File path(s) to process.
556
                Defaults to None.
557
            folders (Union[str, Sequence[str]], optional): Path to folder(s) where files
558
                are stored. Path has priority such that if it's specified, the specified
559
                files will be ignored. Defaults to None.
560
            runs (Union[str, Sequence[str]], optional): Run identifier(s). Corresponding
561
                files will be located in the location provided by ``folders``. Takes
562
                precendence over ``files`` and ``folders``. Defaults to None.
563
            ftype (str, optional): File extension to use. If a folder path is given,
564
                all files with the specified extension are read into the dataframe
565
                in the reading order. Defaults to "h5".
566
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
567
                meta data are added to it. Defaults to None.
568
            collect_metadata (bool): Option to collect metadata from files. Requires
569
                a valid config dict. Defaults to False.
570
            time_stamps (bool, optional): Option to create a time_stamp column in
571
                the dataframe from ms-Markers in the files. Defaults to False.
572
            **kwds: Keyword parameters.
573

574
                - **hdf5_groupnames** : List of groupnames to look for in the file.
575
                - **hdf5_aliases**: Dictionary of aliases for the groupnames.
576
                - **time_stamp_alias**: Alias for the timestamp column
577
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
578
                - **first_event_time_stamp_key**: Attribute name containing the start
579
                  timestamp of the file.
580

581
                Additional keywords are passed to ``hdf5_to_dataframe``.
582

583
        Raises:
584
            ValueError: raised if neither files or folder provided.
585
            FileNotFoundError: Raised if a file or folder is not found.
586

587
        Returns:
588
            Tuple[ddf.DataFrame, ddf.DataFrame, dict]: Dask dataframe, timed Dask
589
            dataframe and metadata read from specified files.
590
        """
591
        # if runs is provided, try to locate the respective files relative to the provided folder.
592
        if runs is not None:  # pylint: disable=duplicate-code
1✔
593
            files = []
1✔
594
            if isinstance(runs, (str, int)):
1✔
595
                runs = [runs]
1✔
596
            for run in runs:
1✔
597
                files.extend(
1✔
598
                    self.get_files_from_run_id(run_id=run, folders=folders, extension=ftype),
599
                )
600
            self.runs = list(runs)
1✔
601
            super().read_dataframe(
1✔
602
                files=files,
603
                ftype=ftype,
604
                metadata=metadata,
605
            )
606
        else:
607
            # pylint: disable=duplicate-code
608
            super().read_dataframe(
1✔
609
                files=files,
610
                folders=folders,
611
                runs=runs,
612
                ftype=ftype,
613
                metadata=metadata,
614
            )
615

616
        channels = kwds.pop(
1✔
617
            "channels",
618
            self._config.get("dataframe", {}).get("channels", None),
619
        )
620
        time_stamp_alias = kwds.pop(
1✔
621
            "time_stamp_alias",
622
            self._config.get("dataframe", {}).get(
623
                "time_stamp_alias",
624
                "timeStamps",
625
            ),
626
        )
627
        ms_markers_key = kwds.pop(
1✔
628
            "ms_markers_key",
629
            self._config.get("dataframe", {}).get(
630
                "ms_markers_key",
631
                "msMarkers",
632
            ),
633
        )
634
        first_event_time_stamp_key = kwds.pop(
1✔
635
            "first_event_time_stamp_key",
636
            self._config.get("dataframe", {}).get(
637
                "first_event_time_stamp_key",
638
                "FirstEventTimeStamp",
639
            ),
640
        )
641
        df = hdf5_to_dataframe(
1✔
642
            files=self.files,
643
            channels=channels,
644
            time_stamps=time_stamps,
645
            time_stamp_alias=time_stamp_alias,
646
            ms_markers_key=ms_markers_key,
647
            first_event_time_stamp_key=first_event_time_stamp_key,
648
            **kwds,
649
        )
650
        timed_df = hdf5_to_timed_dataframe(
1✔
651
            files=self.files,
652
            channels=channels,
653
            time_stamps=time_stamps,
654
            time_stamp_alias=time_stamp_alias,
655
            ms_markers_key=ms_markers_key,
656
            first_event_time_stamp_key=first_event_time_stamp_key,
657
            **kwds,
658
        )
659

660
        if collect_metadata:
1✔
661
            metadata = self.gather_metadata(
1✔
662
                files=self.files,
663
                metadata=self.metadata,
664
            )
665
        else:
666
            metadata = self.metadata
1✔
667

668
        return df, timed_df, metadata
1✔
669

670
    def get_files_from_run_id(
1✔
671
        self,
672
        run_id: str,
673
        folders: Union[str, Sequence[str]] = None,
674
        extension: str = "h5",
675
        **kwds,  # noqa: ARG002
676
    ) -> List[str]:
677
        """Locate the files for a given run identifier.
678

679
        Args:
680
            run_id (str): The run identifier to locate.
681
            folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw
682
                data is located. Defaults to config["core"]["base_folder"]
683
            extension (str, optional): The file extension. Defaults to "h5".
684
            kwds: Keyword arguments
685

686
        Return:
687
            List[str]: List of file path strings to the location of run data.
688
        """
689
        if folders is None:
1✔
690
            folders = self._config["core"]["paths"]["data_raw_dir"]
1✔
691

692
        if isinstance(folders, str):
1✔
693
            folders = [folders]
1✔
694

695
        files: List[str] = []
1✔
696
        for folder in folders:
1✔
697
            run_files = natsorted(
1✔
698
                glob.glob(
699
                    folder + "/**/Scan" + str(run_id).zfill(4) + "_*." + extension,
700
                    recursive=True,
701
                ),
702
            )
703
            files.extend(run_files)
1✔
704

705
        # Check if any files are found
706
        if not files:
1✔
UNCOV
707
            raise FileNotFoundError(
×
708
                f"No files found for run {run_id} in directory {str(folders)}",
709
            )
710

711
        # Return the list of found files
712
        return files
1✔
713

714
    def get_start_and_end_time(self) -> Tuple[float, float]:
1✔
715
        """Extract the start and end time stamps from the loaded files
716

717
        Returns:
718
            Tuple[float, float]: A tuple containing the start and end time stamps
719
        """
720
        h5file = h5py.File(self.files[0])
1✔
721
        channels = []
1✔
722
        for channel in self._config["dataframe"]["channels"].values():
1✔
723
            if channel["format"] == "per_electron":
1✔
724
                channels = [channel]
1✔
725
                break
1✔
726
        if not channels:
1✔
NEW
UNCOV
727
            raise ValueError("No valid 'per_electron' channels found.")
×
728
        timestamps = hdf5_to_array(
1✔
729
            h5file,
730
            channels=channels,
731
            time_stamps=True,
732
        )
733
        ts_from = timestamps[-1][1]
1✔
734
        h5file = h5py.File(self.files[-1])
1✔
735
        timestamps = hdf5_to_array(
1✔
736
            h5file,
737
            channels=channels,
738
            time_stamps=True,
739
        )
740
        ts_to = timestamps[-1][-1]
1✔
741
        return (ts_from, ts_to)
1✔
742

743
    def gather_metadata(
1✔
744
        self,
745
        files: Sequence[str],
746
        metadata: dict = None,
747
    ) -> dict:
748
        """Collect meta data from files
749

750
        Args:
751
            files (Sequence[str]): List of files loaded
752
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
753
                meta data are added to it. Defaults to None.
754

755
        Returns:
756
            dict: The completed metadata dictionary.
757
        """
758

759
        if metadata is None:
1✔
760
            metadata = {}
×
761
        print("Gathering metadata from different locations")
1✔
762
        # Read events in with ms time stamps
763
        print("Collecting time stamps...")
1✔
764
        (ts_from, ts_to) = self.get_start_and_end_time()
1✔
765

766
        metadata["timing"] = {
1✔
767
            "acquisition_start": datetime.datetime.utcfromtimestamp(ts_from)
768
            .replace(tzinfo=datetime.timezone.utc)
769
            .isoformat(),
770
            "acquisition_stop": datetime.datetime.utcfromtimestamp(ts_to)
771
            .replace(tzinfo=datetime.timezone.utc)
772
            .isoformat(),
773
            "acquisition_duration": int(ts_to - ts_from),
774
            "collection_time": float(ts_to - ts_from),
775
        }
776

777
        # import meta data from data file
778
        if "file" not in metadata:  # If already present, the value is assumed to be a dictionary
1✔
779
            metadata["file"] = {}
1✔
780

781
        print("Collecting file metadata...")
1✔
782
        with h5py.File(files[0], "r") as h5file:
1✔
783
            for key, value in h5file.attrs.items():
1✔
784
                key = key.replace("VSet", "V")
1✔
785
                metadata["file"][key] = value
1✔
786

787
        metadata["entry_identifier"] = os.path.dirname(
1✔
788
            os.path.realpath(files[0]),
789
        )
790

791
        print("Collecting data from the EPICS archive...")
1✔
792
        # Get metadata from Epics archive if not present already
793
        epics_channels = self._config["metadata"]["epics_pvs"]
1✔
794

795
        start = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
796

797
        channels_missing = set(epics_channels) - set(
1✔
798
            metadata["file"].keys(),
799
        )
800
        for channel in channels_missing:
1✔
801
            try:
1✔
802
                _, vals = get_archiver_data(
1✔
803
                    archiver_url=self._config["metadata"].get("archiver_url"),
804
                    archiver_channel=channel,
805
                    ts_from=ts_from,
806
                    ts_to=ts_to,
807
                )
UNCOV
808
                metadata["file"][f"{channel}"] = np.mean(vals)
×
809

810
            except IndexError:
1✔
UNCOV
811
                metadata["file"][f"{channel}"] = np.nan
×
UNCOV
812
                print(
×
813
                    f"Data for channel {channel} doesn't exist for time {start}",
814
                )
815
            except HTTPError as exc:
1✔
UNCOV
816
                print(
×
817
                    f"Incorrect URL for the archive channel {channel}. "
818
                    "Make sure that the channel name and file start and end times are "
819
                    "correct.",
820
                )
UNCOV
821
                print("Error code: ", exc)
×
822
            except URLError as exc:
1✔
823
                print(
1✔
824
                    f"Cannot access the archive URL for channel {channel}. "
825
                    f"Make sure that you are within the FHI network."
826
                    f"Skipping over channels {channels_missing}.",
827
                )
828
                print("Error code: ", exc)
1✔
829
                break
1✔
830

831
        # Determine the correct aperture_config
832
        stamps = sorted(
1✔
833
            list(self._config["metadata"]["aperture_config"]) + [start],
834
        )
835
        current_index = stamps.index(start)
1✔
836
        timestamp = stamps[current_index - 1]  # pick last configuration before file date
1✔
837

838
        # Aperture metadata
839
        if "instrument" not in metadata.keys():
1✔
UNCOV
840
            metadata["instrument"] = {"analyzer": {}}
×
841
        metadata["instrument"]["analyzer"]["fa_shape"] = "circle"
1✔
842
        metadata["instrument"]["analyzer"]["ca_shape"] = "circle"
1✔
843
        metadata["instrument"]["analyzer"]["fa_size"] = np.nan
1✔
844
        metadata["instrument"]["analyzer"]["ca_size"] = np.nan
1✔
845
        # get field aperture shape and size
846
        if {
1✔
847
            self._config["metadata"]["fa_in_channel"],
848
            self._config["metadata"]["fa_hor_channel"],
849
        }.issubset(set(metadata["file"].keys())):
850
            fa_in = metadata["file"][self._config["metadata"]["fa_in_channel"]]
1✔
851
            fa_hor = metadata["file"][self._config["metadata"]["fa_hor_channel"]]
1✔
852
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
853
                "fa_size"
854
            ].items():
855
                if value[0][0] < fa_in < value[0][1] and value[1][0] < fa_hor < value[1][1]:
1✔
856
                    try:
1✔
857
                        k_float = float(key)
1✔
858
                        metadata["instrument"]["analyzer"]["fa_size"] = k_float
1✔
UNCOV
859
                    except ValueError:  # store string if numeric interpretation fails
×
UNCOV
860
                        metadata["instrument"]["analyzer"]["fa_shape"] = key
×
861
                    break
1✔
862
            else:
UNCOV
863
                print("Field aperture size not found.")
×
864

865
        # get contrast aperture shape and size
866
        if self._config["metadata"]["ca_in_channel"] in metadata["file"]:
1✔
867
            ca_in = metadata["file"][self._config["metadata"]["ca_in_channel"]]
1✔
868
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
869
                "ca_size"
870
            ].items():
871
                if value[0] < ca_in < value[1]:
1✔
872
                    try:
1✔
873
                        k_float = float(key)
1✔
UNCOV
874
                        metadata["instrument"]["analyzer"]["ca_size"] = k_float
×
875
                    except ValueError:  # store string if numeric interpretation fails
1✔
876
                        metadata["instrument"]["analyzer"]["ca_shape"] = key
1✔
877
                    break
1✔
878
            else:
UNCOV
879
                print("Contrast aperture size not found.")
×
880

881
        # Storing the lens modes corresponding to lens voltages.
882
        # Use lens volages present in first lens_mode entry.
883
        lens_list = self._config["metadata"]["lens_mode_config"][
1✔
884
            next(iter(self._config["metadata"]["lens_mode_config"]))
885
        ].keys()
886

887
        lens_volts = np.array(
1✔
888
            [metadata["file"].get(f"KTOF:Lens:{lens}:V", np.NaN) for lens in lens_list],
889
        )
890
        for mode, value in self._config["metadata"]["lens_mode_config"].items():
1✔
891
            lens_volts_config = np.array([value[k] for k in lens_list])
1✔
892
            if np.allclose(
1✔
893
                lens_volts,
894
                lens_volts_config,
895
                rtol=0.005,
896
            ):  # Equal upto 0.5% tolerance
UNCOV
897
                metadata["instrument"]["analyzer"]["lens_mode"] = mode
×
UNCOV
898
                break
×
899
        else:
900
            print(
1✔
901
                "Lens mode for given lens voltages not found. "
902
                "Storing lens mode from the user, if provided.",
903
            )
904

905
        # Determining projection from the lens mode
906
        try:
1✔
907
            lens_mode = metadata["instrument"]["analyzer"]["lens_mode"]
1✔
UNCOV
908
            if "spatial" in lens_mode.split("_")[1]:
×
UNCOV
909
                metadata["instrument"]["analyzer"]["projection"] = "real"
×
UNCOV
910
                metadata["instrument"]["analyzer"]["scheme"] = "momentum dispersive"
×
911
            else:
UNCOV
912
                metadata["instrument"]["analyzer"]["projection"] = "reciprocal"
×
UNCOV
913
                metadata["instrument"]["analyzer"]["scheme"] = "spatial dispersive"
×
914
        except IndexError:
1✔
UNCOV
915
            print(
×
916
                "Lens mode must have the form, '6kV_kmodem4.0_20VTOF_v3.sav'. "
917
                "Can't determine projection. "
918
                "Storing projection from the user, if provided.",
919
            )
920
        except KeyError:
1✔
921
            print(
1✔
922
                "Lens mode not found. Can't determine projection. "
923
                "Storing projection from the user, if provided.",
924
            )
925

926
        return metadata
1✔
927

928
    def get_count_rate(
1✔
929
        self,
930
        fids: Sequence[int] = None,
931
        **kwds,
932
    ) -> Tuple[np.ndarray, np.ndarray]:
933
        """Create count rate from the msMarker column for the files specified in
934
        ``fids``.
935

936
        Args:
937
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
938
                include. Defaults to list of all file ids.
939
            kwds: Keyword arguments:
940

941
                - **ms_markers_key**: HDF5 path of the ms-markers
942

943
        Returns:
944
            Tuple[np.ndarray, np.ndarray]: Arrays containing countrate and seconds
945
            into the scan.
946
        """
947
        if fids is None:
1✔
948
            fids = range(0, len(self.files))
1✔
949

950
        ms_markers_key = kwds.pop(
1✔
951
            "ms_markers_key",
952
            self._config.get("dataframe", {}).get(
953
                "ms_markers_key",
954
                "msMarkers",
955
            ),
956
        )
957

958
        secs_list = []
1✔
959
        count_rate_list = []
1✔
960
        accumulated_time = 0
1✔
961
        for fid in fids:
1✔
962
            count_rate_, secs_ = get_count_rate(
1✔
963
                h5py.File(self.files[fid]),
964
                ms_markers_key=ms_markers_key,
965
            )
966
            secs_list.append((accumulated_time + secs_).T)
1✔
967
            count_rate_list.append(count_rate_.T)
1✔
968
            accumulated_time += secs_[-1]
1✔
969

970
        count_rate = np.concatenate(count_rate_list)
1✔
971
        secs = np.concatenate(secs_list)
1✔
972

973
        return count_rate, secs
1✔
974

975
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
1✔
976
        """Return the elapsed time in the files specified in ``fids`` from
977
        the msMarkers column.
978

979
        Args:
980
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
981
                include. Defaults to list of all file ids.
982
            kwds: Keyword arguments:
983

984
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
985

986
        Return:
987
            float: The elapsed time in the files in seconds.
988
        """
989
        if fids is None:
1✔
990
            fids = range(0, len(self.files))
1✔
991

992
        ms_markers_key = kwds.pop(
1✔
993
            "ms_markers_key",
994
            self._config.get("dataframe", {}).get(
995
                "ms_markers_key",
996
                "msMarkers",
997
            ),
998
        )
999

1000
        secs = 0.0
1✔
1001
        for fid in fids:
1✔
1002
            secs += get_elapsed_time(
1✔
1003
                h5py.File(self.files[fid]),
1004
                ms_markers_key=ms_markers_key,
1005
            )
1006

1007
        return secs
1✔
1008

1009

1010
LOADER = MpesLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc