• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 13641008901

03 Mar 2025 09:42PM UTC coverage: 92.444% (+0.3%) from 92.174%
13641008901

Pull #551

github

web-flow
Merge 037dfb355 into 541d4c8fe
Pull Request #551: Mpes elab metadata

280 of 309 new or added lines in 5 files covered. (90.61%)

10 existing lines in 1 file now uncovered.

8161 of 8828 relevant lines covered (92.44%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.03
/src/sed/loader/mpes/metadata.py
1
"""
2
The module provides a MetadataRetriever class for retrieving metadata
3
from an EPICS archiver and an elabFTW instance.
4
"""
5
from __future__ import annotations
1✔
6

7
import datetime
1✔
8
import json
1✔
9
from copy import deepcopy
1✔
10
from urllib.error import HTTPError
1✔
11
from urllib.error import URLError
1✔
12
from urllib.request import urlopen
1✔
13

14
import elabapi_python
1✔
15
import numpy as np
1✔
16

17
from sed.core.config import read_env_var
1✔
18
from sed.core.config import save_env_var
1✔
19
from sed.core.logging import setup_logging
1✔
20

21
logger = setup_logging("mpes_metadata_retriever")
1✔
22

23

24
class MetadataRetriever:
1✔
25
    """
26
    A class for retrieving metadata from an EPICS archiver and an elabFTW instance.
27
    """
28

29
    def __init__(self, metadata_config: dict, token: str = None) -> None:
1✔
30
        """
31
        Initializes the MetadataRetriever class.
32

33
        Args:
34
            metadata_config (dict): Takes a dict containing at least url for the EPICS archiver and
35
                elabFTW instance.
36
            token (str, optional): The token to use for fetching metadata. If provided,
37
                will be saved to .env file for future use.
38
        """
39
        self._config = deepcopy(metadata_config)
1✔
40
        # Token handling
41
        if token:
1✔
42
            self.token = token
1✔
43
            save_env_var("ELAB_TOKEN", self.token)
1✔
44
        else:
45
            # Try to load token from config or .env file
46
            self.token = read_env_var("ELAB_TOKEN")
1✔
47

48
        if not self.token:
1✔
49
            logger.warning(
1✔
50
                "No valid token provided for elabFTW. Fetching elabFTW metadata will be skipped.",
51
            )
52
            return
1✔
53

54
        self.url = self._config.get("elab_url")
1✔
55
        if not self.url:
1✔
56
            logger.warning(
1✔
57
                "No URL provided for elabFTW. Fetching elabFTW metadata will be skipped.",
58
            )
59
            return
1✔
60

61
        # Config
62
        self.configuration = elabapi_python.Configuration()
1✔
63
        self.configuration.api_key["api_key"] = self.token
1✔
64
        self.configuration.api_key_prefix["api_key"] = "Authorization"
1✔
65
        self.configuration.host = str(self.url)
1✔
66
        self.configuration.debug = False
1✔
67
        self.configuration.verify_ssl = False
1✔
68

69
        # create an instance of the API class
70
        self.api_client = elabapi_python.ApiClient(self.configuration)
1✔
71
        # fix issue with Authorization header not being properly set by the generated lib
72
        self.api_client.set_default_header(header_name="Authorization", header_value=self.token)
1✔
73

74
        # create an instance of Items
75
        self.itemsApi = elabapi_python.ItemsApi(self.api_client)
1✔
76
        self.experimentsApi = elabapi_python.ExperimentsApi(self.api_client)
1✔
77
        self.linksApi = elabapi_python.LinksToItemsApi(self.api_client)
1✔
78
        self.experimentsLinksApi = elabapi_python.LinksToExperimentsApi(self.api_client)
1✔
79
        self.usersApi = elabapi_python.UsersApi(self.api_client)
1✔
80

81
    def fetch_epics_metadata(self, ts_from: float, ts_to: float, metadata: dict) -> dict:
1✔
82
        """Fetch metadata from an EPICS archiver instance for times between ts_from and ts_to.
83
        Channels are defined in the config.
84

85
        Args:
86
            ts_from (float): Start timestamp of the range to collect data from.
87
            ts_to (float): End timestamp of the range to collect data from.
88
            metadata (dict): Input metadata dictionary. Will be updated
89

90
        Returns:
91
            dict: Updated metadata dictionary.
92
        """
93
        if not self._config.get("archiver_url"):
1✔
NEW
94
            logger.warning(
×
95
                "No URL provided for fetching metadata from the EPICS archiver. "
96
                "Fetching EPICS metadata will be skipped.",
97
            )
NEW
98
            return metadata
×
99

100
        logger.info("Collecting data from the EPICS archive...")
1✔
101

102
        start = datetime.datetime.utcfromtimestamp(ts_from)
1✔
103

104
        # Get metadata from Epics archive if not present already
105
        epics_channels = self._config["epics_pvs"]
1✔
106

107
        channels_missing = set(epics_channels) - set(
1✔
108
            metadata["file"].keys(),
109
        )
110
        for channel in channels_missing:
1✔
111
            try:
1✔
112
                _, vals = get_archiver_data(
1✔
113
                    archiver_url=str(self._config.get("archiver_url")),
114
                    archiver_channel=channel,
115
                    ts_from=ts_from,
116
                    ts_to=ts_to,
117
                )
118
                metadata["file"][f"{channel}"] = np.mean(vals)
1✔
119

120
            except IndexError:
1✔
NEW
121
                logger.info(
×
122
                    f"Data for channel {channel} doesn't exist for time {start}",
123
                )
124
            except HTTPError as exc:
1✔
NEW
125
                logger.warning(
×
126
                    f"Incorrect URL for the archive channel {channel}. "
127
                    "Make sure that the channel name and file start and end times are "
128
                    "correct.",
129
                )
NEW
130
                logger.warning(f"Error code: {exc}")
×
131
            except URLError as exc:
1✔
132
                logger.warning(
1✔
133
                    f"Cannot access the archive URL for channel {channel}. "
134
                    f"Make sure that you are within the FHI network."
135
                    f"Skipping over channels {channels_missing}.",
136
                )
137
                logger.warning(f"Error code: {exc}")
1✔
138
                break
1✔
139

140
        # Determine the correct aperture_config
141
        stamps = sorted(
1✔
142
            list(self._config["aperture_config"].keys()) + [start],
143
        )
144
        current_index = stamps.index(start)
1✔
145
        timestamp = stamps[current_index - 1]  # pick last configuration before file date
1✔
146

147
        # Aperture metadata
148
        if "instrument" not in metadata.keys():
1✔
149
            metadata["instrument"] = {"analyzer": {}}
1✔
150
        metadata["instrument"]["analyzer"]["fa_shape"] = "circle"
1✔
151
        metadata["instrument"]["analyzer"]["ca_shape"] = "circle"
1✔
152
        metadata["instrument"]["analyzer"]["fa_size"] = np.nan
1✔
153
        metadata["instrument"]["analyzer"]["ca_size"] = np.nan
1✔
154
        # get field aperture shape and size
155
        if {
1✔
156
            self._config["fa_in_channel"],
157
            self._config["fa_hor_channel"],
158
        }.issubset(set(metadata["file"].keys())):
159
            fa_in = metadata["file"][self._config["fa_in_channel"]]
1✔
160
            fa_hor = metadata["file"][self._config["fa_hor_channel"]]
1✔
161
            for key, value in self._config["aperture_config"][timestamp]["fa_size"].items():
1✔
162
                if value[0][0] < fa_in < value[0][1] and value[1][0] < fa_hor < value[1][1]:
1✔
163
                    try:
1✔
164
                        metadata["instrument"]["analyzer"]["fa_size"] = float(key)
1✔
NEW
165
                    except ValueError:  # store string if numeric interpretation fails
×
NEW
166
                        metadata["instrument"]["analyzer"]["fa_shape"] = key
×
167
                    break
1✔
168
            else:
NEW
169
                logger.warning("Field aperture size not found.")
×
170

171
        # get contrast aperture shape and size
172
        if self._config["ca_in_channel"] in metadata["file"]:
1✔
173
            ca_in = metadata["file"][self._config["ca_in_channel"]]
1✔
174
            for key, value in self._config["aperture_config"][timestamp]["ca_size"].items():
1✔
175
                if value[0] < ca_in < value[1]:
1✔
176
                    try:
1✔
177
                        metadata["instrument"]["analyzer"]["ca_size"] = float(key)
1✔
178
                    except ValueError:  # store string if numeric interpretation fails
1✔
179
                        metadata["instrument"]["analyzer"]["ca_shape"] = key
1✔
180
                    break
1✔
181
            else:
NEW
182
                logger.warning("Contrast aperture size not found.")
×
183

184
        # Storing the lens modes corresponding to lens voltages.
185
        # Use lens voltages present in first lens_mode entry.
186
        lens_list = self._config["lens_mode_config"][
1✔
187
            next(iter(self._config["lens_mode_config"]))
188
        ].keys()
189

190
        lens_volts = np.array(
1✔
191
            [metadata["file"].get(f"KTOF:Lens:{lens}:V", np.nan) for lens in lens_list],
192
        )
193
        for mode, value in self._config["lens_mode_config"].items():
1✔
194
            lens_volts_config = np.array([value[k] for k in lens_list])
1✔
195
            if np.allclose(
1✔
196
                lens_volts,
197
                lens_volts_config,
198
                rtol=0.005,
199
            ):  # Equal upto 0.5% tolerance
200
                metadata["instrument"]["analyzer"]["lens_mode"] = mode
1✔
201
                break
1✔
202
        else:
203
            logger.warning(
1✔
204
                "Lens mode for given lens voltages not found. "
205
                "Storing lens mode from the user, if provided.",
206
            )
207

208
        # Determining projection from the lens mode
209
        try:
1✔
210
            lens_mode = metadata["instrument"]["analyzer"]["lens_mode"]
1✔
211
            if "spatial" in lens_mode.split("_")[1]:
1✔
NEW
212
                metadata["instrument"]["analyzer"]["projection"] = "real"
×
NEW
213
                metadata["instrument"]["analyzer"]["scheme"] = "spatial dispersive"
×
214
            else:
215
                metadata["instrument"]["analyzer"]["projection"] = "reciprocal"
1✔
216
                metadata["instrument"]["analyzer"]["scheme"] = "momentum dispersive"
1✔
217
        except IndexError:
1✔
NEW
218
            logger.warning(
×
219
                "Lens mode must have the form, '6kV_kmodem4.0_20VTOF_v3.sav'. "
220
                "Can't determine projection. "
221
                "Storing projection from the user, if provided.",
222
            )
223
        except KeyError:
1✔
224
            logger.warning(
1✔
225
                "Lens mode not found. Can't determine projection. "
226
                "Storing projection from the user, if provided.",
227
            )
228

229
        return metadata
1✔
230

231
    def fetch_elab_metadata(self, runs: list[str], metadata: dict) -> dict:
1✔
232
        """Fetch metadata from an elabFTW instance
233

234
        Args:
235
            runs (list[str]): List of runs for which to fetch metadata
236
            metadata (dict): Input metadata dictionary. Will be updated
237

238
        Returns:
239
            dict: Updated metadata dictionary
240
        """
241
        if not self.token:
1✔
242
            logger.warning(
1✔
243
                "No valid token found. Token is required for metadata collection. Either provide "
244
                "a token parameter or set the ELAB_TOKEN environment variable.",
245
            )
246
            return metadata
1✔
247

248
        if not self.url:
1✔
249
            logger.warning(
1✔
250
                "No URL provided for fetching metadata from elabFTW. "
251
                "Fetching elabFTW metadata will be skipped.",
252
            )
253
            return metadata
1✔
254

255
        logger.info("Collecting data from the elabFTW instance...")
1✔
256
        # Get the experiment
257
        try:
1✔
258
            experiment = self.experimentsApi.read_experiments(q=f"'Metis scan {runs[0]}'")[0]
1✔
NEW
259
        except IndexError:
×
NEW
260
            logger.warning(f"No elabFTW entry found for run {runs[0]}")
×
NEW
261
            return metadata
×
262

263
        if "elabFTW" not in metadata:
1✔
264
            metadata["elabFTW"] = {}
1✔
265

266
        exp_id = experiment.id
1✔
267
        # Get user information
268
        user = self.usersApi.read_user(experiment.userid)
1✔
269
        metadata["elabFTW"]["user"] = {}
1✔
270
        metadata["elabFTW"]["user"]["name"] = user.fullname
1✔
271
        metadata["elabFTW"]["user"]["email"] = user.email
1✔
272
        metadata["elabFTW"]["user"]["id"] = user.userid
1✔
273
        if user.orcid:
1✔
274
            metadata["elabFTW"]["user"]["orcid"] = user.orcid
1✔
275
        # Get the links to items
276
        links = self.linksApi.read_entity_items_links(entity_type="experiments", id=exp_id)
1✔
277
        # Get the items
278
        items = [self.itemsApi.get_item(link.entityid) for link in links]
1✔
279
        items_dict = {item.category_title: item for item in items}
1✔
280
        items_dict["scan"] = experiment
1✔
281

282
        # Sort the metadata
283
        for category, item in items_dict.items():
1✔
284
            category = category.replace(":", "").replace(" ", "_").lower()
1✔
285
            if category not in metadata["elabFTW"]:
1✔
286
                metadata["elabFTW"][category] = {}
1✔
287
            metadata["elabFTW"][category]["title"] = item.title
1✔
288
            metadata["elabFTW"][category]["summary"] = item.body
1✔
289
            metadata["elabFTW"][category]["id"] = item.id
1✔
290
            metadata["elabFTW"][category]["elabid"] = item.elabid
1✔
291
            if item.sharelink:
1✔
292
                metadata["elabFTW"][category]["link"] = item.sharelink
1✔
293
            if item.metadata is not None:
1✔
294
                metadata_json = json.loads(item.metadata)
1✔
295
                for key, val in metadata_json["extra_fields"].items():
1✔
296
                    if val["value"] is not None and val["value"] != "" and val["value"] != ["None"]:
1✔
297
                        try:
1✔
298
                            metadata["elabFTW"][category][key] = float(val["value"])
1✔
299
                        except ValueError:
1✔
300
                            metadata["elabFTW"][category][key] = val["value"]
1✔
301

302
        # group beam profiles:
303
        if (
1✔
304
            "laser_status" in metadata["elabFTW"]
305
            and "pump_profile_x" in metadata["elabFTW"]["laser_status"]
306
            and "pump_profile_y" in metadata["elabFTW"]["laser_status"]
307
        ):
NEW
308
            metadata["elabFTW"]["laser_status"]["pump_profile"] = [
×
309
                float(metadata["elabFTW"]["laser_status"]["pump_profile_x"]),
310
                float(metadata["elabFTW"]["laser_status"]["pump_profile_y"]),
311
            ]
312
        if (
1✔
313
            "laser_status" in metadata["elabFTW"]
314
            and "probe_profile_x" in metadata["elabFTW"]["laser_status"]
315
            and "probe_profile_y" in metadata["elabFTW"]["laser_status"]
316
        ):
NEW
317
            metadata["elabFTW"]["laser_status"]["probe_profile"] = [
×
318
                float(metadata["elabFTW"]["laser_status"]["probe_profile_x"]),
319
                float(metadata["elabFTW"]["laser_status"]["probe_profile_y"]),
320
            ]
321

322
        # fix preparation date
323
        if "sample" in metadata["elabFTW"] and "preparation_date" in metadata["elabFTW"]["sample"]:
1✔
NEW
324
            metadata["elabFTW"]["sample"]["preparation_date"] = (
×
325
                datetime.datetime.strptime(
326
                    metadata["elabFTW"]["sample"]["preparation_date"],
327
                    "%Y-%m-%d",
328
                )
329
                .replace(tzinfo=datetime.timezone.utc)
330
                .isoformat()
331
            )
332

333
        # fix polarizations
334
        if (
1✔
335
            "scan" in metadata["elabFTW"]
336
            and "pump_polarization" in metadata["elabFTW"]["scan"]
337
            and isinstance(metadata["elabFTW"]["scan"]["pump_polarization"], str)
338
        ):
NEW
339
            if metadata["elabFTW"]["scan"]["pump_polarization"] == "s":
×
NEW
340
                metadata["elabFTW"]["scan"]["pump_polarization"] = 90
×
NEW
341
            elif metadata["elabFTW"]["scan"]["pump_polarization"] == "p":
×
NEW
342
                metadata["elabFTW"]["scan"]["pump_polarization"] = 0
×
343

344
        if (
1✔
345
            "scan" in metadata["elabFTW"]
346
            and "probe_polarization" in metadata["elabFTW"]["scan"]
347
            and isinstance(metadata["elabFTW"]["scan"]["probe_polarization"], str)
348
        ):
NEW
349
            if metadata["elabFTW"]["scan"]["probe_polarization"] == "s":
×
NEW
350
                metadata["elabFTW"]["scan"]["probe_polarization"] = 90
×
NEW
351
            elif metadata["elabFTW"]["scan"]["probe_polarization"] == "p":
×
NEW
352
                metadata["elabFTW"]["scan"]["probe_polarization"] = 0
×
353

354
        # remove pump information if pump not applied:
355
        if not metadata["elabFTW"]["scan"].get("pump_status", 0):
1✔
356
            if "pump_photon_energy" in metadata["elabFTW"].get("laser_status", {}):
1✔
NEW
357
                del metadata["elabFTW"]["laser_status"]["pump_photon_energy"]
×
358
            if "pump_repetition_rate" in metadata["elabFTW"].get("laser_status", {}):
1✔
NEW
359
                del metadata["elabFTW"]["laser_status"]["pump_repetition_rate"]
×
360

361
        return metadata
1✔
362

363

364
def get_archiver_data(
1✔
365
    archiver_url: str,
366
    archiver_channel: str,
367
    ts_from: float,
368
    ts_to: float,
369
) -> tuple[np.ndarray, np.ndarray]:
370
    """Extract time stamps and corresponding data from and EPICS archiver instance
371

372
    Args:
373
        archiver_url (str): URL of the archiver data extraction interface
374
        archiver_channel (str): EPICS channel to extract data for
375
        ts_from (float): starting time stamp of the range of interest
376
        ts_to (float): ending time stamp of the range of interest
377

378
    Returns:
379
        tuple[np.ndarray, np.ndarray]: The extracted time stamps and corresponding data
380
    """
381
    iso_from = datetime.datetime.utcfromtimestamp(ts_from).isoformat()
1✔
382
    iso_to = datetime.datetime.utcfromtimestamp(ts_to).isoformat()
1✔
383
    req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z"
1✔
384
    with urlopen(req_str) as req:
1✔
385
        data = json.load(req)
1✔
386
        secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]]
1✔
387
        vals = [x["val"] for x in data[0]["data"]]
1✔
388

389
    return (np.asarray(secs), np.asarray(vals))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc