• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WenjieDu / PyPOTS / 10736679932

06 Sep 2024 10:20AM UTC coverage: 83.273% (+0.2%) from 83.123%
10736679932

Pull #505

github

web-flow
Merge bfc8a18e1 into 66da59c96
Pull Request #505: Add TEFN model

129 of 132 new or added lines in 8 files covered. (97.73%)

2 existing lines in 2 files now uncovered.

11261 of 13523 relevant lines covered (83.27%)

4.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.93
/pypots/clustering/vader/model.py
1
"""
6✔
2
The implementation of VaDER for the partially-observed time-series clustering task.
3

4
"""
5

6
# Created by Wenjie Du <wenjay.du@gmail.com>
7
# License: BSD-3-Clause
8

9

10
import os
6✔
11
from typing import Union, Optional
6✔
12

13
import numpy as np
6✔
14
import torch
6✔
15
from scipy.stats import multivariate_normal
6✔
16
from sklearn.mixture import GaussianMixture
6✔
17
from torch.utils.data import DataLoader
6✔
18

19
from .data import DatasetForVaDER
6✔
20
from .core import inverse_softplus, _VaDER
6✔
21
from ..base import BaseNNClusterer
6✔
22
from ...optim.adam import Adam
6✔
23
from ...optim.base import Optimizer
6✔
24
from ...utils.logging import logger
6✔
25

26
try:
6✔
27
    import nni
6✔
28
except ImportError:
6✔
29
    pass
6✔
30

31

32
class VaDER(BaseNNClusterer):
6✔
33
    """The PyTorch implementation of the VaDER model :cite:`dejong2019VaDER`.
6✔
34

35
    Parameters
36
    ----------
37
    n_steps :
38
        The number of time steps in the time-series data sample.
39

40
    n_features :
41
        The number of features in the time-series data sample.
42

43
    n_clusters :
44
        The number of clusters in the clustering task.
45

46
    rnn_hidden_size :
47
        The size of the RNN hidden state, also the number of hidden units in the RNN cell.
48

49
    d_mu_stddev :
50
        The dimension of the mean and standard deviation of the Gaussian distribution.
51

52
    batch_size :
53
        The batch size for training and evaluating the model.
54

55
    pretrain_epochs :
56
        The number of epochs for pretraining the model.
57

58
    epochs :
59
        The number of epochs for training the model.
60

61
    patience :
62
        The patience for the early-stopping mechanism. Given a positive integer, the training process will be
63
        stopped when the model does not perform better after that number of epochs.
64
        Leaving it default as None will disable the early-stopping.
65

66
    optimizer :
67
        The optimizer for model training.
68
        If not given, will use a default Adam optimizer.
69
    num_workers :
70
        The number of subprocesses to use for data loading.
71
        `0` means data loading will be in the main process, i.e. there won't be subprocesses.
72

73
    device :
74
        The device for the model to run on.
75
        If not given, will try to use CUDA devices first (will use the GPU with device number 0 only by default),
76
        then CPUs, considering CUDA and CPU are so far the main devices for people to train ML models.
77
        Other devices like Google TPU and Apple Silicon accelerator MPS may be added in the future.
78

79
    saving_path :
80
        The path for automatically saving model checkpoints and tensorboard files (i.e. loss values recorded during
81
        training into a tensorboard file). Will not save if not given.
82

83
    model_saving_strategy :
84
        The strategy to save model checkpoints. It has to be one of [None, "best", "better", "all"].
85
        No model will be saved when it is set as None.
86
        The "best" strategy will only automatically save the best model after the training finished.
87
        The "better" strategy will automatically save the model during training whenever the model performs
88
        better than in previous epochs.
89
        The "all" strategy will save every model after each epoch training.
90

91
    verbose :
92
        Whether to print out the training logs during the training process.
93
    """
94

95
    def __init__(
6✔
96
        self,
97
        n_steps: int,
98
        n_features: int,
99
        n_clusters: int,
100
        rnn_hidden_size: int,
101
        d_mu_stddev: int,
102
        batch_size: int = 32,
103
        epochs: int = 100,
104
        pretrain_epochs: int = 10,
105
        patience: Optional[int] = None,
106
        optimizer: Optional[Optimizer] = Adam(),
107
        num_workers: int = 0,
108
        device: Optional[Union[str, torch.device, list]] = None,
109
        saving_path: str = None,
110
        model_saving_strategy: Optional[str] = "best",
111
        verbose: bool = True,
112
    ):
113
        super().__init__(
6✔
114
            n_clusters,
115
            batch_size,
116
            epochs,
117
            patience,
118
            num_workers,
119
            device,
120
            saving_path,
121
            model_saving_strategy,
122
            verbose,
123
        )
124

125
        assert (
6✔
126
            pretrain_epochs > 0
127
        ), f"pretrain_epochs must be a positive integer, but got {pretrain_epochs}"
128

129
        self.n_steps = n_steps
6✔
130
        self.n_features = n_features
6✔
131
        self.pretrain_epochs = pretrain_epochs
6✔
132

133
        # set up the model
134
        self.model = _VaDER(
6✔
135
            n_steps, n_features, n_clusters, rnn_hidden_size, d_mu_stddev
136
        )
137
        self._send_model_to_given_device()
6✔
138
        self._print_model_size()
6✔
139

140
        # set up the optimizer
141
        self.optimizer = optimizer
6✔
142
        self.optimizer.init_optimizer(self.model.parameters())
6✔
143

144
    def _assemble_input_for_training(self, data: list) -> dict:
6✔
145
        # fetch data
146
        indices, X, missing_mask = self._send_data_to_given_device(data)
6✔
147

148
        inputs = {
6✔
149
            "X": X,
150
            "missing_mask": missing_mask,
151
        }
152

153
        return inputs
6✔
154

155
    def _assemble_input_for_validating(self, data: list) -> dict:
6✔
156
        return self._assemble_input_for_training(data)
6✔
157

158
    def _assemble_input_for_testing(self, data: list) -> dict:
6✔
159
        return self._assemble_input_for_validating(data)
6✔
160

161
    def _train_model(
6✔
162
        self,
163
        training_loader: DataLoader,
164
        val_loader: DataLoader = None,
165
    ) -> None:
166
        # each training starts from the very beginning, so reset the loss and model dict here
167
        self.best_loss = float("inf")
6✔
168
        self.best_model_dict = None
6✔
169

170
        # pretrain to initialize parameters of GMM layer
171
        pretraining_step = 0
6✔
172
        for epoch in range(self.pretrain_epochs):
6✔
173
            self.model.train()
6✔
174
            for idx, data in enumerate(training_loader):
6✔
175
                pretraining_step += 1
6✔
176
                inputs = self._assemble_input_for_training(data)
6✔
177
                self.optimizer.zero_grad()
6✔
178
                results = self.model.forward(inputs, pretrain=True)
6✔
179
                results["loss"].sum().backward()
6✔
180
                self.optimizer.step()
6✔
181

182
                # save pre-training loss logs into the tensorboard file for every step if in need
183
                if self.summary_writer is not None:
6✔
184
                    self._save_log_into_tb_file(
6✔
185
                        pretraining_step, "pretraining", results
186
                    )
187

188
        with torch.no_grad():
6✔
189
            sample_collector = []
6✔
190
            for _ in range(10):  # sampling 10 times
6✔
191
                for idx, data in enumerate(training_loader):
6✔
192
                    inputs = self._assemble_input_for_validating(data)
6✔
193
                    results = self.model.forward(inputs, pretrain=True)
6✔
194
                    sample_collector.append(results["z"])
6✔
195
            samples = torch.cat(sample_collector).cpu().detach().numpy()
6✔
196

197
            # leverage the below loop to automatically fix the exception ValueError raised by gmm.fit()
198
            flag = 0
6✔
199
            reg_covar = 1e-04
6✔
200
            while flag <= 0:
6✔
201
                try:
6✔
202
                    gmm = GaussianMixture(
6✔
203
                        n_components=self.n_clusters,
204
                        covariance_type="diag",
205
                        reg_covar=reg_covar,
206
                        # reg_covar is set as 1e-04 in the official implementation, but may cause ValueError: Fitting
207
                        # the mixture model failed because some components have ill-defined empirical covariance
208
                        # (for instance caused by singleton or collapsed samples). Try to decrease the number
209
                        # of components, or increase reg_covar.
210
                    )
211
                    gmm.fit(samples)
6✔
212
                    flag = 1
6✔
213
                except ValueError as e:
×
214
                    logger.error(f"❌ Exception: {e}")
×
215
                    logger.warning(
×
216
                        "‼️ Met with ValueError, double `reg_covar` to re-train the GMM model."
217
                    )
218

219
                    flag -= 1
×
220
                    if flag == -5:
×
221
                        logger.error(
×
222
                            f"❌ Doubled `reg_covar` for 4 times, its current value is {reg_covar}, but still failed.\n"
223
                            f"Now quit to let you check your model training.\n"
224
                            "Please raise an issue https://github.com/WenjieDu/PyPOTS/issues if you have questions."
225
                        )
226
                        raise RuntimeError
×
227
                    else:
228
                        reg_covar *= 2
×
229

230
                    continue
×
231

232
            # get GMM parameters
233
            mu = gmm.means_
6✔
234
            var = inverse_softplus(gmm.covariances_)
6✔
235
            phi = np.log(gmm.weights_ + 1e-9)  # inverse softmax
6✔
236
            device = results["z"].device
6✔
237

238
            # use trained GMM's parameters to init GMM layer's
239
            if isinstance(self.device, list):  # if using multi-GPU
6✔
240
                self.model.module.backbone.gmm_layer.set_values(
×
241
                    torch.from_numpy(mu).to(device),
242
                    torch.from_numpy(var).to(device),
243
                    torch.from_numpy(phi).to(device),
244
                )
245
            else:
246
                self.model.backbone.gmm_layer.set_values(
6✔
247
                    torch.from_numpy(mu).to(device),
248
                    torch.from_numpy(var).to(device),
249
                    torch.from_numpy(phi).to(device),
250
                )
251

252
        try:
6✔
253
            training_step = 0
6✔
254
            for epoch in range(1, self.epochs + 1):
6✔
255
                self.model.train()
6✔
256
                epoch_train_loss_collector = []
6✔
257
                for idx, data in enumerate(training_loader):
6✔
258
                    training_step += 1
6✔
259
                    inputs = self._assemble_input_for_training(data)
6✔
260
                    self.optimizer.zero_grad()
6✔
261
                    results = self.model.forward(inputs)
6✔
262
                    results["loss"].sum().backward()
6✔
263
                    self.optimizer.step()
6✔
264
                    epoch_train_loss_collector.append(results["loss"].sum().item())
6✔
265

266
                    # save training loss logs into the tensorboard file for every step if in need
267
                    if self.summary_writer is not None:
6✔
268
                        self._save_log_into_tb_file(training_step, "training", results)
6✔
269

270
                # mean training loss of the current epoch
271
                mean_train_loss = np.mean(epoch_train_loss_collector)
6✔
272

273
                if val_loader is not None:
6✔
274
                    self.model.eval()
6✔
275
                    epoch_val_loss_collector = []
6✔
276
                    with torch.no_grad():
6✔
277
                        for idx, data in enumerate(val_loader):
6✔
278
                            inputs = self._assemble_input_for_validating(data)
6✔
279
                            results = self.model.forward(inputs)
6✔
280
                            epoch_val_loss_collector.append(
6✔
281
                                results["loss"].sum().item()
282
                            )
283

284
                    mean_val_loss = np.mean(epoch_val_loss_collector)
6✔
285

286
                    # save validation loss logs into the tensorboard file for every epoch if in need
287
                    if self.summary_writer is not None:
6✔
288
                        val_loss_dict = {
6✔
289
                            "loss": mean_val_loss,
290
                        }
291
                        self._save_log_into_tb_file(epoch, "validating", val_loss_dict)
6✔
292

293
                    logger.info(
6✔
294
                        f"Epoch {epoch:03d} - "
295
                        f"training loss: {mean_train_loss:.4f}, "
296
                        f"validation loss: {mean_val_loss:.4f}"
297
                    )
298
                    mean_loss = mean_val_loss
6✔
299
                else:
300
                    logger.info(
×
301
                        f"Epoch {epoch:03d} - training loss: {mean_train_loss:.4f}"
302
                    )
303
                    mean_loss = mean_train_loss
×
304

305
                if np.isnan(mean_loss):
6✔
306
                    logger.warning(
×
307
                        f"‼️ Attention: got NaN loss in Epoch {epoch}. This may lead to unexpected errors."
308
                    )
309

310
                if mean_loss < self.best_loss:
6✔
311
                    self.best_epoch = epoch
6✔
312
                    self.best_loss = mean_loss
6✔
313
                    self.best_model_dict = self.model.state_dict()
6✔
314
                    self.patience = self.original_patience
6✔
315
                else:
UNCOV
316
                    self.patience -= 1
4✔
317

318
                # save the model if necessary
319
                self._auto_save_model_if_necessary(
6✔
320
                    confirm_saving=self.best_epoch == epoch,
321
                    saving_name=f"{self.__class__.__name__}_epoch{epoch}_loss{mean_loss}",
322
                )
323

324
                if os.getenv("enable_tuning", False):
6✔
325
                    nni.report_intermediate_result(mean_loss)
×
326
                    if epoch == self.epochs - 1 or self.patience == 0:
×
327
                        nni.report_final_result(self.best_loss)
×
328

329
                if self.patience == 0:
6✔
330
                    logger.info(
×
331
                        "Exceeded the training patience. Terminating the training procedure..."
332
                    )
333
                    break
×
334

335
        except KeyboardInterrupt:  # if keyboard interrupt, only warning
×
336
            logger.warning("‼️ Training got interrupted by the user. Exist now ...")
×
337
        except Exception as e:  # other kind of exception follows below processing
×
338
            logger.error(f"❌ Exception: {e}")
×
339
            if self.best_model_dict is None:  # if no best model, raise error
×
340
                raise RuntimeError(
×
341
                    "Training got interrupted. Model was not trained. Please investigate the error printed above."
342
                )
343
            else:
344
                RuntimeWarning(
×
345
                    "Training got interrupted. Please investigate the error printed above.\n"
346
                    "Model got trained and will load the best checkpoint so far for testing.\n"
347
                    "If you don't want it, please try fit() again."
348
                )
349

350
        if np.isnan(self.best_loss):
6✔
351
            raise ValueError("Something is wrong. best_loss is Nan after training.")
×
352

353
        logger.info(
6✔
354
            f"Finished training. The best model is from epoch#{self.best_epoch}."
355
        )
356

357
    def fit(
6✔
358
        self,
359
        train_set: Union[dict, str],
360
        val_set: Optional[Union[dict, str]] = None,
361
        file_type: str = "hdf5",
362
    ) -> None:
363
        # Step 1: wrap the input data with classes Dataset and DataLoader
364
        training_set = DatasetForVaDER(train_set, return_y=False, file_type=file_type)
6✔
365
        training_loader = DataLoader(
6✔
366
            training_set,
367
            batch_size=self.batch_size,
368
            shuffle=True,
369
            num_workers=self.num_workers,
370
        )
371

372
        val_loader = None
6✔
373
        if val_set is not None:
6✔
374
            val_set = DatasetForVaDER(val_set, return_y=False, file_type=file_type)
6✔
375
            val_loader = DataLoader(
6✔
376
                val_set,
377
                batch_size=self.batch_size,
378
                shuffle=False,
379
                num_workers=self.num_workers,
380
            )
381

382
        # Step 2: train the model and freeze it
383
        self._train_model(training_loader, val_loader)
6✔
384
        self.model.load_state_dict(self.best_model_dict)
6✔
385
        self.model.eval()  # set the model as eval status to freeze it.
6✔
386

387
        # Step 3: save the model if necessary
388
        self._auto_save_model_if_necessary(confirm_saving=True)
6✔
389

390
    def predict(
6✔
391
        self,
392
        test_set: Union[dict, str],
393
        file_type: str = "hdf5",
394
        return_latent_vars: bool = False,
395
    ) -> dict:
396
        """Make predictions for the input data with the trained model.
397

398
        Parameters
399
        ----------
400
        test_set : dict or str
401
            The dataset for model validating, should be a dictionary including keys as 'X',
402
            or a path string locating a data file supported by PyPOTS (e.g. h5 file).
403
            If it is a dict, X should be array-like of shape [n_samples, sequence length (n_steps), n_features],
404
            which is time-series data for validating, can contain missing values, and y should be array-like of shape
405
            [n_samples], which is classification labels of X.
406
            If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
407
            key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
408

409
        file_type :
410
            The type of the given file if test_set is a path string.
411

412
        return_latent_vars : bool
413
            Whether to return the latent variables in VaDER, e.g. mu and phi, etc.
414

415
        Returns
416
        -------
417
        file_type :
418
            The dictionary containing the clustering results and latent variables if necessary.
419

420
        """
421
        self.model.eval()  # set the model as eval status to freeze it.
6✔
422
        test_set = DatasetForVaDER(test_set, return_y=False, file_type=file_type)
6✔
423
        test_loader = DataLoader(
6✔
424
            test_set,
425
            batch_size=self.batch_size,
426
            shuffle=False,
427
            num_workers=self.num_workers,
428
        )
429
        mu_tilde_collector = []
6✔
430
        stddev_tilde_collector = []
6✔
431
        mu_collector = []
6✔
432
        var_collector = []
6✔
433
        phi_collector = []
6✔
434
        z_collector = []
6✔
435
        imputation_latent_collector = []
6✔
436
        clustering_results_collector = []
6✔
437

438
        with torch.no_grad():
6✔
439
            for idx, data in enumerate(test_loader):
6✔
440
                inputs = self._assemble_input_for_testing(data)
6✔
441
                results = self.model.forward(inputs, training=False)
6✔
442

443
                mu_tilde = results["mu_tilde"].cpu().numpy()
6✔
444
                mu_tilde_collector.append(mu_tilde)
6✔
445
                mu = results["mu"].cpu().numpy()
6✔
446
                mu_collector.append(mu)
6✔
447
                var = results["var"].cpu().numpy()
6✔
448
                var_collector.append(var)
6✔
449
                phi = results["phi"].cpu().numpy()
6✔
450
                phi_collector.append(phi)
6✔
451

452
                def func_to_apply(
6✔
453
                    mu_t_: np.ndarray,
454
                    mu_: np.ndarray,
455
                    stddev_: np.ndarray,
456
                    phi_: np.ndarray,
457
                ) -> np.ndarray:
458
                    # the covariance matrix is diagonal, so we can just take the product
459
                    return np.log(1e-9 + phi_) + np.log(
6✔
460
                        1e-9
461
                        + multivariate_normal.pdf(mu_t_, mean=mu_, cov=np.diag(stddev_))
462
                    )
463

464
                p = np.array(
6✔
465
                    [
466
                        func_to_apply(mu_tilde, mu[i], var[i], phi[i])
467
                        for i in np.arange(mu.shape[0])
468
                    ]
469
                )
470
                clustering_results = np.argmax(p, axis=0)
6✔
471
                clustering_results_collector.append(clustering_results)
6✔
472

473
                if return_latent_vars:
6✔
474
                    stddev_tilde = results["stddev_tilde"].cpu().numpy()
6✔
475
                    stddev_tilde_collector.append(stddev_tilde)
6✔
476
                    z = results["z"].cpu().numpy()
6✔
477
                    z_collector.append(z)
6✔
478
                    imputation_latent = results["imputation_latent"].cpu().numpy()
6✔
479
                    imputation_latent_collector.append(imputation_latent)
6✔
480

481
        clustering = np.concatenate(clustering_results_collector)
6✔
482
        result_dict = {
6✔
483
            "clustering": clustering,
484
        }
485

486
        if return_latent_vars:
6✔
487
            latent_var_collector = {
6✔
488
                "mu_tilde": np.concatenate(mu_tilde_collector),
489
                "stddev_tilde": np.concatenate(stddev_tilde_collector),
490
                "mu": np.concatenate(mu_collector),
491
                "var": np.concatenate(var_collector),
492
                "phi": np.concatenate(phi_collector),
493
                "z": np.concatenate(z_collector),
494
                "imputation_latent": np.concatenate(imputation_latent_collector),
495
            }
496
            result_dict["latent_vars"] = latent_var_collector
6✔
497

498
        return result_dict
6✔
499

500
    def cluster(
6✔
501
        self,
502
        test_set: Union[dict, str],
503
        file_type: str = "hdf5",
504
    ) -> Union[np.ndarray]:
505
        """Cluster the input with the trained model.
506

507
        Parameters
508
        ----------
509
        test_set :
510
            The data samples for testing, should be array-like of shape [n_samples, sequence length (n_steps),
511
            n_features], or a path string locating a data file, e.g. h5 file.
512

513
        file_type :
514
            The type of the given file if X is a path string.
515

516
        Returns
517
        -------
518
        array-like,
519
            Clustering results.
520

521
        """
522

523
        result_dict = self.predict(test_set, file_type=file_type)
×
524
        return result_dict["clustering"]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc