• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SMTorg / smt / 12279380655

11 Dec 2024 03:18PM UTC coverage: 88.744% (+0.002%) from 88.742%
12279380655

Pull #692

github

web-flow
Merge 27e50b804 into 047f72206
Pull Request #692: Anisotropic kernel for the multi-fidelity co-Kriging model (MFCK)

48 of 54 new or added lines in 1 file covered. (88.89%)

1 existing line in 1 file now uncovered.

6922 of 7800 relevant lines covered (88.74%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/smt/applications/mfck.py
1
# -*- coding: utf-8 -*-
2
"""
1✔
3
Created on Sat May 04 10:10:12 2024
4

5
@author: Mauricio Castano Aguirre <mauricio.castano_aguirre@onera.fr>
6
Multi-Fidelity co-Kriging model construction for non-nested experimental
7
design sets.
8
-------
9
[1] Loic Le Gratiet (2013). Multi-fidelity Gaussian process modelling
10
[Doctoral Thesis, Université Paris-Sud].
11
[2] Edwin V. Bonilla, Kian Ming A. Chai, and Christopher K. I. Williams
12
(2007). Multi-task Gaussian Process prediction. In International
13
Conference on Neural Information Processing Systems.
14
"""
15

16
# import warnings
17
import numpy as np
1✔
18
from scipy.linalg import solve_triangular
1✔
19
from scipy import optimize
1✔
20
from smt.sampling_methods import LHS
1✔
21
from smt.utils.kriging import differences, componentwise_distance
1✔
22
from smt.surrogate_models.krg_based import KrgBased
1✔
23
from smt.utils.misc import standardization
1✔
24

25

26
class MFCK(KrgBased):
1✔
27
    def _initialize(self):
1✔
28
        super()._initialize()
1✔
29
        declare = self.options.declare
1✔
30
        self.name = "MFCK"
1✔
31

32
        declare(
1✔
33
            "rho0",
34
            1.0,
35
            types=(float),
36
            desc="Initial rho for the autoregressive model , \
37
                  (scalar factor between two consecutive fidelities, \
38
                    e.g., Y_HF = (Rho) * Y_LF + Gamma",
39
        )
40
        declare(
1✔
41
            "rho_bounds",
42
            [-5.0, 5.0],
43
            types=(list, np.ndarray),
44
            desc="Bounds for the rho parameter used in the autoregressive model",
45
        )
46
        declare(
1✔
47
            "sigma0",
48
            1.0,
49
            types=(float),
50
            desc="Initial variance parameter",
51
        )
52
        declare(
1✔
53
            "sigma_bounds",
54
            [1e-2, 100],
55
            types=(list, np.ndarray),
56
            desc="Bounds for the variance parameter",
57
        )
58
        declare(
1✔
59
            "lambda",
60
            0.1,
61
            types=(float),
62
            desc="Regularization parameter",
63
        )
64

65
        self.options["nugget"] = (
1✔
66
            1e-9  # Incresing the nugget for numerical stability reasons
67
        )
68
        self.options["hyper_opt"] = (
1✔
69
            "Cobyla"  # MFCK doesn't support gradient-based optimizers
70
        )
71

72
    def train(self):
1✔
73
        """
74
        Overrides MFK implementation
75
        Trains the Multi-Fidelity co-Kriging model
76
        Returns
77
        -------
78
        None.
79
        """
80
        xt = []
1✔
81
        yt = []
1✔
82
        i = 0
1✔
83
        while self.training_points.get(i, None) is not None:
1✔
84
            xt.append(self.training_points[i][0][0])
1✔
85
            yt.append(self.training_points[i][0][1])
1✔
86
            i = i + 1
1✔
87
        xt.append(self.training_points[None][0][0])
1✔
88
        yt.append(self.training_points[None][0][1])
1✔
89
        self.lvl = i + 1
1✔
90
        self.X = xt
1✔
91
        self.y = np.vstack(yt)
1✔
92
        self._check_param()
1✔
93

94
        (
1✔
95
            _,
96
            _,
97
            self.X_offset,
98
            self.y_mean,
99
            self.X_scale,
100
            self.y_std,
101
        ) = standardization(np.concatenate(xt, axis=0), np.concatenate(yt, axis=0))
102

103
        self.X_norma_all = [(x - self.X_offset) / self.X_scale for x in xt]
1✔
104
        self.y_norma_all = np.vstack([(f - self.y_mean) / self.y_std for f in yt])
1✔
105

106
        if self.lvl == 1:
1✔
107
            # For a single level, initialize theta_ini, lower_bounds, and
108
            # upper_bounds with consistent shapes
109
            theta_ini = np.hstack(
1✔
110
                (self.options["sigma0"], self.options["theta0"])
111
            )  # Variance + initial theta values
112
            lower_bounds = np.hstack(
1✔
113
                (
114
                    self.options["sigma_bounds"][0],
115
                    np.full(self.nx, self.options["theta_bounds"][0]),
116
                )
117
            )
118
            upper_bounds = np.hstack(
1✔
119
                (
120
                    self.options["sigma_bounds"][1],
121
                    np.full(self.nx, self.options["theta_bounds"][1]),
122
                )
123
            )
124
            # Apply log10 to theta_ini and bounds
125
            nb_params = len(self.options["theta0"])
1✔
126
            theta_ini[: nb_params + 1] = np.log10(theta_ini[: nb_params + 1])
1✔
127
            lower_bounds[: nb_params + 1] = np.log10(lower_bounds[: nb_params + 1])
1✔
128
            upper_bounds[: nb_params + 1] = np.log10(upper_bounds[: nb_params + 1])
1✔
129
        else:
130
            for lvl in range(self.lvl):
1✔
131
                if lvl == 0:
1✔
132
                    # Initialize theta_ini for level 0
133
                    theta_ini = np.hstack(
1✔
134
                        (self.options["sigma0"], self.options["theta0"])
135
                    )  # Variance + initial theta values
136
                    lower_bounds = np.hstack(
1✔
137
                        (
138
                            self.options["sigma_bounds"][0],
139
                            np.full(self.nx, self.options["theta_bounds"][0]),
140
                        )
141
                    )
142
                    upper_bounds = np.hstack(
1✔
143
                        (
144
                            self.options["sigma_bounds"][1],
145
                            np.full(self.nx, self.options["theta_bounds"][1]),
146
                        )
147
                    )
148
                    # Apply log10 to theta_ini and bounds
149
                    nb_params = len(self.options["theta0"])
1✔
150
                    theta_ini[: nb_params + 1] = np.log10(theta_ini[: nb_params + 1])
1✔
151
                    lower_bounds[: nb_params + 1] = np.log10(
1✔
152
                        lower_bounds[: nb_params + 1]
153
                    )
154
                    upper_bounds[: nb_params + 1] = np.log10(
1✔
155
                        upper_bounds[: nb_params + 1]
156
                    )
157

158
                elif lvl > 0:
1✔
159
                    # For additional levels, append to theta_ini, lower_bounds, and upper_bounds
160
                    thetat = np.hstack((self.options["sigma0"], self.options["theta0"]))
1✔
161
                    lower_boundst = np.hstack(
1✔
162
                        (
163
                            self.options["sigma_bounds"][0],
164
                            np.full(self.nx, self.options["theta_bounds"][0]),
165
                        )
166
                    )
167
                    upper_boundst = np.hstack(
1✔
168
                        (
169
                            self.options["sigma_bounds"][1],
170
                            np.full(self.nx, self.options["theta_bounds"][1]),
171
                        )
172
                    )
173
                    # Apply log10 to the newly added values
174
                    thetat = np.log10(thetat)
1✔
175
                    lower_boundst = np.log10(lower_boundst)
1✔
176
                    upper_boundst = np.log10(upper_boundst)
1✔
177
                    # Append to theta_ini, lower_bounds, and upper_bounds
178
                    theta_ini = np.hstack([theta_ini, thetat, self.options["rho0"]])
1✔
179
                    lower_bounds = np.hstack([lower_bounds, lower_boundst])
1✔
180
                    upper_bounds = np.hstack([upper_bounds, upper_boundst])
1✔
181
                    # Finally, append the rho bounds
182
                    lower_bounds = np.hstack(
1✔
183
                        [lower_bounds, self.options["rho_bounds"][0]]
184
                    )
185
                    upper_bounds = np.hstack(
1✔
186
                        [upper_bounds, self.options["rho_bounds"][1]]
187
                    )
188

189
        theta_ini = theta_ini[:].T
1✔
190
        x_opt = theta_ini
1✔
191

192
        if self.options["hyper_opt"] == "Cobyla":
1✔
193
            if self.options["n_start"] > 1:
1✔
194
                sampling = LHS(
1✔
195
                    xlimits=np.stack((lower_bounds, upper_bounds), axis=1),
196
                    criterion="ese",
197
                    random_state=0,
198
                )
199
                theta_lhs_loops = sampling(self.options["n_start"])
1✔
200
                theta0 = np.vstack((theta_ini, theta_lhs_loops))
1✔
201

202
            constraints = []
1✔
203

204
            for i in range(len(theta_ini)):
1✔
205
                constraints.append(lambda theta0, i=i: theta0[i] - lower_bounds[i])
1✔
206
                constraints.append(lambda theta0, i=i: upper_bounds[i] - theta0[i])
1✔
207

208
            for j in range(self.options["n_start"]):
1✔
209
                optimal_theta_res_loop = optimize.minimize(
1✔
210
                    self.neg_log_likelihood_scipy,
211
                    theta0[j, :],
212
                    method="COBYLA",
213
                    constraints=[{"fun": con, "type": "ineq"} for con in constraints],
214
                    options={
215
                        "rhobeg": 0.2,
216
                        "tol": 1e-6,
217
                        "maxiter": 100,
218
                    },
219
                )
220
                x_opt_iter = optimal_theta_res_loop.x
1✔
221

222
                if j == 0:
1✔
223
                    x_opt = x_opt_iter
1✔
224
                    nll = optimal_theta_res_loop["fun"]
1✔
225
                else:
226
                    if optimal_theta_res_loop["fun"] < nll:
1✔
227
                        x_opt = x_opt_iter
1✔
228
                        nll = optimal_theta_res_loop["fun"]
1✔
229

230
        elif self.options["hyper_opt"] == "Cobyla-nlopt":
×
231
            try:
×
232
                import nlopt
×
233
            except ImportError:
×
234
                print("nlopt library is not installed or available on this system")
×
235

236
            opt = nlopt.opt(nlopt.LN_COBYLA, theta_ini.shape[0])
×
237
            opt.set_lower_bounds(lower_bounds)  # Lower bounds for each dimension
×
238
            opt.set_upper_bounds(upper_bounds)  # Upper bounds for each dimension
×
239
            opt.set_min_objective(self.neg_log_likelihood_nlopt)
×
NEW
240
            opt.set_maxeval(1000)
×
241
            opt.set_xtol_rel(1e-6)
×
242
            x0 = np.copy(theta_ini)
×
243
            x_opt = opt.optimize(x0)
×
244
        else:
245
            raise ValueError(
×
246
                f"The optimizer {self.options['hyper_opt']} is not available"
247
            )
248
        x_opt[0] = 10 ** (x_opt[0])  # Apply 10** to Sigma 0
1✔
249
        x_opt[1 : self.nx + 1] = (
1✔
250
            10 ** (x_opt[1 : self.nx + 1])
251
        )  # Apply 10** to length scales 0
252
        x_opt[self.nx + 1 :: self.nx + 2] = (
1✔
253
            10 ** (x_opt[self.nx + 1 :: self.nx + 2])
254
        )  # Apply 10** to sigmas gamma
255

256
        for i in np.arange(self.nx + 2, x_opt.shape[0] - 1, self.nx + 2):
1✔
257
            x_opt[i : i + self.nx] = 10 ** x_opt[i : i + self.nx]
1✔
258
        self.optimal_theta = x_opt
1✔
259

260
    def eta(self, j, jp, rho):
1✔
261
        """Compute eta_{j,l} based on the given rho values."""
262
        if j < jp:
1✔
263
            return np.prod(rho[j:jp])  # Product of rho[j+1] to rho[l]
1✔
264
        elif j == jp:
1✔
265
            return 1
1✔
266
        else:
267
            raise ValueError(
×
268
                f"The iterative variable j={j} cannot be greater than j'={jp}"
269
            )
270

271
    # Covariance between y_l(x) and y_l'(x')
272
    def compute_cross_K(self, x, xp, L, Lp, param):
1✔
273
        """
274
        Calculation Cov(y_l(x), y_{l'}(x')) using the autoregressive formulation.
275
        Parmeters:
276
        - x: First input for the covariannce (np.ndarray)
277
        - xp: Second input for the covariannce (np.ndarray)
278
        - L: Level index of the first output (scalar)
279
        - Lp: Level index of the second output (scalar)
280
        - param: Set of Hyper-parameters (vector)
281
        Returns:
282
        - Covariance matrix cov(y_l(x), y_{l'}(x')) (np.ndarray)
283
        """
284
        cov_value = 0.0
1✔
285

286
        sigma_0 = param[0]
1✔
287
        l_0 = param[1 : self.nx + 1]
1✔
288
        # param0 = param[0 : self.nx+1]
289
        sigmas_gamma = param[self.nx + 1 :: self.nx + 2]
1✔
290
        l_s = [
1✔
291
            param[i : i + self.nx].tolist()
292
            for i in np.arange(self.nx + 2, param.shape[0] - 1, self.nx + 2)
293
        ]
294
        # ls_gamma = param[3::3]
295
        rho_values = param[2 + 2 * self.nx :: self.nx + 2]
1✔
296

297
        # Sum of j=0 until l_^prime
298
        for j in range(Lp + 1):
1✔
299
            eta_j_l = self.eta(j, L, rho_values)
1✔
300
            eta_j_lp = self.eta(j, Lp, rho_values)
1✔
301

302
            if j == 0:
1✔
303
                # Cov(γ_j(x), γ_j(x')) using the kernel for K_00
304
                cov_gamma_j = self._compute_K(x, xp, [sigma_0, l_0])
1✔
305
            else:
306
                # Cov(γ_j(x), γ_j(x')) using the kernel
307
                cov_gamma_j = self._compute_K(x, xp, [sigmas_gamma[j - 1], l_s[j - 1]])
1✔
308
            # Add to the value of the covariance
309
            cov_value += eta_j_l * eta_j_lp * cov_gamma_j
1✔
310

311
        return cov_value
1✔
312

313
    def predict_all_levels(self, x):
1✔
314
        """
315
        Generalized prediction function for the multi-fidelity co-Kriging
316
        Parameters
317
        ----------
318
        x : np.ndarray
319
            Array with the inputs for make the prediction.
320
        Returns
321
        -------
322
        means : (list, np.array)
323
            Returns the conditional means per level.
324
        covariances: (list, np.array)
325
            Returns the conditional covariance matrixes per level.
326
        """
327
        means = []
1✔
328
        covariances = []
1✔
329
        x = (x - self.X_offset) / self.X_scale
1✔
330

331
        if self.lvl == 1:
1✔
332
            sigma = self.optimal_theta[0]  # Apply 10** to Sigma 0
1✔
333
            l_s = [self.optimal_theta[1 : self.nx + 1]]  # Apply 10** to length scales 0
1✔
334

335
            k_XX = self._compute_K(
1✔
336
                self.X_norma_all[0], self.X_norma_all[0], [sigma, l_s[0]]
337
            )
338
            k_xX = self._compute_K(x, self.X_norma_all[0], [sigma, l_s[0]])
1✔
339
            k_xx = self._compute_K(x, x, [sigma, l_s[0]])
1✔
340

341
            L = np.linalg.cholesky(
1✔
342
                k_XX + self.options["nugget"] * np.eye(k_XX.shape[0])
343
            )
344

345
            beta1 = solve_triangular(L, k_xX.T, lower=True)
1✔
346
            alpha1 = solve_triangular(L, self.y_norma_all, lower=True)
1✔
347
            means.append(self.y_std * np.dot(beta1.T, alpha1) + self.y_mean)
1✔
348
            covariances.append(k_xx - np.dot(beta1.T, beta1))
1✔
349

350
            # k_XX_inv = np.linalg.inv(k_XX + self.options["nugget"]*np.eye(k_XX.shape[0]))
351
            # means.append( np.dot(k_xX, np.matmul(k_XX_inv, self.y)))
352
            # covariances.append(k_xx - np.matmul(k_xX,
353
            #                                     np.matmul(k_XX_inv,
354
            #                                               k_xX.transpose())))
355
        else:
356
            self.K = self.compute_blockwise_K(self.optimal_theta)
1✔
357
            L = np.linalg.cholesky(
1✔
358
                self.K + self.options["nugget"] * np.eye(self.K.shape[0])
359
            )
360
            k_xX = []
1✔
361
            for ind in range(self.lvl):
1✔
362
                k_xx = self.compute_cross_K(x, x, ind, ind, self.optimal_theta)
1✔
363
                for j in range(self.lvl):
1✔
364
                    if ind >= j:
1✔
365
                        k_xX.append(
1✔
366
                            self.compute_cross_K(
367
                                self.X_norma_all[j], x, ind, j, self.optimal_theta
368
                            )
369
                        )
370
                    else:
371
                        k_xX.append(
1✔
372
                            self.compute_cross_K(
373
                                self.X_norma_all[j], x, j, ind, self.optimal_theta
374
                            )
375
                        )
376

377
                beta1 = solve_triangular(L, np.vstack(k_xX), lower=True)
1✔
378
                alpha1 = solve_triangular(L, self.y_norma_all, lower=True)
1✔
379
                means.append(self.y_std * np.dot(beta1.T, alpha1) + self.y_mean)
1✔
380
                covariances.append(k_xx - np.dot(beta1.T, beta1))
1✔
381
                k_xX.clear()
1✔
382

383
        return means, covariances
1✔
384

385
    def _predict(self, x):
1✔
386
        """
387
        Prediction function for the highest fidelity level
388
        Parameters
389
        ----------
390
        x : array
391
            Array with the inputs for make the prediction.
392
        Returns
393
        -------
394
        mean : np.array
395
            Returns the conditional means per level.
396
        covariance: np.ndarray
397
            Returns the conditional covariance matrixes per level.
398
        """
399
        means, covariances = self.predict_all_levels(x)
1✔
400

401
        return means[self.lvl - 1], covariances[self.lvl - 1]
1✔
402

403
    def neg_log_likelihood(self, param, grad=None):
1✔
404
        if self.lvl == 1:
1✔
405
            sigma = param[0]
1✔
406
            l_s = [param[1 : self.nx + 1]]
1✔
407
            self.K = self._compute_K(self.X[0], self.X[0], [sigma, l_s])
1✔
408
        else:
409
            self.K = self.compute_blockwise_K(param)
1✔
410

411
        reg_term = self.options["lambda"] * np.sum(np.power(param, 2))
1✔
412
        L = np.linalg.cholesky(
1✔
413
            self.K + self.options["nugget"] * np.eye(self.K.shape[0])
414
        )
415
        beta = solve_triangular(L, self.y_norma_all, lower=True)
1✔
416
        NMLL = np.sum(np.log(np.diag(L))) + np.dot(beta.T, beta) + reg_term
1✔
417
        (nmll,) = NMLL[0]
1✔
418
        return nmll
1✔
419

420
    def neg_log_likelihood_scipy(self, param):
1✔
421
        """
422
        Likelihood for Cobyla-scipy (SMT) optimizer
423
        """
424
        param = np.array(param, copy=True)
1✔
425
        param[0] = 10 ** (param[0])  # Apply 10** to Sigma 0
1✔
426
        param[1 : self.nx + 1] = (
1✔
427
            10 ** (param[1 : self.nx + 1])
428
        )  # Apply 10** to length scales 0
429
        param[self.nx + 1 :: self.nx + 2] = (
1✔
430
            10 ** (param[self.nx + 1 :: self.nx + 2])
431
        )  # Apply 10** to sigmas gamma
432

433
        for i in np.arange(self.nx + 2, param.shape[0] - 1, self.nx + 2):
1✔
434
            param[i : i + self.nx] = 10 ** param[i : i + self.nx]
1✔
435
        return self.neg_log_likelihood(param)
1✔
436

437
    def neg_log_likelihood_nlopt(self, param, grad=None):
1✔
438
        """
439
        Likelihood for nlopt optimizers
440
        """
441
        param = np.array(param, copy=True)
×
NEW
442
        param[0] = 10 ** (param[0])  # Apply 10** to Sigma 0
×
NEW
443
        param[1 : self.nx + 1] = (
×
444
            10 ** (param[1 : self.nx + 1])
445
        )  # Apply 10** to length scales 0
NEW
446
        param[self.nx + 1 :: self.nx + 2] = (
×
447
            10 ** (param[self.nx + 1 :: self.nx + 2])
448
        )  # Apply 10** to sigmas gamma
449

NEW
450
        for i in np.arange(self.nx + 2, param.shape[0] - 1, self.nx + 2):
×
NEW
451
            param[i : i + self.nx] = 10 ** param[i : i + self.nx]
×
UNCOV
452
        return self.neg_log_likelihood(param, grad)
×
453

454
    def compute_blockwise_K(self, param):
1✔
455
        K_block = {}
1✔
456
        n = self.y.shape[0]
1✔
457
        for jp in range(self.lvl):
1✔
458
            for j in range(self.lvl):
1✔
459
                if jp >= j:
1✔
460
                    K_block[str(jp) + str(j)] = self.compute_cross_K(
1✔
461
                        self.X_norma_all[j], self.X_norma_all[jp], jp, j, param
462
                    )
463

464
        K = np.zeros((n, n))
1✔
465
        row_init, col_init = 0, 0
1✔
466
        for j in range(self.lvl):
1✔
467
            col_init = row_init
1✔
468
            for jp in range(j, self.lvl):
1✔
469
                r, c = K_block[str(jp) + str(j)].shape
1✔
470
                K[row_init : row_init + r, col_init : col_init + c] = K_block[
1✔
471
                    str(jp) + str(j)
472
                ]
473
                if j != jp:
1✔
474
                    K[col_init : col_init + c, row_init : row_init + r] = K_block[
1✔
475
                        str(jp) + str(j)
476
                    ].T
477
                col_init += c
1✔
478
            row_init += r
1✔
479
        return K
1✔
480

481
    def _compute_K(self, A: np.ndarray, B: np.ndarray, param):
1✔
482
        """
483
        Compute the covariance matrix K between A and B
484
            Modified for MFCK initial test (Same theta for each dimmension)
485
        """
486
        # Compute pairwise componentwise L1-distances between A and B
487
        dx = differences(A, B)
1✔
488
        d = componentwise_distance(
1✔
489
            dx,
490
            self.options["corr"],
491
            self.X[0].shape[1],
492
            power=self.options["pow_exp_power"],
493
        )
494
        self.corr.theta = np.asarray(param[1])
1✔
495
        r = self.corr(d)
1✔
496
        R = r.reshape(A.shape[0], B.shape[0])
1✔
497
        K = param[0] * R
1✔
498
        return K
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc