• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thouska / spotpy / 7709518362

30 Jan 2024 10:11AM UTC coverage: 77.332% (-0.2%) from 77.518%
7709518362

push

github

thouska
Merge branch 'master' of https://github.com/thouska/spotpy

4162 of 5382 relevant lines covered (77.33%)

3.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.53
/src/spotpy/algorithms/_algorithm.py
1
"""
2
Copyright (c) 2018 by Tobias Houska
3
This file is part of Statistical Parameter Optimization Tool for Python(SPOTPY).
4
:author: Tobias Houska
5

6
This file holds the standards for every algorithm.
7
"""
8
import random
6✔
9
import threading
6✔
10
import time
6✔
11

12
import numpy as np
6✔
13

14
from spotpy import database, parameter
6✔
15

16
try:
6✔
17
    from queue import Queue
6✔
18
except ImportError:
×
19
    # If the running python version is 2.* we have only Queue available as a multiprocessing class
20
    # we need to stop the whole main process which this sleep for one microsecond otherwise the subprocess is not
21
    # finished and the main process can not access it and put it as garbage away (Garbage collectors cause)
22
    # However this slows down the whole simulation process and is a boring bug. Python3.x does not need this
23
    # workaround
24
    from Queue import Queue
×
25

26

27
class _RunStatistic(object):
6✔
28
    """
29
    this class checks for each run if the objectivefunction got better and holds the
30
    best parameter set.
31
    Every _algorithm has an object of this class as status.
32
    Usage:
33
    status = _RunStatistic()
34
    status(rep,like,params)
35
    """
36

37
    def __init__(self, repetitions, algorithm_name, optimization_direction, parnames):
6✔
38
        self.optimization_direction = optimization_direction  # grid, mazimize, minimize
4✔
39
        print(
4✔
40
            "Initializing the ", algorithm_name, " with ", repetitions, " repetitions"
41
        )
42
        if optimization_direction == "minimize":
4✔
43
            self.compare = self.minimizer
4✔
44
            print("The objective function will be minimized")
4✔
45
        if optimization_direction == "maximize":
4✔
46
            self.compare = self.maximizer
4✔
47
            print("The objective function will be maximized")
4✔
48
        if optimization_direction == "grid":
4✔
49
            self.compare = self.grid
4✔
50

51
        self.rep = 0
4✔
52
        self.parnames = parnames
4✔
53
        self.parameters = len(parnames)
4✔
54
        self.params_min = [np.nan] * self.parameters
4✔
55
        self.params_max = [np.nan] * self.parameters
4✔
56
        self.objectivefunction_min = 1e308
4✔
57
        self.objectivefunction_max = -1e308
4✔
58
        self.starttime = time.time()
4✔
59
        self.last_print = time.time()
4✔
60

61
        self.repetitions = repetitions
4✔
62
        self.stop = False
4✔
63

64
    def minimizer(self, objval, params):
6✔
65
        if objval < self.objectivefunction_min:
4✔
66
            self.objectivefunction_min = objval
4✔
67
            self.params_min = list(params)
4✔
68

69
    def maximizer(self, objval, params):
6✔
70
        if objval > self.objectivefunction_max:
4✔
71
            self.objectivefunction_max = objval
4✔
72
            self.params_max = list(params)
4✔
73

74
    def grid(self, objval, params):
6✔
75
        if objval < self.objectivefunction_min:
4✔
76
            self.objectivefunction_min = objval
4✔
77
            self.params_min = list(params)
4✔
78
        if objval > self.objectivefunction_max:
4✔
79
            self.objectivefunction_max = objval
4✔
80
            self.params_max = list(params)
4✔
81

82
    def __call__(self, objectivefunction, params, block_print=False):
6✔
83
        self.rep += 1
4✔
84
        if type(objectivefunction) == type([]):  # TODO: change to iterable
4✔
85
            self.compare(objectivefunction[0], params)
×
86
        elif type(objectivefunction) == type(np.array([])):
4✔
87
            pass
4✔
88
        else:
89
            self.compare(objectivefunction, params)
4✔
90

91
        if self.rep == self.repetitions:
4✔
92
            self.stop = True
4✔
93

94
        if not block_print:
4✔
95
            self.print_status()
4✔
96

97
    def print_status(self):
6✔
98
        # get str showing approximate timeleft to end of simulation in H, M, S
99
        acttime = time.time()
4✔
100
        # Refresh progressbar every two second
101
        if acttime - self.last_print >= 2:
4✔
102
            avg_time_per_run = (acttime - self.starttime) / (self.rep + 1)
4✔
103
            timestr = time.strftime(
4✔
104
                "%H:%M:%S",
105
                time.gmtime(
106
                    round(avg_time_per_run * (self.repetitions - (self.rep + 1)))
107
                ),
108
            )
109
            if self.optimization_direction == "minimize":
4✔
110
                text = "%i of %i, minimal objective function=%g, time remaining: %s" % (
4✔
111
                    self.rep,
112
                    self.repetitions,
113
                    self.objectivefunction_min,
114
                    timestr,
115
                )
116

117
            if self.optimization_direction == "maximize":
4✔
118
                text = "%i of %i, maximal objective function=%g, time remaining: %s" % (
4✔
119
                    self.rep,
120
                    self.repetitions,
121
                    self.objectivefunction_max,
122
                    timestr,
123
                )
124

125
            if self.optimization_direction == "grid":
4✔
126
                text = "%i of %i, min objf=%g, max objf=%g, time remaining: %s" % (
4✔
127
                    self.rep,
128
                    self.repetitions,
129
                    self.objectivefunction_min,
130
                    self.objectivefunction_max,
131
                    timestr,
132
                )
133

134
            print(text)
4✔
135
            self.last_print = time.time()
4✔
136

137
    def print_status_final(self):
6✔
138
        print("\n*** Final SPOTPY summary ***")
4✔
139
        print(
4✔
140
            "Total Duration: "
141
            + str(round((time.time() - self.starttime), 2))
142
            + " seconds"
143
        )
144
        print("Total Repetitions:", self.rep)
4✔
145

146
        if self.optimization_direction == "minimize":
4✔
147
            print("Minimal objective value: %g" % (self.objectivefunction_min))
4✔
148
            print("Corresponding parameter setting:")
4✔
149
            for i in range(self.parameters):
4✔
150
                text = "%s: %g" % (self.parnames[i], self.params_min[i])
4✔
151
                print(text)
4✔
152

153
        if self.optimization_direction == "maximize":
4✔
154
            print("Maximal objective value: %g" % (self.objectivefunction_max))
4✔
155
            print("Corresponding parameter setting:")
4✔
156
            for i in range(self.parameters):
4✔
157
                text = "%s: %g" % (self.parnames[i], self.params_max[i])
4✔
158
                print(text)
4✔
159

160
        if self.optimization_direction == "grid":
4✔
161
            print("Minimal objective value: %g" % (self.objectivefunction_min))
4✔
162
            print("Corresponding parameter setting:")
4✔
163
            for i in range(self.parameters):
4✔
164
                text = "%s: %g" % (self.parnames[i], self.params_min[i])
4✔
165
                print(text)
4✔
166

167
            print("Maximal objective value: %g" % (self.objectivefunction_max))
4✔
168
            print("Corresponding parameter setting:")
4✔
169
            for i in range(self.parameters):
4✔
170
                text = "%s: %g" % (self.parnames[i], self.params_max[i])
4✔
171
                print(text)
4✔
172

173
        print("******************************\n")
4✔
174

175
    def __repr__(self):
176
        return "Min objectivefunction: %g \n Max objectivefunction: %g" % (
177
            self.objectivefunction_min,
178
            self.objectivefunction_max,
179
        )
180

181

182
class _algorithm(object):
6✔
183
    """
184
    Implements an algorithm.
185

186
    Input
187
    ----------
188
    spot_setup: class
189
        model: function
190
            Should be callable with a parameter combination of the parameter-function
191
            and return an list of simulation results (as long as evaluation list)
192
        parameter: function
193
            When called, it should return a random parameter combination. Which can
194
            be e.g. uniform or Gaussian
195
        objectivefunction: function
196
            Should return the objectivefunction for a given list of a model simulation and
197
            observation.
198
        evaluation: function
199
            Should return the true values as return by the model.
200

201
    dbname: str
202
        Name of the database where parameter, objectivefunction value and simulation
203
        results will be saved.
204
    dbformat: str
205
         ram: fast suited for short sampling time. no file will be created and results are saved in an array.
206
        csv: A csv file will be created, which you can import afterwards.
207
    parallel: str
208
        seq: Sequentiel sampling (default): Normal iterations on one core of your cpu.
209
        mpc: Multi processing: Iterations on all available cores on your (single) pc
210
        mpi: Message Passing Interface: Parallel computing on high performance computing clusters, py4mpi needs to be installed
211
    save_threshold: float or list
212
        Compares the given value/list of values with return value/list of values from spot_setup.objectivefunction.
213
        If the objectivefunction value is higher, the results are saved in the database. If not they are ignored (saves storage).
214
    db_precision:np.float type
215
        set np.float16, np.float32 or np.float64 for rounding of floats in the output database
216
        Default is np.float32
217
    sim_timeout: float, int or None, default: None
218
        the defined model given in the spot_setup class can be controlled to break after 'sim_timeout' seconds if
219
        sim_timeout is not None.
220
        If the model run has been broken simlply '[nan]' will be returned.
221
    random_state: int or None, default: None
222
        the algorithms uses the number in random_state as seed for numpy. This way stochastic processes can be reproduced.
223
    """
224

225
    _unaccepted_parameter_types = (parameter.List,)
6✔
226

227
    def __init__(
6✔
228
        self,
229
        spot_setup,
230
        dbname=None,
231
        dbformat=None,
232
        dbinit=True,
233
        dbappend=False,
234
        parallel="seq",
235
        save_sim=True,
236
        breakpoint=None,
237
        backup_every_rep=100,
238
        save_threshold=-np.inf,
239
        db_precision=np.float32,
240
        sim_timeout=None,
241
        random_state=None,
242
        optimization_direction="grid",
243
        algorithm_name="",
244
    ):
245

246
        # Initialize the user defined setup class
247
        self.setup = spot_setup
4✔
248
        param_info = parameter.get_parameters_array(
4✔
249
            self.setup, unaccepted_parameter_types=self._unaccepted_parameter_types
250
        )
251
        self.all_params = param_info["random"]
4✔
252
        self.constant_positions = parameter.get_constant_indices(spot_setup)
4✔
253
        if self.constant_positions:
4✔
254
            self.non_constant_positions = []
4✔
255
            for i, val in enumerate(self.all_params):
4✔
256
                if self.all_params[i] not in self.constant_positions:
4✔
257
                    self.non_constant_positions.append(i)
4✔
258
        else:
259
            self.non_constant_positions = np.arange(0, len(self.all_params))
4✔
260
        self.parameter = self.get_parameters
4✔
261
        self.parnames = param_info["name"]
4✔
262
        self.algorithm_name = algorithm_name
4✔
263
        # Create a type to hold the parameter values using a namedtuple
264
        self.partype = parameter.ParameterSet(param_info)
4✔
265

266
        self.evaluation = self.setup.evaluation()
4✔
267
        self.save_sim = save_sim
4✔
268
        self.optimization_direction = optimization_direction
4✔
269
        self.dbname = dbname or "customDb"
4✔
270
        self.dbformat = dbformat or "ram"
4✔
271
        self.db_precision = db_precision
4✔
272
        self.breakpoint = breakpoint
4✔
273
        self.backup_every_rep = backup_every_rep
4✔
274
        # Two parameters to control the data base handling
275
        # 'dbinit' triggers the initial creation of the data base file
276
        # 'dbappend' used to append to the existing data base, after restart
277
        self.dbinit = dbinit
4✔
278
        self.dbappend = dbappend
4✔
279

280
        # Set the random state
281
        if (
4✔
282
            random_state is None
283
        ):  # ToDo: Have to discuss if these 3 lines are neccessary.
284
            random_state = np.random.randint(low=0, high=2**30)
4✔
285
        np.random.seed(random_state)  # Both numpy.random and random or used in spotpy
4✔
286
        random.seed(random_state)  # Both numpy.random and random or used in spotpy
4✔
287

288
        # If value is not None a timeout will set so that the simulation will break after sim_timeout seconds without return a value
289
        self.sim_timeout = sim_timeout
4✔
290
        self.save_threshold = save_threshold
4✔
291

292
        self._return_all_likes = False  # allows multi-objective calibration if set to True, is set by the algorithm
4✔
293

294
        if breakpoint == "read" or breakpoint == "readandwrite":
4✔
295
            print("Reading backupfile")
×
296
            try:
×
297
                open(self.dbname + ".break")
×
298
            except FileNotFoundError:
×
299
                print("Backupfile not found")
×
300
            self.dbappend = True
×
301

302
        # Now a repeater (ForEach-object) is loaded
303
        # A repeater is a convinent wrapper to repeat tasks
304
        # We have the same interface for sequential and for parallel tasks
305
        if parallel == "seq":
4✔
306
            from spotpy.parallel.sequential import ForEach
4✔
307
        elif parallel == "mpi":
4✔
308
            from spotpy.parallel.mpi import ForEach
×
309

310
        # MPC is based on pathos mutiprocessing and uses ordered map, so results are given back in the order
311
        # as the parameters are
312
        elif parallel == "mpc":
4✔
313
            from spotpy.parallel.mproc import ForEach
4✔
314

315
        # UMPC is based on pathos mutiprocessing and uses unordered map, so results are given back in the order
316
        # as the subprocesses are finished which may speed up the whole simulation process but is not recommended if
317
        # objective functions do their calculation based on the order of the data because the order of the result is chaotic
318
        # and randomized
319
        elif parallel == "umpc":
4✔
320
            from spotpy.parallel.umproc import ForEach
4✔
321
        else:
322
            raise ValueError(
×
323
                "'%s' is not a valid keyword for parallel processing" % parallel
324
            )
325

326
        # This is the repeater for the model runs. The simulate method does the work
327
        # If you need different tasks, the repeater can be pushed into a "phase" using the
328
        # setphase function. The simulate method can check the current phase and dispatch work
329
        # to other functions. This is introduced for sceua to differentiate between burn in and
330
        # the normal work on the chains
331
        self.repeat = ForEach(self.simulate)
4✔
332

333
        # method "save" needs to know whether objective function result is list or float, default is float
334
        self.like_struct_typ = type(1.1)
4✔
335

336
    def __str__(self):
337
        return "{type}({mtype}())->{dbname}".format(
338
            type=type(self).__name__,
339
            mtype=type(self.setup).__name__,
340
            dbname=self.dbname,
341
        )
342

343
    def __repr__(self):
344
        return "{type}()".format(type=type(self).__name__)
345

346
    def get_parameters(self):
6✔
347
        """
348
        Returns the parameter array from the setup
349
        """
350
        pars = parameter.get_parameters_array(self.setup)
4✔
351
        return pars[self.non_constant_positions]
4✔
352

353
    def set_repetiton(self, repetitions):
6✔
354
        self.status = _RunStatistic(
4✔
355
            repetitions, self.algorithm_name, self.optimization_direction, self.parnames
356
        )
357
        # In MPI, this command will do nothing on the master process
358
        # but the worker processes are going to wait for jobs.
359
        # Hence the workers will only receive parameters for the
360
        # simulate function, new calculation phases and the termination
361
        self.repeat.start()
4✔
362

363
    def final_call(self):
6✔
364
        self.repeat.terminate()
4✔
365
        try:
4✔
366
            self.datawriter.finalize()
4✔
367
        except AttributeError:  # Happens if no database was assigned
×
368
            pass
×
369
        self.status.print_status_final()
4✔
370

371
    def _init_database(self, like, randompar, simulations):
6✔
372
        if self.dbinit:
4✔
373
            print("Initialize database...")
4✔
374

375
            self.datawriter = database.get_datawriter(
4✔
376
                self.dbformat,
377
                self.dbname,
378
                self.parnames,
379
                like,
380
                randompar,
381
                simulations,
382
                save_sim=self.save_sim,
383
                dbappend=self.dbappend,
384
                dbinit=self.dbinit,
385
                db_precision=self.db_precision,
386
                setup=self.setup,
387
            )
388

389
            self.dbinit = False
4✔
390

391
    def __is_list_type(self, data):
6✔
392
        if type(data) == type:
4✔
393
            return data == list or data == type(np.array([]))
×
394
        else:
395
            return type(data) == list or type(data) == type(np.array([]))
4✔
396

397
    def save(self, like, randompar, simulations, chains=1):
6✔
398
        # Initialize the database if no run was performed so far
399
        self._init_database(like, randompar, simulations)
4✔
400
        # Test if like and the save threshold are float/list and compare accordingly
401
        if self.__is_list_type(like) and self.__is_list_type(self.save_threshold):
4✔
402
            if all(
×
403
                i > j for i, j in zip(like, self.save_threshold)
404
            ):  # Compares list/list
405
                self.datawriter.save(like, randompar, simulations, chains=chains)
×
406
        if (not self.__is_list_type(like)) and (
4✔
407
            not self.__is_list_type(self.save_threshold)
408
        ):
409
            if like > self.save_threshold:  # Compares float/float
4✔
410
                self.datawriter.save(like, randompar, simulations, chains=chains)
4✔
411
        if self.__is_list_type(like) and (not self.__is_list_type(self.save_threshold)):
4✔
412
            if like[0] > self.save_threshold:  # Compares list/float
4✔
413
                self.datawriter.save(like, randompar, simulations, chains=chains)
4✔
414
        if (not self.__is_list_type(like)) and self.__is_list_type(
4✔
415
            self.save_threshold
416
        ):  # Compares float/list
417
            if (like > self.save_threshold).all:
×
418
                self.datawriter.save(like, randompar, simulations, chains=chains)
×
419

420
    def read_breakdata(self, dbname):
6✔
421
        """Read data from a pickle file if a breakpoint is set.
422
        Reason: In case of incomplete optimizations, old data can be restored."""
423
        import pickle
×
424

425
        with open(dbname + ".break", "rb") as breakfile:
×
426
            work, backuptime, repos, obmin, obmax, pmin, pmax = pickle.load(breakfile)
×
427
            self.status.starttime = self.status.starttime - backuptime
×
428
            self.status.rep = repos
×
429
            self.status.objectivefunction_min = obmin
×
430
            self.status.objectivefunction_max = obmax
×
431
            self.status.params_min = pmin
×
432
            self.status.params_max = pmax
×
433
            return work
×
434

435
    def write_breakdata(self, dbname, work):
6✔
436
        """Write data to a pickle file if a breakpoint has been set."""
437
        import pickle
×
438

439
        work = (
×
440
            work,
441
            self.status.last_print - self.status.starttime,
442
            self.status.rep,
443
            self.status.objectivefunction_min,
444
            self.status.objectivefunction_max,
445
            self.status.params_min,
446
            self.status.params_max,
447
        )
448
        with open(str(dbname) + ".break", "wb") as breakfile:
×
449
            pickle.dump(work, breakfile)
×
450

451
    def getdata(self):
6✔
452
        return self.datawriter.getdata()
4✔
453

454
    def update_params(self, params):
6✔
455
        # Add potential Constant parameters
456
        self.all_params[self.non_constant_positions] = params
4✔
457
        return self.all_params
4✔
458

459
    def postprocessing(
6✔
460
        self,
461
        rep,
462
        params,
463
        simulation,
464
        chains=1,
465
        save_run=True,
466
        negativlike=False,
467
        block_print=False,
468
    ):  # TODO: rep not necessaray
469
        params = self.update_params(params)
4✔
470
        if negativlike is True:
4✔
471
            like = -self.getfitness(simulation=simulation, params=params)
4✔
472
        else:
473
            like = self.getfitness(simulation=simulation, params=params)
4✔
474

475
        # Save everything in the database, if save is True
476
        # This is needed as some algorithms just want to know the fitness,
477
        # before they actually save the run in a database (e.g. sce-ua)
478

479
        self.status(like, params, block_print=block_print)
4✔
480
        if save_run is True and simulation is not None:
4✔
481
            self.save(like, params, simulations=simulation, chains=chains)
4✔
482
        if self._return_all_likes:
4✔
483
            return like
×
484
        else:
485
            try:
4✔
486
                iter(like)
4✔
487
                return like[0]
4✔
488
            except (
4✔
489
                TypeError
490
            ):  # Happens if iter(like) fails, i.e. if like is just one value
491
                return like
4✔
492

493
    def getfitness(self, simulation, params):
6✔
494
        """
495
        Calls the user defined spot_setup objectivefunction
496
        """
497
        try:
4✔
498
            # print('Using parameters in fitness function')
499
            return self.setup.objectivefunction(
4✔
500
                evaluation=self.evaluation,
501
                simulation=simulation,
502
                params=(params, self.parnames),
503
            )
504

505
        except (
4✔
506
            TypeError
507
        ):  # Happens if the user does not allow to pass parameter in the spot_setup.objectivefunction
508
            # print('Not using parameters in fitness function')
509
            return self.setup.objectivefunction(
4✔
510
                evaluation=self.evaluation, simulation=simulation
511
            )
512

513
    def simulate(self, id_params_tuple):
6✔
514
        """This is a simple wrapper of the model, returning the result together with
515
        the run id and the parameters. This is needed, because some parallel things
516
        can mix up the ordering of runs
517
        """
518
        id, params = id_params_tuple
4✔
519
        self.all_params[
4✔
520
            self.non_constant_positions
521
        ] = params  # TODO: List parameters are not updated if not accepted for the algorithm, we may have to warn/error if list is given
522
        all_params = self.all_params
4✔
523

524
        if self.sim_timeout:
4✔
525
            # we need a layer to fetch returned data from a threaded process into a queue.
526
            def model_layer(q, all_params):
4✔
527
                # Call self.model with a namedtuple instead of another sequence
528
                q.put(self.setup.simulation(self.partype(*all_params)))
4✔
529

530
            # starting a queue, where in python2.7 this is a multiprocessing class and can cause errors because of
531
            # incompability which the main thread. Therefore only for older Python version a workaround follows
532
            que = Queue()
4✔
533

534
            sim_thread = threading.Thread(target=model_layer, args=(que, all_params))
4✔
535
            sim_thread.daemon = True
4✔
536
            sim_thread.start()
4✔
537

538
            # If self.sim_timeout is not None the self.model will break after self.sim_timeout seconds otherwise is runs as
539
            # long it needs to run
540
            sim_thread.join(self.sim_timeout)
4✔
541

542
            # If no result from the thread is given, i.e. the thread was killed from the watcher the default result is
543
            # '[nan]' and will not be saved. Otherwise get the result from the thread
544
            model_result = None
4✔
545
            if not que.empty():
4✔
546
                model_result = que.get()
4✔
547

548
        else:
549
            model_result = self.setup.simulation(self.partype(*all_params))
4✔
550

551
        return id, params, model_result
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc