• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tonegas / nnodely / 14360588343

09 Apr 2025 03:06PM UTC coverage: 97.602% (+0.6%) from 97.035%
14360588343

Pull #86

github

web-flow
Merge fec81b82f into e9c323c4f
Pull Request #86: Smallclasses

2292 of 2419 new or added lines in 54 files covered. (94.75%)

3 existing lines in 1 file now uncovered.

11683 of 11970 relevant lines covered (97.6%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.81
/nnodely/basic/modeldef.py
1
import copy
1✔
2

3
import numpy as np
1✔
4

5
from nnodely.support.utils import check, merge
1✔
6
from nnodely.basic.relation import MAIN_JSON, Stream
1✔
7
from nnodely.layers.output import Output
1✔
8

9
from nnodely.support.logger import logging, nnLogger
1✔
10
log = nnLogger(__name__, logging.INFO)
1✔
11

12
class ModelDef():
1✔
13
    def __init__(self, model_def = MAIN_JSON):
1✔
14
        # Models definition
15
        self.__json_base = copy.deepcopy(model_def)
1✔
16

17
        # Inizialize the model definition
18
        self.__json = copy.deepcopy(self.__json_base)
1✔
19
        if "SampleTime" in self.__json['Info']:
1✔
20
            self.__sample_time = self.__json['Info']["SampleTime"]
1✔
21
        else:
22
            self.__sample_time = None
1✔
23
        self.__model_dict = {}
1✔
24
        self.__minimize_dict = {}
1✔
25
        self.__update_state_dict = {}
1✔
26

27
    def __contains__(self, key):
1✔
28
        return key in self.__json
1✔
29

30
    def __getitem__(self, key):
1✔
31
        return self.__json[key]
1✔
32

33
    def __setitem__(self, key, value):
1✔
NEW
34
        self.__json[key] = value
×
35

36
    #TODO to remove when getJson takes a model list as argment
37
    def getModelDict(self):
1✔
38
        return copy.deepcopy(self.__model_dict)
1✔
39

40
    def getJson(self):
1✔
41
        return copy.deepcopy(self.__json)
1✔
42

43
    def getSampleTime(self):
1✔
44
        check(self.__sample_time is not None, AttributeError, "Sample time is not defined the model is not neuralized!")
1✔
45
        return self.__sample_time
1✔
46

47
    def isDefined(self):
1✔
48
        return self.__json is not None
1✔
49

50
    def update(self, model_def = None, model_dict = None, minimize_dict = None, update_state_dict = None):
1✔
51
        self.__json = copy.deepcopy(model_def) if model_def is not None else copy.deepcopy(self.__json_base)
1✔
52
        model_dict = copy.deepcopy(model_dict) if model_dict is not None else self.__model_dict
1✔
53
        minimize_dict = copy.deepcopy(minimize_dict) if minimize_dict is not None else self.__minimize_dict
1✔
54
        update_state_dict = copy.deepcopy(update_state_dict) if update_state_dict is not None else self.__update_state_dict
1✔
55

56
        # Add models to the model_def
57
        for key, stream_list in model_dict.items():
1✔
58
            for stream in stream_list:
1✔
59
                self.__json = merge(self.__json, stream.json)
1✔
60
        if len(model_dict) > 1:
1✔
61
            if 'Models' not in self.__json:
1✔
62
                self.__json['Models'] = {}
1✔
63
            for model_name, model_params in model_dict.items():
1✔
64
                self.__json['Models'][model_name] = {'Inputs': [], 'States': [], 'Outputs': [], 'Parameters': [],
1✔
65
                                                        'Constants': []}
66
                parameters, constants, inputs, states = set(), set(), set(), set()
1✔
67
                for param in model_params:
1✔
68
                    self.__json['Models'][model_name]['Outputs'].append(param.name)
1✔
69
                    parameters |= set(param.json['Parameters'].keys())
1✔
70
                    constants |= set(param.json['Constants'].keys())
1✔
71
                    inputs |= set(param.json['Inputs'].keys())
1✔
72
                    states |= set(param.json['States'].keys())
1✔
73
                self.__json['Models'][model_name]['Parameters'] = list(parameters)
1✔
74
                self.__json['Models'][model_name]['Constants'] = list(constants)
1✔
75
                self.__json['Models'][model_name]['Inputs'] = list(inputs)
1✔
76
                self.__json['Models'][model_name]['States'] = list(states)
1✔
77
        elif len(model_dict) == 1:
1✔
78
            self.__json['Models'] = list(model_dict.keys())[0]
1✔
79

80
        if 'Minimizers' not in self.__json:
1✔
81
            self.__json['Minimizers'] = {}
1✔
82
        for key, minimize in minimize_dict.items():
1✔
83
            self.__json = merge(self.__json, minimize['A'].json)
1✔
84
            self.__json = merge(self.__json, minimize['B'].json)
1✔
85
            self.__json['Minimizers'][key] = {}
1✔
86
            self.__json['Minimizers'][key]['A'] = minimize['A'].name
1✔
87
            self.__json['Minimizers'][key]['B'] = minimize['B'].name
1✔
88
            self.__json['Minimizers'][key]['loss'] = minimize['loss']
1✔
89

90
        for key, update_state in update_state_dict.items():
1✔
91
            self.__json = merge(self.__json, update_state.json)
1✔
92

93
        if "SampleTime" in self.__json['Info']:
1✔
NEW
94
            self.__sample_time = self.__json['Info']["SampleTime"]
×
95

96

97
    def __update_state(self, stream_out, state_list_in, UpdateState):
1✔
98
        from nnodely.layers.input import  State
1✔
99
        if type(state_list_in) is not list:
1✔
100
            state_list_in = [state_list_in]
1✔
101
        for state_in in state_list_in:
1✔
102
            check(isinstance(stream_out, (Output, Stream)), TypeError,
1✔
103
                  f"The {stream_out} must be a Stream or Output and not a {type(stream_out)}.")
104
            check(type(state_in) is State, TypeError,
1✔
105
                  f"The {state_in} must be a State and not a {type(state_in)}.")
106
            check(stream_out.dim['dim'] == state_in.dim['dim'], ValueError,
1✔
107
                  f"The dimension of {stream_out.name} is not equal to the dimension of {state_in.name} ({stream_out.dim['dim']}!={state_in.dim['dim']}).")
108
            if type(stream_out) is Output:
1✔
109
                stream_name = self.__json['Outputs'][stream_out.name]
1✔
110
                stream_out = Stream(stream_name,stream_out.json,stream_out.dim, 0)
1✔
111
            self.__update_state_dict[state_in.name] = UpdateState(stream_out, state_in)
1✔
112

113
    def addConnect(self, stream_out, state_list_in):
1✔
114
        from nnodely.layers.input import Connect
1✔
115
        self.__update_state(stream_out, state_list_in, Connect)
1✔
116
        self.update()
1✔
117

118
    def addClosedLoop(self, stream_out, state_list_in):
1✔
119
        from nnodely.layers.input import ClosedLoop
1✔
120
        self.__update_state(stream_out, state_list_in, ClosedLoop)
1✔
121
        self.update()
1✔
122

123
    def addModel(self, name, stream_list):
1✔
124
        if isinstance(stream_list, (Output,Stream)):
1✔
125
            stream_list = [stream_list]
1✔
126
        if type(stream_list) is list:
1✔
127
            check(name not in self.__model_dict.keys(), ValueError, f"The name '{name}' of the model is already used")
1✔
128
            self.__model_dict[name] = copy.deepcopy(stream_list)
1✔
129
        else:
130
            raise TypeError(f'stream_list is type {type(stream_list)} but must be an Output or Stream or a list of them')
×
131
        self.update()
1✔
132

133
    def removeModel(self, name_list):
1✔
134
        if type(name_list) is str:
1✔
135
            name_list = [name_list]
1✔
136
        if type(name_list) is list:
1✔
137
            for name in name_list:
1✔
138
                check(name in self.__model_dict, IndexError, f"The name {name} is not part of the available models")
1✔
139
                del self.__model_dict[name]
1✔
140
        self.update()
1✔
141

142
    def addMinimize(self, name, streamA, streamB, loss_function='mse'):
1✔
143
        check(isinstance(streamA, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
144
        check(isinstance(streamB, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
145
        #check(streamA.dim == streamB.dim, ValueError, f'Dimension of streamA={streamA.dim} and streamB={streamB.dim} are not equal.')
146
        self.__minimize_dict[name]={'A':copy.deepcopy(streamA), 'B': copy.deepcopy(streamB), 'loss':loss_function}
1✔
147
        self.update()
1✔
148

149
    def removeMinimize(self, name_list):
1✔
150
        if type(name_list) is str:
1✔
151
            name_list = [name_list]
1✔
152
        if type(name_list) is list:
1✔
153
            for name in name_list:
1✔
154
                check(name in self.__minimize_dict, IndexError, f"The name {name} is not part of the available minimuzes")
1✔
155
                del self.__minimize_dict[name]
1✔
156
        self.update()
1✔
157

158
    def setBuildWindow(self, sample_time = None):
1✔
159
        check(self.__json is not None, RuntimeError, "No model is defined!")
1✔
160
        if sample_time is not None:
1✔
161
            check(sample_time > 0, RuntimeError, 'Sample time must be strictly positive!')
1✔
162
            self.__sample_time = sample_time
1✔
163
        else:
164
            if self.__sample_time is None:
1✔
165
                self.__sample_time = 1
1✔
166

167
        self.__json['Info'] = {"SampleTime": self.__sample_time}
1✔
168

169
        check(self.__json['Inputs'] | self.__json['States'] != {}, RuntimeError, "No model is defined!")
1✔
170
        json_inputs = self.__json['Inputs'] | self.__json['States']
1✔
171

172
        # for key,value in self.json['States'].items():
173
        #     check(closedloop_name in self.json['States'][key].keys() or connect_name in self.json['States'][key].keys(),
174
        #           KeyError, f'Update function is missing for state {key}. Use Connect or ClosedLoop to update the state.')
175

176
        input_tw_backward, input_tw_forward, input_ns_backward, input_ns_forward = {}, {}, {}, {}
1✔
177
        for key, value in json_inputs.items():
1✔
178
            if value['sw'] == [0,0] and value['tw'] == [0,0]:
1✔
179
                assert(False), f"Input '{key}' has no time window or sample window"
×
180
            if value['sw'] == [0, 0] and self.__sample_time is not None:
1✔
181
                ## check if value['tw'] is a multiple of sample_time
182
                absolute_tw = abs(value['tw'][0]) + abs(value['tw'][1])
1✔
183
                check(round(absolute_tw % self.__sample_time) == 0, ValueError,
1✔
184
                      f"Time window of input '{key}' is not a multiple of sample time. This network cannot be neuralized")
185
                input_ns_backward[key] = round(-value['tw'][0] / self.__sample_time)
1✔
186
                input_ns_forward[key] = round(value['tw'][1] / self.__sample_time)
1✔
187
            elif self.__sample_time is not None:
1✔
188
                input_ns_backward[key] = max(round(-value['tw'][0] / self.__sample_time), -value['sw'][0])
1✔
189
                input_ns_forward[key] = max(round(value['tw'][1] / self.__sample_time), value['sw'][1])
1✔
190
            else:
191
                check(value['tw'] == [0,0], RuntimeError, f"Sample time is not defined for input '{key}'")
×
192
                input_ns_backward[key] = -value['sw'][0]
×
193
                input_ns_forward[key] = value['sw'][1]
×
194
            value['ns'] = [input_ns_backward[key], input_ns_forward[key]]
1✔
195
            value['ntot'] = sum(value['ns'])
1✔
196

197
        self.__json['Info']['ns'] = [max(input_ns_backward.values()), max(input_ns_forward.values())]
1✔
198
        self.__json['Info']['ntot'] = sum(self.__json['Info']['ns'])
1✔
199
        if self.__json['Info']['ns'][0] < 0:
1✔
200
            log.warning(
1✔
201
                f"The input is only in the far past the max_samples_backward is: {self.__json['Info']['ns'][0]}")
202
        if self.__json['Info']['ns'][1] < 0:
1✔
203
            log.warning(
1✔
204
                f"The input is only in the far future the max_sample_forward is: {self.__json['Info']['ns'][1]}")
205

206
        for k, v in (self.__json['Parameters'] | self.__json['Constants']).items():
1✔
207
            if 'values' in v:
1✔
208
                window = 'tw' if 'tw' in v.keys() else ('sw' if 'sw' in v.keys() else None)
1✔
209
                if window == 'tw':
1✔
210
                    check(np.array(v['values']).shape[0] == v['tw'] / self.__sample_time, ValueError,
1✔
211
                      f"{k} has a different number of values for this sample time.")
212
                if v['values'] == "SampleTime":
1✔
213
                    v['values'] = self.__sample_time
1✔
214

215
    def updateParameters(self, model):
1✔
216
        if model is not None:
1✔
217
            for key in self.__json['Parameters'].keys():
1✔
218
                if key in model.all_parameters:
1✔
219
                    self.__json['Parameters'][key]['values'] = model.all_parameters[key].tolist()
1✔
220
                    if 'init_fun' in self.__json['Parameters'][key]:
1✔
221
                        del self.__json['Parameters'][key]['init_fun']
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc