• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tonegas / nnodely / 13056267505

30 Jan 2025 04:04PM UTC coverage: 94.525% (+0.6%) from 93.934%
13056267505

push

github

web-flow
Merge pull request #48 from tonegas/develop

Develop merge on main release 1.0.0

1185 of 1215 new or added lines in 21 files covered. (97.53%)

3 existing lines in 2 files now uncovered.

9426 of 9972 relevant lines covered (94.52%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.68
/nnodely/modeldef.py
1
import copy
1✔
2

3
import numpy as np
1✔
4

5
from nnodely.input import closedloop_name, connect_name
1✔
6
from nnodely.utils import check, merge
1✔
7
from nnodely.relation import MAIN_JSON, Stream
1✔
8
from nnodely.output import Output
1✔
9

10
from nnodely.logger import logging, nnLogger
1✔
11
log = nnLogger(__name__, logging.INFO)
1✔
12

13
class ModelDef():
1✔
14
    def __init__(self, model_def = MAIN_JSON):
1✔
15
        # Models definition
16
        self.json_base = copy.deepcopy(model_def)
1✔
17

18
        # Inizialize the model definition
19
        self.json = copy.deepcopy(self.json_base)
1✔
20
        if "SampleTime" in self.json['Info']:
1✔
21
            self.sample_time = self.json['Info']["SampleTime"]
1✔
22
        else:
23
            self.sample_time = None
1✔
24
        self.model_dict = {}
1✔
25
        self.minimize_dict = {}
1✔
26
        self.update_state_dict = {}
1✔
27

28
    def __contains__(self, key):
1✔
29
        return key in self.json
1✔
30

31
    def __getitem__(self, key):
1✔
32
        return self.json[key]
1✔
33

34
    def __setitem__(self, key, value):
1✔
35
        self.json[key] = value
×
36

37
    def isDefined(self):
1✔
38
        return self.json is not None
1✔
39

40
    def update(self, model_def = None, model_dict = None, minimize_dict = None, update_state_dict = None):
1✔
41
        self.json = copy.deepcopy(model_def) if model_def is not None else copy.deepcopy(self.json_base)
1✔
42
        model_dict = copy.deepcopy(model_dict) if model_dict is not None else self.model_dict
1✔
43
        minimize_dict = copy.deepcopy(minimize_dict) if minimize_dict is not None else self.minimize_dict
1✔
44
        update_state_dict = copy.deepcopy(update_state_dict) if update_state_dict is not None else self.update_state_dict
1✔
45

46
        # Add models to the model_def
47
        for key, stream_list in model_dict.items():
1✔
48
            for stream in stream_list:
1✔
49
                self.json = merge(self.json, stream.json)
1✔
50
        if len(model_dict) > 1:
1✔
51
            if 'Models' not in self.json:
1✔
52
                self.json['Models'] = {}
1✔
53
            for model_name, model_params in model_dict.items():
1✔
54
                self.json['Models'][model_name] = {'Inputs': [], 'States': [], 'Outputs': [], 'Parameters': [],
1✔
55
                                                        'Constants': []}
56
                parameters, constants, inputs, states = set(), set(), set(), set()
1✔
57
                for param in model_params:
1✔
58
                    self.json['Models'][model_name]['Outputs'].append(param.name)
1✔
59
                    parameters |= set(param.json['Parameters'].keys())
1✔
60
                    constants |= set(param.json['Constants'].keys())
1✔
61
                    inputs |= set(param.json['Inputs'].keys())
1✔
62
                    states |= set(param.json['States'].keys())
1✔
63
                self.json['Models'][model_name]['Parameters'] = list(parameters)
1✔
64
                self.json['Models'][model_name]['Constants'] = list(constants)
1✔
65
                self.json['Models'][model_name]['Inputs'] = list(inputs)
1✔
66
                self.json['Models'][model_name]['States'] = list(states)
1✔
67
        elif len(model_dict) == 1:
1✔
68
            self.json['Models'] = list(model_dict.keys())[0]
1✔
69

70
        if 'Minimizers' not in self.json:
1✔
71
            self.json['Minimizers'] = {}
1✔
72
        for key, minimize in minimize_dict.items():
1✔
73
            self.json = merge(self.json, minimize['A'].json)
1✔
74
            self.json = merge(self.json, minimize['B'].json)
1✔
75
            self.json['Minimizers'][key] = {}
1✔
76
            self.json['Minimizers'][key]['A'] = minimize['A'].name
1✔
77
            self.json['Minimizers'][key]['B'] = minimize['B'].name
1✔
78
            self.json['Minimizers'][key]['loss'] = minimize['loss']
1✔
79

80
        for key, update_state in update_state_dict.items():
1✔
81
            self.json = merge(self.json, update_state.json)
1✔
82

83
        if "SampleTime" in self.json['Info']:
1✔
84
            self.sample_time = self.json['Info']["SampleTime"]
×
85

86

87
    def __update_state(self, stream_out, state_list_in, UpdateState):
1✔
88
        from nnodely.input import  State
1✔
89
        if type(state_list_in) is not list:
1✔
90
            state_list_in = [state_list_in]
1✔
91
        for state_in in state_list_in:
1✔
92
            check(isinstance(stream_out, (Output, Stream)), TypeError,
1✔
93
                  f"The {stream_out} must be a Stream or Output and not a {type(stream_out)}.")
94
            check(type(state_in) is State, TypeError,
1✔
95
                  f"The {state_in} must be a State and not a {type(state_in)}.")
96
            check(stream_out.dim['dim'] == state_in.dim['dim'], ValueError,
1✔
97
                  f"The dimension of {stream_out.name} is not equal to the dimension of {state_in.name} ({stream_out.dim['dim']}!={state_in.dim['dim']}).")
98
            if type(stream_out) is Output:
1✔
99
                stream_name = self.json['Outputs'][stream_out.name]
1✔
100
                stream_out = Stream(stream_name,stream_out.json,stream_out.dim, 0)
1✔
101
            self.update_state_dict[state_in.name] = UpdateState(stream_out, state_in)
1✔
102

103
    def addConnect(self, stream_out, state_list_in):
1✔
104
        from nnodely.input import Connect
1✔
105
        self.__update_state(stream_out, state_list_in, Connect)
1✔
106
        self.update()
1✔
107

108
    def addClosedLoop(self, stream_out, state_list_in):
1✔
109
        from nnodely.input import ClosedLoop
1✔
110
        self.__update_state(stream_out, state_list_in, ClosedLoop)
1✔
111
        self.update()
1✔
112

113
    def addModel(self, name, stream_list):
1✔
114
        if isinstance(stream_list, (Output,Stream)):
1✔
115
            stream_list = [stream_list]
1✔
116
        if type(stream_list) is list:
1✔
117
            self.model_dict[name] = copy.deepcopy(stream_list)
1✔
118
        else:
119
            raise TypeError(f'stream_list is type {type(stream_list)} but must be an Output or Stream or a list of them')
×
120
        self.update()
1✔
121

122
    def removeModel(self, name_list):
1✔
123
        if type(name_list) is str:
×
124
            name_list = [name_list]
×
125
        if type(name_list) is list:
×
126
            for name in name_list:
×
127
                check(name in self.model_dict, IndexError, f"The name {name} is not part of the available models")
×
128
                del self.model_dict[name]
×
129
        self.update()
×
130

131
    def addMinimize(self, name, streamA, streamB, loss_function='mse'):
1✔
132
        check(isinstance(streamA, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
133
        check(isinstance(streamB, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
134
        check(streamA.dim == streamB.dim, ValueError, f'Dimension of streamA={streamA.dim} and streamB={streamB.dim} are not equal.')
1✔
135
        self.minimize_dict[name]={'A':copy.deepcopy(streamA), 'B': copy.deepcopy(streamB), 'loss':loss_function}
1✔
136
        self.update()
1✔
137

138
    def removeMinimize(self, name_list):
1✔
139
        if type(name_list) is str:
1✔
140
            name_list = [name_list]
1✔
141
        if type(name_list) is list:
1✔
142
            for name in name_list:
1✔
143
                check(name in self.minimize_dict, IndexError, f"The name {name} is not part of the available minimuzes")
1✔
144
                del self.minimize_dict[name]
1✔
145
        self.update()
1✔
146

147
    def setBuildWindow(self, sample_time = None):
1✔
148
        check(self.json is not None, RuntimeError, "No model is defined!")
1✔
149
        if sample_time is not None:
1✔
150
            check(sample_time > 0, RuntimeError, 'Sample time must be strictly positive!')
1✔
151
            self.sample_time = sample_time
1✔
152
        else:
153
            if self.sample_time is None:
1✔
154
                self.sample_time = 1
1✔
155

156
        self.json['Info'] = {"SampleTime": self.sample_time}
1✔
157

158
        check(self.json['Inputs'] | self.json['States'] != {}, RuntimeError, "No model is defined!")
1✔
159
        json_inputs = self.json['Inputs'] | self.json['States']
1✔
160

161
        # for key,value in self.json['States'].items():
162
        #     check(closedloop_name in self.json['States'][key].keys() or connect_name in self.json['States'][key].keys(),
163
        #           KeyError, f'Update function is missing for state {key}. Use Connect or ClosedLoop to update the state.')
164

165
        input_tw_backward, input_tw_forward, input_ns_backward, input_ns_forward = {}, {}, {}, {}
1✔
166
        for key, value in json_inputs.items():
1✔
167
            if value['sw'] == [0,0] and value['tw'] == [0,0]:
1✔
168
                assert(False), f"Input {key} has no time window or sample window"
×
169
            if value['sw'] == [0, 0] and self.sample_time is not None:
1✔
170
                ## check if value['tw'] is a multiple of sample_time
171
                absolute_tw = abs(value['tw'][0]) + abs(value['tw'][1])
1✔
172
                check(round(absolute_tw % self.sample_time) == 0, ValueError,
1✔
173
                      f"Time window of input {key} is not a multiple of sample time. This network cannot be neuralized")
174
                input_ns_backward[key] = round(-value['tw'][0] / self.sample_time)
1✔
175
                input_ns_forward[key] = round(value['tw'][1] / self.sample_time)
1✔
176
            elif self.sample_time is not None:
1✔
177
                input_ns_backward[key] = max(round(-value['tw'][0] / self.sample_time),-value['sw'][0])
1✔
178
                input_ns_forward[key] = max(round(value['tw'][1] / self.sample_time),value['sw'][1])
1✔
179
            else:
180
                check(value['tw'] == [0,0], RuntimeError, f"Sample time is not defined for input {key}")
×
181
                input_ns_backward[key] = -value['sw'][0]
×
182
                input_ns_forward[key] = value['sw'][1]
×
183
            value['ns'] = [input_ns_backward[key], input_ns_forward[key]]
1✔
184
            value['ntot'] = sum(value['ns'])
1✔
185

186
        self.json['Info']['ns'] = [max(input_ns_backward.values()), max(input_ns_forward.values())]
1✔
187
        self.json['Info']['ntot'] = sum(self.json['Info']['ns'])
1✔
188
        if self.json['Info']['ns'][0] < 0:
1✔
189
            log.warning(
1✔
190
                f"The input is only in the far past the max_samples_backward is: {self.json['Info']['ns'][0]}")
191
        if self.json['Info']['ns'][1] < 0:
1✔
192
            log.warning(
1✔
193
                f"The input is only in the far future the max_sample_forward is: {self.json['Info']['ns'][1]}")
194

195
        for k, v in (self.json['Parameters']|self.json['Constants']).items():
1✔
196
            if 'values' in v:
1✔
197
                window = 'tw' if 'tw' in v.keys() else ('sw' if 'sw' in v.keys() else None)
1✔
198
                if window == 'tw':
1✔
199
                    check(np.array(v['values']).shape[0] == v['tw']/self.sample_time, ValueError,
1✔
200
                      f"{k} has a different number of values for this sample time.")
201
                if v['values'] == "SampleTime":
1✔
NEW
202
                    v['values'] = self.sample_time
×
203

204

205
    def updateParameters(self, model, recurrent=True):
1✔
206
        if model is not None:
1✔
207
            for key in self.json['Parameters'].keys():
1✔
208
                # if recurrent:
209
                #     if key in model.Cell.all_parameters:
210
                #         self.json['Parameters'][key]['values'] = model.Cell.all_parameters[key].tolist()
211
                #         if 'init_fun' in self.json['Parameters'][key]:
212
                #             del self.json['Parameters'][key]['init_fun']
213
                # else:
214
                if key in model.all_parameters:
1✔
215
                    self.json['Parameters'][key]['values'] = model.all_parameters[key].tolist()
1✔
216
                    if 'init_fun' in self.json['Parameters'][key]:
1✔
217
                        del self.json['Parameters'][key]['init_fun']
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc