• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GeoStat-Framework / welltestpy / 4730966836

pending completion
4730966836

push

github

Sebastian Müller
update rtd config

1844 of 2426 relevant lines covered (76.01%)

11.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.38
/src/welltestpy/process/processlib.py
1
"""welltestpy subpackage providing functions to pre process data."""
2
from copy import deepcopy as dcopy
15✔
3

4
import numpy as np
15✔
5
from scipy import signal
15✔
6

7
from ..data import testslib
15✔
8

9
__all__ = [
15✔
10
    "normpumptest",
11
    "combinepumptest",
12
    "filterdrawdown",
13
    "cooper_jacob_correction",
14
    "smoothing_derivative",
15
]
16

17

18
def normpumptest(pumptest, pumpingrate=-1.0, factor=1.0):
15✔
19
    """Normalize the pumping rate of a pumping test.
20

21
    Parameters
22
    ----------
23
    pumpingrate : :class:`float`, optional
24
        Pumping rate. Default: ``-1.0``
25
    factor : :class:`float`, optional
26
        Scaling factor that can be used for unit conversion. Default: ``1.0``
27
    """
28
    if not isinstance(pumptest, testslib.PumpingTest):
15✔
29
        raise ValueError(str(pumptest) + " is no pumping test")
×
30

31
    if not pumptest.constant_rate:
15✔
32
        raise ValueError(str(pumptest) + " is no constant rate pumping test")
×
33

34
    oldprate = dcopy(pumptest.rate)
15✔
35
    pumptest.pumpingrate = pumpingrate
15✔
36

37
    for obs in pumptest.observations:
15✔
38
        pumptest.observations[obs].observation *= (
15✔
39
            factor * pumptest.rate / oldprate
40
        )
41

42

43
def combinepumptest(
15✔
44
    campaign,
45
    test1,
46
    test2,
47
    pumpingrate=None,
48
    finalname=None,
49
    factor1=1.0,
50
    factor2=1.0,
51
    infooftest1=True,
52
    replace=True,
53
):
54
    """Combine two pumping tests to one.
55

56
    They need to have the same pumping well.
57

58
    Parameters
59
    ----------
60
    campaign : :class:`welltestpy.data.Campaign`
61
        The pumping test campaign which should be used.
62
    test1 : :class:`str`
63
        Name of test 1.
64
    test2 : :class:`str`
65
        Name of test 2.
66
    pumpingrate : :class:`float`, optional
67
        Pumping rate. Default: ``-1.0``
68
    finalname : :class:`str`, optional
69
        Name of the final test. If `replace` is `True` and `finalname` is
70
        `None`, it will get the name of test 1. Else it will get a combined
71
        name of test 1 and test 2.
72
        Default: ``None``
73
    factor1 : :class:`float`, optional
74
        Scaling factor for test 1 that can be used for unit conversion.
75
        Default: ``1.0``
76
    factor2 : :class:`float`, optional
77
        Scaling factor for test 2 that can be used for unit conversion.
78
        Default: ``1.0``
79
    infooftest1 : :class:`bool`, optional
80
        State if the final test should take the information from test 1.
81
        Default: ``True``
82
    replace : :class:`bool`, optional
83
        State if the original tests should be erased.
84
        Default: ``True``
85
    """
86
    if test1 not in campaign.tests:
×
87
        raise ValueError(
×
88
            "combinepumptest: "
89
            + str(test1)
90
            + " not a test in "
91
            + "campaign "
92
            + str(campaign.name)
93
        )
94
    if test2 not in campaign.tests:
×
95
        raise ValueError(
×
96
            "combinepumptest: "
97
            + str(test2)
98
            + " not a test in "
99
            + "campaign "
100
            + str(campaign.name)
101
        )
102

103
    if finalname is None:
×
104
        if replace:
×
105
            finalname = test1
×
106
        else:
107
            finalname = test1 + "+" + test2
×
108

109
    if campaign.tests[test1].testtype != "PumpingTest":
×
110
        raise ValueError(
×
111
            "combinepumptest:" + str(test1) + " is no pumpingtest"
112
        )
113
    if campaign.tests[test2].testtype != "PumpingTest":
×
114
        raise ValueError(
×
115
            "combinepumptest:" + str(test2) + " is no pumpingtest"
116
        )
117

118
    if campaign.tests[test1].pumpingwell != campaign.tests[test2].pumpingwell:
×
119
        raise ValueError(
×
120
            "combinepumptest: The Pumpingtests do not have the "
121
            + "same pumping-well"
122
        )
123

124
    pwell = campaign.tests[test1].pumpingwell
×
125

126
    wellset1 = set(campaign.tests[test1].wells)
×
127
    wellset2 = set(campaign.tests[test2].wells)
×
128

129
    commonwells = wellset1 & wellset2
×
130

131
    if commonwells != {pwell} and commonwells != set():
×
132
        raise ValueError(
×
133
            "combinepumptest: The Pumpingtests shouldn't have "
134
            + "common observation-wells"
135
        )
136

137
    temptest1 = dcopy(campaign.tests[test1])
×
138
    temptest2 = dcopy(campaign.tests[test2])
×
139

140
    if pumpingrate is None:
×
141
        if infooftest1:
×
142
            pumpingrate = temptest1.rate
×
143
        else:
144
            pumpingrate = temptest2.rate
×
145

146
    normpumptest(temptest1, pumpingrate, factor1)
×
147
    normpumptest(temptest2, pumpingrate, factor2)
×
148

149
    prate = temptest1.rate
×
150

151
    if infooftest1:
×
152
        if pwell in temptest1.observations and pwell in temptest2.observations:
×
153
            temptest2.del_observations(pwell)
×
154
        aquiferdepth = temptest1.depth
×
155
        aquiferradius = temptest1.radius
×
156
        description = temptest1.description
×
157
        timeframe = temptest1.timeframe
×
158
    else:
159
        if pwell in temptest1.observations and pwell in temptest2.observations:
×
160
            temptest1.del_observations(pwell)
×
161
        aquiferdepth = temptest2.depth
×
162
        aquiferradius = temptest2.radius
×
163
        description = temptest2.description
×
164
        timeframe = temptest2.timeframe
×
165

166
    observations = dcopy(temptest1.observations)
×
167
    observations.update(temptest2.observations)
×
168

169
    if infooftest1:
×
170
        aquiferdepth = temptest1.depth
×
171
        aquiferradius = temptest1.radius
×
172
        description = temptest1.description
×
173
        timeframe = temptest1.timeframe
×
174
    else:
175
        aquiferdepth = temptest2.depth
×
176
        aquiferradius = temptest2.radius
×
177
        description = temptest2.description
×
178
        timeframe = temptest2.timeframe
×
179

180
    finalpt = testslib.PumpingTest(
×
181
        finalname,
182
        pwell,
183
        prate,
184
        observations,
185
        aquiferdepth,
186
        aquiferradius,
187
        description,
188
        timeframe,
189
    )
190

191
    campaign.addtests(finalpt)
×
192

193
    if replace:
×
194
        campaign.deltests([test1, test2])
×
195

196

197
def filterdrawdown(observation, tout=None, dxscale=2):
15✔
198
    """Smooth the drawdown data of an observation well.
199

200
    Parameters
201
    ----------
202
    observation : :class:`welltestpy.data.Observation`
203
        The observation to be smoothed.
204
    tout : :class:`numpy.ndarray`, optional
205
        Time points to evaluate the smoothed observation at. If ``None``,
206
        the original time points of the observation are taken.
207
        Default: ``None``
208
    dxscale : :class:`int`, optional
209
        Scale of time-steps used for smoothing.
210
        Default: ``2``
211
    """
212
    head, time = observation()
15✔
213
    head = np.array(head, dtype=float).reshape(-1)
15✔
214
    time = np.array(time, dtype=float).reshape(-1)
15✔
215

216
    if tout is None:
15✔
217
        tout = dcopy(time)
×
218
    tout = np.array(tout, dtype=float).reshape(-1)
15✔
219

220
    if len(time) == 1:
15✔
221
        return observation(time=tout, observation=np.full_like(tout, head[0]))
×
222

223
    # make the data equal-spaced to use filter with
224
    # a fraction of the minimal timestep
225
    dxv = dxscale * int((time[-1] - time[0]) / max(np.diff(time).min(), 1.0))
15✔
226
    tequal = np.linspace(time[0], time[-1], dxv)
15✔
227
    hequal = np.interp(tequal, time, head)
15✔
228
    # size = h.max() - h.min()
229

230
    try:
15✔
231
        para1, para2 = signal.butter(1, 0.025)  # size/10.)
15✔
232
        hfilt = signal.filtfilt(para1, para2, hequal, padlen=150)
15✔
233
        hout = np.interp(tout, tequal, hfilt)
15✔
234
    except ValueError:  # in this case there are to few data points
×
235
        hout = np.interp(tout, time, head)
×
236

237
    return observation(time=tout, observation=hout)
15✔
238

239

240
def cooper_jacob_correction(observation, sat_thickness):
15✔
241
    """correction method for observed drawdown for unconfined aquifers.
242

243
    Parameters
244
    ----------
245
    observation : :class:`welltestpy.data.Observation`
246
        The observation to be corrected.
247
    sat_thickness : :class:`float`
248
        Vertical length of the aquifer in which its pores are filled with water.
249

250
    Returns
251
    -------
252
    The corrected drawdown
253

254
    """
255
    # split the observations into array for head.
256
    head = observation.observation
15✔
257

258
    # cooper and jacob correction
259
    head = head - (head**2) / (2 * sat_thickness)
15✔
260

261
    # return new observation
262
    observation(observation=head)
15✔
263

264
    return observation
15✔
265

266

267
def smoothing_derivative(head, time, method="bourdet"):
15✔
268
    """Calculate the derivative of the drawdown curve.
269

270
    Parameters
271
    ----------
272
    head : :class: 'array'
273
        An array with the observed head values.
274
    time: :class: 'array'
275
        An array with the time values for the observed head values.
276
    method : :class:`str`, optional
277
        Method to calculate the time derivative.
278
        Default: "bourdet"
279

280
    Returns
281
    -------
282
    The derivative of the observed heads.
283

284
    """
285
    # create arrays for the input of head and time.
286
    derhead = np.zeros(len(head))
×
287
    t = np.arange(len(time))
×
288
    if method == "bourdet":
×
289
        for i in t[1:-1]:
×
290
            # derivative approximation by Bourdet (1989)
291
            dh = (
×
292
                (
293
                    (head[i] - head[i - 1])
294
                    / (np.log(time[i]) - np.log(time[i - 1]))
295
                    * (np.log(time[i + 1]) - np.log(time[i]))
296
                )
297
                + (
298
                    (head[i + 1] - head[i])
299
                    / (np.log(time[i + 1]) - np.log(time[i]))
300
                    * (np.log(time[i]) - np.log(time[i - 1]))
301
                )
302
            ) / (
303
                (np.log(time[i]) - np.log(time[i - 1]))
304
                + (np.log(time[i + 1]) - np.log(time[i]))
305
            )
306
            derhead[i] = dh
×
307
        return derhead
×
308
    else:
309
        raise ValueError(
×
310
            f"smoothing_derivative: method '{method}' is unknown!"
311
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc