• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GeoStat-Framework / welltestpy / 10691143420

03 Sep 2024 09:48PM UTC coverage: 76.813% (+0.8%) from 76.01%
10691143420

Pull #35

github

web-flow
Merge 1ebfe99fb into 81a2299af
Pull Request #35: Bump actions/download-artifact from 2 to 4.1.7 in /.github/workflows

1928 of 2510 relevant lines covered (76.81%)

10.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.38
/src/welltestpy/process/processlib.py
1
"""welltestpy subpackage providing functions to pre process data."""
2
from copy import deepcopy as dcopy
14✔
3

4
import numpy as np
14✔
5
from scipy import signal
14✔
6

7
from ..data import testslib
14✔
8

9
__all__ = [
14✔
10
    "normpumptest",
11
    "combinepumptest",
12
    "filterdrawdown",
13
    "cooper_jacob_correction",
14
    "smoothing_derivative",
15
]
16

17

18
def normpumptest(pumptest, pumpingrate=-1.0, factor=1.0):
14✔
19
    """Normalize the pumping rate of a pumping test.
20

21
    Parameters
22
    ----------
23
    pumpingrate : :class:`float`, optional
24
        Pumping rate. Default: ``-1.0``
25
    factor : :class:`float`, optional
26
        Scaling factor that can be used for unit conversion. Default: ``1.0``
27
    """
28
    if not isinstance(pumptest, testslib.PumpingTest):
14✔
29
        raise ValueError(str(pumptest) + " is no pumping test")
×
30

31
    if not pumptest.constant_rate:
14✔
32
        raise ValueError(str(pumptest) + " is no constant rate pumping test")
×
33

34
    oldprate = dcopy(pumptest.rate)
14✔
35
    pumptest.pumpingrate = pumpingrate
14✔
36

37
    for obs in pumptest.observations:
14✔
38
        pumptest.observations[obs].observation *= (
14✔
39
            factor * pumptest.rate / oldprate
40
        )
41

42

43
def combinepumptest(
14✔
44
    campaign,
45
    test1,
46
    test2,
47
    pumpingrate=None,
48
    finalname=None,
49
    factor1=1.0,
50
    factor2=1.0,
51
    infooftest1=True,
52
    replace=True,
53
):
54
    """Combine two pumping tests to one.
55

56
    They need to have the same pumping well.
57

58
    Parameters
59
    ----------
60
    campaign : :class:`welltestpy.data.Campaign`
61
        The pumping test campaign which should be used.
62
    test1 : :class:`str`
63
        Name of test 1.
64
    test2 : :class:`str`
65
        Name of test 2.
66
    pumpingrate : :class:`float`, optional
67
        Pumping rate. Default: ``-1.0``
68
    finalname : :class:`str`, optional
69
        Name of the final test. If `replace` is `True` and `finalname` is
70
        `None`, it will get the name of test 1. Else it will get a combined
71
        name of test 1 and test 2.
72
        Default: ``None``
73
    factor1 : :class:`float`, optional
74
        Scaling factor for test 1 that can be used for unit conversion.
75
        Default: ``1.0``
76
    factor2 : :class:`float`, optional
77
        Scaling factor for test 2 that can be used for unit conversion.
78
        Default: ``1.0``
79
    infooftest1 : :class:`bool`, optional
80
        State if the final test should take the information from test 1.
81
        Default: ``True``
82
    replace : :class:`bool`, optional
83
        State if the original tests should be erased.
84
        Default: ``True``
85
    """
86
    if test1 not in campaign.tests:
×
87
        raise ValueError(
×
88
            "combinepumptest: "
89
            + str(test1)
90
            + " not a test in "
91
            + "campaign "
92
            + str(campaign.name)
93
        )
94
    if test2 not in campaign.tests:
×
95
        raise ValueError(
×
96
            "combinepumptest: "
97
            + str(test2)
98
            + " not a test in "
99
            + "campaign "
100
            + str(campaign.name)
101
        )
102

103
    if finalname is None:
×
104
        if replace:
×
105
            finalname = test1
×
106
        else:
107
            finalname = test1 + "+" + test2
×
108

109
    if campaign.tests[test1].testtype != "PumpingTest":
×
110
        raise ValueError(
×
111
            "combinepumptest:" + str(test1) + " is no pumpingtest"
112
        )
113
    if campaign.tests[test2].testtype != "PumpingTest":
×
114
        raise ValueError(
×
115
            "combinepumptest:" + str(test2) + " is no pumpingtest"
116
        )
117

118
    if campaign.tests[test1].pumpingwell != campaign.tests[test2].pumpingwell:
×
119
        raise ValueError(
×
120
            "combinepumptest: The Pumpingtests do not have the "
121
            + "same pumping-well"
122
        )
123

124
    pwell = campaign.tests[test1].pumpingwell
×
125

126
    wellset1 = set(campaign.tests[test1].wells)
×
127
    wellset2 = set(campaign.tests[test2].wells)
×
128

129
    commonwells = wellset1 & wellset2
×
130

131
    if commonwells != {pwell} and commonwells != set():
×
132
        raise ValueError(
×
133
            "combinepumptest: The Pumpingtests shouldn't have "
134
            + "common observation-wells"
135
        )
136

137
    temptest1 = dcopy(campaign.tests[test1])
×
138
    temptest2 = dcopy(campaign.tests[test2])
×
139

140
    if pumpingrate is None:
×
141
        if infooftest1:
×
142
            pumpingrate = temptest1.rate
×
143
        else:
144
            pumpingrate = temptest2.rate
×
145

146
    normpumptest(temptest1, pumpingrate, factor1)
×
147
    normpumptest(temptest2, pumpingrate, factor2)
×
148

149
    prate = temptest1.rate
×
150

151
    if infooftest1:
×
152
        if pwell in temptest1.observations and pwell in temptest2.observations:
×
153
            temptest2.del_observations(pwell)
×
154
        aquiferdepth = temptest1.depth
×
155
        aquiferradius = temptest1.radius
×
156
        description = temptest1.description
×
157
        timeframe = temptest1.timeframe
×
158
    else:
159
        if pwell in temptest1.observations and pwell in temptest2.observations:
×
160
            temptest1.del_observations(pwell)
×
161
        aquiferdepth = temptest2.depth
×
162
        aquiferradius = temptest2.radius
×
163
        description = temptest2.description
×
164
        timeframe = temptest2.timeframe
×
165

166
    observations = dcopy(temptest1.observations)
×
167
    observations.update(temptest2.observations)
×
168

169
    if infooftest1:
×
170
        aquiferdepth = temptest1.depth
×
171
        aquiferradius = temptest1.radius
×
172
        description = temptest1.description
×
173
        timeframe = temptest1.timeframe
×
174
    else:
175
        aquiferdepth = temptest2.depth
×
176
        aquiferradius = temptest2.radius
×
177
        description = temptest2.description
×
178
        timeframe = temptest2.timeframe
×
179

180
    finalpt = testslib.PumpingTest(
×
181
        finalname,
182
        pwell,
183
        prate,
184
        observations,
185
        aquiferdepth,
186
        aquiferradius,
187
        description,
188
        timeframe,
189
    )
190

191
    campaign.addtests(finalpt)
×
192

193
    if replace:
×
194
        campaign.deltests([test1, test2])
×
195

196

197
def filterdrawdown(observation, tout=None, dxscale=2):
14✔
198
    """Smooth the drawdown data of an observation well.
199

200
    Parameters
201
    ----------
202
    observation : :class:`welltestpy.data.Observation`
203
        The observation to be smoothed.
204
    tout : :class:`numpy.ndarray`, optional
205
        Time points to evaluate the smoothed observation at. If ``None``,
206
        the original time points of the observation are taken.
207
        Default: ``None``
208
    dxscale : :class:`int`, optional
209
        Scale of time-steps used for smoothing.
210
        Default: ``2``
211
    """
212
    head, time = observation()
14✔
213
    head = np.array(head, dtype=float).reshape(-1)
14✔
214
    time = np.array(time, dtype=float).reshape(-1)
14✔
215

216
    if tout is None:
14✔
217
        tout = dcopy(time)
×
218
    tout = np.array(tout, dtype=float).reshape(-1)
14✔
219

220
    if len(time) == 1:
14✔
221
        return observation(time=tout, observation=np.full_like(tout, head[0]))
×
222

223
    # make the data equal-spaced to use filter with
224
    # a fraction of the minimal timestep
225
    dxv = dxscale * int((time[-1] - time[0]) / max(np.diff(time).min(), 1.0))
14✔
226
    tequal = np.linspace(time[0], time[-1], dxv)
14✔
227
    hequal = np.interp(tequal, time, head)
14✔
228
    # size = h.max() - h.min()
229

230
    try:
14✔
231
        para1, para2 = signal.butter(1, 0.025)  # size/10.)
14✔
232
        hfilt = signal.filtfilt(para1, para2, hequal, padlen=150)
14✔
233
        hout = np.interp(tout, tequal, hfilt)
14✔
234
    except ValueError:  # in this case there are to few data points
×
235
        hout = np.interp(tout, time, head)
×
236

237
    return observation(time=tout, observation=hout)
14✔
238

239

240
def cooper_jacob_correction(observation, sat_thickness):
14✔
241
    """
242
    Correction method for observed drawdown for unconfined aquifers.
243

244
    Parameters
245
    ----------
246
    observation : :class:`welltestpy.data.Observation`
247
        The observation to be corrected.
248
    sat_thickness : :class:`float`
249
        Vertical length of the aquifer in which its pores are filled with water.
250

251
    Returns
252
    -------
253
    The corrected drawdown
254

255
    """
256
    # split the observations into array for head.
257
    head = observation.observation
14✔
258

259
    # cooper and jacob correction
260
    head = head - (head**2) / (2 * sat_thickness)
14✔
261

262
    # return new observation
263
    observation(observation=head)
14✔
264

265
    return observation
14✔
266

267

268
def smoothing_derivative(head, time, method="bourdet"):
14✔
269
    """Calculate the derivative of the drawdown curve.
270

271
    Parameters
272
    ----------
273
    head : :class: 'array'
274
        An array with the observed head values.
275
    time: :class: 'array'
276
        An array with the time values for the observed head values.
277
    method : :class:`str`, optional
278
        Method to calculate the time derivative.
279
        Default: "bourdet"
280

281
    Returns
282
    -------
283
    The derivative of the observed heads.
284

285
    """
286
    # create arrays for the input of head and time.
287
    derhead = np.zeros(len(head))
×
288
    t = np.arange(len(time))
×
289
    if method == "bourdet":
×
290
        for i in t[1:-1]:
×
291
            # derivative approximation by Bourdet (1989)
292
            dh = (
×
293
                (
294
                    (head[i] - head[i - 1])
295
                    / (np.log(time[i]) - np.log(time[i - 1]))
296
                    * (np.log(time[i + 1]) - np.log(time[i]))
297
                )
298
                + (
299
                    (head[i + 1] - head[i])
300
                    / (np.log(time[i + 1]) - np.log(time[i]))
301
                    * (np.log(time[i]) - np.log(time[i - 1]))
302
                )
303
            ) / (
304
                (np.log(time[i]) - np.log(time[i - 1]))
305
                + (np.log(time[i + 1]) - np.log(time[i]))
306
            )
307
            derhead[i] = dh
×
308
        return derhead
×
309
    else:
310
        raise ValueError(
×
311
            f"smoothing_derivative: method '{method}' is unknown!"
312
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc