• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9636169386

23 Jun 2024 08:18PM UTC coverage: 91.894% (+0.04%) from 91.857%
9636169386

Pull #411

github

rettigl
reset flash benchmark target
Pull Request #411: Energy calibration bias shift

104 of 116 new or added lines in 4 files covered. (89.66%)

125 existing lines in 3 files now uncovered.

6462 of 7032 relevant lines covered (91.89%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.69
/sed/loader/mpes/loader.py
1
"""
2
module sed.loader.mpes, code for loading hdf5 files delayed into a dask dataframe.
3
Mostly ported from https://github.com/mpes-kit/mpes.
4
@author: L. Rettig
5
"""
6
from __future__ import annotations
1✔
7

8
import datetime
1✔
9
import glob
1✔
10
import json
1✔
11
import os
1✔
12
from collections.abc import Sequence
1✔
13
from typing import Any
1✔
14
from urllib.error import HTTPError
1✔
15
from urllib.error import URLError
1✔
16
from urllib.request import urlopen
1✔
17

18
import dask
1✔
19
import dask.array as da
1✔
20
import dask.dataframe as ddf
1✔
21
import h5py
1✔
22
import numpy as np
1✔
23
import scipy.interpolate as sint
1✔
24
from natsort import natsorted
1✔
25

26
from sed.loader.base.loader import BaseLoader
1✔
27

28

29
def hdf5_to_dataframe(
1✔
30
    files: Sequence[str],
31
    channels: dict[str, Any] = None,
32
    time_stamps: bool = False,
33
    time_stamp_alias: str = "timeStamps",
34
    ms_markers_key: str = "msMarkers",
35
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
36
    **kwds,
37
) -> ddf.DataFrame:
38
    """Function to read a selection of hdf5-files, and generate a delayed dask
39
    dataframe from provided groups in the files. Optionally, aliases can be defined.
40

41
    Args:
42
        files (List[str]): A list of the file paths to load.
43
        channels (dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
44
            should contain the keys "format" and "dataset_key". Defaults to load all groups
45
            containing "Stream", and to read the attribute "Name" from each group.
46
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
47
            False.
48
        time_stamp_alias (str): Alias name for the timestamp column.
49
            Defaults to "timeStamps".
50
        ms_markers_key (str): hdf5 path containing timestamp information.
51
            Defaults to "msMarkers".
52
        first_event_time_stamp_key (str): h5 attribute containing the start
53
            timestamp of a file. Defaults to "FirstEventTimeStamp".
54

55
    Returns:
56
        ddf.DataFrame: The delayed Dask DataFrame
57
    """
58
    # Read a file to parse the file structure
59
    test_fid = kwds.pop("test_fid", 0)
1✔
60
    test_proc = h5py.File(files[test_fid])
1✔
61

62
    if channels is None:
1✔
63
        channels = get_datasets_and_aliases(
1✔
64
            h5file=test_proc,
65
            seach_pattern="Stream",
66
        )
67

68
    electron_channels = []
1✔
69
    column_names = []
1✔
70

71
    for name, channel in channels.items():
1✔
72
        if channel["format"] == "per_electron":
1✔
73
            if channel["dataset_key"] in test_proc:
1✔
74
                electron_channels.append(channel)
1✔
75
                column_names.append(name)
1✔
76
            else:
NEW
77
                print(
×
78
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
79
                    "Skipping the channel.",
80
                )
81

82
    if time_stamps:
1✔
83
        column_names.append(time_stamp_alias)
1✔
84

85
    test_array = hdf5_to_array(
1✔
86
        h5file=test_proc,
87
        channels=electron_channels,
88
        time_stamps=time_stamps,
89
        ms_markers_key=ms_markers_key,
90
        first_event_time_stamp_key=first_event_time_stamp_key,
91
    )
92

93
    # Delay-read all files
94
    arrays = [
1✔
95
        da.from_delayed(
96
            dask.delayed(hdf5_to_array)(
97
                h5file=h5py.File(f),
98
                channels=electron_channels,
99
                time_stamps=time_stamps,
100
                ms_markers_key=ms_markers_key,
101
                first_event_time_stamp_key=first_event_time_stamp_key,
102
            ),
103
            dtype=test_array.dtype,
104
            shape=(test_array.shape[0], np.nan),
105
        )
106
        for f in files
107
    ]
108
    array_stack = da.concatenate(arrays, axis=1).T
1✔
109

110
    dataframe = ddf.from_dask_array(array_stack, columns=column_names)
1✔
111

112
    for name, channel in channels.items():
1✔
113
        if channel["format"] == "per_file":
1✔
114
            if channel["dataset_key"] in test_proc.attrs:
1✔
115
                values = [float(get_attribute(h5py.File(f), channel["dataset_key"])) for f in files]
1✔
116
                delayeds = [
1✔
117
                    add_value(partition, name, value)
118
                    for partition, value in zip(dataframe.partitions, values)
119
                ]
120
                dataframe = ddf.from_delayed(delayeds)
1✔
121

122
            else:
NEW
123
                print(
×
124
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
125
                    "Skipping the channel.",
126
                )
127

128
    return dataframe
1✔
129

130

131
def hdf5_to_timed_dataframe(
1✔
132
    files: Sequence[str],
133
    channels: dict[str, Any] = None,
134
    time_stamps: bool = False,
135
    time_stamp_alias: str = "timeStamps",
136
    ms_markers_key: str = "msMarkers",
137
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
138
    **kwds,
139
) -> ddf.DataFrame:
140
    """Function to read a selection of hdf5-files, and generate a delayed dask
141
    dataframe from provided groups in the files. Optionally, aliases can be defined.
142
    Returns a dataframe for evenly spaced time intervals.
143

144
    Args:
145
        files (List[str]): A list of the file paths to load.
146
        channels (dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
147
            should contain the keys "format" and "groupName". Defaults to load all groups
148
            containing "Stream", and to read the attribute "Name" from each group.
149
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
150
            False.
151
        time_stamp_alias (str): Alias name for the timestamp column.
152
            Defaults to "timeStamps".
153
        ms_markers_key (str): hdf5 dataset containing timestamp information.
154
            Defaults to "msMarkers".
155
        first_event_time_stamp_key (str): h5 attribute containing the start
156
            timestamp of a file. Defaults to "FirstEventTimeStamp".
157

158
    Returns:
159
        ddf.DataFrame: The delayed Dask DataFrame
160
    """
161
    # Read a file to parse the file structure
162
    test_fid = kwds.pop("test_fid", 0)
1✔
163
    test_proc = h5py.File(files[test_fid])
1✔
164

165
    if channels is None:
1✔
166
        channels = get_datasets_and_aliases(
1✔
167
            h5file=test_proc,
168
            seach_pattern="Stream",
169
        )
170

171
    electron_channels = []
1✔
172
    column_names = []
1✔
173

174
    for name, channel in channels.items():
1✔
175
        if channel["format"] == "per_electron":
1✔
176
            if channel["dataset_key"] in test_proc:
1✔
177
                electron_channels.append(channel)
1✔
178
                column_names.append(name)
1✔
179
            else:
NEW
180
                print(
×
181
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
182
                    "Skipping the channel.",
183
                )
184

185
    if time_stamps:
1✔
186
        column_names.append(time_stamp_alias)
1✔
187

188
    test_array = hdf5_to_timed_array(
1✔
189
        h5file=test_proc,
190
        channels=electron_channels,
191
        time_stamps=time_stamps,
192
        ms_markers_key=ms_markers_key,
193
        first_event_time_stamp_key=first_event_time_stamp_key,
194
    )
195

196
    # Delay-read all files
197
    arrays = [
1✔
198
        da.from_delayed(
199
            dask.delayed(hdf5_to_timed_array)(
200
                h5file=h5py.File(f),
201
                channels=electron_channels,
202
                time_stamps=time_stamps,
203
                ms_markers_key=ms_markers_key,
204
                first_event_time_stamp_key=first_event_time_stamp_key,
205
            ),
206
            dtype=test_array.dtype,
207
            shape=(test_array.shape[0], np.nan),
208
        )
209
        for f in files
210
    ]
211
    array_stack = da.concatenate(arrays, axis=1).T
1✔
212

213
    dataframe = ddf.from_dask_array(array_stack, columns=column_names)
1✔
214

215
    for name, channel in channels.items():
1✔
216
        if channel["format"] == "per_file":
1✔
217
            if channel["dataset_key"] in test_proc.attrs:
1✔
218
                values = [float(get_attribute(h5py.File(f), channel["dataset_key"])) for f in files]
1✔
219
                delayeds = [
1✔
220
                    add_value(partition, name, value)
221
                    for partition, value in zip(dataframe.partitions, values)
222
                ]
223
                dataframe = ddf.from_delayed(delayeds)
1✔
224

225
            else:
NEW
UNCOV
226
                print(
×
227
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
228
                    "Skipping the channel.",
229
                )
230

231
    return dataframe
1✔
232

233

234
@dask.delayed
1✔
235
def add_value(partition: ddf.DataFrame, name: str, value: float) -> ddf.DataFrame:
1✔
236
    """Dask delayed helper function to add a value to each dataframe partition
237

238
    Args:
239
        partition (ddf.DataFrame): Dask dataframe partition
240
        name (str): Name of the column to add
241
        value (float): value to add to this partition
242

243
    Returns:
244
        ddf.DataFrame: Dataframe partition with added column
245
    """
246
    partition[name] = value
1✔
247
    return partition
1✔
248

249

250
def get_datasets_and_aliases(
1✔
251
    h5file: h5py.File,
252
    seach_pattern: str = None,
253
    alias_key: str = "Name",
254
) -> dict[str, Any]:
255
    """Read datasets and aliases from a provided hdf5 file handle
256

257
    Args:
258
        h5file (h5py.File):
259
            The hdf5 file handle
260
        seach_pattern (str, optional):
261
            Search pattern to select groups. Defaults to include all groups.
262
        alias_key (str, optional):
263
            Attribute key where aliases are stored. Defaults to "Name".
264

265
    Returns:
266
        dict[str, Any]:
267
        A dict of aliases and groupnames parsed from the file
268
    """
269
    # get group names:
270
    dataset_names = list(h5file)
1✔
271

272
    # Filter the group names
273
    if seach_pattern is None:
1✔
NEW
UNCOV
274
        filtered_dataset_names = dataset_names
×
275
    else:
276
        filtered_dataset_names = [name for name in dataset_names if seach_pattern in name]
1✔
277

278
    alias_dict = {}
1✔
279
    for name in filtered_dataset_names:
1✔
280
        alias_dict[name] = get_attribute(h5file[name], alias_key)
1✔
281

282
    return {
1✔
283
        alias_dict[name]: {"format": "per_electron", "dataset_key": name}
284
        for name in filtered_dataset_names
285
    }
286

287

288
def hdf5_to_array(
1✔
289
    h5file: h5py.File,
290
    channels: Sequence[dict[str, Any]],
291
    time_stamps=False,
292
    ms_markers_key: str = "msMarkers",
293
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
294
) -> np.ndarray:
295
    """Reads the content of the given groups in an hdf5 file, and returns a
296
    2-dimensional array with the corresponding values.
297

298
    Args:
299
        h5file (h5py.File):
300
            hdf5 file handle to read from
301
        channels (Sequence[dict[str, any]]):
302
            channel dicts containing group names and types to read.
303
        time_stamps (bool, optional):
304
            Option to calculate time stamps. Defaults to False.
305
        ms_markers_group (str): hdf5 dataset containing timestamp information.
306
            Defaults to "msMarkers".
307
        first_event_time_stamp_key (str): h5 attribute containing the start
308
            timestamp of a file. Defaults to "FirstEventTimeStamp".
309

310
    Returns:
311
        np.ndarray: The 2-dimensional data array containing the values of the groups.
312
    """
313

314
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
315
    # Read out groups:
316
    data_list = []
1✔
317
    for channel in channels:
1✔
318
        if channel["format"] == "per_electron":
1✔
319
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
320
        else:
NEW
UNCOV
321
            raise ValueError(
×
322
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
323
            )
324
        if "dtype" in channel.keys():
1✔
NEW
UNCOV
325
            g_dataset = g_dataset.astype(channel["dtype"])
×
326
        else:
327
            g_dataset = g_dataset.astype("float32")
1✔
328
        data_list.append(g_dataset)
1✔
329

330
    # calculate time stamps
331
    if time_stamps:
1✔
332
        # create target array for time stamps
333
        time_stamp_data = np.zeros(len(data_list[0]))
1✔
334
        # the ms marker contains a list of events that occurred at full ms intervals.
335
        # It's monotonically increasing, and can contain duplicates
336
        ms_marker = np.asarray(h5file[ms_markers_key])
1✔
337

338
        # try to get start timestamp from "FirstEventTimeStamp" attribute
339
        try:
1✔
340
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
341
            start_time = datetime.datetime.strptime(
1✔
342
                start_time_str,
343
                "%Y-%m-%dT%H:%M:%S.%f%z",
344
            ).timestamp()
UNCOV
345
        except KeyError:
×
346
            # get the start time of the file from its modification date if the key
347
            # does not exist (old files)
UNCOV
348
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
349
            # the modification time points to the time when the file was finished, so we
350
            # need to correct for the time it took to write the file
UNCOV
351
            start_time -= len(ms_marker) / 1000
×
352

353
        # fill in range before 1st marker
354
        time_stamp_data[0 : ms_marker[0]] = start_time
1✔
355
        for i in range(len(ms_marker) - 1):
1✔
356
            # linear interpolation between ms: Disabled, because it takes a lot of
357
            # time, and external signals are anyway not better synchronized than 1 ms
358
            # time_stamp_data[ms_marker[n] : ms_marker[n + 1]] = np.linspace(
359
            #     start_time + n,
360
            #     start_time + n + 1,
361
            #     ms_marker[n + 1] - ms_marker[n],
362
            # )
363
            time_stamp_data[ms_marker[i] : ms_marker[i + 1]] = start_time + (i + 1) / 1000
1✔
364
        # fill any remaining points
365
        time_stamp_data[ms_marker[len(ms_marker) - 1] : len(time_stamp_data)] = (
1✔
366
            start_time + len(ms_marker) / 1000
367
        )
368

369
        data_list.append(time_stamp_data)
1✔
370

371
    return np.asarray(data_list)
1✔
372

373

374
def hdf5_to_timed_array(
1✔
375
    h5file: h5py.File,
376
    channels: Sequence[dict[str, Any]],
377
    time_stamps=False,
378
    ms_markers_key: str = "msMarkers",
379
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
380
) -> np.ndarray:
381
    """Reads the content of the given groups in an hdf5 file, and returns a
382
    timed version of a 2-dimensional array with the corresponding values.
383

384
    Args:
385
        h5file (h5py.File):
386
            hdf5 file handle to read from
387
        channels (Sequence[dict[str, any]]):
388
            channel dicts containing group names and types to read.
389
        time_stamps (bool, optional):
390
            Option to calculate time stamps. Defaults to False.
391
        ms_markers_group (str): hdf5 dataset containing timestamp information.
392
            Defaults to "msMarkers".
393
        first_event_time_stamp_key (str): h5 attribute containing the start
394
            timestamp of a file. Defaults to "FirstEventTimeStamp".
395

396
    Returns:
397
        np.ndarray: the array of the values at evently spaced timing obtained from
398
        the ms_markers.
399
    """
400

401
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
402

403
    # Read out groups:
404
    data_list = []
1✔
405
    ms_marker = np.asarray(h5file[ms_markers_key])
1✔
406
    for channel in channels:
1✔
407
        timed_dataset = np.zeros_like(ms_marker)
1✔
408
        if channel["format"] == "per_electron":
1✔
409
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
410
            for i, point in enumerate(ms_marker):
1✔
411
                timed_dataset[i] = g_dataset[int(point) - 1]
1✔
412
        else:
NEW
UNCOV
413
            raise ValueError(
×
414
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
415
            )
416
        if "dtype" in channel.keys():
1✔
NEW
UNCOV
417
            timed_dataset = timed_dataset.astype(channel["dtype"])
×
418
        else:
419
            timed_dataset = timed_dataset.astype("float32")
1✔
420

421
        data_list.append(timed_dataset)
1✔
422

423
    # calculate time stamps
424
    if time_stamps:
1✔
425
        # try to get start timestamp from "FirstEventTimeStamp" attribute
426
        try:
1✔
427
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
428
            start_time = datetime.datetime.strptime(
1✔
429
                start_time_str,
430
                "%Y-%m-%dT%H:%M:%S.%f%z",
431
            ).timestamp()
432
        except KeyError:
×
433
            # get the start time of the file from its modification date if the key
434
            # does not exist (old files)
UNCOV
435
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
436
            # the modification time points to the time when the file was finished, so we
437
            # need to correct for the time it took to write the file
UNCOV
438
            start_time -= len(ms_marker) / 1000
×
439

440
        time_stamp_data = start_time + np.arange(len(ms_marker)) / 1000
1✔
441

442
        data_list.append(time_stamp_data)
1✔
443

444
    return np.asarray(data_list)
1✔
445

446

447
def get_attribute(h5group: h5py.Group, attribute: str) -> str:
1✔
448
    """Reads, decodes and returns an attrubute from an hdf5 group
449

450
    Args:
451
        h5group (h5py.Group):
452
            The hdf5 group to read from
453
        attribute (str):
454
            The name of the attribute
455

456
    Returns:
457
        str: The parsed attribute data
458
    """
459
    try:
1✔
460
        content = h5group.attrs[attribute].decode("utf-8")
1✔
461
    except AttributeError:  # No need to decode
1✔
462
        content = h5group.attrs[attribute]
1✔
UNCOV
463
    except KeyError as exc:  # No such attribute
×
UNCOV
464
        raise KeyError(f"Attribute '{attribute}' not found!") from exc
×
465

466
    return content
1✔
467

468

469
def get_count_rate(
1✔
470
    h5file: h5py.File,
471
    ms_markers_key: str = "msMarkers",
472
) -> tuple[np.ndarray, np.ndarray]:
473
    """Create count rate in the file from the msMarker column.
474

475
    Args:
476
        h5file (h5py.File): The h5file from which to get the count rate.
477
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
478
            are stored. Defaults to "msMarkers".
479

480
    Returns:
481
        tuple[np.ndarray, np.ndarray]: The count rate in Hz and the seconds into the
482
        scan.
483
    """
484
    ms_markers = np.asarray(h5file[ms_markers_key])
1✔
485
    secs = np.arange(0, len(ms_markers)) / 1000
1✔
486
    msmarker_spline = sint.InterpolatedUnivariateSpline(secs, ms_markers, k=1)
1✔
487
    rate_spline = msmarker_spline.derivative()
1✔
488
    count_rate = rate_spline(secs)
1✔
489

490
    return (count_rate, secs)
1✔
491

492

493
def get_elapsed_time(
1✔
494
    h5file: h5py.File,
495
    ms_markers_key: str = "msMarkers",
496
) -> float:
497
    """Return the elapsed time in the file from the msMarkers wave
498

499
    Args:
500
        h5file (h5py.File): The h5file from which to get the count rate.
501
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
502
            are stored. Defaults to "msMarkers".
503

504
    Return:
505
        float: The acquision time of the file in seconds.
506
    """
507
    secs = h5file[ms_markers_key].len() / 1000
1✔
508

509
    return secs
1✔
510

511

512
def get_archiver_data(
1✔
513
    archiver_url: str,
514
    archiver_channel: str,
515
    ts_from: float,
516
    ts_to: float,
517
) -> tuple[np.ndarray, np.ndarray]:
518
    """Extract time stamps and corresponding data from and EPICS archiver instance
519

520
    Args:
521
        archiver_url (str): URL of the archiver data extraction interface
522
        archiver_channel (str): EPICS channel to extract data for
523
        ts_from (float): starting time stamp of the range of interest
524
        ts_to (float): ending time stamp of the range of interest
525

526
    Returns:
527
        tuple[np.ndarray, np.ndarray]: The extracted time stamps and corresponding data
528
    """
529
    iso_from = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
530
    iso_to = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
1✔
531
    req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
1✔
532
    with urlopen(req_str) as req:
1✔
UNCOV
533
        data = json.load(req)
×
UNCOV
534
        secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
×
UNCOV
535
        vals = [x["val"] for x in data[0]["data"]]
×
536

UNCOV
537
    return (np.asarray(secs), np.asarray(vals))
×
538

539

540
class MpesLoader(BaseLoader):
1✔
541
    """Mpes implementation of the Loader. Reads from h5 files or folders of the
542
    SPECS Metis 1000 (FHI Berlin)
543

544
    Args:
545
        config (dict, optional): Config dictionary. Defaults to None.
546
        meta_handler (MetaHandler, optional): MetaHandler object. Defaults to None.
547
    """
548

549
    __name__ = "mpes"
1✔
550

551
    supported_file_types = ["h5"]
1✔
552

553
    def __init__(
1✔
554
        self,
555
        config: dict = None,
556
    ):
557
        super().__init__(config=config)
1✔
558

559
        self.read_timestamps = self._config.get("dataframe", {}).get(
1✔
560
            "read_timestamps",
561
            False,
562
        )
563

564
    def read_dataframe(
1✔
565
        self,
566
        files: str | Sequence[str] = None,
567
        folders: str | Sequence[str] = None,
568
        runs: str | Sequence[str] = None,
569
        ftype: str = "h5",
570
        metadata: dict = None,
571
        collect_metadata: bool = False,
572
        time_stamps: bool = False,
573
        **kwds,
574
    ) -> tuple[ddf.DataFrame, ddf.DataFrame, dict]:
575
        """Read stored hdf5 files from a list or from folder and returns a dask
576
        dataframe and corresponding metadata.
577

578
        Args:
579
            files (str | Sequence[str], optional): File path(s) to process.
580
                Defaults to None.
581
            folders (str | Sequence[str], optional): Path to folder(s) where files
582
                are stored. Path has priority such that if it's specified, the specified
583
                files will be ignored. Defaults to None.
584
            runs (str | Sequence[str], optional): Run identifier(s). Corresponding
585
                files will be located in the location provided by ``folders``. Takes
586
                precendence over ``files`` and ``folders``. Defaults to None.
587
            ftype (str, optional): File extension to use. If a folder path is given,
588
                all files with the specified extension are read into the dataframe
589
                in the reading order. Defaults to "h5".
590
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
591
                meta data are added to it. Defaults to None.
592
            collect_metadata (bool): Option to collect metadata from files. Requires
593
                a valid config dict. Defaults to False.
594
            time_stamps (bool, optional): Option to create a time_stamp column in
595
                the dataframe from ms-Markers in the files. Defaults to False.
596
            **kwds: Keyword parameters.
597

598
                - **hdf5_groupnames** : List of groupnames to look for in the file.
599
                - **hdf5_aliases**: Dictionary of aliases for the groupnames.
600
                - **time_stamp_alias**: Alias for the timestamp column
601
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
602
                - **first_event_time_stamp_key**: Attribute name containing the start
603
                  timestamp of the file.
604

605
                Additional keywords are passed to ``hdf5_to_dataframe``.
606

607
        Raises:
608
            ValueError: raised if neither files or folder provided.
609
            FileNotFoundError: Raised if a file or folder is not found.
610

611
        Returns:
612
            tuple[ddf.DataFrame, ddf.DataFrame, dict]: Dask dataframe, timed Dask
613
            dataframe and metadata read from specified files.
614
        """
615
        # if runs is provided, try to locate the respective files relative to the provided folder.
616
        if runs is not None:  # pylint: disable=duplicate-code
1✔
617
            files = []
1✔
618
            if isinstance(runs, (str, int)):
1✔
619
                runs = [runs]
1✔
620
            for run in runs:
1✔
621
                files.extend(
1✔
622
                    self.get_files_from_run_id(run_id=run, folders=folders, extension=ftype),
623
                )
624
            self.runs = list(runs)
1✔
625
            super().read_dataframe(
1✔
626
                files=files,
627
                ftype=ftype,
628
                metadata=metadata,
629
            )
630
        else:
631
            # pylint: disable=duplicate-code
632
            super().read_dataframe(
1✔
633
                files=files,
634
                folders=folders,
635
                runs=runs,
636
                ftype=ftype,
637
                metadata=metadata,
638
            )
639

640
        channels = kwds.pop(
1✔
641
            "channels",
642
            self._config.get("dataframe", {}).get("channels", None),
643
        )
644
        time_stamp_alias = kwds.pop(
1✔
645
            "time_stamp_alias",
646
            self._config.get("dataframe", {}).get(
647
                "time_stamp_alias",
648
                "timeStamps",
649
            ),
650
        )
651
        ms_markers_key = kwds.pop(
1✔
652
            "ms_markers_key",
653
            self._config.get("dataframe", {}).get(
654
                "ms_markers_key",
655
                "msMarkers",
656
            ),
657
        )
658
        first_event_time_stamp_key = kwds.pop(
1✔
659
            "first_event_time_stamp_key",
660
            self._config.get("dataframe", {}).get(
661
                "first_event_time_stamp_key",
662
                "FirstEventTimeStamp",
663
            ),
664
        )
665
        df = hdf5_to_dataframe(
1✔
666
            files=self.files,
667
            channels=channels,
668
            time_stamps=time_stamps,
669
            time_stamp_alias=time_stamp_alias,
670
            ms_markers_key=ms_markers_key,
671
            first_event_time_stamp_key=first_event_time_stamp_key,
672
            **kwds,
673
        )
674
        timed_df = hdf5_to_timed_dataframe(
1✔
675
            files=self.files,
676
            channels=channels,
677
            time_stamps=time_stamps,
678
            time_stamp_alias=time_stamp_alias,
679
            ms_markers_key=ms_markers_key,
680
            first_event_time_stamp_key=first_event_time_stamp_key,
681
            **kwds,
682
        )
683

684
        if collect_metadata:
1✔
685
            metadata = self.gather_metadata(
1✔
686
                files=self.files,
687
                metadata=self.metadata,
688
            )
689
        else:
690
            metadata = self.metadata
1✔
691

692
        return df, timed_df, metadata
1✔
693

694
    def get_files_from_run_id(
1✔
695
        self,
696
        run_id: str,
697
        folders: str | Sequence[str] = None,
698
        extension: str = "h5",
699
        **kwds,  # noqa: ARG002
700
    ) -> list[str]:
701
        """Locate the files for a given run identifier.
702

703
        Args:
704
            run_id (str): The run identifier to locate.
705
            folders (str | Sequence[str], optional): The directory(ies) where the raw
706
                data is located. Defaults to config["core"]["base_folder"]
707
            extension (str, optional): The file extension. Defaults to "h5".
708
            kwds: Keyword arguments
709

710
        Return:
711
            list[str]: List of file path strings to the location of run data.
712
        """
713
        if folders is None:
1✔
714
            folders = self._config["core"]["paths"]["data_raw_dir"]
1✔
715

716
        if isinstance(folders, str):
1✔
717
            folders = [folders]
1✔
718

719
        files: list[str] = []
1✔
720
        for folder in folders:
1✔
721
            run_files = natsorted(
1✔
722
                glob.glob(
723
                    folder + "/**/Scan" + str(run_id).zfill(4) + "_*." + extension,
724
                    recursive=True,
725
                ),
726
            )
727
            files.extend(run_files)
1✔
728

729
        # Check if any files are found
730
        if not files:
1✔
UNCOV
731
            raise FileNotFoundError(
×
732
                f"No files found for run {run_id} in directory {str(folders)}",
733
            )
734

735
        # Return the list of found files
736
        return files
1✔
737

738
    def get_start_and_end_time(self) -> tuple[float, float]:
1✔
739
        """Extract the start and end time stamps from the loaded files
740

741
        Returns:
742
            tuple[float, float]: A tuple containing the start and end time stamps
743
        """
744
        h5file = h5py.File(self.files[0])
1✔
745
        channels = []
1✔
746
        for channel in self._config["dataframe"]["channels"].values():
1✔
747
            if channel["format"] == "per_electron":
1✔
748
                channels = [channel]
1✔
749
                break
1✔
750
        if not channels:
1✔
NEW
UNCOV
751
            raise ValueError("No valid 'per_electron' channels found.")
×
752
        timestamps = hdf5_to_array(
1✔
753
            h5file,
754
            channels=channels,
755
            time_stamps=True,
756
        )
757
        ts_from = timestamps[-1][1]
1✔
758
        h5file = h5py.File(self.files[-1])
1✔
759
        timestamps = hdf5_to_array(
1✔
760
            h5file,
761
            channels=channels,
762
            time_stamps=True,
763
        )
764
        ts_to = timestamps[-1][-1]
1✔
765
        return (ts_from, ts_to)
1✔
766

767
    def gather_metadata(
1✔
768
        self,
769
        files: Sequence[str],
770
        metadata: dict = None,
771
    ) -> dict:
772
        """Collect meta data from files
773

774
        Args:
775
            files (Sequence[str]): List of files loaded
776
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
777
                meta data are added to it. Defaults to None.
778

779
        Returns:
780
            dict: The completed metadata dictionary.
781
        """
782

783
        if metadata is None:
1✔
UNCOV
784
            metadata = {}
×
785
        print("Gathering metadata from different locations")
1✔
786
        # Read events in with ms time stamps
787
        print("Collecting time stamps...")
1✔
788
        (ts_from, ts_to) = self.get_start_and_end_time()
1✔
789

790
        metadata["timing"] = {
1✔
791
            "acquisition_start": datetime.datetime.utcfromtimestamp(ts_from)
792
            .replace(tzinfo=datetime.timezone.utc)
793
            .isoformat(),
794
            "acquisition_stop": datetime.datetime.utcfromtimestamp(ts_to)
795
            .replace(tzinfo=datetime.timezone.utc)
796
            .isoformat(),
797
            "acquisition_duration": int(ts_to - ts_from),
798
            "collection_time": float(ts_to - ts_from),
799
        }
800

801
        # import meta data from data file
802
        if "file" not in metadata:  # If already present, the value is assumed to be a dictionary
1✔
803
            metadata["file"] = {}
1✔
804

805
        print("Collecting file metadata...")
1✔
806
        with h5py.File(files[0], "r") as h5file:
1✔
807
            for key, value in h5file.attrs.items():
1✔
808
                key = key.replace("VSet", "V")
1✔
809
                metadata["file"][key] = value
1✔
810

811
        metadata["entry_identifier"] = os.path.dirname(
1✔
812
            os.path.realpath(files[0]),
813
        )
814

815
        print("Collecting data from the EPICS archive...")
1✔
816
        # Get metadata from Epics archive if not present already
817
        epics_channels = self._config["metadata"]["epics_pvs"]
1✔
818

819
        start = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
820

821
        channels_missing = set(epics_channels) - set(
1✔
822
            metadata["file"].keys(),
823
        )
824
        for channel in channels_missing:
1✔
825
            try:
1✔
826
                _, vals = get_archiver_data(
1✔
827
                    archiver_url=self._config["metadata"].get("archiver_url"),
828
                    archiver_channel=channel,
829
                    ts_from=ts_from,
830
                    ts_to=ts_to,
831
                )
UNCOV
832
                metadata["file"][f"{channel}"] = np.mean(vals)
×
833

834
            except IndexError:
1✔
UNCOV
835
                metadata["file"][f"{channel}"] = np.nan
×
UNCOV
836
                print(
×
837
                    f"Data for channel {channel} doesn't exist for time {start}",
838
                )
839
            except HTTPError as exc:
1✔
UNCOV
840
                print(
×
841
                    f"Incorrect URL for the archive channel {channel}. "
842
                    "Make sure that the channel name and file start and end times are "
843
                    "correct.",
844
                )
UNCOV
845
                print("Error code: ", exc)
×
846
            except URLError as exc:
1✔
847
                print(
1✔
848
                    f"Cannot access the archive URL for channel {channel}. "
849
                    f"Make sure that you are within the FHI network."
850
                    f"Skipping over channels {channels_missing}.",
851
                )
852
                print("Error code: ", exc)
1✔
853
                break
1✔
854

855
        # Determine the correct aperture_config
856
        stamps = sorted(
1✔
857
            list(self._config["metadata"]["aperture_config"]) + [start],
858
        )
859
        current_index = stamps.index(start)
1✔
860
        timestamp = stamps[current_index - 1]  # pick last configuration before file date
1✔
861

862
        # Aperture metadata
863
        if "instrument" not in metadata.keys():
1✔
UNCOV
864
            metadata["instrument"] = {"analyzer": {}}
×
865
        metadata["instrument"]["analyzer"]["fa_shape"] = "circle"
1✔
866
        metadata["instrument"]["analyzer"]["ca_shape"] = "circle"
1✔
867
        metadata["instrument"]["analyzer"]["fa_size"] = np.nan
1✔
868
        metadata["instrument"]["analyzer"]["ca_size"] = np.nan
1✔
869
        # get field aperture shape and size
870
        if {
1✔
871
            self._config["metadata"]["fa_in_channel"],
872
            self._config["metadata"]["fa_hor_channel"],
873
        }.issubset(set(metadata["file"].keys())):
874
            fa_in = metadata["file"][self._config["metadata"]["fa_in_channel"]]
1✔
875
            fa_hor = metadata["file"][self._config["metadata"]["fa_hor_channel"]]
1✔
876
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
877
                "fa_size"
878
            ].items():
879
                if value[0][0] < fa_in < value[0][1] and value[1][0] < fa_hor < value[1][1]:
1✔
880
                    try:
1✔
881
                        k_float = float(key)
1✔
882
                        metadata["instrument"]["analyzer"]["fa_size"] = k_float
1✔
UNCOV
883
                    except ValueError:  # store string if numeric interpretation fails
×
UNCOV
884
                        metadata["instrument"]["analyzer"]["fa_shape"] = key
×
885
                    break
1✔
886
            else:
UNCOV
887
                print("Field aperture size not found.")
×
888

889
        # get contrast aperture shape and size
890
        if self._config["metadata"]["ca_in_channel"] in metadata["file"]:
1✔
891
            ca_in = metadata["file"][self._config["metadata"]["ca_in_channel"]]
1✔
892
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
893
                "ca_size"
894
            ].items():
895
                if value[0] < ca_in < value[1]:
1✔
896
                    try:
1✔
897
                        k_float = float(key)
1✔
898
                        metadata["instrument"]["analyzer"]["ca_size"] = k_float
×
899
                    except ValueError:  # store string if numeric interpretation fails
1✔
900
                        metadata["instrument"]["analyzer"]["ca_shape"] = key
1✔
901
                    break
1✔
902
            else:
UNCOV
903
                print("Contrast aperture size not found.")
×
904

905
        # Storing the lens modes corresponding to lens voltages.
906
        # Use lens volages present in first lens_mode entry.
907
        lens_list = self._config["metadata"]["lens_mode_config"][
1✔
908
            next(iter(self._config["metadata"]["lens_mode_config"]))
909
        ].keys()
910

911
        lens_volts = np.array(
1✔
912
            [metadata["file"].get(f"KTOF:Lens:{lens}:V", np.nan) for lens in lens_list],
913
        )
914
        for mode, value in self._config["metadata"]["lens_mode_config"].items():
1✔
915
            lens_volts_config = np.array([value[k] for k in lens_list])
1✔
916
            if np.allclose(
1✔
917
                lens_volts,
918
                lens_volts_config,
919
                rtol=0.005,
920
            ):  # Equal upto 0.5% tolerance
UNCOV
921
                metadata["instrument"]["analyzer"]["lens_mode"] = mode
×
UNCOV
922
                break
×
923
        else:
924
            print(
1✔
925
                "Lens mode for given lens voltages not found. "
926
                "Storing lens mode from the user, if provided.",
927
            )
928

929
        # Determining projection from the lens mode
930
        try:
1✔
931
            lens_mode = metadata["instrument"]["analyzer"]["lens_mode"]
1✔
UNCOV
932
            if "spatial" in lens_mode.split("_")[1]:
×
UNCOV
933
                metadata["instrument"]["analyzer"]["projection"] = "real"
×
UNCOV
934
                metadata["instrument"]["analyzer"]["scheme"] = "momentum dispersive"
×
935
            else:
936
                metadata["instrument"]["analyzer"]["projection"] = "reciprocal"
×
UNCOV
937
                metadata["instrument"]["analyzer"]["scheme"] = "spatial dispersive"
×
938
        except IndexError:
1✔
UNCOV
939
            print(
×
940
                "Lens mode must have the form, '6kV_kmodem4.0_20VTOF_v3.sav'. "
941
                "Can't determine projection. "
942
                "Storing projection from the user, if provided.",
943
            )
944
        except KeyError:
1✔
945
            print(
1✔
946
                "Lens mode not found. Can't determine projection. "
947
                "Storing projection from the user, if provided.",
948
            )
949

950
        return metadata
1✔
951

952
    def get_count_rate(
1✔
953
        self,
954
        fids: Sequence[int] = None,
955
        **kwds,
956
    ) -> tuple[np.ndarray, np.ndarray]:
957
        """Create count rate from the msMarker column for the files specified in
958
        ``fids``.
959

960
        Args:
961
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
962
                include. Defaults to list of all file ids.
963
            kwds: Keyword arguments:
964

965
                - **ms_markers_key**: HDF5 path of the ms-markers
966

967
        Returns:
968
            tuple[np.ndarray, np.ndarray]: Arrays containing countrate and seconds
969
            into the scan.
970
        """
971
        if fids is None:
1✔
972
            fids = range(0, len(self.files))
1✔
973

974
        ms_markers_key = kwds.pop(
1✔
975
            "ms_markers_key",
976
            self._config.get("dataframe", {}).get(
977
                "ms_markers_key",
978
                "msMarkers",
979
            ),
980
        )
981

982
        secs_list = []
1✔
983
        count_rate_list = []
1✔
984
        accumulated_time = 0
1✔
985
        for fid in fids:
1✔
986
            count_rate_, secs_ = get_count_rate(
1✔
987
                h5py.File(self.files[fid]),
988
                ms_markers_key=ms_markers_key,
989
            )
990
            secs_list.append((accumulated_time + secs_).T)
1✔
991
            count_rate_list.append(count_rate_.T)
1✔
992
            accumulated_time += secs_[-1]
1✔
993

994
        count_rate = np.concatenate(count_rate_list)
1✔
995
        secs = np.concatenate(secs_list)
1✔
996

997
        return count_rate, secs
1✔
998

999
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
1✔
1000
        """Return the elapsed time in the files specified in ``fids`` from
1001
        the msMarkers column.
1002

1003
        Args:
1004
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
1005
                include. Defaults to list of all file ids.
1006
            kwds: Keyword arguments:
1007

1008
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
1009

1010
        Return:
1011
            float: The elapsed time in the files in seconds.
1012
        """
1013
        if fids is None:
1✔
1014
            fids = range(0, len(self.files))
1✔
1015

1016
        ms_markers_key = kwds.pop(
1✔
1017
            "ms_markers_key",
1018
            self._config.get("dataframe", {}).get(
1019
                "ms_markers_key",
1020
                "msMarkers",
1021
            ),
1022
        )
1023

1024
        secs = 0.0
1✔
1025
        for fid in fids:
1✔
1026
            secs += get_elapsed_time(
1✔
1027
                h5py.File(self.files[fid]),
1028
                ms_markers_key=ms_markers_key,
1029
            )
1030

1031
        return secs
1✔
1032

1033

1034
LOADER = MpesLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc