• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WassimTenachi / PhySO / #13

10 Jun 2024 12:28AM UTC coverage: 52.052% (-30.3%) from 82.385%
#13

push

coveralls-python

WassimTenachi
Update requirements.txt

2980 of 5725 relevant lines covered (52.05%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.82
/physo/physym/tests/execute_UnitTest.py
1
import unittest
1✔
2
import numpy as np
1✔
3
import time as time
1✔
4
import torch as torch
1✔
5
import sympy as sympy
1✔
6
import matplotlib.pyplot as plt
×
7

8
# Internal imports
9
from physo.physym import execute as Exec
1✔
10
from physo.physym import library as Lib
1✔
11
from physo.physym.functions import data_conversion, data_conversion_inv
×
12

13
class ExecuteProgramTest(unittest.TestCase):
×
14

15
    # Test program execution on a complicated function
16
    def test_ExecuteProgram (self):
×
17

18
        DEVICE = 'cpu'
1✔
19
        if torch.cuda.is_available():
×
20
            DEVICE = 'cuda'
×
21

22
        # DATA
23
        N = int(1e6)
×
24
        # input var
25
        x = data_conversion  (np.linspace(0.04, 4, N)  ).to(DEVICE)
1✔
26
        v = data_conversion  (np.linspace(0.10, 10, N) ).to(DEVICE)
1✔
27
        t = data_conversion  (np.linspace(0.06, 6, N)  ).to(DEVICE)
1✔
28
        data = torch.stack((x, v, t), axis=0)
×
29

30
        # consts
31
        pi = data_conversion (np.pi).to(DEVICE)
1✔
32
        const1 = data_conversion (1.).to(DEVICE)
1✔
33
        c  = data_conversion (3e8).to(DEVICE)
1✔
34
        M  = data_conversion (1e6).to(DEVICE)
×
35

36
        # LIBRARY CONFIG
37
        args_make_tokens = {
1✔
38
                        # operations
39
                        "op_names"             : "all",  # or ["mul", "neg", "inv", "sin"]
40
                        "use_protected_ops"    : False,
41
                        # input variables
42
                        "input_var_ids"        : {"x" : 0         , "v" : 1          , "t" : 2,        },
43
                        "input_var_units"      : {"x" : [1, 0, 0] , "v" : [1, -1, 0] , "t" : [0, 1, 0] },
44
                        "input_var_complexity" : {"x" : 0.        , "v" : 1.         , "t" : 0.,       },
45
                        # constants
46
                        "constants"            : {"pi" : pi        , "c" : c         , "M" : M         , "1" : const1    },
47
                        "constants_units"      : {"pi" : [0, 0, 0] , "c" : [1, -1, 0], "M" : [0, 0, 1] , "1" : [0, 0, 0] },
48
                        "constants_complexity" : {"pi" : 0.        , "c" : 0.        , "M" : 1.        , "1" : 1.        },
49
                           }
50
        my_lib = Lib.Library(args_make_tokens = args_make_tokens,
1✔
51
                             superparent_units = [1, -2, 1], superparent_name = "y")
52

53
        # PROGRAM
54
        test_program_str = ["mul", "mul", "M", "n2", "c", "sub", "inv", "sqrt", "sub", "1", "div", "n2", "v", "n2",
1✔
55
                            "c", "cos", "div", "sub", "1", "div", "v", "c", "div", "div", "x", "t", "c"]
56
        test_program     = [my_lib.lib_name_to_token[name] for name in test_program_str]
×
57
        # EXPECTED RES
58
        expected_res     = M*(c**2)*(1./torch.sqrt(1.-(v**2)/(c**2))-torch.cos((1.-(v/c))/((x/t)/c)))
×
59

60
        N = 100
×
61
        # EXECUTION
62
        t0 = time.perf_counter()
1✔
63
        for _ in range (N):
1✔
64
            res = Exec.ExecuteProgram(input_var_data = data, program_tokens = test_program, )
1✔
65
        t1 = time.perf_counter()
1✔
66
        print("\nExecuteProgram time = %.3f ms"%((t1-t0)*1e3/N))
×
67

68
        # EXECUTION (wo tokens)
69
        t0 = time.perf_counter()
1✔
70
        for _ in range (N):
1✔
71
            expected_res     = M*(c**2)*(1./torch.sqrt(1.-(v**2)/(c**2))-torch.cos((1.-(v/c))/((x/t)/c)))
1✔
72
        t1 = time.perf_counter()
1✔
73
        print("\nExecuteProgram time (wo tokens) = %.3f ms"%((t1-t0)*1e3/N))
×
74

75
        # TEST
76
        works_bool = np.array_equal(data_conversion_inv(res.cpu()), data_conversion_inv(expected_res.cpu()),)
1✔
77
        self.assertTrue(works_bool)
1✔
78
        return None
×
79

80
    # Test program execution on a complicated function
81
    def test_ExecuteProgram_with_free_consts (self):
×
82

83
        DEVICE = 'cpu'
1✔
84
        if torch.cuda.is_available():
×
85
            DEVICE = 'cuda'
×
86

87
        # DATA
88
        N = int(1e6)
×
89

90
        # input var
91
        x = data_conversion  (np.linspace(0.04, 4, N)  ).to(DEVICE)
1✔
92
        v = data_conversion  (np.linspace(0.10, 10, N) ).to(DEVICE)
1✔
93
        t = data_conversion  (np.linspace(0.06, 6, N)  ).to(DEVICE)
1✔
94
        data = torch.stack((x, v, t), axis=0)
×
95

96
        # consts
97
        pi = data_conversion (np.pi).to(DEVICE)
1✔
98
        const1 = data_conversion (1.).to(DEVICE)
×
99

100
        # free consts
101
        c  = data_conversion (3e8).to(DEVICE)
1✔
102
        M  = data_conversion (1e6).to(DEVICE)
1✔
103
        free_const_values = torch.stack((M, c), axis=0)
×
104
        # (M, c) in alphabetical order as library will give them ids based on that order
105

106
        # LIBRARY CONFIG
107
        args_make_tokens = {
1✔
108
                        # operations
109
                        "op_names"             : "all",  # or ["mul", "neg", "inv", "sin"]
110
                        "use_protected_ops"    : False,
111
                        # input variables
112
                        "input_var_ids"        : {"x" : 0         , "v" : 1          , "t" : 2,        },
113
                        "input_var_units"      : {"x" : [1, 0, 0] , "v" : [1, -1, 0] , "t" : [0, 1, 0] },
114
                        "input_var_complexity" : {"x" : 0.        , "v" : 1.         , "t" : 0.,       },
115
                        # constants
116
                        "constants"            : {"pi" : pi        , "1" : const1    },
117
                        "constants_units"      : {"pi" : [0, 0, 0] , "1" : [0, 0, 0] },
118
                        "constants_complexity" : {"pi" : 0.        , "1" : 1.        },
119
                        # free constants
120
                        "free_constants"            : {"c"              , "M"             },
121
                        "free_constants_init_val"   : {"c" : 1.         , "M" : 1.        },
122
                        "free_constants_units"      : {"c" : [1, -1, 0] , "M" : [0, 0, 1] },
123
                        "free_constants_complexity" : {"c" : 0.         , "M" : 1.        },
124
                           }
125
        my_lib = Lib.Library(args_make_tokens = args_make_tokens,
1✔
126
                             superparent_units = [1, -2, 1], superparent_name = "y")
127

128
        # PROGRAM
129
        test_program_str = ["mul", "mul", "M", "n2", "c", "sub", "inv", "sqrt", "sub", "1", "div", "n2", "v", "n2",
1✔
130
                            "c", "cos", "div", "sub", "1", "div", "v", "c", "div", "div", "x", "t", "c"]
131
        test_program     = [my_lib.lib_name_to_token[name] for name in test_program_str]
×
132
        # EXPECTED RES
133
        expected_res     = M*(c**2)*(1./torch.sqrt(1.-(v**2)/(c**2))-torch.cos((1.-(v/c))/((x/t)/c)))
×
134

135
        N = 100
×
136
        # EXECUTION
137
        t0 = time.perf_counter()
1✔
138
        for _ in range (N):
1✔
139
            res = Exec.ExecuteProgram(input_var_data = data, class_free_consts_vals = free_const_values, program_tokens = test_program, )
1✔
140
        t1 = time.perf_counter()
1✔
141
        print("\nExecuteProgram time = %.3f ms"%((t1-t0)*1e3/N))
×
142

143
        # EXECUTION (wo tokens)
144
        t0 = time.perf_counter()
1✔
145
        for _ in range (N):
1✔
146
            expected_res     = M*(c**2)*(1./torch.sqrt(1.-(v**2)/(c**2))-torch.cos((1.-(v/c))/((x/t)/c)))
1✔
147
        t1 = time.perf_counter()
1✔
148
        print("\nExecuteProgram time (wo tokens) = %.3f ms"%((t1-t0)*1e3/N))
×
149

150
        # TEST
151
        works_bool = np.array_equal(data_conversion_inv(res.cpu()), data_conversion_inv(expected_res.cpu()),)
1✔
152
        self.assertTrue(works_bool)
1✔
153
        return None
×
154

155
    # Test program execution in Class SR scenario
156
    def test_ExecuteProgram_with_class_and_spe_free_consts (self):
×
157

158
        DEVICE = 'cpu'
1✔
159
        if torch.cuda.is_available():
×
160
            DEVICE = 'cuda'
1✔
161

162
        # -------------------------------------- Making fake datasets --------------------------------------
163

164
        multi_X = []
1✔
165
        for n_samples in [90, 100, 110]:
1✔
166
            x1 = np.linspace(0, 10, n_samples)
×
167
            x2 = np.linspace(0, 1 , n_samples)
1✔
168
            X = np.stack((x1,x2),axis=0)
1✔
169
            X = torch.tensor(X).to(DEVICE)
1✔
170
            multi_X.append(X)
×
171
        multi_X = multi_X*10                         # (n_realizations,) of (n_dim, [n_samples depends on dataset],)
1✔
172

173
        n_samples_per_dataset = np.array([X.shape[1] for X in multi_X])
×
174
        n_all_samples = n_samples_per_dataset.sum()
×
175
        n_realizations = len(multi_X)
1✔
176
        def flatten_multi_data (multi_data,):
1✔
177
            """
178
            Flattens multiple datasets into a single one for vectorized evaluation.
179
            Parameters
180
            ----------
181
            multi_data : list of length (n_realizations,) of torch.tensor of shape (..., [n_samples depends on dataset],)
182
                List of datasets to be flattened.
183
            Returns
184
            -------
185
            torch.tensor of shape (..., n_all_samples)
186
                Flattened data (n_all_samples = sum([n_samples depends on dataset])).
187
            """
188
            flattened_data = torch.cat(multi_data, axis=-1) # (..., n_all_samples)
1✔
189
            return flattened_data
×
190

191
        def unflatten_multi_data (flattened_data):
1✔
192
            """
193
            Unflattens a single data into multiple ones.
194
            Parameters
195
            ----------
196
            flattened_data : torch.tensor of shape (..., n_all_samples)
197
                Flattened data (n_all_samples = sum([n_samples depends on dataset])).
198
            Returns
199
            -------
200
            list of len (n_realizations,) of torch.tensor of shape (..., [n_samples depends on dataset],)
201
                Unflattened data.
202
            """
203
            return list(torch.split(flattened_data, n_samples_per_dataset.tolist(), dim=-1)) # (n_realizations,) of (..., [n_samples depends on dataset],)
×
204

205
        # y_weights_per_dataset = np.array([0, 0.001, 1.0]*10) # Shows weights work
206
        y_weights_per_dataset = torch.tensor(np.array([1., 1., 1.]*10))
×
207
        multi_y_weights = [torch.full(size=(n_samples_per_dataset[i],), fill_value=y_weights_per_dataset[i]) for i in range (n_realizations)]
×
208
        y_weights_flatten = flatten_multi_data(multi_y_weights)
×
209

210
        multi_X_flatten = flatten_multi_data(multi_X)  # (n_dim, n_all_samples)
×
211

212
        # Making fake ideal parameters
213
        # n_spe_params   = 3
214
        # n_class_params = 2
215
        random_shift       = (np.random.rand(n_realizations,3)-0.5)*0.8
×
216
        ideal_spe_params   = torch.tensor(np.array([1.123, 0.345, 0.116]) + random_shift) # (n_realizations, n_spe_params,)
×
217
        ideal_class_params = torch.tensor(np.array([1.389, 1.005]))                       # (n_class_params, )
×
218

219
        ideal_spe_params_flatten = torch.cat(
×
220
            [torch.tile(ideal_spe_params[i], (n_samples_per_dataset[i],1)).transpose(0,1) for i in range (n_realizations)], # (n_realizations,) of (n_spe_params, [n_samples depends on dataset],)
221
            axis = 1
222
        ) # (n_spe_params, n_all_samples)
223

224
        ideal_class_params_flatten = torch.tile(ideal_class_params, (n_all_samples,1)).transpose(0,1) # (n_class_params, n_all_samples)
×
225

226
        def trial_func (X, params, class_params):
×
227
            y = params[0]*torch.exp(-params[1]*X[0])*torch.cos(class_params[0]*X[0]+params[2]) + class_params[1]*X[1]
×
228
            return y
×
229

230
        y_ideals_flatten = trial_func (multi_X_flatten, ideal_spe_params_flatten, ideal_class_params_flatten) # (n_all_samples,)
×
231
        multi_y_ideals   = unflatten_multi_data(y_ideals_flatten)                                            # (n_realizations,) of (n_samples depends on dataset,)
×
232

233

234
        # params[0]*torch.exp(-params[1]*X[0])*torch.cos(class_params[0]*X[0]+params[2]) + class_params[1]*X[1]
235
        # k0 * exp(-k1 * t) * cos(c0 * t + k2) + c1 * l
236
        # "add", "mul", "mul", "k0", "exp", "mul", "neg", "k1", "t", "cos", "add", "mul", "c0", "t", "k2", "mul", "c1", "l"
237

238
        k0_init = [9,10,11]*10 # np.full(n_realizations, 1.)
×
239
        # consts
240
        pi     = data_conversion (np.pi) .to(DEVICE)
×
241
        const1 = data_conversion (1.)    .to(DEVICE)
×
242

243
        # LIBRARY CONFIG
244
        args_make_tokens = {
×
245
                        # operations
246
                        "op_names"             : "all",
247
                        "use_protected_ops"    : True,
248
                        # input variables
249
                        "input_var_ids"        : {"t" : 0         , "l" : 1          },
250
                        "input_var_units"      : {"t" : [1, 0, 0] , "l" : [0, 1, 0]  },
251
                        "input_var_complexity" : {"t" : 0.        , "l" : 1.         },
252
                        # constants
253
                        "constants"            : {"pi" : pi        , "const1" : const1    },
254
                        "constants_units"      : {"pi" : [0, 0, 0] , "const1" : [0, 0, 0] },
255
                        "constants_complexity" : {"pi" : 1.        , "const1" : 1.        },
256
                        # free constants
257
                        "class_free_constants"            : {"c0"              , "c1"               },
258
                        "class_free_constants_init_val"   : {"c0" : 21.        , "c1"  : 22.         },
259
                        "class_free_constants_units"      : {"c0" : [-1, 0, 0] , "c1"  : [0, -1, 0] },
260
                        "class_free_constants_complexity" : {"c0" : 1.         , "c1"  : 1.         },
261
                        # free constants
262
                        "spe_free_constants"            : {"k0"              , "k1"               , "k2"               },
263
                        "spe_free_constants_init_val"   : {"k0" : k0_init    , "k1"  : 2.         , "k2"  : 3.         },
264
                        "spe_free_constants_units"      : {"k0" : [0, 0, 0]  , "k1"  : [-1, 0, 0] , "k2"  : [0, 0, 0]  },
265
                        "spe_free_constants_complexity" : {"k0" : 1.         , "k1"  : 1.         , "k2"  : 1.         },
266
                           }
267
        my_lib = Lib.Library(args_make_tokens = args_make_tokens,
×
268
                             superparent_units = [0, 0, 0], superparent_name = "y")
269

270
        # TEST PROGRAMS
271
        test_programs_idx = []
×
272
        test_prog_str_0 = ["add", "mul", "mul", "k0"  , "exp", "mul", "neg", "k1", "t", "cos", "add", "mul", "c0", "t", "k2", "mul", "c1", "l", ]
×
273
        test_tokens_0 = [my_lib.lib_name_to_token[name] for name in test_prog_str_0]
×
274

275
        # Test execution on all datasets in flattened form
276
        N = 1000
×
277
        t0 = time.perf_counter()
×
278
        for _ in range (N):
×
279
            y_computed_flatten = Exec.ExecuteProgram(input_var_data         = multi_X_flatten,
×
280
                                                     class_free_consts_vals = ideal_class_params_flatten,
281
                                                     spe_free_consts_vals   = ideal_spe_params_flatten,
282
                                                     program_tokens         = test_tokens_0, )
283
        t1 = time.perf_counter()
×
284
        print("\nExecuteProgram time (flattened class SR) = %.3f ms"%((t1-t0)*1e3/N))
×
285
        #multi_y_computed = unflatten_multi_data(y_computed_flatten)
286
        works_bool = (y_computed_flatten == y_ideals_flatten).all()
×
287
        self.assertTrue(works_bool)
×
288

289
        # Test execution on all datasets but one by one
290
        t0 = time.perf_counter()
×
291
        for _ in range (N):
×
292
            multi_y_computed = []
×
293
            for i in range(n_realizations):
×
294
                y_computed = Exec.ExecuteProgram(input_var_data         = multi_X[i],
×
295
                                                 class_free_consts_vals = ideal_class_params,
296
                                                 spe_free_consts_vals   = ideal_spe_params[i],
297
                                                 program_tokens         = test_tokens_0, )
298
                multi_y_computed.append(y_computed)
×
299
        t1 = time.perf_counter()
×
300
        print("\nExecuteProgram time (one-by-one class SR) = %.3f ms"%((t1-t0)*1e3/N))
×
301

302
        for i in range(n_realizations):
×
303
            works_bool = (multi_y_computed[i] == multi_y_ideals[i]).all()
×
304
            self.assertTrue(works_bool)
×
305

306
        # Sanity plot
307
        # fig, ax = plt.subplots(1,1,figsize=(10,5))
308
        # for i in range(n_realizations):
309
        #     ax.plot(multi_X[i][0], multi_y_ideals   [i].cpu().detach().numpy(), 'o', )
310
        #     ax.plot(multi_X[i][0], multi_y_computed [i].cpu().detach().numpy(), 'r-',)
311
        # ax.legend()
312
        # plt.show()
313
        # for i in range(n_realizations):
314
        #     mse = torch.mean((multi_y_computed[i] - multi_y_ideals[i])**2)
315
        #     print("%i, mse = %f"%(i, mse))
316

317
        return None
×
318

319
    # Test program infix notation on a complicated function
320
    def test_ComputeInfixNotation(self):
×
321

322
        # LIBRARY CONFIG
323
        args_make_tokens = {
×
324
                        # operations
325
                        "op_names"             : "all",  # or ["mul", "neg", "inv", "sin"]
326
                        "use_protected_ops"    : True,
327
                        # input variables
328
                        "input_var_ids"        : {"x" : 0         , "v" : 1          , "t" : 2,        },
329
                        "input_var_units"      : {"x" : [1, 0, 0] , "v" : [1, -1, 0] , "t" : [0, 1, 0] },
330
                        "input_var_complexity" : {"x" : 0.        , "v" : 1.         , "t" : 0.,       },
331
                        # constants
332
                        "constants"            : {"pi" : np.pi     , "c" : 3e8       , "M" : 1e6       , "1" : 1         },
333
                        "constants_units"      : {"pi" : [0, 0, 0] , "c" : [1, -1, 0], "M" : [0, 0, 1] , "1" : [0, 0, 0] },
334
                        "constants_complexity" : {"pi" : 0.        , "c" : 0.        , "M" : 1.        , "1" : 1.        },
335
                            }
336
        my_lib = Lib.Library(args_make_tokens = args_make_tokens,
×
337
                             superparent_units = [1, -2, 1], superparent_name = "y")
338

339
        # TEST PROGRAM
340
        test_program_str = ["mul", "mul", "M", "n2", "c", "sub", "inv", "sqrt", "sub", "1", "div", "n2", "v", "n2",
×
341
                            "c", "cos", "div", "sub", "1", "div", "v", "c", "pi"]
342
        test_program     = np.array([my_lib.lib_name_to_token[tok_str] for tok_str in test_program_str])
×
343
        # Infix output
344
        t0 = time.perf_counter()
×
345
        N = 100
×
346
        for _ in range (N):
×
347
            infix_str = Exec.ComputeInfixNotation(test_program)
×
348
        t1 = time.perf_counter()
×
349
        print("\nComputeInfixNotation time = %.3f ms"%((t1-t0)*1e3/N))
×
350
        infix = sympy.parsing.sympy_parser.parse_expr(infix_str)
×
351
        # Expected infix output
352
        expected_str = "M*(c**2.)*(1./((1.-(v**2)/(c**2))**0.5)-cos((1.-(v/c))/pi))"
×
353
        expected = sympy.parsing.sympy_parser.parse_expr(expected_str)
×
354
        # difference
355
        diff = sympy.simplify(infix - expected, rational = True)
×
356
        works_bool = diff == 0
×
357
        self.assertTrue(works_bool)
×
358

359
if __name__ == '__main__':
×
360
    unittest.main(verbosity=2)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc