• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tonegas / nnodely / 18676707115

16 Oct 2025 04:28PM UTC coverage: 96.644% (+0.001%) from 96.643%
18676707115

push

github

MisterMandarino
Merge branch 'develop' of https://github.com/tonegas/nnodely into develop

55 of 55 new or added lines in 3 files covered. (100.0%)

13 existing lines in 3 files now uncovered.

12727 of 13169 relevant lines covered (96.64%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.8
/nnodely/basic/modeldef.py
1
import copy
1✔
2

3
import numpy as np
1✔
4

5
from nnodely.support.utils import check, check_and_get_list
1✔
6
from nnodely.support.jsonutils import merge, subjson_from_model,subjson_from_relation, check_model, get_models_json
1✔
7
from nnodely.basic.relation import MAIN_JSON, Stream, check_names
1✔
8
from nnodely.layers.output import Output
1✔
9
from nnodely.layers.input import Input
1✔
10

11
from nnodely.support.logger import logging, nnLogger
1✔
12
log = nnLogger(__name__, logging.INFO)
1✔
13

14

15
class ModelDef:
1✔
16
    def __init__(self, model_def = MAIN_JSON):
1✔
17
        # Models definition
18
        self.__json_base = copy.deepcopy(model_def)
1✔
19

20
        # Initialize the model definition
21
        self.__json = copy.deepcopy(self.__json_base)
1✔
22
        if "SampleTime" in self.__json['Info']:
1✔
23
            self.__sample_time = self.__json['Info']["SampleTime"]
1✔
24
        else:
25
            self.__sample_time = None
1✔
26

27
    def __contains__(self, key):
1✔
28
        return key in self.__json
1✔
29

30
    def __getitem__(self, key):
1✔
31
        if key in self.__json:
1✔
32
            return self.__json[key]
1✔
33
        else:
34
            return None
1✔
35

36
    def __setitem__(self, key, value):
1✔
UNCOV
37
        self.__json[key] = value
×
38

39
    def __rebuild_json(self, models_names, minimizers):
1✔
40
        new_json = subjson_from_model(self.__json, list(models_names))
1✔
41

42
        if 'Minimizers' in self.__json:
1✔
43
            rel_A = [self.__json['Minimizers'][key]['A'] for key in minimizers]
1✔
44
            rel_B = [self.__json['Minimizers'][key]['B'] for key in minimizers]
1✔
45
            relations_name = set(rel_A) | set(rel_B)
1✔
46
            if len(relations_name) != 0:
1✔
UNCOV
47
                minimizers_json = subjson_from_relation(self.__json, list(relations_name))
×
48
                new_json = merge(new_json, minimizers_json)
×
49

50
        return copy.deepcopy(new_json)
1✔
51

52
    def recurrentInputs(self):
1✔
53
        return {key:value for key, value in self.__json['Inputs'].items() if ('closedLoop' in value.keys() or 'connect' in value.keys())}
1✔
54

55
    def getJson(self, models:list|str|None = None) -> dict:
1✔
56
        if models is None:
1✔
57
            return copy.deepcopy(self.__json)
1✔
58
        else:
59
            json = subjson_from_model(self.__json, models)
1✔
60
            check_model(json)
1✔
61
            return copy.deepcopy(json)
1✔
62

63
    def getSampleTime(self):
1✔
64
        check(self.__sample_time is not None, AttributeError, "Sample time is not defined the model is not neuralized!")
1✔
65
        return self.__sample_time
1✔
66

67
    def isDefined(self):
1✔
68
        return self.__json is not None
1✔
69

70
    def addConnection(self, stream_out:str|Output|Stream, input_in:str|Input, type:str, local:bool = False):
1✔
71
        outputs = self.__json['Outputs']
1✔
72

73
        if isinstance(stream_out, (Output, Stream)):
1✔
74
            stream_name = outputs[stream_out.name] if stream_out.name in outputs.keys() else stream_out.name
1✔
75
        else:
76
            output_name = check_and_get_list(stream_out, set(outputs.keys()),
1✔
77
                                             lambda name: f"The name {name} is not part of the available Outputs")[0]
78
            stream_name = outputs[output_name]
1✔
79

80
        if isinstance(input_in, Input):
1✔
81
            input_name = input_in.name
1✔
82
        else:
83
            input_name = input_in #TODO Add tests
1✔
84

85
        input_name = check_and_get_list(input_name, set(self.__json['Inputs'].keys()),
1✔
86
                                       lambda name: f"The name {name} is not part of the available Inputs")[0]
87
        stream_name = check_and_get_list(stream_name, set(self.__json['Relations'].keys()),
1✔
88
                                        lambda name: f"The name {name} is not part of the available Relations")[0]
89
        self.__json['Inputs'][input_name][type] = stream_name
1✔
90
        self.__json['Inputs'][input_name]['local'] = int(local)
1✔
91

92
    def removeConnection(self, name_list:str|list[str]):
1✔
93
        name_list = check_and_get_list(name_list, set(self.__json['Inputs'].keys()), lambda name: f"The name {name} is not part of the available Inputs")
1✔
94
        for input_in in name_list:
1✔
95
            if 'closedLoop' in self.__json['Inputs'][input_in].keys():
1✔
96
                del self.__json['Inputs'][input_in]['closedLoop']
1✔
97
                del self.__json['Inputs'][input_in]['local']
1✔
98
            elif 'connect' in self.__json['Inputs'][input_in].keys():
1✔
99
                del self.__json['Inputs'][input_in]['connect']
1✔
100
                del self.__json['Inputs'][input_in]['local']
1✔
101
            else:
102
                raise ValueError(f"The input '{input_in}' has no connection or closed loop defined")
1✔
103

104
    def addModel(self, name:str, stream_list):
1✔
105
        if isinstance(stream_list, Output):
1✔
106
            stream_list = [stream_list]
1✔
107

108
        json = MAIN_JSON
1✔
109
        for stream in stream_list:
1✔
110
            json = merge(json, stream.json)
1✔
111
        check_model(json)
1✔
112

113
        if 'Models' not in self.__json:
1✔
114
            self.__json = merge(self.__json, json)
1✔
115
            self.__json['Models'] = name
1✔
116
        else:
117
            models_names = set((self.__json['Models'],)) if type(self.__json['Models']) is str else set(self.__json['Models'].keys())
1✔
118
            check_names(name, models_names, 'Models')
1✔
119
            if type(self.__json['Models']) is str:
1✔
120
                self.__json['Models'] = {self.__json['Models']: get_models_json(self.__json)}
1✔
121
            self.__json = merge(self.__json, json)
1✔
122
            self.__json['Models'][name] = get_models_json(json)
1✔
123

124
    def removeModel(self, name_list):
1✔
125
        if 'Models' not in self.__json:
1✔
UNCOV
126
            raise ValueError("No Models are defined")
×
127
        models_names = {self.__json['Models']} if type(self.__json['Models']) is str else set(self.__json['Models'].keys())
1✔
128
        name_list = check_and_get_list(name_list, models_names, lambda name: f"The name {name} is not part of the available models")
1✔
129
        models_names -= set(name_list)
1✔
130
        minimizers = set(self.__json['Minimizers'].keys()) if 'Minimizers' in self.__json else None
1✔
131
        self.__json = self.__rebuild_json(models_names, minimizers)
1✔
132

133
    def addMinimize(self, name, streamA, streamB, loss_function='mse'):
1✔
134
        if 'Minimizers' not in self.__json:
1✔
135
            self.__json['Minimizers'] = {}
1✔
136
        check_names(name, set(self.__json['Minimizers'].keys()), 'Minimizers')
1✔
137

138
        if isinstance(streamA, str):
1✔
139
            streamA_name = streamA
1✔
140
        else:
141
            check(isinstance(streamA, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
142
            streamA_name = streamA.json['Outputs'][streamA.name] if isinstance(streamA, Output) else streamA.name
1✔
143
            self.__json = merge(self.__json, streamA.json)
1✔
144

145
        if isinstance(streamB, str):
1✔
146
            streamB_name = streamB
1✔
147
        else:
148
            check(isinstance(streamB, (Output, Stream)), TypeError, 'streamA must be an instance of Output or Stream')
1✔
149
            streamB_name = streamB.json['Outputs'][streamB.name] if isinstance(streamB, Output) else streamB.name
1✔
150
            self.__json = merge(self.__json, streamB.json)
1✔
151
        #check(streamA.dim == streamB.dim, ValueError, f'Dimension of streamA={streamA.dim} and streamB={streamB.dim} are not equal.')
152

153
        self.__json['Minimizers'][name] = {}
1✔
154
        self.__json['Minimizers'][name]['A'] = streamA_name
1✔
155
        self.__json['Minimizers'][name]['B'] = streamB_name
1✔
156
        self.__json['Minimizers'][name]['loss'] = loss_function
1✔
157

158
    def removeMinimize(self, name_list):
1✔
159
        if 'Minimizers' not in self.__json:
1✔
UNCOV
160
            raise ValueError("No Minimizers are defined")
×
161
        name_list = check_and_get_list(name_list, self.__json['Minimizers'].keys(), lambda name: f"The name {name} is not part of the available minimizers")
1✔
162
        models_names = {self.__json['Models']} if type(self.__json['Models']) is str else set(self.__json['Models'].keys())
1✔
163
        remaining_minimizers = set(self.__json['Minimizers'].keys()) - set(name_list) if 'Minimizers' in self.__json else None
1✔
164
        self.__json = self.__rebuild_json(models_names, remaining_minimizers)
1✔
165

166
    def setBuildWindow(self, sample_time = None):
1✔
167
        check(self.__json is not None, RuntimeError, "No model is defined!")
1✔
168
        if sample_time is not None:
1✔
169
            check(sample_time > 0, RuntimeError, 'Sample time must be strictly positive!')
1✔
170
            self.__sample_time = sample_time
1✔
171
        else:
172
            if self.__sample_time is None:
1✔
173
                self.__sample_time = 1
1✔
174

175
        self.__json['Info'] = {"SampleTime": self.__sample_time}
1✔
176

177
        check(self.__json['Inputs'] != {}, RuntimeError, "No model is defined!")
1✔
178
        json_inputs = self.__json['Inputs']
1✔
179

180
        input_ns_backward, input_ns_forward = {}, {}
1✔
181
        for key, value in json_inputs.items():
1✔
182
            if 'sw' not in value and 'tw' not in value:
1✔
UNCOV
183
                assert False, f"Input '{key}' has no time window or sample window"
×
184
            if 'sw' not in value and self.__sample_time is not None:
1✔
185
                ## check if value['tw'] is a multiple of sample_time
186
                absolute_tw = abs(value['tw'][0]) + abs(value['tw'][1])
1✔
187
                check(round(absolute_tw % self.__sample_time) == 0, ValueError,
1✔
188
                      f"Time window of input '{key}' is not a multiple of sample time. This network cannot be neuralized")
189
                input_ns_backward[key] = round(-value['tw'][0] / self.__sample_time)
1✔
190
                input_ns_forward[key] = round(value['tw'][1] / self.__sample_time)
1✔
191
            elif self.__sample_time is not None:
1✔
192
                if 'tw' in value:
1✔
193
                    input_ns_backward[key] = max(round(-value['tw'][0] / self.__sample_time), -value['sw'][0])
1✔
194
                    input_ns_forward[key] = max(round(value['tw'][1] / self.__sample_time), value['sw'][1])
1✔
195
                else:
196
                    input_ns_backward[key] = -value['sw'][0]
1✔
197
                    input_ns_forward[key] =  value['sw'][1]
1✔
198
            else:
UNCOV
199
                check(value['tw'] == [0,0], RuntimeError, f"Sample time is not defined for input '{key}'")
×
UNCOV
200
                input_ns_backward[key] = -value['sw'][0]
×
UNCOV
201
                input_ns_forward[key] = value['sw'][1]
×
202
            value['ns'] = [input_ns_backward[key], input_ns_forward[key]]
1✔
203
            value['ntot'] = sum(value['ns'])
1✔
204

205
        self.__json['Info']['ns'] = [max(input_ns_backward.values()), max(input_ns_forward.values())]
1✔
206
        self.__json['Info']['ntot'] = sum(self.__json['Info']['ns'])
1✔
207
        if self.__json['Info']['ns'][0] < 0:
1✔
208
            log.warning(
1✔
209
                f"The input is only in the far past the max_samples_backward is: {self.__json['Info']['ns'][0]}")
210
        if self.__json['Info']['ns'][1] < 0:
1✔
211
            log.warning(
1✔
212
                f"The input is only in the far future the max_sample_forward is: {self.__json['Info']['ns'][1]}")
213

214
        for k, v in (self.__json['Parameters'] | self.__json['Constants']).items():
1✔
215
            if 'values' in v:
1✔
216
                window = 'tw' if 'tw' in v.keys() else ('sw' if 'sw' in v.keys() else None)
1✔
217
                if window == 'tw':
1✔
218
                    check(np.array(v['values']).shape[0] == v['tw'] / self.__sample_time, ValueError,
1✔
219
                      f"{k} has a different number of values for this sample time.")
220
                if v['values'] == "SampleTime":
1✔
221
                    v['values'] = self.__sample_time
1✔
222

223
    def updateParameters(self, model = None, *, clear_model = False):
1✔
224
        if clear_model:
1✔
225
            for key in self.__json['Parameters'].keys():
1✔
226
                if 'init_values' in self.__json['Parameters'][key]:
1✔
227
                    self.__json['Parameters'][key]['values'] = self.__json['Parameters'][key]['init_values']
1✔
228
                elif 'values' in self.__json['Parameters'][key]:
1✔
229
                    del self.__json['Parameters'][key]['values']
1✔
230
        elif model is not None:
1✔
231
            for key in self.__json['Parameters'].keys():
1✔
232
                if key in model.all_parameters:
1✔
233
                    self.__json['Parameters'][key]['values'] = model.all_parameters[key].tolist()
1✔
234

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc