• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tonegas / nnodely / 19576012316

21 Nov 2025 03:56PM UTC coverage: 96.605% (-0.04%) from 96.644%
19576012316

push

github

MisterMandarino
Merge branch 'develop' of https://github.com/tonegas/nnodely into develop

50 of 50 new or added lines in 7 files covered. (100.0%)

71 existing lines in 10 files now uncovered.

12947 of 13402 relevant lines covered (96.6%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.01
/nnodely/operators/validator.py
1
import torch, warnings
1✔
2
import numpy as np
1✔
3

4
from nnodely.support.utils import ReadOnlyDict, get_batch_size
1✔
5

6
from nnodely.basic.loss import CustomLoss
1✔
7
from nnodely.operators.network import Network
1✔
8
from nnodely.support.utils import  check, TORCH_DTYPE, enforce_types
1✔
9

10
class Validator(Network):
1✔
11
    @enforce_types
1✔
12
    def __init__(self):
1✔
13
        check(type(self) is not Validator, TypeError, "Validator class cannot be instantiated directly")
1✔
14
        super().__init__()
1✔
15

16
        # Validation Parameters
17
        self.__performance = {}
1✔
18
        self.__prediction = {}
1✔
19

20
    @property
1✔
21
    def performance(self):
1✔
22
        return ReadOnlyDict(self.__performance)
1✔
23

24
    @property
1✔
25
    def prediction(self):
1✔
26
        return ReadOnlyDict(self.__prediction)
1✔
27

28
    @enforce_types
1✔
29
    def _analyze(self,
1✔
30
                  dataset: dict,
31
                  dataset_tag: str,
32
                  indexes: list = None,
33
                  minimize_gain: dict = {},
34
                  closed_loop: dict = {},
35
                  connect: dict = {},
36
                  prediction_samples: int | str = 0,
37
                  step: int = 0,
38
                  batch_size: int | None = None
39
                ) -> None:
40
        with torch.enable_grad() if self._get_gradient_on_inference() else torch.inference_mode():
1✔
41
            self._model.eval()
1✔
42
            self.__performance[dataset_tag] = {}
1✔
43
            self.__prediction[dataset_tag] = {}
1✔
44
            A = {}
1✔
45
            B = {}
1✔
46
            idxs = None
1✔
47
            total_losses = {}
1✔
48

49
            # Create the losses
50
            losses = {}
1✔
51
            for name, values in self._model_def['Minimizers'].items():
1✔
52
                losses[name] = CustomLoss(values['loss'])
1✔
53

54
            data = self._get_data(dataset)
1✔
55
            n_samples = len(data[list(data.keys())[0]])
1✔
56

57
            batch_size = get_batch_size(n_samples, batch_size, prediction_samples)
1✔
58
            prediction_samples = self._setup_recurrent_variables(prediction_samples, closed_loop, connect)
1✔
59

60
            if prediction_samples >= 0:
1✔
61
                mandatory_inputs, non_mandatory_inputs = self._get_mandatory_inputs(connect,closed_loop)
1✔
62

63
                idxs = []
1✔
64
                for horizon_idx in range(prediction_samples + 1):
1✔
65
                    idxs.append([])
1✔
66
                for key, value in self._model_def['Minimizers'].items():
1✔
67
                    total_losses[key], A[key], B[key] = [], [], []
1✔
68
                    for horizon_idx in range(prediction_samples + 1):
1✔
69
                        A[key].append([])
1✔
70
                        B[key].append([])
1✔
71

72
                ## Update with virtual states
73
                self._model.update(closed_loop = closed_loop, connect = connect)
1✔
74
                self._recurrent_inference(data, indexes, batch_size, minimize_gain, prediction_samples,
1✔
75
                                          step, non_mandatory_inputs, mandatory_inputs, losses,
76
                                          total_losses = total_losses, A = A, B = B, idxs = idxs)
77

78
                for horizon_idx in range(prediction_samples + 1):
1✔
79
                    idxs[horizon_idx] = np.concatenate(idxs[horizon_idx])
1✔
80
                for key, value in self._model_def['Minimizers'].items():
1✔
81
                    for horizon_idx in range(prediction_samples + 1):
1✔
82
                        if A is not None:
1✔
83
                            A[key][horizon_idx] = np.concatenate(A[key][horizon_idx])
1✔
84
                        if B is not None:
1✔
85
                            B[key][horizon_idx] = np.concatenate(B[key][horizon_idx])
1✔
86
                    if total_losses is not None:
1✔
87
                        total_losses[key] = np.mean(total_losses[key])
1✔
88
            else:
89
                for key, value in self._model_def['Minimizers'].items():
1✔
90
                    total_losses[key], A[key], B[key] = [], [], []
1✔
91

92
                self._model.update(disconnect=True)
1✔
93
                self._inference(data, n_samples, batch_size, minimize_gain, losses,
1✔
94
                                total_losses = total_losses, A = A, B = B)
95

96
                for key, value in self._model_def['Minimizers'].items():
1✔
97
                    A[key] = np.concatenate(A[key])
1✔
98
                    B[key] = np.concatenate(B[key])
1✔
99
                    total_losses[key] = np.mean(total_losses[key])
1✔
100

101
            for ind, (key, value) in enumerate(self._model_def['Minimizers'].items()):
1✔
102
                A_np = np.array(A[key])
1✔
103
                B_np = np.array(B[key])
1✔
104
                self.__performance[dataset_tag][key] = {}
1✔
105
                self.__performance[dataset_tag][key][value['loss']] = np.mean(total_losses[key]).item()
1✔
106
                self.__performance[dataset_tag][key]['fvu'] = {}
1✔
107
                # Compute FVU
108
                residual = A_np - B_np
1✔
109
                error_var = np.var(residual)
1✔
110
                error_mean = np.mean(residual)
1✔
111
                #error_var_manual = np.sum((residual-error_mean) ** 2) / (len(self.__prediction['B'][ind]) - 0)
112
                #print(f"{key} var np:{new_error_var} and var manual:{error_var_manual}")
113
                with warnings.catch_warnings(record=True) as w:
1✔
114
                    self.__performance[dataset_tag][key]['fvu']['A'] = (error_var / np.var(A_np)).item()
1✔
115
                    self.__performance[dataset_tag][key]['fvu']['B'] = (error_var / np.var(B_np)).item()
1✔
116
                    if w and np.var(A_np) == 0.0 and  np.var(B_np) == 0.0:
1✔
117
                        self.__performance[dataset_tag][key]['fvu']['A'] = np.nan
1✔
118
                        self.__performance[dataset_tag][key]['fvu']['B'] = np.nan
1✔
119
                self.__performance[dataset_tag][key]['fvu']['total'] = np.mean([self.__performance[dataset_tag][key]['fvu']['A'],self.__performance[dataset_tag][key]['fvu']['B']]).item()
1✔
120
                # Compute AIC
121
                #normal_dist = norm(0, error_var ** 0.5)
122
                #probability_of_residual = normal_dist.pdf(residual)
123
                #log_likelihood_first = sum(np.log(probability_of_residual))
124
                p1 = -len(residual)/2.0*np.log(2*np.pi)
1✔
125
                with warnings.catch_warnings(record=True) as w:
1✔
126
                    p2 = -len(residual)/2.0*np.log(error_var)
1✔
127
                    p3 = -1 / (2.0 * error_var) * np.sum(residual ** 2)
1✔
128
                    if w and p2 == np.float32(np.inf) and p3 == np.float32(-np.inf):
1✔
129
                        p2 = p3 = 0.0
1✔
130
                log_likelihood = p1+p2+p3
1✔
131
                #print(f"{key} log likelihood second mode:{log_likelihood} = {p1}+{p2}+{p3} first mode: {log_likelihood_first}")
132
                total_params = sum(p.numel() for p in self._model.parameters() if p.requires_grad)
1✔
133
                #print(f"{key} total_params:{total_params}")
134
                aic = - 2 * log_likelihood + 2 * total_params
1✔
135
                #print(f"{key} aic:{aic}")
136
                self.__performance[dataset_tag][key]['aic'] = {'value':aic,'total_params':total_params,'log_likelihood':log_likelihood}
1✔
137
                # Prediction and target
138
                self.__prediction[dataset_tag][key] = {}
1✔
139
                self.__prediction[dataset_tag][key]['A'] = A_np.tolist()
1✔
140
                self.__prediction[dataset_tag][key]['B'] = B_np.tolist()
1✔
141

142
            if idxs is not None:
1✔
143
                self.__prediction[dataset_tag]['idxs'] = np.array(idxs).tolist()
1✔
144
            self.__performance[dataset_tag]['total'] = {}
1✔
145
            self.__performance[dataset_tag]['total']['mean_error'] = np.mean([value for key,value in total_losses.items()])
1✔
146
            self.__performance[dataset_tag]['total']['fvu'] = np.mean([self.__performance[dataset_tag][key]['fvu']['total'] for key in self._model_def['Minimizers'].keys()])
1✔
147
            self.__performance[dataset_tag]['total']['aic'] = np.mean([self.__performance[dataset_tag][key]['aic']['value']for key in self._model_def['Minimizers'].keys()])
1✔
148

149
        self.visualizer.showResult(dataset_tag)
1✔
150

151
    @enforce_types
1✔
152
    def analyzeModel(self,
1✔
153
                       dataset: str | list | dict | None = None, *,
154
                       splits: list | None = None,
155
                       name: str | None = None,
156
                       minimize_gain: dict = {},
157
                       closed_loop: dict = {},
158
                       connect: dict = {},
159
                       prediction_samples: int | str = 0,
160
                       step: int = 0,
161
                       batch_size: int | None = None
162
                       ) -> None:
163
        """
164
        The function is used to analyze the performance of the model on the provided dataset.
165

166
        Parameters
167
        ----------
168
        dataset : str | list | dict
169
            Dataset to analyze the performance of the model on.
170
        splits : list or None, optional
171
            A list of 3 elements specifying the percentage of splits for training, validation, and testing.
172
            The three elements must sum up to 100! default is [100, 0, 0]
173
        name : str or None
174
            Label to be used in the plots
175
        minimize_gain : dict
176
            A dictionary specifying the gain for each minimization loss function.
177
        closed_loop : dict or None, optional
178
            A dictionary specifying closed loop connections. The keys are input names and the values are output names. Default is None.
179
        connect : dict or None, optional
180
            A dictionary specifying connections. The keys are input names and the values are output names. Default is None.
181
        step : int or None, optional
182
            The step size to analyze the model on the provided dataset. A big value will result in less data used for each epochs and a faster train. Default is None.
183
        prediction_samples : int or None, optional
184
            The size of the prediction horizon. Number of samples at each recurrent window Default is None.
185
        batch_size :
186
            The batch size use for analyse the performance of the model on the provided dataset.
187

188

189
        """
190
        # Get the dataset if is None take all datasets
UNCOV
191
        if dataset is None:
×
UNCOV
192
            dataset = list(self._data.keys())
×
193

UNCOV
194
        data = self._get_data(dataset) 
×
UNCOV
195
        n_samples = len(data[list(data.keys())[0]])
×
UNCOV
196
        data_tag = self._get_tag(dataset) if name is None else name
×
UNCOV
197
        indexes = list(range(n_samples))
×
UNCOV
198
        dataset_name = data_tag.replace('_train','').replace('_val','').replace('_test','')
×
UNCOV
199
        if prediction_samples > 0 and dataset_name in self._multifile.keys():
×
UNCOV
200
            forbidden_idxs = []
×
UNCOV
201
            for i in self._multifile[dataset_name]:
×
UNCOV
202
                if i < indexes[-1]:
×
UNCOV
203
                    forbidden_idxs.extend(range((i) - prediction_samples, (i), 1))
×
UNCOV
204
            indexes = [idx for idx in indexes if idx not in forbidden_idxs]
×
UNCOV
205
            indexes = indexes[:-prediction_samples]
×
206

207
        # If splits is None it uses all the dataset
UNCOV
208
        if splits is None:
×
UNCOV
209
            data = self._get_data(dataset)
×
UNCOV
210
            self._analyze(data, data_tag, indexes, minimize_gain, closed_loop, connect, prediction_samples, step, batch_size)
×
211
        else:
UNCOV
212
            data_train, data_val, data_test = self._setup_dataset(None, None, None, dataset, splits)
×
UNCOV
213
            n_samples_val = next(iter(data_val.values())).size(0) if data_val else 0
×
UNCOV
214
            n_samples_test = next(iter(data_test.values())).size(0) if data_test else 0
×
215

UNCOV
216
            self._analyze(data_train, f"{data_tag}_train", indexes, minimize_gain, closed_loop, connect, prediction_samples, step, batch_size)
×
UNCOV
217
            if n_samples_val > 0:
×
UNCOV
218
                self._analyze(data_val, f"{data_tag}_val", indexes, minimize_gain, closed_loop, connect, prediction_samples, step, batch_size)
×
UNCOV
219
            if n_samples_test > 0:
×
UNCOV
220
                self._analyze(data_test, f"{data_tag}_test", indexes, minimize_gain, closed_loop, connect, prediction_samples, step, batch_size)
×
221

222

223
    @enforce_types
1✔
224
    def analyzeModel(self,
1✔
225
                dataset: str | list | dict | None = None, *,
226
                tag: str | None = None,
227
                minimize_gain: dict = {},
228
                closed_loop: dict = {},
229
                connect: dict = {},
230
                prediction_samples: int | str = 0,
231
                step: int = 0,
232
                batch_size: int | None = None
233
                ) -> None:
234
        """
235
        The function is used to analyze the performance of the model on the provided dataset.
236

237
        Parameters
238
        ----------
239
        dataset : str | list | dict
240
            Dataset to analyze the performance of the model on.
241
        tag : str or None
242
            Label to be used in the plots
243
        minimize_gain : dict
244
            A dictionary specifying the gain for each minimization loss function.
245
        closed_loop : dict or None, optional
246
            A dictionary specifying closed loop connections. The keys are input names and the values are output names. Default is None.
247
        connect : dict or None, optional
248
            A dictionary specifying connections. The keys are input names and the values are output names. Default is None.
249
        step : int or None, optional
250
            The step size to analyze the model on the provided dataset. A big value will result in less data used for each epochs and a faster train. Default is None.
251
        prediction_samples : int or None, optional
252
            The size of the prediction horizon. Number of samples at each recurrent window Default is None.
253
        batch_size :
254
            The batch size use for analyse the performance of the model on the provided dataset.
255

256

257
        """
258
        # Get the dataset if is None take all datasets
259
        if tag is None:
1✔
260
            tag = dataset if isinstance(dataset, str) else 'default'
1✔
261
        if dataset is None:
1✔
262
            dataset = list(self._data.keys())
1✔
263

264
        data = self._get_data(dataset) 
1✔
265
        n_samples = next(iter(data.values())).size(0)
1✔
266
        indexes = self._get_batch_indexes(dataset, n_samples, prediction_samples)
1✔
267
        self._analyze(data, tag, indexes, minimize_gain, closed_loop, connect, prediction_samples, step, batch_size)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc