• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WassimTenachi / PhySO / #13

10 Jun 2024 12:28AM UTC coverage: 52.052% (-30.3%) from 82.385%
#13

push

coveralls-python

WassimTenachi
Update requirements.txt

2980 of 5725 relevant lines covered (52.05%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.85
/physo/physym/batch.py
1
import numpy as np
1✔
2

3
# Internal imports
4
from physo.physym import token
×
5
from physo.physym import library
1✔
6
from physo.physym import prior
1✔
7
from physo.physym import dataset
1✔
8
from physo.physym import vect_programs as VProg
1✔
9

10
# Embedding output in SR interface
11
INTERFACE_UNITS_AVAILABLE   = 1.
1✔
12
INTERFACE_UNITS_UNAVAILABLE = 0.
×
13
INTERFACE_UNITS_UNAVAILABLE_FILLER = lambda shape: np.random.uniform(size=shape, low=-4, high=4)
×
14

15
class Batch:
1✔
16
    """
17
    Batch containing symbolic function programs with interfaces for symbolic regression.
18
    Input  :
19
        ----- per step -----
20
        - new tokens can be appended by their int idx in the library of choosable tokens.
21
    Output :
22
        ----- per step -----
23
        - prior values for choice of next token.
24
        - environment of next token to guess parent/sibling one hots etc.
25
        ----- per epoch -----
26
        - reward values of programs
27
        - physicality of programs
28
        - lengths of programs
29
    """
30
    def __init__(self,
1✔
31
                library_args,
32
                priors_config,
33
                multi_X,
34
                multi_y,
35
                rewards_computer,
36
                batch_size,
37
                max_time_step,
38
                multi_y_weights      = 1.,
39
                free_const_opti_args = None,
40
                candidate_wrapper    = None,
41
                observe_units        = True,
42
                ):
43
        """
44
        Parameters
45
        ----------
46
        library_args: dict
47
            Arguments passed to library.__init__
48
        priors_config : list of couples (str : dict)
49
            List of priors. List containing couples with prior name as first item in couple (see prior.PRIORS_DICT for list
50
            of available priors) and additional arguments (besides library and programs) to be passed to priors as second
51
            item of couple, leave None for priors that do not require arguments.
52
        multi_X : list of len (n_realizations,) of torch.tensor of shape (n_dim, ?,) of float
53
                List of X (one per realization). With X being values of the input variables of the problem with n_dim = nb
54
                of input variables.
55
        multi_y : list of len (n_realizations,) of torch.tensor of shape (?,) of float
56
            List of y (one per realization). With y being values of the target symbolic function on input variables
57
            contained in X.
58
        multi_y_weights : list of len (n_realizations,) of torch.tensor of shape (?,) of float
59
                           or array_like of (n_realizations,) of float
60
                           or float, optional
61
            List of y_weights (one per realization). With y_weights being weights to apply to y data.
62
            Or list of weights one per entire realization.
63
            Or single float to apply to all (for default value = 1.).
64
            Weights for each data point. By default, no weights are used.
65
        rewards_computer : callable
66
            Custom reward computing function taking programs (vect_programs.VectPrograms), X (torch.tensor of shape (n_dim,?,)
67
            of float), y_target (torch.tensor of shape (?,) of float), y_weights (torch.tensor of shape (?,) of float),
68
            n_samples_per_dataset (array_like of shape (n_realizations,) of int) and free_const_opti_args as key arguments
69
            and returning reward for each program (array_like of float).
70
        batch_size : int
71
            Number of programs in batch.
72
        max_time_step : int
73
            Max number of tokens programs can contain.
74
        free_const_opti_args : dict or None, optional
75
            Arguments to pass to free_const.optimize_free_const for free constants optimization. By default,
76
            free_const.DEFAULT_OPTI_ARGS arguments are used.
77
        candidate_wrapper : callable or None, optional
78
            Wrapper to apply to candidate program's output, candidate_wrapper taking func, X as arguments where func is
79
            a candidate program callable (taking X as arg). By default = None, no wrapper is applied (identity).
80
        observe_units : bool, optional
81
            Should units be included in "in situ" observation vector (True) or should this information be zeroed out
82
            (False).
83
        """
84

85
        # Batch
86
        self.batch_size    = batch_size
1✔
87
        self.max_time_step = max_time_step
1✔
88
        # Library
89
        self.library  = library.Library(**library_args)
1✔
90

91
        # Dataset (detects device and n_realizations)
92
        self.dataset = dataset.Dataset(multi_X         = multi_X,
1✔
93
                                       multi_y         = multi_y,
94
                                       multi_y_weights = multi_y_weights,
95
                                       library         = self.library)
96

97
        # Programs
98
        self.programs = VProg.VectPrograms(  batch_size        = self.batch_size,
1✔
99
                                             max_time_step     = self.max_time_step,
100
                                             library           = self.library,
101
                                             candidate_wrapper = candidate_wrapper,
102
                                             n_realizations    = self.dataset.detected_n_realizations,
103
                                             )
104

105
        # Sending free const table to same device as dataset
106
        self.programs.free_consts.to(self.dataset.detected_device)
×
107

108
        # Prior
109
        self.prior   = prior.make_PriorCollection(programs      = self.programs,
×
110
                                                  library       = self.library,
111
                                                  priors_config = priors_config,)
112

113
        # Reward func
114
        self.rewards_computer = rewards_computer
×
115

116
        # Free constants optimizer args
117
        self.free_const_opti_args = free_const_opti_args
×
118

119
        # Observations
120
        self.observe_units = observe_units
×
121

122
    # ---------------------------- INTERFACE FOR SYMBOLIC REGRESSION ----------------------------
123

124
    def get_sibling_one_hot (self, step = None):
×
125
        """
126
        Get siblings one hot of tokens at step. 0 one hot vectors for dummies.
127
        Parameters
128
        ----------
129
        step : int
130
            Step of token from which sibling one hot should be returned.
131
            By default, step = current step
132
        Returns
133
        -------
134
        one_hot : numpy.array of shape (batch_size, n_choices) of int
135
            One hot.
136
        """
137
        if step is None:
1✔
138
            step = self.programs.curr_step
×
139
        # Idx of siblings
140
        siblings_idx      = self.programs.get_sibling_idx_of_step(step = step)      # (batch_size,)
×
141
        # Do tokens have siblings : mask
142
        has_siblings_mask = np.logical_and(                                         # (batch_size,)
×
143
            self.programs.tokens.has_siblings_mask[:, step],
144
            siblings_idx < self.programs.library.n_choices) # gets rid of dummies tokens which are valid siblings
145
        # Initialize one hot result
146
        one_hot = np.zeros((self.batch_size, self.library.n_choices))               # (batch_size, n_choices)
×
147
        # Affecting only valid siblings and leaving zero vectors where no siblings
148
        one_hot[has_siblings_mask, :] = np.eye(self.library.n_choices)[siblings_idx[has_siblings_mask]]
×
149
        return one_hot
×
150

151
    def get_parent_one_hot (self, step = None):
×
152
        """
153
        Get parents one hot of tokens at step.
154
        Parameters
155
        ----------
156
        step : int
157
            Step of token from which parent one hot should be returned.
158
            By default, step = current step
159
        Returns
160
        -------
161
        one_hot : numpy.array of shape (batch_size, n_choices) of int
162
            One hot.
163
        """
164
        if step is None:
1✔
165
            step = self.programs.curr_step
×
166
        # Idx of parents
167
        parents_idx      = self.programs.get_parent_idx_of_step(step = step)         # (batch_size,)
×
168
        # Do tokens have parents : mask
169
        has_parents_mask = self.programs.tokens.has_parent_mask[:, step]             # (batch_size,)
×
170
        # Initialize one hot result
171
        one_hot = np.zeros((self.batch_size, self.library.n_choices))                 # (batch_size, n_choices)
×
172
        # Affecting only valid parents and leaving zero vectors where no parents
173
        one_hot[has_parents_mask, :] = np.eye(self.library.n_choices)[parents_idx[has_parents_mask]]
1✔
174
        return one_hot
1✔
175

176
    def get_previous_tokens_one_hot(self):
×
177
        """
178
        Get previous step tokens as one hot.
179
        Returns
180
        -------
181
        one_hot : numpy.array of shape (batch_size, n_choices) of int
182
            One hot.
183
        """
184
        # Return 0 if 0th step
185
        if self.programs.curr_step == 0:
×
186
            one_hot = np.zeros((self.batch_size, self.library.n_choices))
1✔
187
        else:
188
            # Idx of tokens at previous step
189
            tokens_idx = self.programs.tokens.idx[:, self.programs.curr_step - 1]  # (batch_size,)
×
190
            # Are these tokens outside of library (void tokens)
191
            valid_mask = self.programs.tokens.idx[:,
×
192
                         self.programs.curr_step - 1] < self.library.n_choices     # (batch_size,)
193
            # Initialize one hot result
194
            one_hot = np.zeros((self.batch_size, self.library.n_choices))          # (batch_size, n_choices)
×
195
            # Affecting only valid tokens and leaving zero vectors where previous vector has no meaning
196
            one_hot[valid_mask, :] = np.eye(self.library.n_choices)[tokens_idx[valid_mask]]
×
197

198
        return one_hot
×
199

200
    def get_sibling_units_obs (self, step = None):
×
201
        """
202
        Get (required) units of sibling of tokens at step. Filling using INTERFACE_UNITS_UNAVAILABLE_FILLER where units
203
        are not available. Adding a vector in addition to the units indicating if units are available or not (equal to
204
        INTERFACE_UNITS_AVAILABLE where units are available and equal to INTERFACE_UNITS_UNAVAILABLE where there are no
205
        units infos available).
206
        Parameters
207
        ----------
208
        step : int
209
            Step of token which's sibling's (required) units be returned.
210
            By default, step = current step.
211
        Returns
212
        -------
213
        units_obs : numpy.array of shape (batch_size, token.UNITS_VECTOR_SIZE + 1) of float
214
            Units and info availability mask.
215
        """
216
        if step is None:
1✔
217
            step = self.programs.curr_step
×
218

219
        # Coords
220
        coords = self.programs.coords_of_step(step)                                                     # (2, batch_size)
1✔
221

222
        # Initialize result with filler (unavailable units everywhere)
223
        units_obs = np.zeros((self.batch_size, token.UNITS_VECTOR_SIZE + 1 ), dtype=float)              # (batch_size, UNITS_VECTOR_SIZE + 1)
×
224
        # filling units
225
        units_obs[:, :-1] = INTERFACE_UNITS_UNAVAILABLE_FILLER(                                         # (batch_size, UNITS_VECTOR_SIZE)
1✔
226
            shape=(self.batch_size, token.UNITS_VECTOR_SIZE))
227
        # availability mask
228
        units_obs[:, -1] = INTERFACE_UNITS_UNAVAILABLE                                                  # (batch_size,)
1✔
229

230
        # Sibling
231
        has_sibling    = self.programs.tokens.has_siblings_mask[tuple(coords)]                          # (batch_size,)
×
232
        n_has_sibling = has_sibling.sum()
×
233
        coords_sibling = self.programs.get_siblings(coords)[:, has_sibling]                             # (2, n_has_sibling)
1✔
234

235
        # Units
236
        # mask : are units of available siblings available ?
237
        is_available  = self.programs.tokens.is_constraining_phy_units[tuple(coords_sibling)]           # (n_has_sibling,)
×
238
        n_is_available = is_available.sum()
1✔
239
        # Coordinates of available siblings having available units
240
        coords_sibling_and_units_available = coords_sibling[:, is_available]                            # (2, n_is_available)
×
241
        # Units of available siblings having available units
242
        phy_units = self.programs.tokens.phy_units[tuple(coords_sibling_and_units_available)]           # (n_is_available, UNITS_VECTOR_SIZE)
×
243

244
        # Putting units of available siblings having available units in units_obs
245
        units_obs[coords_sibling_and_units_available[0], :-1] = phy_units                               # (n_is_available, UNITS_VECTOR_SIZE)
×
246
        units_obs[coords_sibling_and_units_available[0],  -1] = INTERFACE_UNITS_AVAILABLE               # (n_is_available,)
×
247

248
        return units_obs
×
249

250
    def get_parent_units_obs (self, step = None):
×
251
        """
252
        Get (required) units of parent of tokens at step. Filling using INTERFACE_UNITS_UNAVAILABLE_FILLER where units
253
        are not available. Adding a vector in addition to the units indicating if units are available or not (equal to
254
        INTERFACE_UNITS_AVAILABLE where units are available and equal to INTERFACE_UNITS_UNAVAILABLE where there are no
255
        units infos available).
256
        Parameters
257
        ----------
258
        step : int
259
            Step of token which's parent's (required) units be returned.
260
            By default, step = current step.
261
        Returns
262
        -------
263
        units_obs : numpy.array of shape (batch_size, token.UNITS_VECTOR_SIZE + 1) of float
264
            Units and info availability mask.
265
        """
266
        if step is None:
1✔
267
            step = self.programs.curr_step
×
268

269
        # Coords
270
        coords = self.programs.coords_of_step(step)                                                     # (2, batch_size)
1✔
271

272
        # Initialize result with filler (unavailable units everywhere)
273
        units_obs = np.zeros((self.batch_size, token.UNITS_VECTOR_SIZE + 1 ), dtype=float)              # (batch_size, UNITS_VECTOR_SIZE + 1)
×
274
        # filling units
275
        units_obs[:, :-1] = INTERFACE_UNITS_UNAVAILABLE_FILLER(                                         # (batch_size, UNITS_VECTOR_SIZE)
1✔
276
            shape=(self.batch_size, token.UNITS_VECTOR_SIZE))
277
        # availability mask
278
        units_obs[:, -1] = INTERFACE_UNITS_UNAVAILABLE                                                  # (batch_size,)
×
279

280
        # If 0-th step, units are those of superparent
281
        if step == 0:
1✔
282
            units_obs[:, :-1] = self.library.superparent.phy_units                                      # (batch_size, UNITS_VECTOR_SIZE)
1✔
283
            units_obs[:,  -1] = INTERFACE_UNITS_AVAILABLE                                               # (batch_size,)
×
284

285
        # If 0-th step, this part does nothing as n_is_available = 0 in this case
286
        # parent
287
        has_parent    = self.programs.tokens.has_parent_mask[tuple(coords)]                             # (batch_size,)
×
288
        n_has_parent  = has_parent.sum()
×
289
        coords_parent = self.programs.get_parent(coords)[:, has_parent]                                 # (2, n_has_parent)
1✔
290

291
        # Units
292
        # mask : are units of available parents available ?
293
        is_available  = self.programs.tokens.is_constraining_phy_units[tuple(coords_parent)]           # (n_has_parent,)
×
294
        n_is_available = is_available.sum()
1✔
295
        # Coordinates of available parent having available units
296
        coords_parent_and_units_available = coords_parent[:, is_available]                             # (2, n_is_available)
×
297
        # Units of available parents having available units
298
        phy_units = self.programs.tokens.phy_units[tuple(coords_parent_and_units_available)]           # (n_is_available, UNITS_VECTOR_SIZE)
×
299

300
        # Putting units of available parents having available units in units_obs
301
        units_obs[coords_parent_and_units_available[0], :-1] = phy_units                               # (n_is_available, UNITS_VECTOR_SIZE)
×
302
        units_obs[coords_parent_and_units_available[0],  -1] = INTERFACE_UNITS_AVAILABLE               # (n_is_available,)
×
303

304
        return units_obs
×
305

306
    def get_previous_tokens_units_obs (self, step = None):
×
307
        """
308
        Get (required) units of tokens before step. Filling using INTERFACE_UNITS_UNAVAILABLE_FILLER where units are not
309
        available. Adding a vector in addition to the units indicating if units are available or not (equal to
310
        INTERFACE_UNITS_AVAILABLE where units are available and equal to INTERFACE_UNITS_UNAVAILABLE where there are no
311
        units infos available).
312
        Parameters
313
        ----------
314
        step : int
315
            Step of token which's previous tokens' (required) units be returned.
316
            By default, step = current step.
317
        Returns
318
        -------
319
        units_obs : numpy.array of shape (batch_size, token.UNITS_VECTOR_SIZE + 1) of float
320
            Units and info availability mask.
321
        """
322
        if step is None:
1✔
323
            step = self.programs.curr_step
1✔
324

325
        # Initialize result with filler (unavailable units everywhere)
326
        units_obs = np.zeros((self.batch_size, token.UNITS_VECTOR_SIZE + 1 ), dtype=float)              # (batch_size, UNITS_VECTOR_SIZE + 1)
×
327
        # filling units
328
        units_obs[:, :-1] = INTERFACE_UNITS_UNAVAILABLE_FILLER(                                         # (batch_size, UNITS_VECTOR_SIZE)
×
329
            shape=(self.batch_size, token.UNITS_VECTOR_SIZE))
330
        # availability mask
331
        units_obs[:, -1] = INTERFACE_UNITS_UNAVAILABLE                                                  # (batch_size,)
×
332

333
        # If step == 0, leave empty unavailable units filling
334
        if step > 0:
×
335
            units_obs = self.get_tokens_units_obs(step = step - 1)                                      # (batch_size, UNITS_VECTOR_SIZE + 1)
×
336

337
        return units_obs
×
338

339
    def get_tokens_units_obs (self, step = None):
×
340
        """
341
        Get (required) units of tokens at step. Filling using INTERFACE_UNITS_UNAVAILABLE_FILLER where units are not
342
        available. Adding a vector in addition to the units indicating if units are available or not (equal to
343
        INTERFACE_UNITS_AVAILABLE where units are available and equal to INTERFACE_UNITS_UNAVAILABLE where there are no
344
        units infos available).
345
        Parameters
346
        ----------
347
        step : int
348
            Step of token which's (required) units be returned.
349
            By default, step = current step.
350
        Returns
351
        -------
352
        units_obs : numpy.array of shape (batch_size, token.UNITS_VECTOR_SIZE + 1) of float
353
            Units and info availability mask.
354
        """
355
        if step is None:
1✔
356
            step = self.programs.curr_step
×
357

358
        # Coords
359
        coords = self.programs.coords_of_step(step)                                                     # (2, batch_size)
×
360

361
        # Initialize result
362
        units_obs = np.zeros((self.batch_size, token.UNITS_VECTOR_SIZE + 1 ), dtype=float)              # (batch_size, UNITS_VECTOR_SIZE + 1)
1✔
363

364
        # mask : is units information available
365
        is_available  = self.programs.tokens.is_constraining_phy_units[tuple(coords)]                   # (batch_size,)
1✔
366
        n_available   = is_available.sum()
1✔
367
        n_unavailable = self.batch_size - n_available
×
368
        # Coords of tokens which's units are available
369
        coords_available = coords[:, is_available]                                                      # (2, n_available)
×
370

371
        # Result : units (where available)
372
        units_obs[is_available,  :-1] = self.programs.tokens.phy_units[tuple(coords_available)]         # (n_available,   UNITS_VECTOR_SIZE)
×
373
        # Result : filler units (where unavailable)
374
        units_obs[~is_available, :-1] = INTERFACE_UNITS_UNAVAILABLE_FILLER(                             # (n_unavailable, UNITS_VECTOR_SIZE)
×
375
            shape=(n_unavailable, token.UNITS_VECTOR_SIZE))
376
        # Result : availability mask
377
        units_obs[is_available , -1] = INTERFACE_UNITS_AVAILABLE                                        # (batch_size,)
×
378
        units_obs[~is_available, -1] = INTERFACE_UNITS_UNAVAILABLE                                      # (batch_size,)
1✔
379

380
        return units_obs
1✔
381

382
    def get_obs(self):
1✔
383
        """
384
        Computes observation of current step for symbolic regression task.
385
        Returns
386
        -------
387
        obs : numpy.array of shape (batch_size, 3*n_choices+1,) of float
388
        """
389
        # Relatives one-hots
390
        parent_one_hot   = self.get_parent_one_hot()                         # (batch_size, n_choices,)
1✔
391
        sibling_one_hot  = self.get_sibling_one_hot()                        # (batch_size, n_choices,)
×
392
        previous_one_hot = self.get_previous_tokens_one_hot()                # (batch_size, n_choices,)
1✔
393
        # Number of dangling dummies
394
        n_dangling       = self.programs.n_dangling                          # (batch_size,)
1✔
395
        # Units obs
396
        do_obs = int(self.observe_units)
1✔
397
        units_obs_current  = do_obs * self.get_tokens_units_obs()            # (batch_size, UNITS_VECTOR_SIZE + 1)
×
398
        units_obs_sibling  = do_obs * self.get_sibling_units_obs()           # (batch_size, UNITS_VECTOR_SIZE + 1)
1✔
399
        units_obs_parent   = do_obs * self.get_parent_units_obs()            # (batch_size, UNITS_VECTOR_SIZE + 1)
1✔
400
        units_obs_previous = do_obs * self.get_previous_tokens_units_obs()   # (batch_size, UNITS_VECTOR_SIZE + 1)
1✔
401

402
        obs = np.concatenate((                                               # (batch_size, obs_size,)
1✔
403
            # Relatives one-hots
404
            parent_one_hot,
405
            sibling_one_hot,
406
            previous_one_hot,
407
            # Dangling
408
            n_dangling[:, np.newaxis],
409
            # Units obs
410
            units_obs_current,
411
            units_obs_sibling,
412
            units_obs_parent,
413
            units_obs_previous,
414
            ), axis = 1).astype(np.float32)
415

416
        return obs
1✔
417

418
    @property
1✔
419
    def obs_size(self):
×
420
        """
421
        Size of observation vector.
422
        Returns
423
        -------
424
        obs_size : int
425
        """
426
        return (3*self.n_choices) + 1 + 4*(token.UNITS_VECTOR_SIZE+1)
×
427

428
    @property
1✔
429
    def n_choices (self):
1✔
430
        return self.library.n_choices
1✔
431

432
    def get_rewards (self):
×
433
        """
434
        Computes rewards of programs contained in batch.
435
        Returns
436
        -------
437
        rewards : numpy.array of shape (batch_size,) of float
438
            Rewards of programs.
439
        """
440
        rewards = self.rewards_computer(programs              = self.programs,
×
441
                                        X                     = self.dataset.multi_X_flatten,
442
                                        y_target              = self.dataset.multi_y_flatten,
443
                                        y_weights             = self.dataset.multi_y_weights_flatten,
444
                                        n_samples_per_dataset = self.dataset.n_samples_per_dataset,
445
                                        free_const_opti_args  = self.free_const_opti_args,
446
                                            )
447
        return rewards
×
448

449

450
    def __repr__(self):
×
451
        s = ""
×
452
        s += "-------------------------- Library -------------------------\n%s\n"%(self.library )
×
453
        s += "--------------------------- Prior --------------------------\n%s\n"%(self.prior   )
×
454
        s += "-------------------------- Dataset -------------------------\n%s\n"%(self.dataset )
×
455
        s += "-------------------------- Programs ------------------------\n%s\n"%(self.programs)
×
456
        return s
×
457

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc