• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

materialsproject / pymatgen / 4075885785

pending completion
4075885785

push

github

Shyue Ping Ong
Merge branch 'master' of github.com:materialsproject/pymatgen

96 of 96 new or added lines in 27 files covered. (100.0%)

81013 of 102710 relevant lines covered (78.88%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.32
/pymatgen/ext/optimade.py
1
"""
2
Optimade support.
3
"""
4

5
from __future__ import annotations
1✔
6

7
import logging
1✔
8
import sys
1✔
9
from collections import namedtuple
1✔
10
from os.path import join
1✔
11
from urllib.parse import urlparse
1✔
12

13
import requests
1✔
14
from tqdm import tqdm
1✔
15

16
from pymatgen.core.periodic_table import DummySpecies
1✔
17
from pymatgen.core.structure import Structure
1✔
18
from pymatgen.util.provenance import StructureNL
1✔
19

20
# from retrying import retry
21

22

23
# TODO: importing optimade-python-tool's data structures will make more sense
24
Provider = namedtuple("Provider", ["name", "base_url", "description", "homepage", "prefix"])
1✔
25

26
_logger = logging.getLogger(__name__)
1✔
27
_handler = logging.StreamHandler(sys.stdout)
1✔
28
_logger.addHandler(_handler)
1✔
29
_logger.setLevel(logging.WARNING)
1✔
30

31

32
class OptimadeRester:
1✔
33
    """
34
    Class to call OPTIMADE-compliant APIs, see https://optimade.org and [1].
35

36
    This class is ready to use but considered in-development and subject to change.
37

38
    [1] Andersen, C.W., *et al*.
39
        OPTIMADE, an API for exchanging materials data.
40
        Sci Data 8, 217 (2021). https://doi.org/10.1038/s41597-021-00974-z
41

42
    """
43

44
    # regenerate on-demand from official providers.json using OptimadeRester.refresh_aliases()
45
    # these aliases are provided as a convenient shortcut for users of the OptimadeRester class
46
    aliases: dict[str, str] = {
1✔
47
        "aflow": "http://aflow.org/API/optimade/",
48
        "cod": "https://www.crystallography.net/cod/optimade",
49
        "mcloud.2dstructures": "https://aiida.materialscloud.org/2dstructures/optimade",
50
        "mcloud.2dtopo": "https://aiida.materialscloud.org/2dtopo/optimade",
51
        "mcloud.curated-cofs": "https://aiida.materialscloud.org/curated-cofs/optimade",
52
        "mcloud.li-ion-conductors": "https://aiida.materialscloud.org/li-ion-conductors/optimade",
53
        "mcloud.optimade-sample": "https://aiida.materialscloud.org/optimade-sample/optimade",
54
        "mcloud.pyrene-mofs": "https://aiida.materialscloud.org/pyrene-mofs/optimade",
55
        "mcloud.scdm": "https://aiida.materialscloud.org/autowannier/optimade",
56
        "mcloud.sssp": "https://aiida.materialscloud.org/sssplibrary/optimade",
57
        "mcloud.stoceriaitf": "https://aiida.materialscloud.org/stoceriaitf/optimade",
58
        "mcloud.tc-applicability": "https://aiida.materialscloud.org/tc-applicability/optimade",
59
        "mcloud.threedd": "https://aiida.materialscloud.org/3dd/optimade",
60
        "mp": "https://optimade.materialsproject.org",
61
        "mpds": "https://api.mpds.io",
62
        "nmd": "https://nomad-lab.eu/prod/rae/optimade/",
63
        "odbx": "https://optimade.odbx.science",
64
        "omdb.omdb_production": "http://optimade.openmaterialsdb.se",
65
        "oqmd": "http://oqmd.org/optimade/",
66
        "tcod": "https://www.crystallography.net/tcod/optimade",
67
    }
68

69
    # The set of OPTIMADE fields that are required to define a `pymatgen.core.Structure`
70
    mandatory_response_fields: set[str] = {"lattice_vectors", "cartesian_site_positions", "species", "species_at_sites"}
1✔
71

72
    def __init__(self, aliases_or_resource_urls: str | list[str] | None = None, timeout: int = 5):
1✔
73
        """
74
        OPTIMADE is an effort to provide a standardized interface to retrieve information
75
        from many different materials science databases.
76

77
        This is a client to retrieve structures from OPTIMADE v1 compliant endpoints. It
78
        does not yet support all features of the OPTIMADE v1 specification but is intended
79
        as a way to quickly search an endpoint in a way familiar to users of pymatgen without
80
        needing to know the full OPTIMADE specification.
81

82
        For advanced usage, please see the OPTIMADE documentation at optimade.org and
83
        consider calling the APIs directly.
84

85
        For convenience, known OPTIMADE endpoints have been given aliases in pymatgen to save
86
        typing the full URL. The current list of aliases is:
87

88
        aflow, cod, mcloud.sssp, mcloud.2dstructures, mcloud.2dtopo, mcloud.tc-applicability,
89
        mcloud.threedd, mcloud.scdm, mcloud.curated-cofs, mcloud.optimade-sample, mcloud.stoceriaitf,
90
        mcloud.pyrene-mofs, mcloud.li-ion-conductors, mp, odbx, omdb.omdb_production, oqmd, tcod
91

92
        To refresh this list of aliases, generated from the current list of OPTIMADE providers
93
        at optimade.org, call the refresh_aliases() method.
94

95
        Args:
96
            aliases_or_resource_urls: the alias or structure resource URL or a list of
97
            aliases or resource URLs, if providing the resource URL directly it should not
98
            be an index, this interface can only currently access the "v1/structures"
99
            information from the specified resource URL
100
            timeout: number of seconds before an attempted request is abandoned, a good
101
            timeout is useful when querying many providers, some of which may be offline
102
        """
103
        # TODO: maybe we should use the nice pydantic models from optimade-python-tools
104
        #  for response validation, and use the Lark parser for filter validation
105
        self.session = requests.Session()
1✔
106
        self._timeout = timeout  # seconds
1✔
107

108
        if isinstance(aliases_or_resource_urls, str):
1✔
109
            aliases_or_resource_urls = [aliases_or_resource_urls]
1✔
110

111
        # this stores a dictionary with keys provider id (in the same format as the aliases)
112
        # and values as the corresponding URL
113
        self.resources = {}
1✔
114

115
        if not aliases_or_resource_urls:
1✔
116
            aliases_or_resource_urls = list(self.aliases)
×
117
            _logger.warning(
×
118
                "Connecting to all known OPTIMADE providers, this will be slow. Please connect to only the "
119
                f"OPTIMADE providers you want to query. Choose from: {', '.join(self.aliases)}"
120
            )
121

122
        for alias_or_resource_url in aliases_or_resource_urls:
1✔
123
            if alias_or_resource_url in self.aliases:
1✔
124
                self.resources[alias_or_resource_url] = self.aliases[alias_or_resource_url]
1✔
125

126
            elif self._validate_provider(alias_or_resource_url):
×
127
                # TODO: unclear what the key should be here, the "prefix" is for the root provider,
128
                # may need to walk back to the index for the given provider to find the correct identifier
129

130
                self.resources[alias_or_resource_url] = alias_or_resource_url
×
131

132
            else:
133
                _logger.error(f"The following is not a known alias or a valid url: {alias_or_resource_url}")
×
134

135
        self._providers = {url: self._validate_provider(provider_url=url) for url in self.resources.values()}
1✔
136

137
    def __repr__(self):
1✔
138
        return f"OptimadeRester connected to: {', '.join(self.resources.values())}"
×
139

140
    def __str__(self):
1✔
141
        return self.describe()
×
142

143
    def describe(self):
1✔
144
        """
145
        Provides human-readable information about the resources being searched by the OptimadeRester.
146
        """
147
        provider_text = "\n".join(map(str, (provider for provider in self._providers.values() if provider)))
×
148
        description = f"OptimadeRester connected to:\n{provider_text}"
×
149
        return description
×
150

151
    # @retry(stop_max_attempt_number=3, wait_random_min=1000, wait_random_max=2000)
152
    def _get_json(self, url):
1✔
153
        """
154
        Retrieves JSON, will attempt to (politely) try again on failure subject to a
155
        random delay and a maximum number of attempts.
156
        """
157
        return self.session.get(url, timeout=self._timeout).json()
1✔
158

159
    @staticmethod
1✔
160
    def _build_filter(
1✔
161
        elements: str | list[str] | None = None,
162
        nelements: int | None = None,
163
        nsites: int | None = None,
164
        chemical_formula_anonymous: str | None = None,
165
        chemical_formula_hill: str | None = None,
166
    ):
167
        """
168
        Convenience method to build an OPTIMADE filter.
169
        """
170
        filters = []
1✔
171

172
        if elements:
1✔
173
            if isinstance(elements, str):
1✔
174
                elements = [elements]
×
175
            elements_str = ", ".join(f"{el!r}" for el in elements)
1✔
176
            filters.append(f"(elements HAS ALL {elements_str})")
1✔
177

178
        if nsites:
1✔
179
            if isinstance(nsites, (list, tuple)):
×
180
                filters.append(f"(nsites>={min(nsites)} AND nsites<={max(nsites)})")
×
181
            else:
182
                filters.append(f"({nsites=})")
×
183

184
        if nelements:
1✔
185
            if isinstance(nelements, (list, tuple)):
1✔
186
                filters.append(f"(nelements>={min(nelements)} AND nelements<={max(nelements)})")
×
187
            else:
188
                filters.append(f"({nelements=})")
1✔
189

190
        if chemical_formula_anonymous:
1✔
191
            filters.append(f"(chemical_formula_anonymous={chemical_formula_anonymous!r})")
×
192

193
        if chemical_formula_hill:
1✔
194
            filters.append(f"(chemical_formula_hill={chemical_formula_anonymous!r})")
×
195

196
        return " AND ".join(filters)
1✔
197

198
    def get_structures(
1✔
199
        self,
200
        elements: list[str] | str | None = None,
201
        nelements: int | None = None,
202
        nsites: int | None = None,
203
        chemical_formula_anonymous: str | None = None,
204
        chemical_formula_hill: str | None = None,
205
    ) -> dict[str, dict[str, Structure]]:
206
        """
207
        Retrieve Structures from OPTIMADE providers.
208

209
        Not all functionality of OPTIMADE is currently exposed in this convenience method. To
210
        use a custom filter, call get_structures_with_filter().
211

212
        Args:
213
            elements: List of elements
214
            nelements: Number of elements, e.g. 4 or [2, 5] for the range >=2 and <=5
215
            nsites: Number of sites, e.g. 4 or [2, 5] for the range >=2 and <=5
216
            chemical_formula_anonymous: Anonymous chemical formula
217
            chemical_formula_hill: Chemical formula following Hill convention
218

219
        Returns: Dict of (Dict Structures keyed by that database's id system) keyed by provider
220
        """
221
        optimade_filter = self._build_filter(
1✔
222
            elements=elements,
223
            nelements=nelements,
224
            nsites=nsites,
225
            chemical_formula_anonymous=chemical_formula_anonymous,
226
            chemical_formula_hill=chemical_formula_hill,
227
        )
228

229
        return self.get_structures_with_filter(optimade_filter)
1✔
230

231
    def get_snls(
1✔
232
        self,
233
        elements: list[str] | str | None = None,
234
        nelements: int | None = None,
235
        nsites: int | None = None,
236
        chemical_formula_anonymous: str | None = None,
237
        chemical_formula_hill: str | None = None,
238
        additional_response_fields: str | list[str] | set[str] | None = None,
239
    ) -> dict[str, dict[str, StructureNL]]:
240
        """
241
        Retrieve StructureNL from OPTIMADE providers.
242

243
        A StructureNL is an object provided by pymatgen which combines Structure with
244
        associated metadata, such as the URL is was downloaded from and any additional namespaced
245
        data.
246

247
        Not all functionality of OPTIMADE is currently exposed in this convenience method. To
248
        use a custom filter, call get_structures_with_filter().
249

250
        Args:
251
            elements: List of elements
252
            nelements: Number of elements, e.g. 4 or [2, 5] for the range >=2 and <=5
253
            nsites: Number of sites, e.g. 4 or [2, 5] for the range >=2 and <=5
254
            chemical_formula_anonymous: Anonymous chemical formula
255
            chemical_formula_hill: Chemical formula following Hill convention
256
            additional_response_fields: Any additional fields desired from the OPTIMADE API,
257
            these will be stored under the `'_optimade'` key in each `StructureNL.data` dictionary.
258

259
        Returns: Dict of (Dict of StructureNLs keyed by that database's id system) keyed by provider
260
        """
261
        optimade_filter = self._build_filter(
1✔
262
            elements=elements,
263
            nelements=nelements,
264
            nsites=nsites,
265
            chemical_formula_anonymous=chemical_formula_anonymous,
266
            chemical_formula_hill=chemical_formula_hill,
267
        )
268

269
        return self.get_snls_with_filter(optimade_filter, additional_response_fields=additional_response_fields)
1✔
270

271
    def get_structures_with_filter(self, optimade_filter: str) -> dict[str, dict[str, Structure]]:
1✔
272
        """
273
        Get structures satisfying a given OPTIMADE filter.
274

275
        Args:
276
            optimade_filter: An OPTIMADE-compliant filter
277

278
        Returns: Dict of Structures keyed by that database's id system
279
        """
280
        all_snls = self.get_snls_with_filter(optimade_filter)
1✔
281
        all_structures = {}
1✔
282

283
        for identifier, snls_dict in all_snls.items():
1✔
284
            all_structures[identifier] = {k: snl.structure for k, snl in snls_dict.items()}
×
285

286
        return all_structures
1✔
287

288
    def get_snls_with_filter(
1✔
289
        self,
290
        optimade_filter: str,
291
        additional_response_fields: str | list[str] | set[str] | None = None,
292
    ) -> dict[str, dict[str, StructureNL]]:
293
        """
294
        Get structures satisfying a given OPTIMADE filter.
295

296
        Args:
297
            optimade_filter: An OPTIMADE-compliant filter
298
            additional_response_fields: Any additional fields desired from the OPTIMADE API,
299

300
        Returns: Dict of Structures keyed by that database's id system
301
        """
302
        all_snls = {}
1✔
303

304
        response_fields = self._handle_response_fields(additional_response_fields)
1✔
305

306
        for identifier, resource in self.resources.items():
1✔
307
            url = join(resource, f"v1/structures?filter={optimade_filter}&{response_fields=}")
1✔
308

309
            try:
1✔
310
                json = self._get_json(url)
1✔
311

312
                structures = self._get_snls_from_resource(json, url, identifier)
1✔
313

314
                pbar = tqdm(total=json["meta"].get("data_returned", 0), desc=identifier, initial=len(structures))
×
315

316
                # TODO: check spec for `more_data_available` boolean, may simplify this conditional
317
                if ("links" in json) and ("next" in json["links"]) and (json["links"]["next"]):
×
318
                    while "next" in json["links"] and json["links"]["next"]:
×
319
                        next_link = json["links"]["next"]
×
320
                        if isinstance(next_link, dict) and "href" in next_link:
×
321
                            next_link = next_link["href"]
×
322
                        json = self._get_json(next_link)
×
323
                        additional_structures = self._get_snls_from_resource(json, url, identifier)
×
324
                        structures.update(additional_structures)
×
325
                        pbar.update(len(additional_structures))
×
326

327
                if structures:
×
328
                    all_snls[identifier] = structures
×
329

330
            except Exception as exc:
1✔
331
                # TODO: manually inspect failures to either (a) correct a bug or (b) raise more appropriate error
332

333
                _logger.error(
1✔
334
                    f"Could not retrieve required information from provider {identifier} and url {url}: {exc}"
335
                )
336

337
        return all_snls
1✔
338

339
    @staticmethod
1✔
340
    def _get_snls_from_resource(json, url, identifier) -> dict[str, StructureNL]:
1✔
341
        snls = {}
1✔
342

343
        exceptions = set()
1✔
344

345
        def _sanitize_symbol(symbol):
1✔
346
            if symbol == "vacancy":
×
347
                symbol = DummySpecies("X_vacancy", oxidation_state=None)
×
348
            elif symbol == "X":
×
349
                symbol = DummySpecies("X", oxidation_state=None)
×
350
            return symbol
×
351

352
        def _get_comp(sp_dict):
1✔
353
            return {
×
354
                _sanitize_symbol(symbol): conc
355
                for symbol, conc in zip(sp_dict["chemical_symbols"], sp_dict["concentration"])
356
            }
357

358
        for data in json["data"]:
1✔
359
            # TODO: check the spec! and remove this try/except (are all providers following spec?)
360
            # e.g. can check data["type"] == "structures"
361

362
            try:
×
363
                # e.g. COD
364
                structure = Structure(
×
365
                    lattice=data["attributes"]["lattice_vectors"],
366
                    species=[_get_comp(d) for d in data["attributes"]["species"]],
367
                    coords=data["attributes"]["cartesian_site_positions"],
368
                    coords_are_cartesian=True,
369
                )
370
                # Grab any custom fields or non-mandatory fields if they were requested
371
                namespaced_data = {
×
372
                    k: v
373
                    for k, v in data["attributes"].items()
374
                    if k.startswith("_") or k not in {"lattice_vectors", "species", "cartesian_site_positions"}
375
                }
376

377
                # TODO: follow `references` to add reference information here
378
                snl = StructureNL(
×
379
                    structure,
380
                    authors={},
381
                    history=[{"name": identifier, "url": url, "description": {"id": data["id"]}}],
382
                    data={"_optimade": namespaced_data},
383
                )
384

385
                snls[data["id"]] = snl
×
386

387
            # TODO: bare exception, remove...
388
            except Exception:
×
389
                try:
×
390
                    # e.g. MP (all ordered, no vacancies)
391
                    structure = Structure(
×
392
                        lattice=data["attributes"]["lattice_vectors"],
393
                        species=data["attributes"]["species_at_sites"],
394
                        coords=data["attributes"]["cartesian_site_positions"],
395
                        coords_are_cartesian=True,
396
                    )
397
                    # Grab any custom fields or non-mandatory fields if they were requested
398
                    namespaced_data = {
×
399
                        k: v
400
                        for k, v in data["attributes"].items()
401
                        if k.startswith("_") or k not in {"lattice_vectors", "species", "cartesian_site_positions"}
402
                    }
403

404
                    # TODO: follow `references` to add reference information here
405
                    snl = StructureNL(
×
406
                        structure,
407
                        authors={},
408
                        history=[{"name": identifier, "url": url, "description": {"id": data["id"]}}],
409
                        data={"_optimade": namespaced_data},
410
                    )
411

412
                    snls[data["id"]] = snl
×
413

414
                except Exception as exc:
×
415
                    if str(exc) not in exceptions:
×
416
                        exceptions.add(str(exc))
×
417

418
        if exceptions:
×
419
            _logger.error(f'Failed to parse returned data for {url}: {", ".join(exceptions)}')
×
420

421
        return snls
×
422

423
    def _validate_provider(self, provider_url) -> Provider | None:
1✔
424
        """
425
        Checks that a given URL is indeed an OPTIMADE provider,
426
        returning None if it is not a provider, or the provider
427
        prefix if it is.
428

429
        TODO: careful reading of OPTIMADE specification required
430
        TODO: add better exception handling, intentionally permissive currently
431
        """
432

433
        def is_url(url) -> bool:
1✔
434
            """
435
            Basic URL validation thanks to https://stackoverflow.com/a/52455972
436
            """
437
            try:
1✔
438
                result = urlparse(url)
1✔
439
                return all([result.scheme, result.netloc])
1✔
440
            except ValueError:
×
441
                return False
×
442

443
        if not is_url(provider_url):
1✔
444
            _logger.warning(f"An invalid url was supplied: {provider_url}")
×
445
            return None
×
446

447
        try:
1✔
448
            url = join(provider_url, "v1/info")
1✔
449
            provider_info_json = self._get_json(url)
1✔
450
        except Exception as exc:
×
451
            _logger.warning(f"Failed to parse {url} when validating: {exc}")
×
452
            return None
×
453

454
        try:
1✔
455
            return Provider(
1✔
456
                name=provider_info_json["meta"].get("provider", {}).get("name", "Unknown"),
457
                base_url=provider_url,
458
                description=provider_info_json["meta"].get("provider", {}).get("description", "Unknown"),
459
                homepage=provider_info_json["meta"].get("provider", {}).get("homepage"),
460
                prefix=provider_info_json["meta"].get("provider", {}).get("prefix", "Unknown"),
461
            )
462
        except Exception as exc:
1✔
463
            _logger.warning(f"Failed to extract required information from {url}: {exc}")
1✔
464
            return None
1✔
465

466
    def _parse_provider(self, provider, provider_url) -> dict[str, Provider]:
1✔
467
        """
468
        Used internally to update the list of providers or to
469
        check a given URL is valid.
470

471
        It does not raise exceptions but will instead _logger.warning and provide
472
        an empty dictionary in the case of invalid data.
473

474
        In future, when the specification  is sufficiently well adopted,
475
        we might be more strict here.
476

477
        Args:
478
            provider: the provider prefix
479
            provider_url: An OPTIMADE provider URL
480

481
        Returns:
482
            A dictionary of keys (in format of "provider.database") to
483
            Provider objects.
484
        """
485
        try:
×
486
            url = join(provider_url, "v1/links")
×
487
            provider_link_json = self._get_json(url)
×
488
        except Exception as exc:
×
489
            _logger.error(f"Failed to parse {url} when following links: {exc}")
×
490
            return {}
×
491

492
        def _parse_provider_link(provider, provider_link_json):
×
493
            """No validation attempted."""
494
            ps = {}
×
495
            try:
×
496
                d = [d for d in provider_link_json["data"] if d["attributes"]["link_type"] == "child"]
×
497
                for link in d:
×
498
                    key = f"{provider}.{link['id']}" if provider != link["id"] else provider
×
499
                    if link["attributes"]["base_url"]:
×
500
                        ps[key] = Provider(
×
501
                            name=link["attributes"]["name"],
502
                            base_url=link["attributes"]["base_url"],
503
                            description=link["attributes"]["description"],
504
                            homepage=link["attributes"].get("homepage"),
505
                            prefix=link["attributes"].get("prefix"),
506
                        )
507
            except Exception:
×
508
                # print(f"Failed to parse {provider}: {exc}")
509
                # Not all providers parse yet.
510
                pass
×
511
            return ps
×
512

513
        return _parse_provider_link(provider, provider_link_json)
×
514

515
    def _handle_response_fields(self, additional_response_fields: str | list[str] | set[str] | None = None) -> str:
1✔
516
        """
517
        Used internally to handle the mandatory and additional response fields.
518

519
        Args:
520
            additional_response_fields: A set of additional fields to request.
521

522
        Returns:
523
            A string of comma-separated OPTIMADE response fields.
524
        """
525
        if isinstance(additional_response_fields, str):
1✔
526
            additional_response_fields = {additional_response_fields}
1✔
527
        if not additional_response_fields:
1✔
528
            additional_response_fields = set()
1✔
529
        return ",".join({*additional_response_fields} | self.mandatory_response_fields)
1✔
530

531
    def refresh_aliases(self, providers_url="https://providers.optimade.org/providers.json"):
1✔
532
        """
533
        Updates available OPTIMADE structure resources based on the current list of OPTIMADE
534
        providers.
535
        """
536
        json = self._get_json(providers_url)
×
537
        providers_from_url = {
×
538
            entry["id"]: entry["attributes"]["base_url"] for entry in json["data"] if entry["attributes"]["base_url"]
539
        }
540

541
        structure_providers = {}
×
542
        for provider, provider_link in providers_from_url.items():
×
543
            structure_providers.update(self._parse_provider(provider, provider_link))
×
544

545
        self.aliases = {alias: provider.base_url for alias, provider in structure_providers.items()}
×
546

547
    # TODO: revisit context manager logic here and in MPRester
548
    def __enter__(self):
1✔
549
        """
550
        Support for "with" context.
551
        """
552
        return self
1✔
553

554
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
555
        """
556
        Support for "with" context.
557
        """
558
        self.session.close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc