• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

QuantEcon / QuantEcon.py / 16234517863

12 Jul 2025 04:53AM UTC coverage: 92.72%. Remained the same
16234517863

Pull #775

github

web-flow
Merge 73f5eb9fe into 6fd8955b8
Pull Request #775: CI: Add Python 3.13, drop 3.10

7438 of 8022 relevant lines covered (92.72%)

2.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.06
/quantecon/_kalman.py
1
"""
2
Implements the Kalman filter for a linear Gaussian state space model.
3

4
References
5
----------
6

7
https://python.quantecon.org/kalman.html
8

9
"""
10
from textwrap import dedent
3✔
11
import numpy as np
3✔
12
from scipy.linalg import inv
3✔
13
from ._lss import LinearStateSpace
3✔
14
from ._matrix_eqn import solve_discrete_riccati
3✔
15

16

17
class Kalman:
3✔
18
    r"""
19
    Implements the Kalman filter for the Gaussian state space model
20

21
    .. math::
22

23
        x_{t+1} = A x_t + C w_{t+1} \\
24
        y_t = G x_t + H v_t
25

26
    Here :math:`x_t` is the hidden state and :math:`y_t` is the measurement.
27
    The shocks :math:`w_t` and :math:`v_t` are iid standard normals. Below
28
    we use the notation
29

30
    .. math::
31

32
        Q := CC'
33
        R := HH'
34

35

36
    Parameters
37
    ----------
38
    ss : instance of LinearStateSpace
39
        An instance of the quantecon.lss.LinearStateSpace class
40
    x_hat : scalar(float) or array_like(float), optional(default=None)
41
        An n x 1 array representing the mean x_hat of the
42
        prior/predictive density.  Set to zero if not supplied.
43
    Sigma : scalar(float) or array_like(float), optional(default=None)
44
        An n x n array representing the covariance matrix Sigma of
45
        the prior/predictive density.  Must be positive definite.
46
        Set to the identity if not supplied.
47

48
    Attributes
49
    ----------
50
    Sigma, x_hat : as above
51
    Sigma_infinity : array_like or scalar(float)
52
        The infinite limit of Sigma_t
53
    K_infinity : array_like or scalar(float)
54
        The stationary Kalman gain.
55

56
    References
57
    ----------
58

59
    https://python.quantecon.org/kalman.html
60

61
    """
62

63
    def __init__(self, ss, x_hat=None, Sigma=None):
3✔
64
        self.ss = ss
3✔
65
        self.set_state(x_hat, Sigma)
3✔
66
        self._K_infinity = None
3✔
67
        self._Sigma_infinity = None
3✔
68

69
    def set_state(self, x_hat, Sigma):
3✔
70
        if Sigma is None:
3✔
71
            self.Sigma = np.identity(self.ss.n)
3✔
72
        else:
73
            self.Sigma = np.atleast_2d(Sigma)
3✔
74
        if x_hat is None:
3✔
75
            self.x_hat = np.zeros((self.ss.n, 1))
3✔
76
        else:
77
            self.x_hat = np.atleast_2d(x_hat)
3✔
78
            self.x_hat.shape = self.ss.n, 1
3✔
79

80
    def __repr__(self):
81
        return self.__str__()
82

83
    def __str__(self):
3✔
84
        m = """\
×
85
        Kalman filter:
86
          - dimension of state space          : {n}
87
          - dimension of observation equation : {k}
88
        """
89
        return dedent(m.format(n=self.ss.n, k=self.ss.k))
×
90

91
    @property
3✔
92
    def Sigma_infinity(self):
3✔
93
        if self._Sigma_infinity is None:
×
94
            self.stationary_values()
×
95
        return self._Sigma_infinity
×
96

97
    @property
3✔
98
    def K_infinity(self):
3✔
99
        if self._K_infinity is None:
×
100
            self.stationary_values()
×
101
        return self._K_infinity
×
102

103
    def whitener_lss(self):
3✔
104
        r"""
105
        This function takes the linear state space system
106
        that is an input to the Kalman class and it converts
107
        that system to the time-invariant whitener represenation
108
        given by
109

110
        .. math::
111

112
            \tilde{x}_{t+1}^* = \tilde{A} \tilde{x} + \tilde{C} v
113
            a = \tilde{G} \tilde{x}
114

115
        where
116

117
        .. math::
118

119
            \tilde{x}_t = [x+{t}, \hat{x}_{t}, v_{t}]
120

121
        and
122

123
        .. math::
124

125
            \tilde{A} =
126
            \begin{bmatrix}
127
            A  & 0    & 0  \\
128
            KG & A-KG & KH \\
129
            0  & 0    & 0 \\
130
            \end{bmatrix}
131

132
        .. math::
133

134
            \tilde{C} =
135
            \begin{bmatrix}
136
            C & 0 \\
137
            0 & 0 \\
138
            0 & I \\
139
            \end{bmatrix}
140

141
        .. math::
142

143
            \tilde{G} =
144
            \begin{bmatrix}
145
            G & -G & H \\
146
            \end{bmatrix}
147

148
        with :math:`A, C, G, H` coming from the linear state space system
149
        that defines the Kalman instance
150

151
        Returns
152
        -------
153
        whitened_lss : LinearStateSpace
154
            This is the linear state space system that represents
155
            the whitened system
156

157
        """
158
        K = self.K_infinity
×
159

160
        # Get the matrix sizes
161
        n, k, m, l = self.ss.n, self.ss.k, self.ss.m, self.ss.l
×
162
        A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
×
163

164
        Atil = np.vstack([np.hstack([A, np.zeros((n, n)), np.zeros((n, l))]),
×
165
                          np.hstack([np.dot(K, G),
166
                                     A-np.dot(K, G),
167
                                     np.dot(K, H)]),
168
                          np.zeros((l, 2*n + l))])
169

170
        Ctil = np.vstack([np.hstack([C, np.zeros((n, l))]),
×
171
                          np.zeros((n, m+l)),
172
                          np.hstack([np.zeros((l, m)), np.eye(l)])])
173

174
        Gtil = np.hstack([G, -G, H])
×
175

176
        whitened_lss = LinearStateSpace(Atil, Ctil, Gtil)
×
177
        self.whitened_lss = whitened_lss
×
178

179
        return whitened_lss
×
180

181
    def prior_to_filtered(self, y):
3✔
182
        r"""
183
        Updates the moments (x_hat, Sigma) of the time t prior to the
184
        time t filtering distribution, using current measurement :math:`y_t`.
185

186
        The updates are according to
187

188
        .. math::
189

190
            \hat{x}^F = \hat{x} + \Sigma G' (G \Sigma G' + R)^{-1}
191
                (y - G \hat{x})
192
            \Sigma^F = \Sigma - \Sigma G' (G \Sigma G' + R)^{-1} G
193
                \Sigma
194

195
        Parameters
196
        ----------
197
        y : scalar or array_like(float)
198
            The current measurement
199

200
        """
201
        # === simplify notation === #
202
        G, H = self.ss.G, self.ss.H
3✔
203
        R = np.dot(H, H.T)
3✔
204

205
        # === and then update === #
206
        y = np.atleast_2d(y)
3✔
207
        y.shape = self.ss.k, 1
3✔
208
        E = np.dot(self.Sigma, G.T)
3✔
209
        F = np.dot(np.dot(G, self.Sigma), G.T) + R
3✔
210
        M = np.dot(E, inv(F))
3✔
211
        self.x_hat = self.x_hat + np.dot(M, (y - np.dot(G, self.x_hat)))
3✔
212
        self.Sigma = self.Sigma - np.dot(M, np.dot(G,  self.Sigma))
3✔
213

214
    def filtered_to_forecast(self):
3✔
215
        """
216
        Updates the moments of the time t filtering distribution to the
217
        moments of the predictive distribution, which becomes the time
218
        t+1 prior
219

220
        """
221
        # === simplify notation === #
222
        A, C = self.ss.A, self.ss.C
3✔
223
        Q = np.dot(C, C.T)
3✔
224

225
        # === and then update === #
226
        self.x_hat = np.dot(A, self.x_hat)
3✔
227
        self.Sigma = np.dot(A, np.dot(self.Sigma, A.T)) + Q
3✔
228

229
    def update(self, y):
3✔
230
        """
231
        Updates x_hat and Sigma given k x 1 ndarray y.  The full
232
        update, from one period to the next
233

234
        Parameters
235
        ----------
236
        y : np.ndarray
237
            A k x 1 ndarray y representing the current measurement
238

239
        """
240
        self.prior_to_filtered(y)
3✔
241
        self.filtered_to_forecast()
3✔
242

243
    def stationary_values(self, method='doubling'):
3✔
244
        r"""
245
        Computes the limit of :math:`\Sigma_t` as t goes to infinity by
246
        solving the associated Riccati equation. The outputs are stored in the
247
        attributes `K_infinity` and `Sigma_infinity`. Computation is via the
248
        doubling algorithm (default) or a QZ decomposition method (see the
249
        documentation in `matrix_eqn.solve_discrete_riccati`).
250

251
        Parameters
252
        ----------
253
        method : str, optional(default="doubling")
254
            Solution method used in solving the associated Riccati
255
            equation, str in {'doubling', 'qz'}.
256

257
        Returns
258
        -------
259
        Sigma_infinity : array_like or scalar(float)
260
            The infinite limit of :math:`\Sigma_t`
261
        K_infinity : array_like or scalar(float)
262
            The stationary Kalman gain.
263

264
        """
265

266
        # === simplify notation === #
267
        A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
3✔
268
        Q, R = np.dot(C, C.T), np.dot(H, H.T)
3✔
269

270
        # === solve Riccati equation, obtain Kalman gain === #
271
        Sigma_infinity = solve_discrete_riccati(A.T, G.T, Q, R, method=method)
3✔
272
        temp1 = np.dot(np.dot(A, Sigma_infinity), G.T)
3✔
273
        temp2 = inv(np.dot(G, np.dot(Sigma_infinity, G.T)) + R)
3✔
274
        K_infinity = np.dot(temp1, temp2)
3✔
275

276
        # == record as attributes and return == #
277
        self._Sigma_infinity, self._K_infinity = Sigma_infinity, K_infinity
3✔
278
        return Sigma_infinity, K_infinity
3✔
279

280
    def stationary_coefficients(self, j, coeff_type='ma'):
3✔
281
        """
282
        Wold representation moving average or VAR coefficients for the
283
        steady state Kalman filter.
284

285
        Parameters
286
        ----------
287
        j : int
288
            The lag length
289
        coeff_type : string, either 'ma' or 'var' (default='ma')
290
            The type of coefficent sequence to compute.  Either 'ma' for
291
            moving average or 'var' for VAR.
292
            
293
        """
294
        # == simplify notation == #
295
        A, G = self.ss.A, self.ss.G
×
296
        K_infinity = self.K_infinity
×
297
        # == compute and return coefficients == #
298
        coeffs = []
×
299
        i = 1
×
300
        if coeff_type == 'ma':
×
301
            coeffs.append(np.identity(self.ss.k))
×
302
            P_mat = A
×
303
            P = np.identity(self.ss.n)  # Create a copy
×
304
        elif coeff_type == 'var':
×
305
            coeffs.append(np.dot(G, K_infinity))
×
306
            P_mat = A - np.dot(K_infinity, G)
×
307
            P = np.copy(P_mat)  # Create a copy
×
308
        else:
309
            raise ValueError("Unknown coefficient type")
×
310
        while i <= j:
×
311
            coeffs.append(np.dot(np.dot(G, P), K_infinity))
×
312
            P = np.dot(P, P_mat)
×
313
            i += 1
×
314
        return coeffs
×
315

316
    def stationary_innovation_covar(self):
3✔
317
        # == simplify notation == #
318
        H, G = self.ss.H, self.ss.G
×
319
        R = np.dot(H, H.T)
×
320
        Sigma_infinity = self.Sigma_infinity
×
321

322
        return np.dot(G, np.dot(Sigma_infinity, G.T)) + R
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc