• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stfc / janus-core / 12676817063

08 Jan 2025 06:31PM UTC coverage: 95.103% (-0.06%) from 95.166%
12676817063

Pull #377

github

web-flow
Merge 2cb5afaf7 into bf53f87b2
Pull Request #377: Drop Python 3.9 support

38 of 41 new or added lines in 19 files covered. (92.68%)

2 existing lines in 1 file now uncovered.

2272 of 2389 relevant lines covered (95.1%)

2.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/janus_core/processing/post_process.py
1
"""Module for post-processing trajectories."""
2

3
from __future__ import annotations
3✔
4

5
from collections.abc import Sequence
3✔
6
from itertools import combinations_with_replacement
3✔
7

8
from ase import Atoms
3✔
9
from ase.geometry.analysis import Analysis
3✔
10
import numpy as np
3✔
11
from numpy import float64
3✔
12
from numpy.typing import NDArray
3✔
13

14
from janus_core.helpers.janus_types import (
3✔
15
    MaybeSequence,
16
    PathLike,
17
    SliceLike,
18
)
19
from janus_core.helpers.utils import slicelike_to_startstopstep
3✔
20

21

22
def compute_rdf(
3✔
23
    data: MaybeSequence[Atoms],
24
    ana: Analysis | None = None,
25
    /,
26
    *,
27
    filenames: MaybeSequence[PathLike] | None = None,
28
    by_elements: bool = False,
29
    rmax: float = 2.5,
30
    nbins: int = 50,
31
    elements: MaybeSequence[int | str] | None = None,
32
    index: SliceLike = (0, None, 1),
33
    volume: float | None = None,
34
) -> NDArray[float64] | dict[tuple[str, str] | NDArray[float64]]:
35
    """
36
    Compute the rdf of data.
37

38
    Parameters
39
    ----------
40
    data : MaybeSequence[Atoms]
41
        Dataset to compute RDF of.
42
    ana : Analysis | None
43
        ASE Analysis object for data reuse.
44
    filenames : MaybeSequence[PathLike] | None
45
        Filenames to output data to. Must match number of RDFs computed.
46
    by_elements : bool
47
        Split RDF into pairwise by elements group. Default is False.
48
        N.B. mixed RDFs (e.g. C-H) include all self-RDFs (e.g. C-C),
49
        to get the pure (C-H) RDF subtract the self-RDFs.
50
    rmax : float
51
        Maximum distance of RDF.
52
    nbins : int
53
        Number of bins to divide RDF.
54
    elements : MaybeSequence[int | str] | None
55
        Make partial RDFs. If `by_elements` is true will filter to
56
        only display pairs in list.
57
    index : SliceLike
58
        Images to analyze as:
59
        `index` if `int`,
60
        `start`, `stop`, `step` if `tuple`,
61
        `slice` if `slice` or `range`.
62
    volume : float | None
63
        Volume of cell for normalisation. Only needs to be provided
64
        if aperiodic cell. Default is (2*rmax)**3.
65

66
    Returns
67
    -------
68
    NDArray[float64] | dict[tuple[str, str] | NDArray[float64]]
69
        If `by_elements` is true returns a `dict` of RDF by element pairs.
70
        Otherwise returns RDF of total system filtered by elements.
71
    """
72
    index = slicelike_to_startstopstep(index)
3✔
73

74
    if not isinstance(data, Sequence):
3✔
75
        data = [data]
3✔
76

77
    if elements is not None and not isinstance(elements, Sequence):
3✔
78
        elements = (elements,)
×
79

80
    if (  # If aperiodic, assume volume of a cube encompassing rmax sphere.
3✔
81
        not all(data[0].get_pbc()) and volume is None
82
    ):
83
        volume = (2 * rmax) ** 3
3✔
84

85
    if ana is None:
3✔
86
        ana = Analysis(data)
3✔
87

88
    if by_elements:
3✔
89
        elements = (
3✔
90
            tuple(sorted(set(data[0].get_chemical_symbols())))
91
            if elements is None
92
            else elements
93
        )
94

95
        rdf = {
3✔
96
            element: ana.get_rdf(
97
                rmax=rmax,
98
                nbins=nbins,
99
                elements=element,
100
                imageIdx=slice(*index),
101
                return_dists=True,
102
                volume=volume,
103
            )
104
            for element in combinations_with_replacement(elements, 2)
105
        }
106

107
        # Compute RDF average
108
        rdf = {
3✔
109
            element: (rdf[0][1], np.average([rdf_i[0] for rdf_i in rdf], axis=0))
110
            for element, rdf in rdf.items()
111
        }
112

113
        if filenames is not None:
3✔
114
            if isinstance(filenames, str) or not isinstance(filenames, Sequence):
×
115
                filenames = (filenames,)
×
116

117
            assert isinstance(filenames, Sequence)
×
118

119
            if len(filenames) != len(rdf):
×
120
                raise ValueError(
×
121
                    f"Different number of file names ({len(filenames)}) "
122
                    f"to number of samples ({len(rdf)})"
123
                )
124

NEW
125
            for (dists, rdfs), out_path in zip(rdf.values(), filenames, strict=True):
×
126
                with open(out_path, "w", encoding="utf-8") as out_file:
×
NEW
127
                    for dist, rdf_i in zip(dists, rdfs, strict=True):
×
128
                        print(dist, rdf_i, file=out_file)
×
129

130
    else:
131
        rdf = ana.get_rdf(
3✔
132
            rmax=rmax,
133
            nbins=nbins,
134
            elements=elements,
135
            imageIdx=slice(*index),
136
            return_dists=True,
137
            volume=volume,
138
        )
139

140
        assert isinstance(rdf, list)
3✔
141

142
        # Compute RDF average
143
        rdf = rdf[0][1], np.average([rdf_i[0] for rdf_i in rdf], axis=0)
3✔
144

145
        if filenames is not None:
3✔
146
            if isinstance(filenames, Sequence):
3✔
147
                if len(filenames) != 1:
3✔
148
                    raise ValueError(
×
149
                        f"Different number of file names ({len(filenames)}) "
150
                        "to number of samples (1)"
151
                    )
152
                filenames = filenames[0]
3✔
153

154
            with open(filenames, "w", encoding="utf-8") as out_file:
3✔
155
                for dist, rdf_i in zip(*rdf, strict=True):
3✔
156
                    print(dist, rdf_i, file=out_file)
3✔
157

158
    return rdf
3✔
159

160

161
def compute_vaf(
3✔
162
    data: Sequence[Atoms],
163
    filenames: MaybeSequence[PathLike] | None = None,
164
    *,
165
    use_velocities: bool = False,
166
    fft: bool = False,
167
    index: SliceLike = (0, None, 1),
168
    filter_atoms: MaybeSequence[MaybeSequence[int | None]] = ((None),),
169
    time_step: float = 1.0,
170
) -> tuple[NDArray[float64], list[NDArray[float64]]]:
171
    """
172
    Compute the velocity autocorrelation function (VAF) of `data`.
173

174
    Parameters
175
    ----------
176
    data : Sequence[Atoms]
177
        Dataset to compute VAF of.
178
    filenames : MaybeSequence[PathLike] | None
179
        If present, dump resultant VAF to file.
180
    use_velocities : bool
181
        Compute VAF using velocities rather than momenta.
182
        Default is False.
183
    fft : bool
184
        Compute the fourier transformed VAF.
185
        Default is False.
186
    index : SliceLike
187
        Images to analyze as `start`, `stop`, `step`.
188
        Default is all images.
189
    filter_atoms : MaybeSequence[MaybeSequence[int | None]]
190
        Compute the VAF averaged over subsets of the system.
191
        Default is all atoms.
192
    time_step : float
193
        Time step for scaling lags to align with input data.
194
        Default is 1 (i.e. no scaling).
195

196
    Returns
197
    -------
198
    lags : numpy.ndarray
199
        Lags at which the VAFs have been computed.
200
    vafs : list[numpy.ndarray]
201
        Computed VAF(s).
202

203
    Notes
204
    -----
205
    `filter_atoms` is given as a series of sequences of atoms, where
206
    each element in the series denotes a VAF subset to calculate and
207
    each sequence determines the atoms (by index) to be included in that VAF.
208

209
    E.g.
210

211
    .. code-block: Python
212

213
        # Species indices in cell
214
        na = (1, 3, 5, 7)
215
        cl = (2, 4, 6, 8)
216

217
        compute_vaf(..., filter_atoms=(na, cl))
218

219
    Would compute separate VAFs for each species.
220

221
    By default, one VAF will be computed for all atoms in the structure.
222
    """
223
    # Ensure if passed scalars they are turned into correct dimensionality
224
    if not isinstance(filter_atoms, Sequence):
3✔
225
        filter_atoms = (filter_atoms,)
3✔
226
    if not isinstance(filter_atoms[0], Sequence):
3✔
227
        filter_atoms = (filter_atoms,)
3✔
228

229
    if filenames and not isinstance(filenames, Sequence):
3✔
230
        filenames = (filenames,)
3✔
231

232
        if len(filenames) != len(filter_atoms):
3✔
233
            raise ValueError(
×
234
                f"Different number of file names ({len(filenames)}) "
235
                f"to number of samples ({len(filter_atoms)})"
236
            )
237

238
    # Extract requested data
239
    index = slicelike_to_startstopstep(index)
3✔
240
    data = data[slice(*index)]
3✔
241

242
    if use_velocities:
3✔
243
        momenta = np.asarray([datum.get_velocities() for datum in data])
3✔
244
    else:
245
        momenta = np.asarray([datum.get_momenta() for datum in data])
3✔
246

247
    n_steps = len(momenta)
3✔
248
    n_atoms = len(momenta[0])
3✔
249

250
    # If filter_atoms not specified use all atoms
251
    filter_atoms = [
3✔
252
        atom if atom[0] is not None else range(n_atoms) for atom in filter_atoms
253
    ]
254

255
    used_atoms = {atom for atoms in filter_atoms for atom in atoms}
3✔
256
    used_atoms = {j: i for i, j in enumerate(used_atoms)}
3✔
257

258
    vafs = np.sum(
3✔
259
        np.asarray(
260
            [
261
                [
262
                    np.correlate(momenta[:, j, i], momenta[:, j, i], "full")[
263
                        n_steps - 1 :
264
                    ]
265
                    for i in range(3)
266
                ]
267
                for j in used_atoms
268
            ]
269
        ),
270
        axis=1,
271
    )
272

273
    vafs /= n_steps - np.arange(n_steps)
3✔
274

275
    lags = np.arange(n_steps) * time_step
3✔
276

277
    if fft:
3✔
278
        vafs = np.fft.fft(vafs, axis=0)
3✔
279
        lags = np.fft.fftfreq(n_steps, time_step)
3✔
280

281
    vafs = (
3✔
282
        lags,
283
        [
284
            np.average([vafs[used_atoms[i]] for i in atoms], axis=0)
285
            for atoms in filter_atoms
286
        ],
287
    )
288

289
    if filenames:
3✔
290
        for vaf, filename in zip(vafs[1], filenames, strict=True):
3✔
291
            with open(filename, "w", encoding="utf-8") as out_file:
3✔
292
                for lag, dat in zip(lags, vaf, strict=True):
3✔
293
                    print(lag, dat, file=out_file)
3✔
294

295
    return vafs
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc