• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

materialsproject / pymatgen / 4075885785

pending completion
4075885785

push

github

Shyue Ping Ong
Merge branch 'master' of github.com:materialsproject/pymatgen

96 of 96 new or added lines in 27 files covered. (100.0%)

81013 of 102710 relevant lines covered (78.88%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.72
/pymatgen/command_line/mcsqs_caller.py
1
"""
2
Module to call mcsqs, distributed with AT-AT
3
https://www.brown.edu/Departments/Engineering/Labs/avdw/atat/
4
"""
5

6
from __future__ import annotations
1✔
7

8
import os
1✔
9
import tempfile
1✔
10
import warnings
1✔
11
from collections import namedtuple
1✔
12
from pathlib import Path
1✔
13
from shutil import which
1✔
14
from subprocess import Popen, TimeoutExpired
1✔
15

16
from monty.dev import requires
1✔
17

18
from pymatgen.core.structure import Structure
1✔
19

20
Sqs = namedtuple("Sqs", "bestsqs objective_function allsqs clusters directory")
1✔
21
"""
22
Return type for run_mcsqs.
23
bestsqs: Structure
24
objective_function: float | str
25
allsqs: List
26
clusters: List
27
directory: str
28
"""
29

30

31
@requires(
1✔
32
    which("mcsqs") and which("str2cif"),
33
    "run_mcsqs requires first installing AT-AT, see https://www.brown.edu/Departments/Engineering/Labs/avdw/atat/",
34
)
35
def run_mcsqs(
1✔
36
    structure: Structure,
37
    clusters: dict[int, float],
38
    scaling: int | list[int] = 1,
39
    search_time: float = 60,
40
    directory: str | None = None,
41
    instances: int | None = None,
42
    temperature: int | float = 1,
43
    wr: float = 1,
44
    wn: float = 1,
45
    wd: float = 0.5,
46
    tol: float = 1e-3,
47
) -> Sqs:
48
    """
49
    Helper function for calling mcsqs with different arguments
50
    Args:
51
        structure (Structure): Disordered pymatgen Structure object
52
        clusters (dict): Dictionary of cluster interactions with entries in the form
53
            number of atoms: cutoff in angstroms
54
        scaling (int or list): Scaling factor to determine supercell. Two options are possible:
55
                a. (preferred) Scales number of atoms, e.g., for a structure with 8 atoms,
56
                   scaling=4 would lead to a 32 atom supercell
57
                b. A sequence of three scaling factors, e.g., [2, 1, 1], which
58
                   specifies that the supercell should have dimensions 2a x b x c
59
            Defaults to 1.
60
        search_time (float): Time spent looking for the ideal SQS in minutes (default: 60)
61
        directory (str): Directory to run mcsqs calculation and store files (default: None
62
            runs calculations in a temp directory)
63
        instances (int): Specifies the number of parallel instances of mcsqs to run
64
            (default: number of cpu cores detected by Python)
65
        temperature (int or float): Monte Carlo temperature (default: 1), "T" in atat code
66
        wr (int or float): Weight assigned to range of perfect correlation match in objective
67
            function (default = 1)
68
        wn (int or float): Multiplicative decrease in weight per additional point in cluster (default: 1)
69
        wd (int or float): Exponent of decay in weight as function of cluster diameter (default: 0.5)
70
        tol (int or float): Tolerance for matching correlations (default: 1e-3)
71

72
    Returns:
73
        Tuple of Pymatgen structure SQS of the input structure, the mcsqs objective function,
74
            list of all SQS structures, and the directory where calculations are run
75
    """
76

77
    num_atoms = len(structure)
×
78

79
    if structure.is_ordered:
×
80
        raise ValueError("Pick a disordered structure")
×
81

82
    if instances is None:
×
83
        # os.cpu_count() can return None if detection fails
84
        instances = os.cpu_count()
×
85

86
    original_directory = os.getcwd()
×
87
    if not directory:
×
88
        directory = tempfile.mkdtemp()
×
89
    os.chdir(directory)
×
90

91
    if isinstance(scaling, (int, float)):
×
92
        if scaling % 1:
×
93
            raise ValueError(f"Scaling should be an integer, not {scaling}")
×
94
        mcsqs_find_sqs_cmd = ["mcsqs", f"-n {scaling * num_atoms}"]
×
95

96
    else:
97
        # Set supercell to identity (will make supercell with pymatgen)
98
        with open("sqscell.out", "w") as f:
×
99
            f.write("1\n1 0 0\n0 1 0\n0 0 1\n")
×
100
        structure = structure * scaling
×
101
        mcsqs_find_sqs_cmd = ["mcsqs", "-rc", f"-n {num_atoms}"]
×
102

103
    structure.to(filename="rndstr.in")
×
104

105
    # Generate clusters
106
    mcsqs_generate_clusters_cmd = ["mcsqs"]
×
107
    for num in clusters:
×
108
        mcsqs_generate_clusters_cmd.append("-" + str(num) + "=" + str(clusters[num]))
×
109

110
    # Run mcsqs to find clusters
111
    with Popen(mcsqs_generate_clusters_cmd) as process:
×
112
        process.communicate()
×
113

114
    # Generate SQS structures
115
    add_ons = [
×
116
        f"-T {temperature}",
117
        f"-wr {wr}",
118
        f"-wn {wn}",
119
        f"-wd {wd}",
120
        f"-tol {tol}",
121
    ]
122

123
    mcsqs_find_sqs_processes = []
×
124
    if instances and instances > 1:
×
125
        # if multiple instances, run a range of commands using "-ip"
126
        for i in range(instances):
×
127
            instance_cmd = [f"-ip {i + 1}"]
×
128
            cmd = mcsqs_find_sqs_cmd + add_ons + instance_cmd
×
129
            process = Popen(cmd)  # pylint: disable=R1732
×
130
            mcsqs_find_sqs_processes.append(process)
×
131
    else:
132
        # run normal mcsqs command
133
        cmd = mcsqs_find_sqs_cmd + add_ons
×
134
        process = Popen(cmd)  # pylint: disable=R1732
×
135
        mcsqs_find_sqs_processes.append(process)
×
136

137
    try:
×
138
        for process in mcsqs_find_sqs_processes:
×
139
            process.communicate(timeout=search_time * 60)
×
140

141
        if instances and instances > 1:
×
142
            process = Popen(["mcsqs", "-best"])  # pylint: disable=R1732
×
143
            process.communicate()
×
144

145
        if os.path.exists("bestsqs.out") and os.path.exists("bestcorr.out"):
×
146
            return _parse_sqs_path(".")
×
147

148
        raise RuntimeError("mcsqs exited before timeout reached")
×
149

150
    except TimeoutExpired:
×
151
        for process in mcsqs_find_sqs_processes:
×
152
            process.kill()
×
153
            process.communicate()
×
154

155
        # Find the best sqs structures
156
        if instances and instances > 1:
×
157
            if not os.path.exists("bestcorr1.out"):
×
158
                raise RuntimeError(
×
159
                    "mcsqs did not generate output files, "
160
                    "is search_time sufficient or are number of instances too high?"
161
                )
162

163
            process = Popen(["mcsqs", "-best"])  # pylint: disable=R1732
×
164
            process.communicate()
×
165

166
        if os.path.exists("bestsqs.out") and os.path.exists("bestcorr.out"):
×
167
            sqs = _parse_sqs_path(".")
×
168
            return sqs
×
169

170
        os.chdir(original_directory)
×
171
        raise TimeoutError("Cluster expansion took too long.")
×
172

173

174
def _parse_sqs_path(path) -> Sqs:
1✔
175
    """
176
    Private function to parse mcsqs output directory
177
    Args:
178
        path: directory to perform parsing
179

180
    Returns:
181
        Tuple of Pymatgen structure SQS of the input structure, the mcsqs objective function,
182
            list of all SQS structures, and the directory where calculations are run
183
    """
184

185
    path = Path(path)
×
186

187
    # detected instances will be 0 if mcsqs was run in series, or number of instances
188
    detected_instances = len(list(path.glob("bestsqs*[0-9]*.out")))
×
189

190
    # Convert best SQS structure to cif file and pymatgen Structure
191
    with Popen("str2cif < bestsqs.out > bestsqs.cif", shell=True, cwd=path) as p:
×
192
        p.communicate()
×
193

194
    with warnings.catch_warnings():
×
195
        warnings.simplefilter("ignore")
×
196
        bestsqs = Structure.from_file(path / "bestsqs.out")
×
197

198
    # Get best SQS objective function
199
    with open(path / "bestcorr.out") as f:
×
200
        lines = f.readlines()
×
201

202
    objective_function_str = lines[-1].split("=")[-1].strip()
×
203
    objective_function: float | str
204
    if objective_function_str != "Perfect_match":
×
205
        objective_function = float(objective_function_str)
×
206
    else:
207
        objective_function = "Perfect_match"
×
208

209
    # Get all SQS structures and objective functions
210
    allsqs = []
×
211

212
    for i in range(detected_instances):
×
213
        sqs_out = f"bestsqs{i + 1}.out"
×
214
        sqs_cif = f"bestsqs{i + 1}.cif"
×
215
        corr_out = f"bestcorr{i + 1}.out"
×
216
        with Popen(f"str2cif < {sqs_out} > {sqs_cif}", shell=True, cwd=path) as p:
×
217
            p.communicate()
×
218
        with warnings.catch_warnings():
×
219
            warnings.simplefilter("ignore")
×
220
            sqs = Structure.from_file(path / sqs_out)
×
221
        with open(path / corr_out) as f:
×
222
            lines = f.readlines()
×
223

224
        objective_function_str = lines[-1].split("=")[-1].strip()
×
225
        obj: float | str
226
        if objective_function_str != "Perfect_match":
×
227
            obj = float(objective_function_str)
×
228
        else:
229
            obj = "Perfect_match"
×
230
        allsqs.append({"structure": sqs, "objective_function": obj})
×
231

232
    clusters = _parse_clusters(path / "clusters.out")
×
233

234
    return Sqs(
×
235
        bestsqs=bestsqs,
236
        objective_function=objective_function,
237
        allsqs=allsqs,
238
        directory=str(path.resolve()),
239
        clusters=clusters,
240
    )
241

242

243
def _parse_clusters(filename):
1✔
244
    """
245
    Private function to parse clusters.out file
246
    Args:
247
        path: directory to perform parsing
248

249
    Returns:
250
        List of dicts
251
    """
252

253
    with open(filename) as f:
×
254
        lines = f.readlines()
×
255

256
    clusters = []
×
257
    cluster_block = []
×
258
    for line in lines:
×
259
        line = line.split("\n")[0]
×
260
        if line == "":
×
261
            clusters.append(cluster_block)
×
262
            cluster_block = []
×
263
        else:
264
            cluster_block.append(line)
×
265

266
    cluster_dicts = []
×
267
    for cluster in clusters:
×
268
        cluster_dict = {
×
269
            "multiplicity": int(cluster[0]),
270
            "longest_pair_length": float(cluster[1]),
271
            "num_points_in_cluster": int(cluster[2]),
272
        }
273
        points = []
×
274
        for point in range(cluster_dict["num_points_in_cluster"]):
×
275
            line = cluster[3 + point].split(" ")
×
276
            point_dict = {}
×
277
            point_dict["coordinates"] = [float(line) for line in line[0:3]]
×
278
            point_dict["num_possible_species"] = int(line[3]) + 2  # see ATAT manual for why +2
×
279
            point_dict["cluster_function"] = float(line[4])  # see ATAT manual for what "function" is
×
280
            points.append(point_dict)
×
281

282
        cluster_dict["coordinates"] = points
×
283
        cluster_dicts.append(cluster_dict)
×
284

285
    return cluster_dicts
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc