• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

materialsproject / pymatgen / 4075885785

pending completion
4075885785

push

github

Shyue Ping Ong
Merge branch 'master' of github.com:materialsproject/pymatgen

96 of 96 new or added lines in 27 files covered. (100.0%)

81013 of 102710 relevant lines covered (78.88%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.27
/pymatgen/analysis/chemenv/connectivity/structure_connectivity.py
1
"""
2
Structure connectivity class.
3
"""
4

5
from __future__ import annotations
1✔
6

7
import collections
1✔
8
import logging
1✔
9

10
import networkx as nx
1✔
11
import numpy as np
1✔
12
from monty.json import MSONable, jsanitize
1✔
13

14
from pymatgen.analysis.chemenv.connectivity.connected_components import (
1✔
15
    ConnectedComponent,
16
)
17
from pymatgen.analysis.chemenv.connectivity.environment_nodes import (
1✔
18
    get_environment_node,
19
)
20
from pymatgen.analysis.chemenv.coordination_environments.structure_environments import (
1✔
21
    LightStructureEnvironments,
22
)
23

24
__author__ = "David Waroquiers"
1✔
25
__copyright__ = "Copyright 2012, The Materials Project"
1✔
26
__credits__ = "Geoffroy Hautier"
1✔
27
__version__ = "1.0"
1✔
28
__maintainer__ = "David Waroquiers"
1✔
29
__email__ = "david.waroquiers@gmail.com"
1✔
30
__date__ = "June 25, 2019"
1✔
31

32

33
def get_delta_image(isite1, isite2, data1, data2):
1✔
34
    """
35
    Helper method to get the delta image between one environment and another
36
    from the ligand's delta images.
37
    """
38
    if data1["start"] == isite1:
1✔
39
        if data2["start"] == isite2:
1✔
40
            return np.array(data1["delta"]) - np.array(data2["delta"])
1✔
41
        return np.array(data1["delta"]) + np.array(data2["delta"])
×
42
    if data2["start"] == isite2:
×
43
        return -np.array(data1["delta"]) - np.array(data2["delta"])
×
44
    return -np.array(data1["delta"]) + np.array(data2["delta"])
×
45

46

47
class StructureConnectivity(MSONable):
1✔
48
    """
49
    Main class containing the connectivity of a structure.
50
    """
51

52
    def __init__(
1✔
53
        self,
54
        light_structure_environment,
55
        connectivity_graph=None,
56
        environment_subgraphs=None,
57
    ):
58
        """
59
        Constructor for the StructureConnectivity object.
60

61
        Args:
62
            light_structure_environment: a LightStructureEnvironments object
63
                                         containing the relevant local environments
64
                                         for the sites in the structure.
65
            connectivity_graph: the networkx MultiGraph if it has already been computed,
66
                                e.g. stored in a file or dict and StructureConnectivity
67
                                is reconstructed from that file or dict.
68
            environment_subgraphs: the different subgraphs of environments that have
69
                                   been computed if any (as for connectivity_graph, only
70
                                   if it is reconstructed from a file or dict).
71
        """
72
        self.light_structure_environments = light_structure_environment
1✔
73
        if connectivity_graph is None:
1✔
74
            self._graph = nx.MultiGraph()
1✔
75
        else:
76
            self._graph = connectivity_graph
1✔
77
        if environment_subgraphs is None:
1✔
78
            self.environment_subgraphs = {}
1✔
79
        else:
80
            self.environment_subgraphs = environment_subgraphs
×
81

82
    def environment_subgraph(self, environments_symbols=None, only_atoms=None):
1✔
83
        """
84
        Args:
85
            environments_symbols ():
86
            only_atoms ():
87

88
        Returns:
89
        """
90
        if environments_symbols is not None:
1✔
91
            self.setup_environment_subgraph(environments_symbols=environments_symbols, only_atoms=only_atoms)
1✔
92
        try:
1✔
93
            return self._environment_subgraph
1✔
94
        except AttributeError:
1✔
95
            all_envs = self.light_structure_environments.environments_identified()
1✔
96
            self.setup_environment_subgraph(environments_symbols=all_envs, only_atoms=only_atoms)
1✔
97
        return self._environment_subgraph
1✔
98

99
    def add_sites(self):
1✔
100
        """
101
        Add the sites in the structure connectivity graph.
102
        """
103
        self._graph.add_nodes_from(list(range(len(self.light_structure_environments.structure))))
1✔
104

105
    def add_bonds(self, isite, site_neighbors_set):
1✔
106
        """
107
        Add the bonds for a given site index to the structure connectivity graph.
108

109
        Args:
110
            isite: Index of the site for which the bonds have to be added.
111
            site_neighbors_set: site_neighbors_set: Neighbors set of the site
112
        """
113
        existing_edges = self._graph.edges(nbunch=[isite], data=True)
1✔
114
        for nb_index_and_image in site_neighbors_set.neighb_indices_and_images:
1✔
115
            nb_index_unitcell = nb_index_and_image["index"]
1✔
116
            nb_image_cell = nb_index_and_image["image_cell"]
1✔
117
            exists = False
1✔
118
            if np.allclose(nb_image_cell, np.zeros(3)):
1✔
119
                for _, ineighb1, data1 in existing_edges:
1✔
120
                    if np.allclose(data1["delta"], np.zeros(3)) and nb_index_unitcell == ineighb1:
1✔
121
                        exists = True
×
122
                        break
×
123
            else:
124
                if isite == nb_index_unitcell:
1✔
125
                    for isite1, ineighb1, data1 in existing_edges:
×
126
                        if isite1 == ineighb1:
×
127
                            if np.allclose(data1["delta"], nb_image_cell) or np.allclose(
×
128
                                data1["delta"], -nb_image_cell
129
                            ):
130
                                exists = True
×
131
                                break
×
132
                else:
133
                    for _, ineighb1, data1 in existing_edges:
1✔
134
                        if nb_index_unitcell == ineighb1:
1✔
135
                            if data1["start"] == isite:
1✔
136
                                if np.allclose(data1["delta"], nb_image_cell):
1✔
137
                                    exists = True
×
138
                                    break
×
139
                            elif data1["end"] == isite:
×
140
                                if np.allclose(data1["delta"], -nb_image_cell):
×
141
                                    exists = True
×
142
                                    break
×
143
                            else:
144
                                raise ValueError("SHOULD NOT HAPPEN ???")
×
145
            if not exists:
1✔
146
                self._graph.add_edge(
1✔
147
                    isite,
148
                    nb_index_unitcell,
149
                    start=isite,
150
                    end=nb_index_unitcell,
151
                    delta=nb_image_cell,
152
                )
153

154
    def setup_environment_subgraph(self, environments_symbols, only_atoms=None):
1✔
155
        """
156
        Set up the graph for predefined environments and optionally atoms.
157

158
        Args:
159
            environments_symbols: Symbols of the environments for the environment subgraph.
160
            only_atoms: Atoms to be considered.
161
        """
162
        logging.info(f"Setup of environment subgraph for environments {', '.join(environments_symbols)}")
1✔
163
        if not isinstance(environments_symbols, collections.abc.Iterable):
1✔
164
            environments_symbols = [environments_symbols]
×
165
        environments_symbols = sorted(environments_symbols)
1✔
166
        envs_string = "-".join(environments_symbols)
1✔
167
        if only_atoms is not None:
1✔
168
            envs_string += "#" + "-".join(sorted(only_atoms))
1✔
169
        # Get it directly if it was already computed
170
        if envs_string in self.environment_subgraphs:
1✔
171
            self._environment_subgraph = self.environment_subgraphs[envs_string]
×
172
            return
×
173

174
        # Initialize graph for a subset of environments
175
        self._environment_subgraph = nx.MultiGraph()
1✔
176
        # Add the sites with the required environment(s)
177
        for isite, ce_this_site_all in enumerate(self.light_structure_environments.coordination_environments):
1✔
178
            if ce_this_site_all is None:
1✔
179
                continue
1✔
180
            if len(ce_this_site_all) == 0:
1✔
181
                continue
×
182
            ce_this_site = ce_this_site_all[0]["ce_symbol"]
1✔
183
            if ce_this_site in environments_symbols:
1✔
184
                if only_atoms is None:
1✔
185
                    env_node = get_environment_node(
1✔
186
                        self.light_structure_environments.structure[isite],
187
                        isite,
188
                        ce_this_site,
189
                    )
190
                    self._environment_subgraph.add_node(env_node)
1✔
191
                else:
192
                    if self.light_structure_environments.structure.is_ordered:
1✔
193
                        if self.light_structure_environments.structure[isite].specie.symbol in only_atoms:
1✔
194
                            env_node = get_environment_node(
1✔
195
                                self.light_structure_environments.structure[isite],
196
                                isite,
197
                                ce_this_site,
198
                            )
199
                            self._environment_subgraph.add_node(env_node)
1✔
200
                    else:
201
                        #  TODO: add the possibility of a "constraint" on the minimum percentage
202
                        #        of the atoms on the site
203
                        this_site_elements = [
×
204
                            sp.symbol for sp in self.light_structure_environments.structure[isite].species_and_occu
205
                        ]
206
                        for elem_symbol in this_site_elements:
×
207
                            if elem_symbol in only_atoms:
×
208
                                env_node = get_environment_node(
×
209
                                    self.light_structure_environments.structure[isite],
210
                                    isite,
211
                                    ce_this_site,
212
                                )
213
                                self._environment_subgraph.add_node(env_node)
×
214
                                break
×
215
        # Find the connections between the environments
216
        nodes = list(self._environment_subgraph.nodes())
1✔
217
        for inode1, node1 in enumerate(nodes):
1✔
218
            isite1 = node1.isite
1✔
219
            links_node1 = self._graph.edges(isite1, data=True)
1✔
220
            for node2 in nodes[inode1:]:
1✔
221
                isite2 = node2.isite
1✔
222
                links_node2 = self._graph.edges(isite2, data=True)
1✔
223
                # We look for ligands that are common to both site1 and site2
224
                connections_site1_site2 = {}
1✔
225
                for _, ilig_site1, d1 in links_node1:
1✔
226
                    for _, ilig_site2, d2 in links_node2:
1✔
227
                        if ilig_site1 == ilig_site2:
1✔
228
                            delta_image = get_delta_image(isite1, isite2, d1, d2)
1✔
229
                            if isite1 == isite2 and np.all(delta_image == 0):
1✔
230
                                continue
1✔
231
                            tuple_delta_image = tuple(delta_image)
1✔
232
                            if tuple_delta_image in connections_site1_site2:
1✔
233
                                connections_site1_site2[tuple_delta_image].append((ilig_site1, d1, d2))
1✔
234
                            else:
235
                                connections_site1_site2[tuple_delta_image] = [(ilig_site1, d1, d2)]
1✔
236
                # Remove the double self-loops ...
237
                if isite1 == isite2:
1✔
238
                    remove_deltas = []
1✔
239
                    alldeltas = list(connections_site1_site2)
1✔
240
                    alldeltas2 = list(connections_site1_site2)
1✔
241
                    if (0, 0, 0) in alldeltas:
1✔
242
                        alldeltas.remove((0, 0, 0))
×
243
                        alldeltas2.remove((0, 0, 0))
×
244
                    for current_delta in alldeltas:
1✔
245
                        opp_current_delta = tuple(-dd for dd in current_delta)
1✔
246
                        if opp_current_delta in alldeltas2:
1✔
247
                            remove_deltas.append(current_delta)
1✔
248
                            alldeltas2.remove(current_delta)
1✔
249
                            alldeltas2.remove(opp_current_delta)
1✔
250
                    for remove_delta in remove_deltas:
1✔
251
                        connections_site1_site2.pop(remove_delta)
1✔
252
                # Add all the edges
253
                for conn, ligands in list(connections_site1_site2.items()):
1✔
254
                    self._environment_subgraph.add_edge(
1✔
255
                        node1,
256
                        node2,
257
                        start=node1.isite,
258
                        end=node2.isite,
259
                        delta=conn,
260
                        ligands=ligands,
261
                    )
262
        self.environment_subgraphs[envs_string] = self._environment_subgraph
1✔
263

264
    def setup_connectivity_description(self):
1✔
265
        """
266
        Returns:
267
        """
268

269
    def get_connected_components(self, environments_symbols=None, only_atoms=None):
1✔
270
        """
271
        Args:
272
            environments_symbols ():
273
            only_atoms ():
274

275
        Returns:
276
        """
277
        connected_components = []
1✔
278
        env_subgraph = self.environment_subgraph(environments_symbols=environments_symbols, only_atoms=only_atoms)
1✔
279
        for component_nodes in nx.connected_components(env_subgraph):
1✔
280
            graph = env_subgraph.subgraph(component_nodes).copy()
1✔
281
            connected_components.append(ConnectedComponent.from_graph(graph))
1✔
282
        return connected_components
1✔
283

284
    def setup_atom_environment_subgraph(self, atom_environment):
1✔
285
        """
286
        Args:
287
            atom_environment ():
288

289
        Returns:
290
        """
291
        raise NotImplementedError()
×
292

293
    def setup_environments_subgraph(self, environments_symbols):
1✔
294
        """
295
        Args:
296
            environments_symbols ():
297

298
        Returns:
299
        """
300
        raise NotImplementedError()
×
301

302
    def setup_atom_environments_subgraph(self, atoms_environments):
1✔
303
        """
304
        Args:
305
            atoms_environments ():
306

307
        Returns:
308
        """
309
        raise NotImplementedError()
×
310

311
    def print_links(self):
1✔
312
        """
313
        Returns:
314
        """
315
        nodes = self.environment_subgraph().nodes()
×
316
        print("Links in graph :")
×
317
        for node in nodes:
×
318
            print(node.isite, " is connected with : ")
×
319
            for n1, n2, data in self.environment_subgraph().edges(node, data=True):
×
320
                if n1.isite == data["start"]:
×
321
                    print(
×
322
                        f"  - {n2.isite} by {len(data['ligands'])} ligands ({data['delta'][0]} "
323
                        f"{data['delta'][1]} {data['delta'][2]})"
324
                    )
325
                else:
326
                    print(
×
327
                        f"  - {n2.isite} by {len(data['ligands'])} ligands ({-data['delta'][0]} "
328
                        f"{-data['delta'][1]} {-data['delta'][2]})"
329
                    )
330

331
    def as_dict(self):
1✔
332
        """
333
        Returns:
334
        """
335
        return {
1✔
336
            "@module": type(self).__module__,
337
            "@class": type(self).__name__,
338
            "light_structure_environments": self.light_structure_environments.as_dict(),
339
            "connectivity_graph": jsanitize(nx.to_dict_of_dicts(self._graph)),
340
            "environment_subgraphs": {
341
                env_key: jsanitize(nx.to_dict_of_dicts(subgraph))
342
                for env_key, subgraph in self.environment_subgraphs.items()
343
            },
344
        }
345

346
    @classmethod
1✔
347
    def from_dict(cls, d):
1✔
348
        """
349
        Args:
350
            d ():
351

352
        Returns:
353
        """
354
        # Reconstructs the graph with integer as nodes (json's as_dict replaces integer keys with str keys)
355
        cgraph = nx.from_dict_of_dicts(d["connectivity_graph"], create_using=nx.MultiGraph, multigraph_input=True)
1✔
356
        cgraph = nx.relabel_nodes(cgraph, int)  # Just relabel the nodes using integer casting (maps str->int)
1✔
357
        # Relabel multiedges (removes multiedges with str keys and adds them back with int keys)
358
        edges = set(cgraph.edges())
1✔
359
        for n1, n2 in edges:
1✔
360
            new_edges = {int(iedge): edata for iedge, edata in cgraph[n1][n2].items()}
1✔
361
            cgraph.remove_edges_from([(n1, n2, iedge) for iedge, edata in cgraph[n1][n2].items()])
1✔
362
            cgraph.add_edges_from([(n1, n2, iedge, edata) for iedge, edata in new_edges.items()])
1✔
363
        return cls(
1✔
364
            LightStructureEnvironments.from_dict(d["light_structure_environments"]),
365
            connectivity_graph=cgraph,
366
            environment_subgraphs=None,
367
        )
368
        # TODO: also deserialize the environment_subgraphs
369
        #            environment_subgraphs={env_key: nx.from_dict_of_dicts(subgraph, multigraph_input=True)
370
        #                                   for env_key, subgraph in d['environment_subgraphs'].items()})
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc