• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PrincetonUniversity / PsyNeuLink / 15917088825

05 Jun 2025 04:18AM UTC coverage: 84.482% (+0.5%) from 84.017%
15917088825

push

github

web-flow
Merge pull request #3271 from PrincetonUniversity/devel

Devel

9909 of 12966 branches covered (76.42%)

Branch coverage included in aggregate %.

1708 of 1908 new or added lines in 54 files covered. (89.52%)

25 existing lines in 14 files now uncovered.

34484 of 39581 relevant lines covered (87.12%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.55
/psyneulink/library/compositions/compositionrunner.py
1
# Princeton University licenses this file to You under the Apache License, Version 2.0 (the "License");
2
# you may not use this file except in compliance with the License.  You may obtain a copy of the License at:
3
#     http://www.apache.org/licenses/LICENSE-2.0
4
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
5
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
6
# See the License for the specific language governing permissions and limitations under the License.
7

8

9
# ********************************************* AutodiffComposition *************************************************
10

11
import numpy as np
1✔
12
from types import GeneratorType
1✔
13

14
from psyneulink._typing import Mapping, Optional
1✔
15
from psyneulink.core.llvm import ExecutionMode
1✔
16
from psyneulink.core.compositions.composition import Composition
1✔
17
from psyneulink.core.compositions.report import Report, ReportProgress, ReportDevices, LEARN_REPORT, PROGRESS_REPORT
1✔
18
from psyneulink.core.components.mechanisms.modulatory.learning.learningmechanism import LearningMechanism
1✔
19
from psyneulink.core.globals.keywords import (EPOCH, MATRIX_WEIGHTS, MINIBATCH, OBJECTIVE_MECHANISM, OPTIMIZATION_STEP,
1✔
20
                                              RUN, TRAINING_SET, TRIAL, NODE_VALUES, NODE_VARIABLES)
21
from psyneulink.core.globals.context import Context
1✔
22
from psyneulink.core.globals.parameters import copy_parameter_value
1✔
23
from inspect import isgeneratorfunction
1✔
24

25
__all__ = ["CompositionRunner"]
1✔
26

27
def inf_yield_val(val=None):
1✔
28
    while True:
29
        yield val
1✔
30

31
class CompositionRunner():
1✔
32

33
    def __init__(self, compostion: Composition):
1✔
34
        self._composition = compostion
1✔
35

36
    def _calculate_loss(self, num_trials:int, execution_mode:ExecutionMode, context):
1✔
37
        """
38
        Returns a value that is the sum of all the losses from the last iteration
39
        """
40
        from psyneulink.library.compositions import AutodiffComposition
1✔
41

42
        if isinstance(self._composition, AutodiffComposition):
1!
43
            return self._composition._get_total_loss(num_trials, context)
1✔
44

45
        total_loss = 0
×
46
        for terminal_sequence in self._composition._terminal_backprop_sequences.values():
×
47
            comparator = terminal_sequence[OBJECTIVE_MECHANISM]
×
48
            total_loss += comparator.value[0][0]
×
49
        return total_loss
×
50

51
    def convert_to_torch(self, v):
1✔
52
        """Convert a list of numpy arrays to a list of PyTorch tensors"""
53

54
        import torch
1✔
55
        torch_dtype = self._composition.torch_dtype if hasattr(self._composition, 'torch_dtype') else torch.float64
1✔
56

57
        # If the inner elements of the list are numpy arrays, convert to np.array first since PyTorch says
58
        # converting directly to tensors for lists of np.ndarrays is slow.
59
        if type(v[0]) == np.ndarray:
1✔
60
            # t = torch.tensor(torch.from_numpy(np.array(v)), dtype=torch_dtype)
61
            t = torch.from_numpy(np.array(v)).to(dtype=torch_dtype)
1✔
62
        else:
63
            try:
1✔
64
                t = torch.tensor(v, dtype=torch_dtype)
1✔
65
            except ValueError:
1✔
66
                # We probably have a ragged array, so we need to convert to a list of tensors
67
                t = [[torch.tensor(y, dtype=torch_dtype) for y in x] for x in v]
1✔
68

69
        # Assume that the input port dimension is singleton and add it for 2D inputs
70
        if isinstance(t, torch.Tensor) and t.ndim < 3:
1✔
71
            t = t.unsqueeze(dim=1)
1✔
72

73
        # Convert integer types to double
74
        if type(t) is not list and t.dtype in [torch.int64, torch.int32, torch.int16, torch.int8]:
1!
UNCOV
75
            t = t.double()
×
76

77
        return t
1✔
78

79
    def convert_input_to_arrays(self, inputs, execution_mode):
1✔
80
        """If the inputs are not numpy arrays or torch tensors, convert them"""
81

82
        array_inputs = {}
1✔
83
        for k, v in inputs.items():
1✔
84
            if type(v) is list:
1!
85
                if execution_mode is ExecutionMode.PyTorch:
1✔
86
                    array_inputs[k] = self.convert_to_torch(v)
1✔
87
                else:
88
                    array_inputs[k] = np.array(v)
1✔
89
            else:
90
                array_inputs[k] = v
×
91

92
        return array_inputs
1✔
93

94
    def _batch_inputs(self,
1✔
95
                      inputs: dict,
96
                      epochs: int,
97
                      num_trials: int,
98
                      minibatch_size: int = 1,
99
                      optimizations_per_minibatch: int = 1,
100
                      randomize: bool = True,
101
                      synch_with_pnl_options:Optional[Mapping] = None,
102
                      retain_in_pnl_options:Optional[Mapping] = None,
103
                      call_before_minibatch=None,
104
                      call_after_minibatch=None,
105
                      early_stopper=None,
106
                      execution_mode:ExecutionMode=ExecutionMode.Python,
107
                      context=None)->GeneratorType:
108
        """
109
        Execute inputs and update pytorch parameters for one minibatch at a time.
110
        Partition inputs dict into ones of length minibatch_size (or, for the last set, the remainder)
111
        Execute all inputs in that dict and then update weights (parameters), and repeat for all batches
112
        within an epoch Synchronize weights, values and results with PsyNeuLink as specified in
113
        synch_with_pnl_options and retain_in_pnl_options dicts.
114

115
        The generator should always yield a dict containing torch.Tensors or lists of torch.Tensores (when ragged).
116
        Each torch.Tensor or list should have its minibatch_size as the first dimension, even if minibatch_size=1.
117

118
        """
119
        assert early_stopper is None or not self._is_llvm_mode, "Early stopper doesn't work in compiled mode"
1✔
120
        assert call_before_minibatch is None or not self._is_llvm_mode, "minibatch calls don't work in compiled mode"
1✔
121
        assert call_after_minibatch is None or not self._is_llvm_mode, "minibatch calls don't work in compiled mode"
1✔
122

123
        if type(minibatch_size) == np.ndarray:
1✔
124
            minibatch_size = minibatch_size.item()
1✔
125

126
        if type(minibatch_size) == list:
1!
127
            minibatch_size = np.array(minibatch_size).item()
×
128

129
        if minibatch_size > 1 and optimizations_per_minibatch != 1:
1✔
130
            raise ValueError("Cannot optimize multiple times per batch if minibatch size is greater than 1.")
131

132
        inputs = self.convert_input_to_arrays(inputs, execution_mode)
1✔
133

134
        # This is a generator for performance reasons,
135
        #    since we don't want to copy any data (especially for very large inputs or epoch counts!)
136
        for epoch in range(epochs):
1✔
137
            indices_of_all_trials = list(range(0, num_trials))
1✔
138
            if randomize:
1!
139
                np.random.shuffle(indices_of_all_trials)
×
140

141
            # Cycle over minibatches
142
            for i in range(0, num_trials, minibatch_size):
1✔
143
                if call_before_minibatch:
1✔
144
                    call_before_minibatch()
1✔
145

146
                # Cycle over trials (stimui) within a minibatch
147
                indices_of_trials_in_batch = indices_of_all_trials[i:i + minibatch_size]
1✔
148

149
                inputs_for_minibatch = {}
1✔
150
                for k, v in inputs.items():
1✔
151
                    modded_indices = [i % len(v) for i in indices_of_trials_in_batch]
1✔
152

153
                    if type(v) is not list:
1✔
154
                        inputs_for_minibatch[k] = v[modded_indices]
1✔
155
                    else: # list because ragged
156
                        inputs_for_minibatch[k] = [v[i] for i in modded_indices]
1✔
157

158
                # Cycle over optimizations per trial (stimulus
159
                for optimization_num in range(optimizations_per_minibatch):
1✔
160
                    # Return current set of stimuli for minibatch
161
                    yield copy_parameter_value(inputs_for_minibatch)
1✔
162

163
                    # Update weights if in PyTorch execution_mode;
164
                    #  handled by Composition.execute in Python mode and in compiled version in LLVM mode
165
                    if execution_mode is ExecutionMode.PyTorch:
1✔
166
                        self._composition.do_gradient_optimization(retain_in_pnl_options, context, optimization_num)
1✔
167
                        from torch import no_grad
1✔
168
                        pytorch_rep = self._composition.parameters.pytorch_representation.get(context)
1✔
169
                        with no_grad():
1✔
170
                            for node, variable in pytorch_rep._nodes_to_execute_after_gradient_calc.items():
1✔
171
                                node.execute(variable, optimization_num, synch_with_pnl_options, context)
1✔
172

173
                        # Synchronize after every optimization step for a given stimulus (i.e., trial) if specified
174
                        pytorch_rep.synch_with_psyneulink(synch_with_pnl_options, OPTIMIZATION_STEP, context,
1✔
175
                                                          [MATRIX_WEIGHTS, NODE_VARIABLES, NODE_VALUES])
176

177
                if execution_mode is ExecutionMode.PyTorch:
1✔
178
                    # Synchronize specified outcomes after every stimulus (i.e., trial)
179
                    pytorch_rep.synch_with_psyneulink(synch_with_pnl_options, TRIAL, context)
1✔
180

181
                if execution_mode is ExecutionMode.PyTorch:
1✔
182
                    # Synchronize specified outcomes after every minibatch
183
                    pytorch_rep.synch_with_psyneulink(synch_with_pnl_options, MINIBATCH, context)
1✔
184

185
                if call_after_minibatch:
1✔
186
                    try:
1✔
187
                        # Try with the hope that the function uses **kwargs (or these args)
188
                        call_after_minibatch(epoch=epoch,
1✔
189
                                             minibatch = i // minibatch_size,
190
                                             num_minibatches = num_trials // minibatch_size,
191
                                             context = context)
192
                    except TypeError:
1✔
193
                        # If not, try without the args
194
                        call_after_minibatch()
1✔
195

196
            if execution_mode is ExecutionMode.PyTorch:
1✔
197
                pytorch_rep.synch_with_psyneulink(synch_with_pnl_options, EPOCH, context)
1✔
198

199
            # Compiled mode does not need more identical inputs.
200
            # number_of_runs will be set appropriately to cycle over the set
201
            if self._is_llvm_mode and not randomize:
1✔
202
                return
1✔
203
            if (not self._is_llvm_mode and early_stopper is not None
1✔
204
                    and early_stopper.step(self._calculate_loss(num_trials, execution_mode, context))):
205
                # end early if patience exceeded
206
                pass
1✔
207

208
    def _batch_function_inputs(self,
1✔
209
                               inputs: dict,
210
                               epochs: int,
211
                               num_trials: int,
212
                               minibatch_size: int = 1,
213
                               optimizations_per_minibatch: int = 1,
214
                               synch_with_pnl_options: Optional[Mapping] = None,
215
                               retain_in_pnl_options: Optional[Mapping] = None,
216
                               call_before_minibatch=None,
217
                               call_after_minibatch=None,
218
                               early_stopper=None,
219
                               execution_mode:ExecutionMode=ExecutionMode.Python,
220
                               context=None)->GeneratorType:
221

222
        assert early_stopper is None or not self._is_llvm_mode, "Early stopper doesn't work in compiled mode"
1✔
223
        assert call_before_minibatch is None or not self._is_llvm_mode, "minibatch calls don't work in compiled mode"
1✔
224
        assert call_after_minibatch is None or not self._is_llvm_mode, "minibatch calls don't work in compiled mode"
1✔
225

226
        if type(minibatch_size) == np.ndarray:
1!
227
            minibatch_size = minibatch_size.item()
1✔
228

229
        if type(minibatch_size) == list:
1!
230
            minibatch_size = np.array(minibatch_size).item()
×
231

232
        if minibatch_size > 1 and optimizations_per_minibatch != 1:
1✔
233
            raise ValueError("Cannot optimize multiple times per batch if minibatch size is greater than 1.")
234

235
        for epoch in range(epochs):
1✔
236
            for i in range(0, num_trials, minibatch_size):
1✔
237
                batch_ran = False
1✔
238

239
                if call_before_minibatch:
1!
240
                    call_before_minibatch()
×
241

242
                for idx in range(i, i + minibatch_size):
1✔
243
                    try:
1✔
244
                        input_batch, _ = self._composition._parse_learning_spec(inputs=inputs(idx),
1✔
245
                                                                                targets=None,
246
                                                                                execution_mode=execution_mode,
247
                                                                                context=context)
248
                    except:
1✔
249
                        break
1✔
250
                    if input_batch is None:
1!
251
                        break
×
252
                    batch_ran = True
1✔
253

254
                    input_batch = self.convert_input_to_arrays(input_batch, execution_mode)
1✔
255

256
                    yield input_batch
1✔
257

258
                if batch_ran:
1✔
259
                    if call_after_minibatch:
1!
260
                        call_after_minibatch()
×
261

262
                    # 7/10/24 - FIX: REVISE TO ACCOMODATE optimizations_per_minibatch
263
                    #                AND ADD HANDLING OF synch_with_pnl_options AND retain_in_pnl_options
264
                    # Update weights if in PyTorch execution_mode;
265
                    #  handled by Composition.execute in Python mode and in compiled version in LLVM mode
266
                    if execution_mode is ExecutionMode.PyTorch:
1✔
267
                        self._composition.do_gradient_optimization(retain_in_pnl_options, context)
1✔
268
                else:
269
                    break
1✔
270

271
            if (not self._is_llvm_mode
1!
272
                    and early_stopper is not None
273
                    and early_stopper.step(self._calculate_loss(num_trials, execution_mode, context))):
274
                # end early if patience exceeded
275
                pass
×
276

277
    def run_learning(self,
1✔
278
                     inputs: dict,
279
                     targets: dict = None,
280
                     num_trials: int = None,
281
                     epochs: int = 1,
282
                     learning_rate = None,
283
                     minibatch_size: int = 1,
284
                     optimizations_per_minibatch: int = 1,
285
                     patience: int = None,
286
                     min_delta: int = 0,
287
                     randomize_minibatches: bool = True,
288
                     synch_with_pnl_options:Optional[Mapping] = None,
289
                     retain_in_pnl_options:Optional[Mapping] = None,
290
                     call_before_minibatch = None,
291
                     call_after_minibatch = None,
292
                     context=None,
293
                     execution_mode:ExecutionMode = ExecutionMode.Python,
294
                     skip_initialization=False,
295
                     **kwargs)->np.ndarray:
296
        """
297
        Runs the composition repeatedly with the specified parameters.
298

299
        Returns
300
        ---------
301
        Outputs from the final execution
302
        """
303

304
        if execution_mode.is_compiled():
1✔
305
            assert execution_mode.is_cpu_compiled()
1✔
306
            self._is_llvm_mode = True
1✔
307

308
        else:
309
            self._is_llvm_mode = False
1✔
310

311
        if execution_mode is ExecutionMode.Python and learning_rate is not None:
1✔
312
            # User learning_rate specified in call to learn, so use that by passing it in runtime_params,
313
            #   excluding any LearningMechanisms for which learning_rate has been individually specified
314
            runtime_params = {learning_mechanism:{'learning_rate':learning_rate}
1✔
315
                              for learning_mechanism in self._composition.nodes
316
                              if isinstance(learning_mechanism, LearningMechanism) and
317
                              learning_mechanism.parameters.learning_rate.get() == # If learning_rate != default
318
                              learning_mechanism.defaults.learning_rate}           # it was individually specified
319
            if 'runtime_params' in kwargs:
1!
320
                kwargs['runtime_params'].update(runtime_params)
×
321
            else:
322
                kwargs['runtime_params'] = runtime_params
1✔
323
        else:
324
            # This is used by local learning-related methods to override the default learning_rate set at construction.
325
            self._composition._runtime_learning_rate = learning_rate
1✔
326

327
        # Handle function and generator inputs
328
        if isgeneratorfunction(inputs):
1✔
329
            inputs = inputs()
1✔
330

331
        if isinstance(inputs, dict) or callable(inputs):
1✔
332
            inputs = [inputs]
1✔
333

334
        if isgeneratorfunction(targets):
1!
335
            targets = targets()
×
336

337
        if isinstance(targets, dict) or callable(targets):
1✔
338
            targets = [targets]
1✔
339
        elif targets is None:
1!
340
            targets = inf_yield_val(targets)
1✔
341

342
        if isgeneratorfunction(epochs):
1!
343
            epochs = epochs()
×
344

345
        if (not isinstance(epochs, list) and not isinstance(epochs, tuple)):
1!
346
            epochs = inf_yield_val(epochs)
1✔
347
        elif epochs is None:
×
348
            epochs = inf_yield_val(1)
×
349

350
        # FIX JDC 12/10/22: PUT with Report HERE, TREATING OUTER LOOP AS RUN, AND RUN AS TRIAL
351

352
        for stim_input, stim_target, stim_epoch in zip(inputs, targets, epochs):
1✔
353
            if not callable(stim_input) and 'epochs' in stim_input:
1✔
354
                stim_epoch = stim_input['epochs']
1✔
355

356
            stim_input, num_input_trials = self._composition._parse_learning_spec(inputs=stim_input,
1✔
357
                                                                                  targets=stim_target,
358
                                                                                  execution_mode=execution_mode,
359
                                                                                  context=context)
360
            if num_trials is None:
1✔
361
                num_trials = num_input_trials
1✔
362

363
            if minibatch_size == TRAINING_SET:
1✔
364
                minibatch_size = num_trials
1✔
365

366
            if minibatch_size > num_trials:
1!
367
                raise Exception("The minibatch size cannot be greater than the number of trials.")
×
368

369
            early_stopper = None
1✔
370
            if patience is not None and not self._is_llvm_mode:
1✔
371
                early_stopper = EarlyStopping(min_delta=min_delta, patience=patience)
1✔
372

373
            if callable(stim_input) and not isgeneratorfunction(stim_input):
1✔
374
                minibatched_input = self._batch_function_inputs(stim_input,
1✔
375
                                                                stim_epoch,
376
                                                                num_trials,
377
                                                                minibatch_size,
378
                                                                optimizations_per_minibatch=optimizations_per_minibatch,
379
                                                                synch_with_pnl_options=synch_with_pnl_options,
380
                                                                retain_in_pnl_options=retain_in_pnl_options,
381
                                                                call_before_minibatch=call_before_minibatch,
382
                                                                call_after_minibatch=call_after_minibatch,
383
                                                                early_stopper=early_stopper,
384
                                                                execution_mode=execution_mode,
385
                                                                context=context)
386
            else:
387
                minibatched_input = self._batch_inputs(inputs=stim_input,
1✔
388
                                                       epochs=stim_epoch,
389
                                                       num_trials=num_trials,
390
                                                       minibatch_size=minibatch_size,
391
                                                       optimizations_per_minibatch=optimizations_per_minibatch,
392
                                                       randomize=randomize_minibatches,
393
                                                       synch_with_pnl_options=synch_with_pnl_options,
394
                                                       retain_in_pnl_options=retain_in_pnl_options,
395
                                                       call_before_minibatch=call_before_minibatch,
396
                                                       call_after_minibatch=call_after_minibatch,
397
                                                       early_stopper=early_stopper,
398
                                                       execution_mode=execution_mode,
399
                                                       context=context)
400

401
            # The above generators generate:
402
            # num_trials / batch_size * batch_size * stim_epoch entries
403
            # unless 'early_stopper' stops the iteration sooner.
404
            # 'early_stopper' is not allowed in compiled mode.
405
            # FIXME: Passing the number to Python execution fails several tests.
406
            # Those test rely on the extra iteration that exits the iterator.
407
            # (Passing num_trials * stim_epoch + 1 works)
408
            run_trials = num_trials * stim_epoch if self._is_llvm_mode else None
1✔
409

410
            # IMPLEMENTATION NOTE: for autodiff composition, the following executes a MINIBATCH's worth of training
411
            self._composition.run(inputs=minibatched_input,
1✔
412
                                  num_trials=run_trials,
413
                                  skip_initialization=skip_initialization,
414
                                  skip_analyze_graph=True,
415
                                  optimizations_per_minibatch=optimizations_per_minibatch,
416
                                  synch_with_pnl_options=synch_with_pnl_options,
417
                                  retain_in_pnl_options=retain_in_pnl_options,
418
                                  execution_mode=execution_mode,
419
                                  context=context,
420
                                  **kwargs)
421
            skip_initialization = True
1✔
422

423
            if execution_mode is ExecutionMode.PyTorch:
1✔
424
                pytorch_rep = self._composition.parameters.pytorch_representation._get(context)
1✔
425
                if pytorch_rep and synch_with_pnl_options[MATRIX_WEIGHTS] == MINIBATCH:
1✔
426
                    pytorch_rep._copy_weights_to_psyneulink(context)
1✔
427

428
        num_epoch_results = num_trials // minibatch_size # number of results expected from final epoch
1✔
429

430
        # assign results from last *epoch* to learning_results
431
        self._composition.parameters.learning_results._set(
1✔
432
            self._composition.parameters.results.get(context)[-1 * num_epoch_results:], context)
433

434
        if execution_mode is ExecutionMode.PyTorch and synch_with_pnl_options[MATRIX_WEIGHTS] == EPOCH:
1!
435
            # Copy weights at end of learning run
NEW
436
            pytorch_rep._copy_weights_to_psyneulink(context)
×
437

438
        # return result of last *trial* (as usual for a call to run)
439
        return self._composition.parameters.results.get(context)[-1]
1✔
440

441
class EarlyStopping(object):
1✔
442
    def __init__(self, mode='min', min_delta=0, patience=10):
1✔
443
        self.mode = mode
1✔
444
        self.min_delta = min_delta
1✔
445
        self.patience = patience
1✔
446
        self.best = None
1✔
447
        self.num_bad_epochs = 0
1✔
448
        self.is_better = None
1✔
449
        self._init_is_better(mode, min_delta)
1✔
450

451
        if patience == 0:
1!
452
            self.is_better = lambda a, b: True
×
453

454
    def step(self, metrics):
1✔
455
        if self.best is None:
1✔
456
            self.best = metrics
1✔
457
            return False
1✔
458

459
        if np.isnan(metrics):
1!
460
            return True
×
461

462
        if self.is_better(metrics, self.best):
1!
463
            self.num_bad_epochs = 0
×
464
            self.best = metrics
×
465
        else:
466
            self.num_bad_epochs += 1
1✔
467

468
        if self.num_bad_epochs >= self.patience:
1✔
469
            return True
1✔
470

471
        return False
1✔
472

473
    def _init_is_better(self, mode, min_delta):
1✔
474
        if mode not in {'min', 'max'}:
1✔
475
            raise ValueError('mode ' + mode + ' is unknown!')
476
        if mode == 'min':
1!
477
            self.is_better = lambda a, best: a < best - min_delta
1✔
478
        if mode == 'max':
1!
479
            self.is_better = lambda a, best: a > best + min_delta
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc