• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

materialsproject / pymatgen / 3968792566

pending completion
3968792566

push

github

Shyue Ping Ong
Merge branch 'master' of github.com:materialsproject/pymatgen

600 of 600 new or added lines in 370 files covered. (100.0%)

80392 of 102009 relevant lines covered (78.81%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pymatgen/optimization/linear_assignment_numpy.py
1
# Copyright (c) Pymatgen Development Team.
2
# Distributed under the terms of the MIT License.
3

4
"""
×
5
This module contains an algorithm to solve the Linear Assignment Problem.
6
It has the same functionality as linear_assignment.pyx, but is much slower
7
as it is vectorized in numpy rather than cython
8
"""
9

10
from __future__ import annotations
×
11

12
__author__ = "Will Richards"
×
13
__copyright__ = "Copyright 2011, The Materials Project"
×
14
__version__ = "1.0"
×
15
__maintainer__ = "Will Richards"
×
16
__email__ = "wrichards@mit.edu"
×
17
__date__ = "Jan 28, 2013"
×
18

19
import numpy as np
×
20

21

22
class LinearAssignment:
×
23
    """
24
    This class finds the solution to the Linear Assignment Problem.
25
    It finds a minimum cost matching between two sets, given a cost
26
    matrix.
27

28
    This class is an implementation of the LAPJV algorithm described in:
29
    R. Jonker, A. Volgenant. A Shortest Augmenting Path Algorithm for
30
    Dense and Sparse Linear Assignment Problems. Computing 38, 325-340
31
    (1987)
32

33
    .. attribute: min_cost:
34

35
        The minimum cost of the matching
36

37
    .. attribute: solution:
38

39
        The matching of the rows to columns. i.e solution = [1, 2, 0]
40
        would match row 0 to column 1, row 1 to column 2 and row 2
41
        to column 0. Total cost would be c[0, 1] + c[1, 2] + c[2, 0]
42
    """
43

44
    def __init__(self, costs, epsilon=1e-6):
×
45
        """
46
        Args:
47
            costs: The cost matrix of the problem. cost[i,j] should be the
48
                cost of matching x[i] to y[j]. The cost matrix may be
49
                rectangular
50
            epsilon: Tolerance for determining if solution vector is < 0
51
        """
52
        self.orig_c = np.array(costs, dtype=np.float64)
×
53
        self.nx, self.ny = self.orig_c.shape
×
54
        self.n = self.ny
×
55
        self._inds = np.arange(self.n)
×
56

57
        self.epsilon = abs(epsilon)
×
58

59
        # check that cost matrix is square
60
        if self.nx > self.ny:
×
61
            raise ValueError("cost matrix must have at least as many columns as rows")
×
62

63
        if self.nx == self.ny:
×
64
            self.c = self.orig_c
×
65
        else:
66
            # Can run into precision issues if np.max is used as the fill value (since a
67
            # value of this size doesn't necessarily end up in the solution). A value
68
            # at least as large as the maximin is, however, guaranteed to appear so it
69
            # is a safer choice. The fill value is not zero to avoid choosing the extra
70
            # rows in the initial column reduction step
71
            self.c = np.full((self.n, self.n), np.max(np.min(self.orig_c, axis=1)))
×
72
            self.c[: self.nx] = self.orig_c
×
73

74
        # initialize solution vectors
75
        self._x = np.zeros(self.n, dtype=int) - 1
×
76
        self._y = self._x.copy()
×
77

78
        # if column reduction doesn't find a solution, augment with shortest
79
        # paths until one is found
80
        if self._column_reduction():
×
81
            self._augmenting_row_reduction()
×
82
            # initialize the reduced costs
83
            self._update_cred()
×
84
            while -1 in self._x:
×
85
                self._augment()
×
86

87
        self.solution = self._x[: self.nx]
×
88
        self._min_cost = None
×
89

90
    @property
×
91
    def min_cost(self):
×
92
        """
93
        Returns the cost of the best assignment
94
        """
95
        if self._min_cost:
×
96
            return self._min_cost
×
97
        self._min_cost = np.sum(self.c[np.arange(self.nx), self.solution])
×
98
        return self._min_cost
×
99

100
    def _column_reduction(self):
×
101
        """
102
        Column reduction and reduction transfer steps from LAPJV algorithm
103
        """
104
        # assign each column to its lowest cost row, ensuring that only row
105
        # or column is assigned once
106
        i1, j = np.unique(np.argmin(self.c, axis=0), return_index=True)
×
107
        self._x[i1] = j
×
108

109
        # if problem is solved, return
110
        if len(i1) == self.n:
×
111
            return False
×
112

113
        self._y[j] = i1
×
114

115
        # reduction_transfer
116
        # tempc is array with previously assigned matchings masked
117
        self._v = np.min(self.c, axis=0)
×
118
        tempc = self.c.copy()
×
119
        tempc[i1, j] = np.inf
×
120
        mu = np.min(tempc[i1, :] - self._v[None, :], axis=1)
×
121
        self._v[j] -= mu
×
122
        return True
×
123

124
    def _augmenting_row_reduction(self):
×
125
        """
126
        Augmenting row reduction step from LAPJV algorithm
127
        """
128
        unassigned = np.where(self._x == -1)[0]
×
129
        for i in unassigned:
×
130
            for _ in range(self.c.size):
×
131
                # Time in this loop can be proportional to 1/epsilon
132
                # This step is not strictly necessary, so cutoff early
133
                # to avoid near-infinite loops
134

135
                # find smallest 2 values and indices
136
                temp = self.c[i] - self._v
×
137
                j1 = np.argmin(temp)
×
138
                u1 = temp[j1]
×
139
                temp[j1] = np.inf
×
140
                j2 = np.argmin(temp)
×
141
                u2 = temp[j2]
×
142

143
                if u1 < u2:
×
144
                    self._v[j1] -= u2 - u1
×
145
                elif self._y[j1] != -1:
×
146
                    j1 = j2
×
147
                k = self._y[j1]
×
148
                if k != -1:
×
149
                    self._x[k] = -1
×
150
                    self._x[i] = j1
×
151
                    self._y[j1] = i
×
152
                    i = k
×
153
                if k == -1 or abs(u1 - u2) < self.epsilon:
×
154
                    break
×
155

156
    def _update_cred(self):
×
157
        """
158
        Updates the reduced costs with the values from the
159
        dual solution
160
        """
161
        ui = self.c[self._inds, self._x] - self._v[self._x]
×
162
        self.cred = self.c - ui[:, None] - self._v[None, :]
×
163

164
    def _augment(self):
×
165
        """
166
        Finds a minimum cost path and adds it to the matching
167
        """
168
        # build a minimum cost tree
169
        _pred, _ready, istar, j, mu = self._build_tree()
×
170

171
        # update prices
172
        self._v[_ready] += self._d[_ready] - mu
×
173

174
        # augment the solution with the minimum cost path from the
175
        # tree. Follows an alternating path along matched, unmatched
176
        # edges from X to Y
177
        while True:
178
            i = _pred[j]
×
179
            self._y[j] = i
×
180
            k = j
×
181
            j = self._x[i]
×
182
            self._x[i] = k
×
183
            if i == istar:
×
184
                break
×
185
        self._update_cred()
×
186

187
    def _build_tree(self):
×
188
        """
189
        Builds the tree finding an augmenting path. Alternates along
190
        matched and unmatched edges between X and Y. The paths are
191
        stored in _pred (new predecessor of nodes in Y), and
192
        self._x and self._y
193
        """
194
        # find unassigned i*
195
        istar = np.argmin(self._x)
×
196

197
        # compute distances
198
        self._d = self.c[istar] - self._v
×
199
        _pred = np.zeros(self.n, dtype=int) + istar
×
200

201
        # initialize sets
202
        # READY: set of nodes visited and in the path (whose price gets
203
        # updated in augment)
204
        # SCAN: set of nodes at the bottom of the tree, which we need to
205
        # look at
206
        # T0DO: unvisited nodes
207
        _ready = np.zeros(self.n, dtype=bool)
×
208
        _scan = np.zeros(self.n, dtype=bool)
×
209
        _todo = np.zeros(self.n, dtype=bool) + True
×
210

211
        while True:
212
            # populate scan with minimum reduced distances
213
            if True not in _scan:
×
214
                mu = np.min(self._d[_todo])
×
215
                _scan[self._d == mu] = True
×
216
                _todo[_scan] = False
×
217
                j = np.argmin(self._y * _scan)
×
218
                if self._y[j] == -1 and _scan[j]:
×
219
                    return _pred, _ready, istar, j, mu
×
220

221
            # pick jstar from scan (scan always has at least 1)
222
            _jstar = np.argmax(_scan)
×
223

224
            # pick i associated with jstar
225
            i = self._y[_jstar]
×
226

227
            _scan[_jstar] = False
×
228
            _ready[_jstar] = True
×
229

230
            # find shorter distances
231
            newdists = mu + self.cred[i, :]
×
232
            shorter = np.logical_and(newdists < self._d, _todo)
×
233

234
            # update distances
235
            self._d[shorter] = newdists[shorter]
×
236

237
            # update predecessors
238
            _pred[shorter] = i
×
239

240
            for j in np.nonzero(np.logical_and(self._d == mu, _todo))[0]:
×
241
                if self._y[j] == -1:
×
242
                    return _pred, _ready, istar, j, mu
×
243
                _scan[j] = True
×
244
                _todo[j] = False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc