• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thouska / spotpy / 7709518362

30 Jan 2024 10:11AM UTC coverage: 77.332% (-0.2%) from 77.518%
7709518362

push

github

thouska
Merge branch 'master' of https://github.com/thouska/spotpy

4162 of 5382 relevant lines covered (77.33%)

3.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/src/spotpy/algorithms/dds.py
1
import numpy as np
6✔
2

3
from spotpy.parameter import ParameterSet
6✔
4

5
from . import _algorithm
6✔
6

7

8
class DDSGenerator:
6✔
9
    """
10
    This class is used by the DDS algorithm to generate a new sample of parameters based on the current one.
11
    Current parameter are exchanged in `ParameterSet` objects.
12
    """
13

14
    def __init__(self, np_random):
6✔
15
        self.np_random = np_random
4✔
16

17
    def neigh_value_continuous(self, s, x_min, x_max, r):
6✔
18
        """
19
        select a RANDOM neighbouring real value of a SINGLE decision variable
20
        CEE 509, HW 5 by Bryan Tolson, Mar 5, 2003 AND ALSO CEE PROJECT
21
        variables:
22
        x_range is the range of the real variable (s_max-s_min)
23

24
        :param s: is a current SINGLE decision variable VALUE
25
        :param x_min: is the min of variable s
26
        :param x_max: is the max of variable s
27
        :param r: is the neighbourhood parameter (replaces V parameter~see not
28
                             It is defined as the ratio of the std deviation of the desired
29
                            normal random number/x_range.  Eg:
30
                        std dev desired = r * x_range
31
                        for comparison:  variance (V) = (r * x_range)^2
32
        :return: x_new, a new sample of values in beetween a given range
33
        """
34

35
        x_range = x_max - x_min
4✔
36

37
        x_new = s + self.np_random.normal(0, 1) * r * x_range
4✔
38

39
        # NEED to deal with variable upper and lower bounds:
40
        # Originally bounds in DDS were 100# reflective
41
        # But some times DVs are right on the boundary and with 100# reflective
42
        # boundaries it is hard to detect them. Therefore, we decided to make the
43
        # boundaries reflective with 50# chance and absorptive with 50# chance.
44
        # M. Asadzadeh and B. Tolson Dec 2008
45

46
        p_abs_or_ref = self.np_random.rand()
4✔
47

48
        if x_new < x_min:  # works for any pos or neg x_min
4✔
49
            if p_abs_or_ref <= 0.5:  # with 50%chance reflect
4✔
50
                x_new = x_min + (x_min - x_new)
4✔
51
            else:  # with 50% chance absorb
52
                x_new = x_min
4✔
53

54
                # if reflection goes past x_max then value should be x_min since without reflection
55
                # the approach goes way past lower bound.  This keeps X close to lower bound when X current
56
                # is close to lower bound:
57
            if x_new > x_max:
4✔
58
                x_new = x_min
4✔
59

60
        elif x_new > x_max:  # works for any pos or neg x_max
4✔
61
            if p_abs_or_ref <= 0.5:  # with 50% chance reflect
4✔
62
                x_new = x_max - (x_new - x_max)
4✔
63
            else:  # with 50% chance absorb
64
                x_new = x_max
4✔
65

66
                # if reflection goes past x_min then value should be x_max for same reasons as above
67
            if x_new < x_min:
4✔
68
                x_new = x_max
4✔
69

70
        return x_new
4✔
71

72
    def neigh_value_discrete(self, s, s_min, s_max, r):
6✔
73
        """
74
        Created by B.Tolson and B.Yung, June 2006
75
        Modified by B. Tolson & M. Asadzadeh, Sept 2008
76
        Modification: 1- Boundary for reflection at (s_min-0.5) & (s_max+0.5)
77
                      2- Round the new value at the end of generation.
78
        select a RANDOM neighbouring integer value of a SINGLE decision variable
79
        discrete distribution is approximately normal
80
        alternative to this appoach is reflecting triangular distribution (see Azadeh work)
81

82
        :param s: is a current SINGLE decision variable VALUE
83
        :param s_min: is the min of variable s
84
        :param s_max: is the max of variable s
85
        :param r: r is the neighbourhood parameter (replaces V parameter~see notes)
86
                  It is defined as the ratio of the std deviation of the desired
87
                  normal random number/s_range.  Eg:
88
                      std dev desired = r * s_range
89
                      for comparison:  variance (V) = (r * s_range)^2
90
        :return: s_new, a new sample of values in beetween a given range
91
        """
92

93
        s_range = s_max - s_min
4✔
94
        delta = self.np_random.normal(0, 1) * r * s_range
4✔
95
        s_new = s + delta
4✔
96

97
        p_abs_or_ref = self.np_random.rand()
4✔
98

99
        if s_new < s_min - 0.5:  # works for any pos or neg s_min
4✔
100
            if p_abs_or_ref <= 0.5:  # with 50% chance reflect
4✔
101
                s_new = (s_min - 0.5) + ((s_min - 0.5) - s_new)
4✔
102
            else:  # with 50% chance absorb
103
                s_new = s_min
4✔
104

105
                # if reflection goes past (s_max+0.5) then value should be s_min since without reflection
106
                # the approach goes way past lower bound.  This keeps X close to lower bound when X current
107
                # is close to lower bound:
108
                if s_new > s_max + 0.5:
4✔
109
                    s_new = s_min
×
110

111
        elif s_new > s_max + 0.5:  # works for any pos or neg s_max
4✔
112
            if p_abs_or_ref <= 0.5:  # with 50% chance reflect
4✔
113
                s_new = (s_max + 0.5) - (s_new - (s_max + 0.5))
4✔
114
            else:  # with 50% chance absorb
115
                s_new = s_max
4✔
116

117
                # if reflection goes past (s_min-0.5) then value should be s_max for same reasons as above
118
            if s_new < s_min - 0.5:
4✔
119
                s_new = s_max
×
120

121
        s_new = np.round(s_new)  # New value must be integer
4✔
122
        if (
4✔
123
            s_new == s
124
        ):  # pick a number between s_max and s_min by a Uniform distribution
125
            sample = s_min - 1 + np.ceil((s_max - s_min) * self.np_random.rand())
×
126
            if sample < s:
×
127
                s_new = sample
×
128
            else:  # must increment option number by one
129
                s_new = sample + 1
×
130
        return s_new
4✔
131

132
    def neigh_value_mixed(self, x_curr, r, j, x_min, x_max):
6✔
133
        """
134

135
        :param x_curr:
136
        :type x_curr: ParameterSet
137
        :param r:
138
        :param j:
139
        :return:
140
        """
141
        s = x_curr[j]
4✔
142

143
        if not x_curr.as_int[j]:
4✔
144
            return self.neigh_value_continuous(s, x_min, x_max, r)
4✔
145
        else:
146
            return self.neigh_value_discrete(s, x_min, x_max, r)
4✔
147

148

149
class dds(_algorithm):
6✔
150
    """
151
    Implements the Dynamically dimensioned search algorithm for computationally efficient watershed model
152
    calibration
153
    by
154
    Tolson, B. A. and C. A. Shoemaker (2007), Dynamically dimensioned search algorithm for computationally efficient
155
     watershed model calibration, Water Resources Research, 43, W01413, 10.1029/2005WR004723.
156
    Asadzadeh, M. and B. A. Tolson (2013), Pareto archived dynamically dimensioned search with hypervolume-based
157
    selection for multi-objective optimization, Engineering Optimization. 10.1080/0305215X.2012.748046.
158

159
    http://www.civil.uwaterloo.ca/btolson/software.aspx
160

161
    Method:
162
    "The DDS algorithm is a novel and simple stochastic single-solution based heuristic global search
163
    algorithm that was developed for the purpose of finding good global solutions
164
    (as opposed to globally optimal solutions) within a specified maximum function (or model) evaluation limit."
165
    (Page 3)
166

167
    The DDS algorithm is a simple greedy algorithm, always using the best solution (min or max) from the current
168
    point of view. This may not lead to the global optimization.
169

170
    """
171

172
    def __init__(self, *args, **kwargs):
6✔
173
        """
174
        Input
175
        ----------
176
        spot_setup: class
177
            model: function
178
                Should be callable with a parameter combination of the parameter-function
179
                and return an list of simulation results (as long as evaluation list)
180
            parameter: function
181
                When called, it should return a random parameter combination. Which can
182
                be e.g. uniform or Gaussian
183
            objectivefunction: function
184
                Should return the objectivefunction for a given list of a model simulation and
185
                observation.
186
            evaluation: function
187
                Should return the true values as return by the model.
188

189
        dbname: str
190
            * Name of the database where parameter, objectivefunction value and simulation results will be saved.
191

192
        dbformat: str
193
            * ram: fast suited for short sampling time. no file will be created and results are saved in an array.
194
            * csv: A csv file will be created, which you can import afterwards.
195

196
        parallel: str
197
            * seq: Sequentiel sampling (default): Normal iterations on one core of your cpu.
198
            * mpi: Message Passing Interface: Parallel computing on cluster pcs (recommended for unix os).
199

200
        save_sim: boolean
201
            * True:  Simulation results will be saved
202
            * False: Simulation results will not be saved
203
        :param r: neighborhood size perturbation parameter (r) that defines the random perturbation size standard
204
                  deviation as a fraction of the decision variable range. Default is 0.2.
205
        :type r: float
206

207
        """
208

209
        try:
4✔
210
            self.r = kwargs.pop("r")
4✔
211
        except KeyError:
4✔
212
            self.r = 0.2  # default value
4✔
213
        kwargs["optimization_direction"] = "maximize"
4✔
214
        kwargs["algorithm_name"] = "Dynamically Dimensioned Search (DDS) algorithm"
4✔
215
        super(dds, self).__init__(*args, **kwargs)
4✔
216

217
        self.np_random = np.random
4✔
218

219
        # self.status.params_max = ParameterSet(self.parameter())
220

221
        # self.generator_repetitions will be set in `sample` and is needed to generate a
222
        # generator which sends back actual parameter s_test
223
        self.generator_repetitions = -1
4✔
224

225
        self.dds_generator = DDSGenerator(self.np_random)
4✔
226

227
    def _set_np_random(self, f_rand):
6✔
228
        self.np_random = f_rand
4✔
229
        self.dds_generator.np_random = f_rand
4✔
230

231
    def get_next_x_curr(self):
6✔
232
        """
233
        Fake a generator to run self.repeat to use multiprocessing
234
        """
235
        # We need to shift position and length of the sampling process
236
        for rep in range(self.generator_repetitions):
4✔
237
            yield rep, self.calculate_next_s_test(
4✔
238
                self.params_max, rep, self.generator_repetitions, self.r
239
            )
240

241
    def sample(self, repetitions, trials=1, x_initial=np.array([])):
6✔
242
        """
243
        Samples from the DDS Algorithm.
244

245
        DDS is a greedy type of algorithm since the current solution, also the best solution identified so far,
246
        is never updated with a solution that has an inferior value of the objective function.
247

248
        That means in detail:
249
        The DDS Algorithm starts with an initial phase:
250
        If the user does not defines an own initial configuration The DDS algorithm start with searching a parameter
251
        configuration in between the given parameter bounds.
252

253
        The next phase is the dds algorithm itself which runs in a loop `repetion` times:
254
        Based on the parameter configuration x_new the algorithm run the model and simulation with the current parameter set
255
        and calculates the objective function value called F_curr.
256

257
        If F_curr > F_best, where F_best is the current max value objective function value, we set x_best = x_curr and
258
        F_best = F_curr.
259

260
        Select k of all parameters to include them in the neighborhood calculation. This is performed by calculating a
261
        threshold probability_neighborhood (probability in neighbourhood).
262

263
        The neighbourhood calculation perturb x_best on standard normal distribution and reflect the result if it
264
        breaks the parameter boundary.
265
        The updated parameter configuration is called x_curr
266

267
        :param repetitions:  Maximum number of runs.
268
        :type repetitions: int
269
        :param trials: amount of runs DDS algorithm will be performed
270
        :param x_initial: set an initial trial set as a first parameter configuration. If the set is empty the algorithm
271
                         select an own initial parameter configuration
272
        :return: a key-value set of all parameter combination which has been used. May changed in future.
273
        """
274

275
        # every iteration a map of all relevant values is stored, only for debug purpose.
276
        # Spotpy will not need this values.
277
        debug_results = []
4✔
278

279
        self.set_repetiton(repetitions)
4✔
280
        self.min_bound, self.max_bound = (
4✔
281
            self.parameter()["minbound"],
282
            self.parameter()["maxbound"],
283
        )
284
        print(
4✔
285
            "Starting the DDS algotrithm with " + str(repetitions) + " repetitions..."
286
        )
287

288
        number_of_parameters = (
4✔
289
            self.status.parameters
290
        )  # number_of_parameters is the amount of parameters
291

292
        if len(x_initial) == 0:
4✔
293
            initial_iterations = int(np.max([5, round(0.005 * repetitions)]))
4✔
294
        elif len(x_initial) != number_of_parameters:
4✔
295
            raise ValueError(
4✔
296
                "User specified 'x_initial' has not the same length as available parameters"
297
            )
298
        else:
299
            initial_iterations = 1
4✔
300
            x_initial = np.array(x_initial)
4✔
301
            if not (
4✔
302
                np.all(x_initial <= self.max_bound)
303
                and np.all(x_initial >= self.min_bound)
304
            ):
305
                raise ValueError(
×
306
                    "User specified 'x_initial' but the values are not within the parameter range"
307
                )
308

309
        # Users can define trial runs in within "repetition" times the algorithm will be executed
310
        for trial in range(trials):
4✔
311
            # objectivefunction_max = -1e308
312
            params_max = x_initial
4✔
313
            # repitionno_best saves on which iteration the best parameter configuration has been found
314
            repitionno_best = initial_iterations  # needed to initialize variable and avoid code failure when small # iterations
4✔
315
            (
4✔
316
                params_max,
317
                repetions_left,
318
                objectivefunction_max,
319
            ) = self.calc_initial_para_configuration(
320
                initial_iterations, trial, repetitions, x_initial
321
            )
322
            params_max = self.fix_status_params_format(params_max)
4✔
323
            trial_best_value = list(params_max)  # self.status.params_max.copy()
4✔
324

325
            # important to set this field `generator_repetitions` so that
326
            # method `get_next_s_test` can generate exact parameters
327
            self.generator_repetitions = repetions_left
4✔
328
            self.params_max = params_max
4✔
329
            for rep, x_curr, simulations in self.repeat(self.get_next_x_curr()):
4✔
330

331
                like = self.postprocessing(rep, x_curr, simulations, chains=trial)
4✔
332
                if like > objectivefunction_max:
4✔
333
                    objectivefunction_max = like
4✔
334
                    self.params_max = list(x_curr)
4✔
335
                    self.params_max = self.fix_status_params_format(self.params_max)
4✔
336

337
            print(
4✔
338
                "Best solution found has obj function value of "
339
                + str(objectivefunction_max)
340
                + " at "
341
                + str(repitionno_best)
342
                + "\n\n"
343
            )
344
            debug_results.append(
4✔
345
                {
346
                    "sbest": self.params_max,
347
                    "trial_initial": trial_best_value,
348
                    "objfunc_val": objectivefunction_max,
349
                }
350
            )
351
        self.final_call()
4✔
352
        return debug_results
4✔
353

354
    def fix_status_params_format(self, params_max):
6✔
355
        start_params = ParameterSet(self.parameter())
4✔
356
        start_params.set_by_array([j for j in params_max])
4✔
357
        return start_params
4✔
358

359
    def calc_initial_para_configuration(
6✔
360
        self, initial_iterations, trial, repetitions, x_initial
361
    ):
362
        # max_bound, min_bound = self.status.params_max.maxbound, self.status.params_max.minbound
363
        parameter_bound_range = self.max_bound - self.min_bound
4✔
364
        number_of_parameters = len(parameter_bound_range)
4✔
365
        discrete_flag = ParameterSet(self.parameter()).as_int
4✔
366
        params_max = x_initial
4✔
367
        objectivefunction_max = -1e308
4✔
368
        # Calculate the initial Solution, if `initial_iterations` > 1 otherwise the user defined a own one.
369
        # If we need to find an initial solution we iterating initial_iterations times to warm um the algorithm
370
        # by trying which randomized generated input matches best
371
        # initial_iterations is the number of function evaluations to initialize the DDS algorithm solution
372
        if initial_iterations > 1:
4✔
373
            print(
4✔
374
                "Finding best starting point for trial "
375
                + str(trial + 1)
376
                + " using "
377
                + str(initial_iterations)
378
                + " random samples."
379
            )
380
            repetions_left = (
4✔
381
                repetitions - initial_iterations
382
            )  # use this to reduce number of fevals in DDS loop
383
            if repetions_left <= 0:
4✔
384
                raise ValueError(
×
385
                    "# Initialization samples >= Max # function evaluations."
386
                )
387

388
            starting_generator = (
4✔
389
                (
390
                    rep,
391
                    [
392
                        self.np_random.randint(
393
                            int(self.min_bound[j]), int(self.max_bound[j]) + 1
394
                        )
395
                        if discrete_flag[j]
396
                        else self.min_bound[j]
397
                        + parameter_bound_range[j] * self.np_random.rand()
398
                        for j in range(number_of_parameters)
399
                    ],
400
                )
401
                for rep in range(int(initial_iterations))
402
            )
403

404
            for rep, x_curr, simulations in self.repeat(starting_generator):
4✔
405
                like = self.postprocessing(
4✔
406
                    rep, x_curr, simulations
407
                )  # get obj function value
408
                # status setting update
409
                if like > objectivefunction_max:
4✔
410
                    objectivefunction_max = like
4✔
411
                    params_max = list(x_curr)
4✔
412
                    params_max = self.fix_status_params_format(params_max)
4✔
413

414
        else:  # now initial_iterations=1, using a user supplied initial solution.  Calculate obj func value.
415
            repetions_left = (
4✔
416
                repetitions - 1
417
            )  # use this to reduce number of fevals in DDS loop
418
            rep, x_test_param, simulations = self.simulate(
4✔
419
                (0, x_initial)
420
            )  # get from the inputs
421
            like = self.postprocessing(rep, x_test_param, simulations)
4✔
422
            if like > objectivefunction_max:
4✔
423
                objectivefunction_max = like
4✔
424
                params_max = list(x_test_param)
4✔
425
                params_max = self.fix_status_params_format(params_max)
4✔
426
        return params_max, repetions_left, objectivefunction_max
4✔
427

428
    def calculate_next_s_test(self, previous_x_curr, rep, rep_limit, r):
6✔
429
        """
430
        Needs to run inside `sample` method. Calculate the next set of parameters based on a given set.
431
        This is greedy algorithm belonging to the DDS algorithm.
432

433
        `probability_neighborhood` is a threshold at which level a parameter is added to neighbourhood calculation.
434

435
        Using a normal distribution
436
        The decision variable
437

438
        `dvn_count` counts how many parameter configuration has been exchanged with neighbourhood values.
439
        If no parameters has been exchanged just one will select and exchanged with it's neighbourhood value.
440

441
        :param previous_x_curr: A set of parameters
442
        :param rep: Position in DDS loop
443
        :param r: neighbourhood size perturbation parameter
444
        :return: next parameter set
445
        """
446
        amount_params = len(previous_x_curr)
4✔
447
        new_x_curr = (
4✔
448
            previous_x_curr.copy()
449
        )  # define new_x_curr initially as current (previous_x_curr for greedy)
450

451
        randompar = self.np_random.rand(amount_params)
4✔
452
        probability_neighborhood = 1.0 - np.log(rep + 1) / np.log(rep_limit)
4✔
453
        dvn_count = 0  # counter for how many decision variables vary in neighbour
4✔
454

455
        for j in range(amount_params):
4✔
456
            if (
4✔
457
                randompar[j] < probability_neighborhood
458
            ):  # then j th DV selected to vary in neighbour
459
                dvn_count = dvn_count + 1
4✔
460
                new_value = self.dds_generator.neigh_value_mixed(
4✔
461
                    previous_x_curr, r, j, self.min_bound[j], self.max_bound[j]
462
                )
463
                new_x_curr[j] = new_value  # change relevant dec var value in x_curr
4✔
464

465
        if dvn_count == 0:  # no DVs selected at random, so select ONE
4✔
466
            dec_var = int(np.ceil(amount_params * self.np_random.rand())) - 1
4✔
467
            new_value = self.dds_generator.neigh_value_mixed(
4✔
468
                previous_x_curr,
469
                r,
470
                dec_var,
471
                self.min_bound[dec_var],
472
                self.max_bound[dec_var],
473
            )
474
            new_x_curr[
4✔
475
                dec_var
476
            ] = new_value  # change relevant decision variable value in s_test
477

478
        return new_x_curr
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc