• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thouska / spotpy / 13290613367

12 Feb 2025 04:52PM UTC coverage: 67.775% (-0.02%) from 67.793%
13290613367

push

github

thouska
Update pyproject.toml

3752 of 5536 relevant lines covered (67.77%)

2.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/src/spotpy/algorithms/rope.py
1
# -*- coding: utf-8 -*-
2
"""
3
Copyright (c) 2018 by Tobias Houska
4
This file is part of Statistical Parameter Optimization Tool for Python(SPOTPY).
5
:author: Tobias Houska and Alejandro Chamorro-Chavez
6
"""
7
import random
3✔
8
import time
3✔
9

10
import numpy as np
3✔
11

12
from . import _algorithm
3✔
13

14

15
class rope(_algorithm):
3✔
16
    """
17
    This class holds the Robust Parameter Estimation (ROPE) algorithm based on
18
    Bárdossy and Singh (2008).
19

20
    Bárdossy, A. and Singh, S. K.:
21
    Robust estimation of hydrological model parameters,
22
    Hydrol. Earth Syst. Sci. Discuss., 5(3), 1641–1675, 2008.
23
    """
24

25
    def __init__(self, *args, **kwargs):
3✔
26

27
        """
28
        Input
29
        ----------
30
        :param spot_setup: class
31
            model: function
32
                Should be callable with a parameter combination of the
33
                parameter-function and return an list of simulation results (as
34
                long as evaluation list)
35
            parameter: function
36
                When called, it should return a random parameter combination.
37
                Which can be e.g. uniform or Gaussian
38
            objectivefunction: function
39
                Should return the objectivefunction for a given list of a model
40
                simulation and observation.
41
            evaluation: function
42
                Should return the true values as return by the model.
43

44
        :param dbname: str
45
            * Name of the database where parameter, objectivefunction value and
46
            simulation results will be saved.
47

48
        :param dbformat: str
49
            * ram: fast suited for short sampling time. no file will be created and
50
            results are saved in an array.
51
            * csv: A csv file will be created, which you can import afterwards.
52

53
        :param parallel: str
54
            * seq: Sequentiel sampling (default): Normal iterations on one core
55
            of your cpu.
56
            * mpc: Multi processing: Iterations on all available cores on your cpu
57
            (recommended for windows os).
58
            * mpi: Message Passing Interface: Parallel computing on cluster pcs
59
            (recommended for unix os).
60

61
        :param save_sim: boolean
62
            *True:  Simulation results will be saved
63
            *False: Simulation results will not be saved
64
        """
65
        kwargs["optimization_direction"] = "maximize"
3✔
66
        kwargs["algorithm_name"] = "RObust Parameter Estimation (ROPE) algorithm"
3✔
67
        super(rope, self).__init__(*args, **kwargs)
3✔
68

69
    def get_best_runs(self, likes, pars, runs, percentage):
3✔
70
        """
71
        Returns the best xx% of the runs"""
72
        return [
3✔
73
            new_pars
74
            for (new_likes, new_pars) in sorted(zip(likes, pars))[
75
                int(len(likes) * (1 - percentage)) :
76
            ]
77
        ]
78

79
    def sample(
3✔
80
        self,
81
        repetitions=None,
82
        repetitions_first_run=None,
83
        subsets=5,
84
        percentage_first_run=0.10,
85
        percentage_following_runs=0.10,
86
        NDIR=None,
87
    ):
88
        """
89
        Samples from the ROPE algorithm.
90

91
        Input
92
        ----------
93
        repetitions = Number of runs overall, is only used if the user does
94
        not specify the other arguments, otherwise its overwritten.
95
        repetitions_first_run = Number of runs in the first rune
96
        repetitions_following_runs = Number of runs for all following runs
97
        subsets = number of time the rope algorithm creates a smaller search
98
        windows for parameters
99
        percentage_first_run = amount of runs that will be used for the next
100
        step after the first subset
101
        percentage_following_runs = amount of runs that will be used for the
102
        next step after in all following subsets
103
        NDIR = The number of samples to draw
104
        """
105
        # Reported behaviour:
106
        # Takes ways´to long for npar >8
107
        # wenn mehr parameter produziert werden sollen als reingehen, rechnet er sich tot (ngen>n)
108
        # Subsets < 5 führt manchmal zu Absturz
109
        print(
3✔
110
            "Starting the ROPE algotrithm with " + str(repetitions) + " repetitions..."
111
        )
112
        self.set_repetiton(repetitions)
3✔
113

114
        if repetitions_first_run is None:
3✔
115
            # Take the first have of the repetitions as burn-in
116
            first_run = int(repetitions / 2.0)
3✔
117

118
        else:
119
            # Make user defined number of burn-in repetitions
120
            first_run = repetitions_first_run
×
121

122
        repetitions_following_runs = int((repetitions - first_run) / (subsets - 1))
3✔
123
        # Needed to avoid an error in integer division somewhere in depth function
124
        if repetitions_following_runs % 2 != 0:
3✔
125
            print(
3✔
126
                "Warning: Burn-in samples and total number of repetions are not compatible.\n"
127
                "SPOTPY will automatically adjust the number of total repetitions."
128
            )
129
            repetitions_following_runs += 1
3✔
130

131
        if NDIR is None:
3✔
132
            NDIR = int(repetitions_following_runs / 100.0)
3✔
133
        self.NDIR = NDIR
3✔
134

135
        starttime = time.time()
3✔
136
        intervaltime = starttime
3✔
137
        parset = self.parameter()
3✔
138
        self.min_bound, self.max_bound = parset["minbound"], parset["maxbound"]
3✔
139

140
        # Init ROPE with one subset
141
        likes = []
3✔
142
        pars = []
3✔
143
        segment = 1.0 / float(first_run)
3✔
144
        # Get the minimum and maximum value for each parameter from the
145

146
        # Create a matrix to store the parameter sets
147
        matrix = np.empty((first_run, len(self.min_bound)))
3✔
148
        # Create the LatinHypercube matrix as in McKay et al. (1979):
149
        for i in range(int(first_run)):
3✔
150
            segmentMin = i * segment
3✔
151
            pointInSegment = segmentMin + (random.random() * segment)
3✔
152
            parset = pointInSegment * (self.max_bound - self.min_bound) + self.min_bound
3✔
153
            matrix[i] = parset
3✔
154
        for i in range(len(self.min_bound)):
3✔
155
            random.shuffle(matrix[:, i])
3✔
156

157
        # A generator that produces the parameters
158
        param_generator = ((rep, matrix[rep]) for rep in range(int(first_run) - 1))
3✔
159
        for rep, randompar, simulations in self.repeat(param_generator):
3✔
160
            # A function that calculates the fitness of the run and the manages the database
161
            like = self.postprocessing(rep, randompar, simulations)
3✔
162
            likes.append(like)
3✔
163
            pars.append(randompar)
3✔
164
            # Progress bar
165
            acttime = time.time()
3✔
166
            # Refresh progressbar every second
167
            if self.status.stop:
3✔
168
                break
×
169
            if acttime - intervaltime >= 2:
3✔
170
                text = "1 Subset: Run %i of %i (best like=%g)" % (
×
171
                    rep,
172
                    first_run,
173
                    self.status.objectivefunction_max,
174
                )
175
                print(text)
×
176
                intervaltime = time.time()
×
177

178
        for subset in range(subsets - 1):
3✔
179
            if subset == 0:
3✔
180
                best_pars = self.get_best_runs(
3✔
181
                    likes, pars, repetitions_following_runs, percentage_first_run
182
                )
183
            else:
184
                best_pars = self.get_best_runs(
3✔
185
                    likes, pars, repetitions_following_runs, percentage_following_runs
186
                )
187
            valid = False
3✔
188
            trials = 0
3✔
189
            while valid is False and trials < 10 and repetitions_following_runs > 1:
3✔
190
                new_pars = self.programm_depth(best_pars, repetitions_following_runs)
3✔
191
                if len(new_pars) == repetitions_following_runs:
3✔
192
                    valid = True
3✔
193
                else:
194
                    trials += 1
×
195
            pars = []
3✔
196
            likes = []
3✔
197
            if int(repetitions_following_runs) > len(new_pars):
3✔
198
                repetitions_following_runs = len(new_pars)
×
199
            param_generator = (
3✔
200
                (rep, new_pars[rep]) for rep in range(int(repetitions_following_runs))
201
            )
202
            for rep, ropepar, simulations in self.repeat(param_generator):
3✔
203
                # Calculate the objective function
204
                like = self.postprocessing(
3✔
205
                    first_run + rep + repetitions_following_runs * subset,
206
                    ropepar,
207
                    simulations,
208
                )
209
                likes.append(like)
3✔
210
                pars.append(ropepar)
3✔
211
                if self.status.stop:
3✔
212
                    print("Stopping samplig")
3✔
213
                    break
3✔
214

215
                # Progress bar
216
                acttime = time.time()
3✔
217
                if repetitions_following_runs is not None:
3✔
218
                    # Refresh progressbar every second
219
                    if acttime - intervaltime >= 2:
3✔
220
                        text = "%i Subset: Run %i of %i (best like=%g)" % (
×
221
                            subset + 2,
222
                            rep,
223
                            repetitions_following_runs,
224
                            self.status.objectivefunction_max,
225
                        )
226
                        print(text)
×
227
                        intervaltime = time.time()
×
228
            if self.status.stop:
3✔
229
                break
3✔
230

231
        self.final_call()
3✔
232

233
    def programm_depth(self, pars, runs):
3✔
234
        X = np.array(pars)
3✔
235

236
        N, NP = X.shape
3✔
237
        text = str(N) + " input vectors with " + str(NP) + " parameters"
3✔
238
        print(text)
3✔
239

240
        Ngen = int(runs)  # int(N*(1/self.percentage))
3✔
241
        print(("Generating " + str(Ngen) + " parameters:"))
3✔
242

243
        NPOSI = Ngen  # Number of points to generate
3✔
244

245
        EPS = 0.00001
3✔
246

247
        # Find  max and min values
248
        XMIN = np.zeros(NP)
3✔
249
        XMAX = np.zeros(NP)
3✔
250
        for j in range(NP):
3✔
251
            XMIN[j] = min(X[:, j])
3✔
252
            XMAX[j] = max(X[:, j])
3✔
253

254
        # Beginn to generate
255
        ITRY = 1
3✔
256
        IPOS = 1
3✔
257
        LLEN = N
3✔
258

259
        CL = np.zeros(NP)
3✔
260
        TL = np.zeros(shape=(LLEN, NP))
3✔
261

262
        while IPOS < NPOSI:
3✔
263
            for IM in range(LLEN):  # LLEN=1000 Random Vectors of dim NP
3✔
264
                for j in range(NP):
3✔
265
                    DRAND = np.random.rand()
3✔
266
                    TL[IM, j] = XMIN[j] + DRAND * (XMAX[j] - XMIN[j])
3✔
267
            LNDEP = self.fHDEPTHAB(N, NP, X, TL, EPS, LLEN)
3✔
268
            for L in range(LLEN):
3✔
269
                ITRY = ITRY + 1
3✔
270
                if LNDEP[L] >= 1:
3✔
271
                    CL = np.vstack((CL, TL[L, :]))
3✔
272
                    IPOS = IPOS + 1
3✔
273
            print((IPOS, ITRY))
3✔
274

275
        CL = np.delete(CL, 0, 0)
3✔
276
        CL = CL[:NPOSI]
3✔
277
        return CL
3✔
278

279
    def fHDEPTHAB(self, N, NP, X, TL, EPS, LLEN):
3✔
280
        LNDEP = self.fDEP(N, NP, X, TL, EPS, LLEN)
3✔
281
        return LNDEP
3✔
282

283
    def fDEP(self, N, NP, X, TL, EPS, LLEN):
3✔
284
        LNDEP = np.array([N for i in range(N)])
3✔
285
        for NRAN in range(int(self.NDIR)):
3✔
286

287
            #       Random sample of size NP
288
            JSAMP = np.zeros(N)
3✔
289
            I = np.random.randint(1, N)
3✔
290
            if I > N:
3✔
291
                I = N
×
292
            JSAMP[0] = I
3✔
293
            NSAMP = 1
3✔
294

295
            for index in range(NP - 1):
3✔
296
                dirac = 0
3✔
297
                while dirac == 0:
3✔
298
                    dirac = 1
3✔
299
                    L = np.random.randint(1, N + 1)
3✔
300
                    if L > N:
3✔
301
                        L = N
×
302
                    for j in range(NSAMP):
3✔
303
                        if L == JSAMP[j]:
3✔
304
                            dirac = 0
3✔
305
                NSAMP = NSAMP + 1
3✔
306
                JSAMP[index + 1] = L
3✔
307

308
            #       Covariance matrix of the sample
309
            S = np.zeros(shape=(NP, NP))
3✔
310
            for i in range(NP):
3✔
311
                row = JSAMP[i]
3✔
312

313
                # too complicated
314
                # S[i, :] = [X[int(row) - 1,j] for j in range(NP)]
315
                # S: random sample from X
316
                S[i, :] = X[int(row) - 1]
3✔
317

318
            nx, NP = S.shape
3✔
319
            C = np.zeros(shape=(NP, NP))
3✔
320
            y = np.zeros(shape=(2, 2))
3✔
321
            for j in range(NP):
3✔
322
                for k in range(j + 1):
3✔
323
                    y = np.cov(S[:, k], S[:, j])
3✔
324
                    C[j, k] = y[0, 1]
3✔
325
                    C[k, j] = y[0, 1]
3✔
326
            COV = C
3✔
327

328
            EVALS, EVE = np.linalg.eig(COV)
3✔
329
            arg = np.argsort(EVALS)
3✔
330
            # Eigenvector in the direction of min eigenvalue
331
            EVECT = EVE[:, arg[0]]
3✔
332

333
            #       Project all points on the line through theta with direction
334
            # given by the eigenvector of the smallest eigenvalue, i.e.
335
            # the direction orthogonal on the hyperplane given by the np-subset
336
            #       Compute the one-dimensional halfspace depth of theta on this line
337
            HELP = []
3✔
338
            for L in range(N):
3✔
339
                k = np.dot(EVECT, X[L, :])
3✔
340
                HELP.append(k)
3✔
341
            HELP = np.array(HELP)
3✔
342
            HELP = sorted(HELP)
3✔
343

344
            ICROSS = 0
3✔
345
            for LU in range(LLEN):
3✔
346
                if LNDEP[LU] > ICROSS:
3✔
347
                    EKT = 0.0
3✔
348
                    NT = 0
3✔
349
                    EKT = EKT + np.dot(TL[LU, :], EVECT)
3✔
350
                    if (EKT < (HELP[0] - EPS)) or (EKT > (HELP[N - 1] + EPS)):
3✔
351
                        N1 = 0
3✔
352
                    else:
353
                        N1 = 1
3✔
354
                        N2 = N
3✔
355
                        dirac = 0
3✔
356
                        while dirac == 0:
3✔
357
                            dirac = 1
3✔
358
                            N3 = (N1 + N2) / 2.0
3✔
359
                            if HELP[int(N3)] < EKT:
3✔
360
                                N1 = N3
3✔
361
                            else:
362
                                N2 = N3
3✔
363
                            if (N2 - N1) > 1:
3✔
364
                                dirac = 0
3✔
365
                    NUMH = N1
3✔
366
                    LNDEP[LU] = min(LNDEP[LU], min(NUMH + NT, N - NUMH))
3✔
367
        return LNDEP
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc