• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 9628937920

22 Jun 2024 11:13PM UTC coverage: 91.956% (+0.04%) from 91.919%
9628937920

Pull #411

github

web-flow
Merge pull request #431 from OpenCOMPES/energy_calibration_performance_fix

faster version of per_file channels
Pull Request #411: Energy calibration bias shift

104 of 116 new or added lines in 4 files covered. (89.66%)

123 existing lines in 3 files now uncovered.

6493 of 7061 relevant lines covered (91.96%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.81
/sed/loader/mpes/loader.py
1
"""
2
module sed.loader.mpes, code for loading hdf5 files delayed into a dask dataframe.
3
Mostly ported from https://github.com/mpes-kit/mpes.
4
@author: L. Rettig
5
"""
6
import datetime
1✔
7
import glob
1✔
8
import json
1✔
9
import os
1✔
10
from typing import Any
1✔
11
from typing import Dict
1✔
12
from typing import List
1✔
13
from typing import Sequence
1✔
14
from typing import Tuple
1✔
15
from typing import Union
1✔
16
from urllib.error import HTTPError
1✔
17
from urllib.error import URLError
1✔
18
from urllib.request import urlopen
1✔
19

20
import dask
1✔
21
import dask.array as da
1✔
22
import dask.dataframe as ddf
1✔
23
import h5py
1✔
24
import numpy as np
1✔
25
import scipy.interpolate as sint
1✔
26
from natsort import natsorted
1✔
27

28
from sed.loader.base.loader import BaseLoader
1✔
29

30

31
def hdf5_to_dataframe(
1✔
32
    files: Sequence[str],
33
    channels: Dict[str, Any] = None,
34
    time_stamps: bool = False,
35
    time_stamp_alias: str = "timeStamps",
36
    ms_markers_key: str = "msMarkers",
37
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
38
    **kwds,
39
) -> ddf.DataFrame:
40
    """Function to read a selection of hdf5-files, and generate a delayed dask
41
    dataframe from provided groups in the files. Optionally, aliases can be defined.
42

43
    Args:
44
        files (List[str]): A list of the file paths to load.
45
        channels (Dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
46
            should contain the keys "format" and "dataset_key". Defaults to load all groups
47
            containing "Stream", and to read the attribute "Name" from each group.
48
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
49
            False.
50
        time_stamp_alias (str): Alias name for the timestamp column.
51
            Defaults to "timeStamps".
52
        ms_markers_key (str): hdf5 path containing timestamp information.
53
            Defaults to "msMarkers".
54
        first_event_time_stamp_key (str): h5 attribute containing the start
55
            timestamp of a file. Defaults to "FirstEventTimeStamp".
56

57
    Returns:
58
        ddf.DataFrame: The delayed Dask DataFrame
59
    """
60
    # Read a file to parse the file structure
61
    test_fid = kwds.pop("test_fid", 0)
1✔
62
    test_proc = h5py.File(files[test_fid])
1✔
63

64
    if channels is None:
1✔
65
        channels = get_datasets_and_aliases(
1✔
66
            h5file=test_proc,
67
            seach_pattern="Stream",
68
        )
69

70
    electron_channels = []
1✔
71
    column_names = []
1✔
72

73
    for name, channel in channels.items():
1✔
74
        if channel["format"] == "per_electron":
1✔
75
            if channel["dataset_key"] in test_proc:
1✔
76
                electron_channels.append(channel)
1✔
77
                column_names.append(name)
1✔
78
            else:
NEW
79
                print(
×
80
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
81
                    "Skipping the channel.",
82
                )
83

84
    if time_stamps:
1✔
85
        column_names.append(time_stamp_alias)
1✔
86

87
    test_array = hdf5_to_array(
1✔
88
        h5file=test_proc,
89
        channels=electron_channels,
90
        time_stamps=time_stamps,
91
        ms_markers_key=ms_markers_key,
92
        first_event_time_stamp_key=first_event_time_stamp_key,
93
    )
94

95
    # Delay-read all files
96
    arrays = [
1✔
97
        da.from_delayed(
98
            dask.delayed(hdf5_to_array)(
99
                h5file=h5py.File(f),
100
                channels=electron_channels,
101
                time_stamps=time_stamps,
102
                ms_markers_key=ms_markers_key,
103
                first_event_time_stamp_key=first_event_time_stamp_key,
104
            ),
105
            dtype=test_array.dtype,
106
            shape=(test_array.shape[0], np.nan),
107
        )
108
        for f in files
109
    ]
110
    array_stack = da.concatenate(arrays, axis=1).T
1✔
111

112
    dataframe = ddf.from_dask_array(array_stack, columns=column_names)
1✔
113

114
    for name, channel in channels.items():
1✔
115
        if channel["format"] == "per_file":
1✔
116
            if channel["dataset_key"] in test_proc.attrs:
1✔
117
                values = [float(get_attribute(h5py.File(f), channel["dataset_key"])) for f in files]
1✔
118
                delayeds = [
1✔
119
                    add_value(partition, name, value)
120
                    for partition, value in zip(dataframe.partitions, values)
121
                ]
122
                dataframe = ddf.from_delayed(delayeds)
1✔
123

124
            else:
NEW
125
                print(
×
126
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
127
                    "Skipping the channel.",
128
                )
129

130
    return dataframe
1✔
131

132

133
def hdf5_to_timed_dataframe(
1✔
134
    files: Sequence[str],
135
    channels: Dict[str, Any] = None,
136
    time_stamps: bool = False,
137
    time_stamp_alias: str = "timeStamps",
138
    ms_markers_key: str = "msMarkers",
139
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
140
    **kwds,
141
) -> ddf.DataFrame:
142
    """Function to read a selection of hdf5-files, and generate a delayed dask
143
    dataframe from provided groups in the files. Optionally, aliases can be defined.
144
    Returns a dataframe for evenly spaced time intervals.
145

146
    Args:
147
        files (List[str]): A list of the file paths to load.
148
        channels (Dict[str, str], optional): hdf5 channels names to load. Each entry in the dict
149
            should contain the keys "format" and "groupName". Defaults to load all groups
150
            containing "Stream", and to read the attribute "Name" from each group.
151
        time_stamps (bool, optional): Option to calculate time stamps. Defaults to
152
            False.
153
        time_stamp_alias (str): Alias name for the timestamp column.
154
            Defaults to "timeStamps".
155
        ms_markers_key (str): hdf5 dataset containing timestamp information.
156
            Defaults to "msMarkers".
157
        first_event_time_stamp_key (str): h5 attribute containing the start
158
            timestamp of a file. Defaults to "FirstEventTimeStamp".
159

160
    Returns:
161
        ddf.DataFrame: The delayed Dask DataFrame
162
    """
163
    # Read a file to parse the file structure
164
    test_fid = kwds.pop("test_fid", 0)
1✔
165
    test_proc = h5py.File(files[test_fid])
1✔
166

167
    if channels is None:
1✔
168
        channels = get_datasets_and_aliases(
1✔
169
            h5file=test_proc,
170
            seach_pattern="Stream",
171
        )
172

173
    electron_channels = []
1✔
174
    column_names = []
1✔
175

176
    for name, channel in channels.items():
1✔
177
        if channel["format"] == "per_electron":
1✔
178
            if channel["dataset_key"] in test_proc:
1✔
179
                electron_channels.append(channel)
1✔
180
                column_names.append(name)
1✔
181
            else:
NEW
182
                print(
×
183
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
184
                    "Skipping the channel.",
185
                )
186

187
    if time_stamps:
1✔
188
        column_names.append(time_stamp_alias)
1✔
189

190
    test_array = hdf5_to_timed_array(
1✔
191
        h5file=test_proc,
192
        channels=electron_channels,
193
        time_stamps=time_stamps,
194
        ms_markers_key=ms_markers_key,
195
        first_event_time_stamp_key=first_event_time_stamp_key,
196
    )
197

198
    # Delay-read all files
199
    arrays = [
1✔
200
        da.from_delayed(
201
            dask.delayed(hdf5_to_timed_array)(
202
                h5file=h5py.File(f),
203
                channels=electron_channels,
204
                time_stamps=time_stamps,
205
                ms_markers_key=ms_markers_key,
206
                first_event_time_stamp_key=first_event_time_stamp_key,
207
            ),
208
            dtype=test_array.dtype,
209
            shape=(test_array.shape[0], np.nan),
210
        )
211
        for f in files
212
    ]
213
    array_stack = da.concatenate(arrays, axis=1).T
1✔
214

215
    dataframe = ddf.from_dask_array(array_stack, columns=column_names)
1✔
216

217
    for name, channel in channels.items():
1✔
218
        if channel["format"] == "per_file":
1✔
219
            if channel["dataset_key"] in test_proc.attrs:
1✔
220
                values = [float(get_attribute(h5py.File(f), channel["dataset_key"])) for f in files]
1✔
221
                delayeds = [
1✔
222
                    add_value(partition, name, value)
223
                    for partition, value in zip(dataframe.partitions, values)
224
                ]
225
                dataframe = ddf.from_delayed(delayeds)
1✔
226

227
            else:
NEW
UNCOV
228
                print(
×
229
                    f"Entry \"{channel['dataset_key']}\" for channel \"{name}\" not found.",
230
                    "Skipping the channel.",
231
                )
232

233
    return dataframe
1✔
234

235

236
@dask.delayed
1✔
237
def add_value(partition: ddf.DataFrame, name: str, value: float) -> ddf.DataFrame:
1✔
238
    """Dask delayed helper function to add a value to each dataframe partition
239

240
    Args:
241
        partition (ddf.DataFrame): Dask dataframe partition
242
        name (str): Name of the column to add
243
        value (float): value to add to this partition
244

245
    Returns:
246
        ddf.DataFrame: Dataframe partition with added column
247
    """
248
    partition[name] = value
1✔
249
    return partition
1✔
250

251

252
def get_datasets_and_aliases(
1✔
253
    h5file: h5py.File,
254
    seach_pattern: str = None,
255
    alias_key: str = "Name",
256
) -> Dict[str, Any]:
257
    """Read datasets and aliases from a provided hdf5 file handle
258

259
    Args:
260
        h5file (h5py.File):
261
            The hdf5 file handle
262
        seach_pattern (str, optional):
263
            Search pattern to select groups. Defaults to include all groups.
264
        alias_key (str, optional):
265
            Attribute key where aliases are stored. Defaults to "Name".
266

267
    Returns:
268
        Dict[str, Any]:
269
            A dict of aliases and groupnames parsed from the file
270
    """
271
    # get group names:
272
    dataset_names = list(h5file)
1✔
273

274
    # Filter the group names
275
    if seach_pattern is None:
1✔
NEW
UNCOV
276
        filtered_dataset_names = dataset_names
×
277
    else:
278
        filtered_dataset_names = [name for name in dataset_names if seach_pattern in name]
1✔
279

280
    alias_dict = {}
1✔
281
    for name in filtered_dataset_names:
1✔
282
        alias_dict[name] = get_attribute(h5file[name], alias_key)
1✔
283

284
    return {
1✔
285
        alias_dict[name]: {"format": "per_electron", "dataset_key": name}
286
        for name in filtered_dataset_names
287
    }
288

289

290
def hdf5_to_array(
1✔
291
    h5file: h5py.File,
292
    channels: Sequence[Dict[str, Any]],
293
    time_stamps=False,
294
    ms_markers_key: str = "msMarkers",
295
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
296
) -> np.ndarray:
297
    """Reads the content of the given groups in an hdf5 file, and returns a
298
    2-dimensional array with the corresponding values.
299

300
    Args:
301
        h5file (h5py.File):
302
            hdf5 file handle to read from
303
        channels (Sequence[Dict[str, any]]):
304
            channel dicts containing group names and types to read.
305
        time_stamps (bool, optional):
306
            Option to calculate time stamps. Defaults to False.
307
        ms_markers_group (str): hdf5 dataset containing timestamp information.
308
            Defaults to "msMarkers".
309
        first_event_time_stamp_key (str): h5 attribute containing the start
310
            timestamp of a file. Defaults to "FirstEventTimeStamp".
311

312
    Returns:
313
        np.ndarray: The 2-dimensional data array containing the values of the groups.
314
    """
315

316
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
317
    # Read out groups:
318
    data_list = []
1✔
319
    for channel in channels:
1✔
320
        if channel["format"] == "per_electron":
1✔
321
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
322
        else:
NEW
UNCOV
323
            raise ValueError(
×
324
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
325
            )
326
        if "dtype" in channel.keys():
1✔
NEW
UNCOV
327
            g_dataset = g_dataset.astype(channel["dtype"])
×
328
        else:
329
            g_dataset = g_dataset.astype("float32")
1✔
330
        data_list.append(g_dataset)
1✔
331

332
    # calculate time stamps
333
    if time_stamps:
1✔
334
        # create target array for time stamps
335
        time_stamp_data = np.zeros(len(data_list[0]))
1✔
336
        # the ms marker contains a list of events that occurred at full ms intervals.
337
        # It's monotonically increasing, and can contain duplicates
338
        ms_marker = np.asarray(h5file[ms_markers_key])
1✔
339

340
        # try to get start timestamp from "FirstEventTimeStamp" attribute
341
        try:
1✔
342
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
343
            start_time = datetime.datetime.strptime(
1✔
344
                start_time_str,
345
                "%Y-%m-%dT%H:%M:%S.%f%z",
346
            ).timestamp()
UNCOV
347
        except KeyError:
×
348
            # get the start time of the file from its modification date if the key
349
            # does not exist (old files)
350
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
351
            # the modification time points to the time when the file was finished, so we
352
            # need to correct for the time it took to write the file
UNCOV
353
            start_time -= len(ms_marker) / 1000
×
354

355
        # fill in range before 1st marker
356
        time_stamp_data[0 : ms_marker[0]] = start_time
1✔
357
        for i in range(len(ms_marker) - 1):
1✔
358
            # linear interpolation between ms: Disabled, because it takes a lot of
359
            # time, and external signals are anyway not better synchronized than 1 ms
360
            # time_stamp_data[ms_marker[n] : ms_marker[n + 1]] = np.linspace(
361
            #     start_time + n,
362
            #     start_time + n + 1,
363
            #     ms_marker[n + 1] - ms_marker[n],
364
            # )
365
            time_stamp_data[ms_marker[i] : ms_marker[i + 1]] = start_time + (i + 1) / 1000
1✔
366
        # fill any remaining points
367
        time_stamp_data[ms_marker[len(ms_marker) - 1] : len(time_stamp_data)] = (
1✔
368
            start_time + len(ms_marker) / 1000
369
        )
370

371
        data_list.append(time_stamp_data)
1✔
372

373
    return np.asarray(data_list)
1✔
374

375

376
def hdf5_to_timed_array(
1✔
377
    h5file: h5py.File,
378
    channels: Sequence[Dict[str, Any]],
379
    time_stamps=False,
380
    ms_markers_key: str = "msMarkers",
381
    first_event_time_stamp_key: str = "FirstEventTimeStamp",
382
) -> np.ndarray:
383
    """Reads the content of the given groups in an hdf5 file, and returns a
384
    timed version of a 2-dimensional array with the corresponding values.
385

386
    Args:
387
        h5file (h5py.File):
388
            hdf5 file handle to read from
389
        channels (Sequence[Dict[str, any]]):
390
            channel dicts containing group names and types to read.
391
        time_stamps (bool, optional):
392
            Option to calculate time stamps. Defaults to False.
393
        ms_markers_group (str): hdf5 dataset containing timestamp information.
394
            Defaults to "msMarkers".
395
        first_event_time_stamp_key (str): h5 attribute containing the start
396
            timestamp of a file. Defaults to "FirstEventTimeStamp".
397

398
    Returns:
399
        np.ndarray: the array of the values at evently spaced timing obtained from
400
        the ms_markers.
401
    """
402

403
    # Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
404

405
    # Read out groups:
406
    data_list = []
1✔
407
    ms_marker = np.asarray(h5file[ms_markers_key])
1✔
408
    for channel in channels:
1✔
409
        timed_dataset = np.zeros_like(ms_marker)
1✔
410
        if channel["format"] == "per_electron":
1✔
411
            g_dataset = np.asarray(h5file[channel["dataset_key"]])
1✔
412
            for i, point in enumerate(ms_marker):
1✔
413
                timed_dataset[i] = g_dataset[int(point) - 1]
1✔
414
        else:
NEW
UNCOV
415
            raise ValueError(
×
416
                f"Invalid 'format':{channel['format']} for channel {channel['dataset_key']}.",
417
            )
418
        if "dtype" in channel.keys():
1✔
NEW
419
            timed_dataset = timed_dataset.astype(channel["dtype"])
×
420
        else:
421
            timed_dataset = timed_dataset.astype("float32")
1✔
422

423
        data_list.append(timed_dataset)
1✔
424

425
    # calculate time stamps
426
    if time_stamps:
1✔
427
        # try to get start timestamp from "FirstEventTimeStamp" attribute
428
        try:
1✔
429
            start_time_str = get_attribute(h5file, first_event_time_stamp_key)
1✔
430
            start_time = datetime.datetime.strptime(
1✔
431
                start_time_str,
432
                "%Y-%m-%dT%H:%M:%S.%f%z",
433
            ).timestamp()
UNCOV
434
        except KeyError:
×
435
            # get the start time of the file from its modification date if the key
436
            # does not exist (old files)
UNCOV
437
            start_time = os.path.getmtime(h5file.filename)  # convert to ms
×
438
            # the modification time points to the time when the file was finished, so we
439
            # need to correct for the time it took to write the file
UNCOV
440
            start_time -= len(ms_marker) / 1000
×
441

442
        time_stamp_data = start_time + np.arange(len(ms_marker)) / 1000
1✔
443

444
        data_list.append(time_stamp_data)
1✔
445

446
    return np.asarray(data_list)
1✔
447

448

449
def get_attribute(h5group: h5py.Group, attribute: str) -> str:
1✔
450
    """Reads, decodes and returns an attrubute from an hdf5 group
451

452
    Args:
453
        h5group (h5py.Group):
454
            The hdf5 group to read from
455
        attribute (str):
456
            The name of the attribute
457

458
    Returns:
459
        str: The parsed attribute data
460
    """
461
    try:
1✔
462
        content = h5group.attrs[attribute].decode("utf-8")
1✔
463
    except AttributeError:  # No need to decode
1✔
464
        content = h5group.attrs[attribute]
1✔
UNCOV
465
    except KeyError as exc:  # No such attribute
×
UNCOV
466
        raise KeyError(f"Attribute '{attribute}' not found!") from exc
×
467

468
    return content
1✔
469

470

471
def get_count_rate(
1✔
472
    h5file: h5py.File,
473
    ms_markers_key: str = "msMarkers",
474
) -> Tuple[np.ndarray, np.ndarray]:
475
    """Create count rate in the file from the msMarker column.
476

477
    Args:
478
        h5file (h5py.File): The h5file from which to get the count rate.
479
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
480
            are stored. Defaults to "msMarkers".
481

482
    Returns:
483
        Tuple[np.ndarray, np.ndarray]: The count rate in Hz and the seconds into the
484
        scan.
485
    """
486
    ms_markers = np.asarray(h5file[ms_markers_key])
1✔
487
    secs = np.arange(0, len(ms_markers)) / 1000
1✔
488
    msmarker_spline = sint.InterpolatedUnivariateSpline(secs, ms_markers, k=1)
1✔
489
    rate_spline = msmarker_spline.derivative()
1✔
490
    count_rate = rate_spline(secs)
1✔
491

492
    return (count_rate, secs)
1✔
493

494

495
def get_elapsed_time(
1✔
496
    h5file: h5py.File,
497
    ms_markers_key: str = "msMarkers",
498
) -> float:
499
    """Return the elapsed time in the file from the msMarkers wave
500

501
    Args:
502
        h5file (h5py.File): The h5file from which to get the count rate.
503
        ms_markers_key (str, optional): The hdf5 path where the millisecond markers
504
            are stored. Defaults to "msMarkers".
505

506
    Return:
507
        float: The acquision time of the file in seconds.
508
    """
509
    secs = h5file[ms_markers_key].len() / 1000
1✔
510

511
    return secs
1✔
512

513

514
def get_archiver_data(
1✔
515
    archiver_url: str,
516
    archiver_channel: str,
517
    ts_from: float,
518
    ts_to: float,
519
) -> Tuple[np.ndarray, np.ndarray]:
520
    """Extract time stamps and corresponding data from and EPICS archiver instance
521

522
    Args:
523
        archiver_url (str): URL of the archiver data extraction interface
524
        archiver_channel (str): EPICS channel to extract data for
525
        ts_from (float): starting time stamp of the range of interest
526
        ts_to (float): ending time stamp of the range of interest
527

528
    Returns:
529
        Tuple[List, List]: The extracted time stamps and corresponding data
530
    """
531
    iso_from = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
532
    iso_to = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
1✔
533
    req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
1✔
534
    with urlopen(req_str) as req:
1✔
UNCOV
535
        data = json.load(req)
×
UNCOV
536
        secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
×
UNCOV
537
        vals = [x["val"] for x in data[0]["data"]]
×
538

UNCOV
539
    return (np.asarray(secs), np.asarray(vals))
×
540

541

542
class MpesLoader(BaseLoader):
1✔
543
    """Mpes implementation of the Loader. Reads from h5 files or folders of the
544
    SPECS Metis 1000 (FHI Berlin)
545

546
    Args:
547
        config (dict, optional): Config dictionary. Defaults to None.
548
        meta_handler (MetaHandler, optional): MetaHandler object. Defaults to None.
549
    """
550

551
    __name__ = "mpes"
1✔
552

553
    supported_file_types = ["h5"]
1✔
554

555
    def __init__(
1✔
556
        self,
557
        config: dict = None,
558
    ):
559
        super().__init__(config=config)
1✔
560

561
        self.read_timestamps = self._config.get("dataframe", {}).get(
1✔
562
            "read_timestamps",
563
            False,
564
        )
565

566
    def read_dataframe(
1✔
567
        self,
568
        files: Union[str, Sequence[str]] = None,
569
        folders: Union[str, Sequence[str]] = None,
570
        runs: Union[str, Sequence[str]] = None,
571
        ftype: str = "h5",
572
        metadata: dict = None,
573
        collect_metadata: bool = False,
574
        time_stamps: bool = False,
575
        **kwds,
576
    ) -> Tuple[ddf.DataFrame, ddf.DataFrame, dict]:
577
        """Read stored hdf5 files from a list or from folder and returns a dask
578
        dataframe and corresponding metadata.
579

580
        Args:
581
            files (Union[str, Sequence[str]], optional): File path(s) to process.
582
                Defaults to None.
583
            folders (Union[str, Sequence[str]], optional): Path to folder(s) where files
584
                are stored. Path has priority such that if it's specified, the specified
585
                files will be ignored. Defaults to None.
586
            runs (Union[str, Sequence[str]], optional): Run identifier(s). Corresponding
587
                files will be located in the location provided by ``folders``. Takes
588
                precendence over ``files`` and ``folders``. Defaults to None.
589
            ftype (str, optional): File extension to use. If a folder path is given,
590
                all files with the specified extension are read into the dataframe
591
                in the reading order. Defaults to "h5".
592
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
593
                meta data are added to it. Defaults to None.
594
            collect_metadata (bool): Option to collect metadata from files. Requires
595
                a valid config dict. Defaults to False.
596
            time_stamps (bool, optional): Option to create a time_stamp column in
597
                the dataframe from ms-Markers in the files. Defaults to False.
598
            **kwds: Keyword parameters.
599

600
                - **hdf5_groupnames** : List of groupnames to look for in the file.
601
                - **hdf5_aliases**: Dictionary of aliases for the groupnames.
602
                - **time_stamp_alias**: Alias for the timestamp column
603
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
604
                - **first_event_time_stamp_key**: Attribute name containing the start
605
                  timestamp of the file.
606

607
                Additional keywords are passed to ``hdf5_to_dataframe``.
608

609
        Raises:
610
            ValueError: raised if neither files or folder provided.
611
            FileNotFoundError: Raised if a file or folder is not found.
612

613
        Returns:
614
            Tuple[ddf.DataFrame, ddf.DataFrame, dict]: Dask dataframe, timed Dask
615
            dataframe and metadata read from specified files.
616
        """
617
        # if runs is provided, try to locate the respective files relative to the provided folder.
618
        if runs is not None:  # pylint: disable=duplicate-code
1✔
619
            files = []
1✔
620
            if isinstance(runs, (str, int)):
1✔
621
                runs = [runs]
1✔
622
            for run in runs:
1✔
623
                files.extend(
1✔
624
                    self.get_files_from_run_id(run_id=run, folders=folders, extension=ftype),
625
                )
626
            self.runs = list(runs)
1✔
627
            super().read_dataframe(
1✔
628
                files=files,
629
                ftype=ftype,
630
                metadata=metadata,
631
            )
632
        else:
633
            # pylint: disable=duplicate-code
634
            super().read_dataframe(
1✔
635
                files=files,
636
                folders=folders,
637
                runs=runs,
638
                ftype=ftype,
639
                metadata=metadata,
640
            )
641

642
        channels = kwds.pop(
1✔
643
            "channels",
644
            self._config.get("dataframe", {}).get("channels", None),
645
        )
646
        time_stamp_alias = kwds.pop(
1✔
647
            "time_stamp_alias",
648
            self._config.get("dataframe", {}).get(
649
                "time_stamp_alias",
650
                "timeStamps",
651
            ),
652
        )
653
        ms_markers_key = kwds.pop(
1✔
654
            "ms_markers_key",
655
            self._config.get("dataframe", {}).get(
656
                "ms_markers_key",
657
                "msMarkers",
658
            ),
659
        )
660
        first_event_time_stamp_key = kwds.pop(
1✔
661
            "first_event_time_stamp_key",
662
            self._config.get("dataframe", {}).get(
663
                "first_event_time_stamp_key",
664
                "FirstEventTimeStamp",
665
            ),
666
        )
667
        df = hdf5_to_dataframe(
1✔
668
            files=self.files,
669
            channels=channels,
670
            time_stamps=time_stamps,
671
            time_stamp_alias=time_stamp_alias,
672
            ms_markers_key=ms_markers_key,
673
            first_event_time_stamp_key=first_event_time_stamp_key,
674
            **kwds,
675
        )
676
        timed_df = hdf5_to_timed_dataframe(
1✔
677
            files=self.files,
678
            channels=channels,
679
            time_stamps=time_stamps,
680
            time_stamp_alias=time_stamp_alias,
681
            ms_markers_key=ms_markers_key,
682
            first_event_time_stamp_key=first_event_time_stamp_key,
683
            **kwds,
684
        )
685

686
        if collect_metadata:
1✔
687
            metadata = self.gather_metadata(
1✔
688
                files=self.files,
689
                metadata=self.metadata,
690
            )
691
        else:
692
            metadata = self.metadata
1✔
693

694
        return df, timed_df, metadata
1✔
695

696
    def get_files_from_run_id(
1✔
697
        self,
698
        run_id: str,
699
        folders: Union[str, Sequence[str]] = None,
700
        extension: str = "h5",
701
        **kwds,  # noqa: ARG002
702
    ) -> List[str]:
703
        """Locate the files for a given run identifier.
704

705
        Args:
706
            run_id (str): The run identifier to locate.
707
            folders (Union[str, Sequence[str]], optional): The directory(ies) where the raw
708
                data is located. Defaults to config["core"]["base_folder"]
709
            extension (str, optional): The file extension. Defaults to "h5".
710
            kwds: Keyword arguments
711

712
        Return:
713
            List[str]: List of file path strings to the location of run data.
714
        """
715
        if folders is None:
1✔
716
            folders = self._config["core"]["paths"]["data_raw_dir"]
1✔
717

718
        if isinstance(folders, str):
1✔
719
            folders = [folders]
1✔
720

721
        files: List[str] = []
1✔
722
        for folder in folders:
1✔
723
            run_files = natsorted(
1✔
724
                glob.glob(
725
                    folder + "/**/Scan" + str(run_id).zfill(4) + "_*." + extension,
726
                    recursive=True,
727
                ),
728
            )
729
            files.extend(run_files)
1✔
730

731
        # Check if any files are found
732
        if not files:
1✔
UNCOV
733
            raise FileNotFoundError(
×
734
                f"No files found for run {run_id} in directory {str(folders)}",
735
            )
736

737
        # Return the list of found files
738
        return files
1✔
739

740
    def get_start_and_end_time(self) -> Tuple[float, float]:
1✔
741
        """Extract the start and end time stamps from the loaded files
742

743
        Returns:
744
            Tuple[float, float]: A tuple containing the start and end time stamps
745
        """
746
        h5file = h5py.File(self.files[0])
1✔
747
        channels = []
1✔
748
        for channel in self._config["dataframe"]["channels"].values():
1✔
749
            if channel["format"] == "per_electron":
1✔
750
                channels = [channel]
1✔
751
                break
1✔
752
        if not channels:
1✔
NEW
UNCOV
753
            raise ValueError("No valid 'per_electron' channels found.")
×
754
        timestamps = hdf5_to_array(
1✔
755
            h5file,
756
            channels=channels,
757
            time_stamps=True,
758
        )
759
        ts_from = timestamps[-1][1]
1✔
760
        h5file = h5py.File(self.files[-1])
1✔
761
        timestamps = hdf5_to_array(
1✔
762
            h5file,
763
            channels=channels,
764
            time_stamps=True,
765
        )
766
        ts_to = timestamps[-1][-1]
1✔
767
        return (ts_from, ts_to)
1✔
768

769
    def gather_metadata(
1✔
770
        self,
771
        files: Sequence[str],
772
        metadata: dict = None,
773
    ) -> dict:
774
        """Collect meta data from files
775

776
        Args:
777
            files (Sequence[str]): List of files loaded
778
            metadata (dict, optional): Manual meta data dictionary. Auto-generated
779
                meta data are added to it. Defaults to None.
780

781
        Returns:
782
            dict: The completed metadata dictionary.
783
        """
784

785
        if metadata is None:
1✔
UNCOV
786
            metadata = {}
×
787
        print("Gathering metadata from different locations")
1✔
788
        # Read events in with ms time stamps
789
        print("Collecting time stamps...")
1✔
790
        (ts_from, ts_to) = self.get_start_and_end_time()
1✔
791

792
        metadata["timing"] = {
1✔
793
            "acquisition_start": datetime.datetime.utcfromtimestamp(ts_from)
794
            .replace(tzinfo=datetime.timezone.utc)
795
            .isoformat(),
796
            "acquisition_stop": datetime.datetime.utcfromtimestamp(ts_to)
797
            .replace(tzinfo=datetime.timezone.utc)
798
            .isoformat(),
799
            "acquisition_duration": int(ts_to - ts_from),
800
            "collection_time": float(ts_to - ts_from),
801
        }
802

803
        # import meta data from data file
804
        if "file" not in metadata:  # If already present, the value is assumed to be a dictionary
1✔
805
            metadata["file"] = {}
1✔
806

807
        print("Collecting file metadata...")
1✔
808
        with h5py.File(files[0], "r") as h5file:
1✔
809
            for key, value in h5file.attrs.items():
1✔
810
                key = key.replace("VSet", "V")
1✔
811
                metadata["file"][key] = value
1✔
812

813
        metadata["entry_identifier"] = os.path.dirname(
1✔
814
            os.path.realpath(files[0]),
815
        )
816

817
        print("Collecting data from the EPICS archive...")
1✔
818
        # Get metadata from Epics archive if not present already
819
        epics_channels = self._config["metadata"]["epics_pvs"]
1✔
820

821
        start = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
822

823
        channels_missing = set(epics_channels) - set(
1✔
824
            metadata["file"].keys(),
825
        )
826
        for channel in channels_missing:
1✔
827
            try:
1✔
828
                _, vals = get_archiver_data(
1✔
829
                    archiver_url=self._config["metadata"].get("archiver_url"),
830
                    archiver_channel=channel,
831
                    ts_from=ts_from,
832
                    ts_to=ts_to,
833
                )
UNCOV
834
                metadata["file"][f"{channel}"] = np.mean(vals)
×
835

836
            except IndexError:
1✔
UNCOV
837
                metadata["file"][f"{channel}"] = np.nan
×
UNCOV
838
                print(
×
839
                    f"Data for channel {channel} doesn't exist for time {start}",
840
                )
841
            except HTTPError as exc:
1✔
UNCOV
842
                print(
×
843
                    f"Incorrect URL for the archive channel {channel}. "
844
                    "Make sure that the channel name and file start and end times are "
845
                    "correct.",
846
                )
UNCOV
847
                print("Error code: ", exc)
×
848
            except URLError as exc:
1✔
849
                print(
1✔
850
                    f"Cannot access the archive URL for channel {channel}. "
851
                    f"Make sure that you are within the FHI network."
852
                    f"Skipping over channels {channels_missing}.",
853
                )
854
                print("Error code: ", exc)
1✔
855
                break
1✔
856

857
        # Determine the correct aperture_config
858
        stamps = sorted(
1✔
859
            list(self._config["metadata"]["aperture_config"]) + [start],
860
        )
861
        current_index = stamps.index(start)
1✔
862
        timestamp = stamps[current_index - 1]  # pick last configuration before file date
1✔
863

864
        # Aperture metadata
865
        if "instrument" not in metadata.keys():
1✔
UNCOV
866
            metadata["instrument"] = {"analyzer": {}}
×
867
        metadata["instrument"]["analyzer"]["fa_shape"] = "circle"
1✔
868
        metadata["instrument"]["analyzer"]["ca_shape"] = "circle"
1✔
869
        metadata["instrument"]["analyzer"]["fa_size"] = np.nan
1✔
870
        metadata["instrument"]["analyzer"]["ca_size"] = np.nan
1✔
871
        # get field aperture shape and size
872
        if {
1✔
873
            self._config["metadata"]["fa_in_channel"],
874
            self._config["metadata"]["fa_hor_channel"],
875
        }.issubset(set(metadata["file"].keys())):
876
            fa_in = metadata["file"][self._config["metadata"]["fa_in_channel"]]
1✔
877
            fa_hor = metadata["file"][self._config["metadata"]["fa_hor_channel"]]
1✔
878
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
879
                "fa_size"
880
            ].items():
881
                if value[0][0] < fa_in < value[0][1] and value[1][0] < fa_hor < value[1][1]:
1✔
882
                    try:
1✔
883
                        k_float = float(key)
1✔
884
                        metadata["instrument"]["analyzer"]["fa_size"] = k_float
1✔
UNCOV
885
                    except ValueError:  # store string if numeric interpretation fails
×
UNCOV
886
                        metadata["instrument"]["analyzer"]["fa_shape"] = key
×
887
                    break
1✔
888
            else:
UNCOV
889
                print("Field aperture size not found.")
×
890

891
        # get contrast aperture shape and size
892
        if self._config["metadata"]["ca_in_channel"] in metadata["file"]:
1✔
893
            ca_in = metadata["file"][self._config["metadata"]["ca_in_channel"]]
1✔
894
            for key, value in self._config["metadata"]["aperture_config"][timestamp][
1✔
895
                "ca_size"
896
            ].items():
897
                if value[0] < ca_in < value[1]:
1✔
898
                    try:
1✔
899
                        k_float = float(key)
1✔
900
                        metadata["instrument"]["analyzer"]["ca_size"] = k_float
×
901
                    except ValueError:  # store string if numeric interpretation fails
1✔
902
                        metadata["instrument"]["analyzer"]["ca_shape"] = key
1✔
903
                    break
1✔
904
            else:
UNCOV
905
                print("Contrast aperture size not found.")
×
906

907
        # Storing the lens modes corresponding to lens voltages.
908
        # Use lens volages present in first lens_mode entry.
909
        lens_list = self._config["metadata"]["lens_mode_config"][
1✔
910
            next(iter(self._config["metadata"]["lens_mode_config"]))
911
        ].keys()
912

913
        lens_volts = np.array(
1✔
914
            [metadata["file"].get(f"KTOF:Lens:{lens}:V", np.NaN) for lens in lens_list],
915
        )
916
        for mode, value in self._config["metadata"]["lens_mode_config"].items():
1✔
917
            lens_volts_config = np.array([value[k] for k in lens_list])
1✔
918
            if np.allclose(
1✔
919
                lens_volts,
920
                lens_volts_config,
921
                rtol=0.005,
922
            ):  # Equal upto 0.5% tolerance
UNCOV
923
                metadata["instrument"]["analyzer"]["lens_mode"] = mode
×
UNCOV
924
                break
×
925
        else:
926
            print(
1✔
927
                "Lens mode for given lens voltages not found. "
928
                "Storing lens mode from the user, if provided.",
929
            )
930

931
        # Determining projection from the lens mode
932
        try:
1✔
933
            lens_mode = metadata["instrument"]["analyzer"]["lens_mode"]
1✔
UNCOV
934
            if "spatial" in lens_mode.split("_")[1]:
×
UNCOV
935
                metadata["instrument"]["analyzer"]["projection"] = "real"
×
UNCOV
936
                metadata["instrument"]["analyzer"]["scheme"] = "momentum dispersive"
×
937
            else:
938
                metadata["instrument"]["analyzer"]["projection"] = "reciprocal"
×
UNCOV
939
                metadata["instrument"]["analyzer"]["scheme"] = "spatial dispersive"
×
940
        except IndexError:
1✔
UNCOV
941
            print(
×
942
                "Lens mode must have the form, '6kV_kmodem4.0_20VTOF_v3.sav'. "
943
                "Can't determine projection. "
944
                "Storing projection from the user, if provided.",
945
            )
946
        except KeyError:
1✔
947
            print(
1✔
948
                "Lens mode not found. Can't determine projection. "
949
                "Storing projection from the user, if provided.",
950
            )
951

952
        return metadata
1✔
953

954
    def get_count_rate(
1✔
955
        self,
956
        fids: Sequence[int] = None,
957
        **kwds,
958
    ) -> Tuple[np.ndarray, np.ndarray]:
959
        """Create count rate from the msMarker column for the files specified in
960
        ``fids``.
961

962
        Args:
963
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
964
                include. Defaults to list of all file ids.
965
            kwds: Keyword arguments:
966

967
                - **ms_markers_key**: HDF5 path of the ms-markers
968

969
        Returns:
970
            Tuple[np.ndarray, np.ndarray]: Arrays containing countrate and seconds
971
            into the scan.
972
        """
973
        if fids is None:
1✔
974
            fids = range(0, len(self.files))
1✔
975

976
        ms_markers_key = kwds.pop(
1✔
977
            "ms_markers_key",
978
            self._config.get("dataframe", {}).get(
979
                "ms_markers_key",
980
                "msMarkers",
981
            ),
982
        )
983

984
        secs_list = []
1✔
985
        count_rate_list = []
1✔
986
        accumulated_time = 0
1✔
987
        for fid in fids:
1✔
988
            count_rate_, secs_ = get_count_rate(
1✔
989
                h5py.File(self.files[fid]),
990
                ms_markers_key=ms_markers_key,
991
            )
992
            secs_list.append((accumulated_time + secs_).T)
1✔
993
            count_rate_list.append(count_rate_.T)
1✔
994
            accumulated_time += secs_[-1]
1✔
995

996
        count_rate = np.concatenate(count_rate_list)
1✔
997
        secs = np.concatenate(secs_list)
1✔
998

999
        return count_rate, secs
1✔
1000

1001
    def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
1✔
1002
        """Return the elapsed time in the files specified in ``fids`` from
1003
        the msMarkers column.
1004

1005
        Args:
1006
            fids (Sequence[int], optional): fids (Sequence[int]): the file ids to
1007
                include. Defaults to list of all file ids.
1008
            kwds: Keyword arguments:
1009

1010
                - **ms_markers_key**: HDF5 path of the millisecond marker column.
1011

1012
        Return:
1013
            float: The elapsed time in the files in seconds.
1014
        """
1015
        if fids is None:
1✔
1016
            fids = range(0, len(self.files))
1✔
1017

1018
        ms_markers_key = kwds.pop(
1✔
1019
            "ms_markers_key",
1020
            self._config.get("dataframe", {}).get(
1021
                "ms_markers_key",
1022
                "msMarkers",
1023
            ),
1024
        )
1025

1026
        secs = 0.0
1✔
1027
        for fid in fids:
1✔
1028
            secs += get_elapsed_time(
1✔
1029
                h5py.File(self.files[fid]),
1030
                ms_markers_key=ms_markers_key,
1031
            )
1032

1033
        return secs
1✔
1034

1035

1036
LOADER = MpesLoader
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc