• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GeoStat-Framework / welltestpy / 10691143420

03 Sep 2024 09:48PM UTC coverage: 76.813% (+0.8%) from 76.01%
10691143420

Pull #35

github

web-flow
Merge 1ebfe99fb into 81a2299af
Pull Request #35: Bump actions/download-artifact from 2 to 4.1.7 in /.github/workflows

1928 of 2510 relevant lines covered (76.81%)

10.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/src/welltestpy/data/data_io.py
1
"""welltestpy subpackage providing input-output routines."""
2
import csv
14✔
3
import numbers
14✔
4
import os
14✔
5
import shutil
14✔
6
import tempfile
14✔
7
import zipfile
14✔
8
from io import BytesIO as BytIO
14✔
9
from io import TextIOWrapper as TxtIO
14✔
10

11
import numpy as np
14✔
12
from packaging.version import parse as version_parse
14✔
13

14
from . import campaignlib, testslib, varlib
14✔
15

16
try:
14✔
17
    from .._version import __version__
14✔
18
except ImportError:  # pragma: nocover
×
19
    # package is not installed
20
    __version__ = "0.0.0.dev0"
×
21

22

23
# TOOLS ###
24

25

26
class LoadError(Exception):
14✔
27
    """Loading error for all reading routines."""
28

29
    pass
14✔
30

31

32
def _formstr(string):
14✔
33
    # remove spaces, tabs, linebreaks and other separators
34
    return "".join(str(string).split())
14✔
35

36

37
def _formname(string):
14✔
38
    # remove slashes
39
    string = "".join(str(string).split(os.path.sep))
14✔
40
    # remove spaces, tabs, linebreaks and other separators
41
    return _formstr(string)
14✔
42

43

44
def _nextr(data):
14✔
45
    return tuple(filter(None, next(data)))
14✔
46

47

48
def _check_version(version):
14✔
49
    """At least check major version."""
50
    if version.major > version_parse(__version__).major:
14✔
51
        raise ValueError(f"Unknown version '{version.public}'")
×
52

53

54
# SAVE ###
55

56

57
def save_var(var, path="", name=None):
14✔
58
    """Save a variable to file.
59

60
    This writes the variable to a csv file.
61

62
    Parameters
63
    ----------
64
    path : :class:`str`, optional
65
        Path where the variable should be saved. Default: ``""``
66
    name : :class:`str`, optional
67
        Name of the file. If ``None``, the name will be generated by
68
        ``"Var_"+name``. Default: ``None``
69

70
    Notes
71
    -----
72
    The file will get the suffix ``".var"``.
73
    """
74
    path = os.path.normpath(path)
14✔
75
    # create the path if not existing
76
    if not os.path.exists(path):
14✔
77
        os.makedirs(path)
×
78
    # create a standard name if None is given
79
    if name is None:
14✔
80
        name = "Var_" + var.name
×
81
    # ensure the name ends with '.var'
82
    if name[-4:] != ".var":
14✔
83
        name += ".var"
14✔
84
    name = _formname(name)
14✔
85
    file_path = os.path.join(path, name)
14✔
86
    # write the csv-file
87
    with open(file_path, "w") as csvf:
14✔
88
        writer = csv.writer(
14✔
89
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
90
        )
91
        writer.writerow(["wtp-version", __version__])
14✔
92
        writer.writerow(["Variable"])
14✔
93
        writer.writerow(["name", var.name])
14✔
94
        writer.writerow(["symbol", var.symbol])
14✔
95
        writer.writerow(["units", var.units])
14✔
96
        writer.writerow(["description", var.description])
14✔
97
        if issubclass(np.asanyarray(var.value).dtype.type, numbers.Integral):
14✔
98
            writer.writerow(["integer"])
×
99
        else:
100
            writer.writerow(["float"])
14✔
101
        if var.scalar:
14✔
102
            writer.writerow(["scalar"])
14✔
103
            writer.writerow(["value", var.value])
14✔
104
        else:
105
            writer.writerow(["shape"] + list(np.shape(var.value)))
14✔
106
            tmpvalue = np.reshape(var.value, -1)
14✔
107
            writer.writerow(["values", len(tmpvalue)])
14✔
108
            for val in tmpvalue:
14✔
109
                writer.writerow([val])
14✔
110
    return file_path
14✔
111

112

113
def save_obs(obs, path="", name=None):
14✔
114
    """Save an observation to file.
115

116
    This writes the observation to a csv file.
117

118
    Parameters
119
    ----------
120
    path : :class:`str`, optional
121
        Path where the variable should be saved. Default: ``""``
122
    name : :class:`str`, optional
123
        Name of the file. If ``None``, the name will be generated by
124
        ``"Obs_"+name``. Default: ``None``
125

126
    Notes
127
    -----
128
    The file will get the suffix ``".obs"``.
129
    """
130
    path = os.path.normpath(path)
14✔
131
    # create the path if not existing
132
    if not os.path.exists(path):
14✔
133
        os.makedirs(path)
×
134
    # create a standard name if None is given
135
    if name is None:
14✔
136
        name = "Obs_" + obs.name
×
137
    # ensure the name ends with '.obs'
138
    if name[-4:] != ".obs":
14✔
139
        name += ".obs"
×
140
    name = _formname(name)
14✔
141
    # create temporal directory for the included files
142
    patht = tempfile.mkdtemp(dir=path)
14✔
143
    # write the csv-file
144
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
14✔
145
        writer = csv.writer(
14✔
146
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
147
        )
148
        writer.writerow(["wtp-version", __version__])
14✔
149
        writer.writerow(["Observation"])
14✔
150
        writer.writerow(["name", obs.name])
14✔
151
        writer.writerow(["state", obs.state])
14✔
152
        writer.writerow(["description", obs.description])
14✔
153
        if obs.state == "steady":
14✔
154
            obsname = name[:-4] + "_ObsVar.var"
×
155
            writer.writerow(["observation", obsname])
×
156
            obs._observation.save(patht, obsname)
×
157
        else:
158
            timname = name[:-4] + "_TimVar.var"
14✔
159
            obsname = name[:-4] + "_ObsVar.var"
14✔
160
            writer.writerow(["time", timname])
14✔
161
            writer.writerow(["observation", obsname])
14✔
162
            obs._time.save(patht, timname)
14✔
163
            obs._observation.save(patht, obsname)
14✔
164
    # compress everything to one zip-file
165
    file_path = os.path.join(path, name)
14✔
166
    with zipfile.ZipFile(file_path, "w") as zfile:
14✔
167
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
14✔
168
        if obs.state == "transient":
14✔
169
            zfile.write(os.path.join(patht, timname), timname)
14✔
170
        zfile.write(os.path.join(patht, obsname), obsname)
14✔
171
    shutil.rmtree(patht, ignore_errors=True)
14✔
172
    return file_path
14✔
173

174

175
def save_well(well, path="", name=None):
14✔
176
    """Save a well to file.
177

178
    This writes the variable to a csv file.
179

180
    Parameters
181
    ----------
182
    path : :class:`str`, optional
183
        Path where the variable should be saved. Default: ``""``
184
    name : :class:`str`, optional
185
        Name of the file. If ``None``, the name will be generated by
186
        ``"Well_"+name``. Default: ``None``
187

188
    Notes
189
    -----
190
    The file will get the suffix ``".wel"``.
191
    """
192
    path = os.path.normpath(path)
14✔
193
    # create the path if not existing
194
    if not os.path.exists(path):
14✔
195
        os.makedirs(path)
×
196
    # create a standard name if None is given
197
    if name is None:
14✔
198
        name = "Well_" + well.name
×
199
    # ensure the name ends with '.wel'
200
    if name[-4:] != ".wel":
14✔
201
        name += ".wel"
×
202
    name = _formname(name)
14✔
203
    # create temporal directory for the included files
204
    patht = tempfile.mkdtemp(dir=path)
14✔
205
    # write the csv-file
206
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
14✔
207
        writer = csv.writer(
14✔
208
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
209
        )
210
        writer.writerow(["wtp-version", __version__])
14✔
211
        writer.writerow(["Well"])
14✔
212
        writer.writerow(["name", well.name])
14✔
213
        # define names for the variable-files
214
        radiuname = name[:-4] + "_RadVar.var"
14✔
215
        coordname = name[:-4] + "_CooVar.var"
14✔
216
        welldname = name[:-4] + "_WedVar.var"
14✔
217
        aquifname = name[:-4] + "_AqdVar.var"
14✔
218
        screename = name[:-4] + "_ScrVar.var"
14✔
219
        # save variable-files
220
        writer.writerow(["radius", radiuname])
14✔
221
        well.wellradius.save(patht, radiuname)
14✔
222
        writer.writerow(["coordinates", coordname])
14✔
223
        well.coordinates.save(patht, coordname)
14✔
224
        writer.writerow(["welldepth", welldname])
14✔
225
        well.welldepth.save(patht, welldname)
14✔
226
        writer.writerow(["aquiferdepth", aquifname])
14✔
227
        well.aquiferdepth.save(patht, aquifname)
14✔
228
        writer.writerow(["screensize", screename])
14✔
229
        well.screensize.save(patht, screename)
14✔
230
    # compress everything to one zip-file
231
    file_path = os.path.join(path, name)
14✔
232
    with zipfile.ZipFile(file_path, "w") as zfile:
14✔
233
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
14✔
234
        zfile.write(os.path.join(patht, radiuname), radiuname)
14✔
235
        zfile.write(os.path.join(patht, coordname), coordname)
14✔
236
        zfile.write(os.path.join(patht, welldname), welldname)
14✔
237
        zfile.write(os.path.join(patht, aquifname), aquifname)
14✔
238
        zfile.write(os.path.join(patht, screename), screename)
14✔
239
    # delete the temporary directory
240
    shutil.rmtree(patht, ignore_errors=True)
14✔
241
    return file_path
14✔
242

243

244
def save_campaign(campaign, path="", name=None):
14✔
245
    """Save the campaign to file.
246

247
    This writes the campaign to a csv file.
248

249
    Parameters
250
    ----------
251
    path : :class:`str`, optional
252
        Path where the variable should be saved. Default: ``""``
253
    name : :class:`str`, optional
254
        Name of the file. If ``None``, the name will be generated by
255
        ``"Cmp_"+name``. Default: ``None``
256

257
    Notes
258
    -----
259
    The file will get the suffix ``".cmp"``.
260
    """
261
    path = os.path.normpath(path)
14✔
262
    # create the path if not existing
263
    if not os.path.exists(path):
14✔
264
        os.makedirs(path)
×
265
    # create a standard name if None is given
266
    if name is None:
14✔
267
        name = "Cmp_" + campaign.name
14✔
268
    # ensure the name ends with '.cmp'
269
    if name[-4:] != ".cmp":
14✔
270
        name += ".cmp"
14✔
271
    name = _formname(name)
14✔
272
    # create temporal directory for the included files
273
    patht = tempfile.mkdtemp(dir=path)
14✔
274
    # write the csv-file
275
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
14✔
276
        writer = csv.writer(
14✔
277
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
278
        )
279
        writer.writerow(["wtp-version", __version__])
14✔
280
        writer.writerow(["Campaign"])
14✔
281
        writer.writerow(["name", campaign.name])
14✔
282
        writer.writerow(["description", campaign.description])
14✔
283
        writer.writerow(["timeframe", campaign.timeframe])
14✔
284
        # define names for the variable-files
285
        if campaign.fieldsite is not None:
14✔
286
            fieldsname = name[:-4] + "_Fieldsite.fds"
14✔
287
            # save variable-files
288
            writer.writerow(["fieldsite", fieldsname])
14✔
289
            campaign.fieldsite.save(patht, fieldsname)
14✔
290
        else:
291
            writer.writerow(["fieldsite", "None"])
×
292

293
        wkeys = tuple(campaign.wells.keys())
14✔
294
        writer.writerow(["Wells", len(wkeys)])
14✔
295
        wellsname = {}
14✔
296
        for k in wkeys:
14✔
297
            wellsname[k] = name[:-4] + "_" + k + "_Well.wel"
14✔
298
            writer.writerow([k, wellsname[k]])
14✔
299
            campaign.wells[k].save(patht, wellsname[k])
14✔
300

301
        tkeys = tuple(campaign.tests.keys())
14✔
302
        writer.writerow(["Tests", len(tkeys)])
14✔
303
        testsname = {}
14✔
304
        for k in tkeys:
14✔
305
            testsname[k] = name[:-4] + "_" + k + "_Test.tst"
14✔
306
            writer.writerow([k, testsname[k]])
14✔
307
            campaign.tests[k].save(patht, testsname[k])
14✔
308

309
    # compress everything to one zip-file
310
    file_path = os.path.join(path, name)
14✔
311
    with zipfile.ZipFile(file_path, "w") as zfile:
14✔
312
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
14✔
313
        if campaign.fieldsite is not None:
14✔
314
            zfile.write(os.path.join(patht, fieldsname), fieldsname)
14✔
315
        for k in wkeys:
14✔
316
            zfile.write(os.path.join(patht, wellsname[k]), wellsname[k])
14✔
317
        for k in tkeys:
14✔
318
            zfile.write(os.path.join(patht, testsname[k]), testsname[k])
14✔
319
    # delete the temporary directory
320
    shutil.rmtree(patht, ignore_errors=True)
14✔
321
    return file_path
14✔
322

323

324
def save_fieldsite(fieldsite, path="", name=None):
14✔
325
    """Save a field site to file.
326

327
    This writes the field site to a csv file.
328

329
    Parameters
330
    ----------
331
    path : :class:`str`, optional
332
        Path where the variable should be saved. Default: ``""``
333
    name : :class:`str`, optional
334
        Name of the file. If ``None``, the name will be generated by
335
        ``"Field_"+name``. Default: ``None``
336

337
    Notes
338
    -----
339
    The file will get the suffix ``".fds"``.
340
    """
341
    path = os.path.normpath(path)
14✔
342
    # create the path if not existing
343
    if not os.path.exists(path):
14✔
344
        os.makedirs(path)
×
345
    # create a standard name if None is given
346
    if name is None:
14✔
347
        name = "Field_" + fieldsite.name
×
348
    # ensure the name ends with '.fds'
349
    if name[-4:] != ".fds":
14✔
350
        name += ".fds"
×
351
    name = _formname(name)
14✔
352
    # create temporal directory for the included files
353
    patht = tempfile.mkdtemp(dir=path)
14✔
354
    # write the csv-file
355
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
14✔
356
        writer = csv.writer(
14✔
357
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
358
        )
359
        writer.writerow(["wtp-version", __version__])
14✔
360
        writer.writerow(["Fieldsite"])
14✔
361
        writer.writerow(["name", fieldsite.name])
14✔
362
        writer.writerow(["description", fieldsite.description])
14✔
363
        # define names for the variable-files
364
        if fieldsite.coordinates is not None:
14✔
365
            coordname = name[:-4] + "_CooVar.var"
14✔
366
            # save variable-files
367
            writer.writerow(["coordinates", coordname])
14✔
368
            fieldsite.coordinates.save(patht, coordname)
14✔
369
        else:
370
            writer.writerow(["coordinates", "None"])
×
371
    # compress everything to one zip-file
372
    file_path = os.path.join(path, name)
14✔
373
    with zipfile.ZipFile(file_path, "w") as zfile:
14✔
374
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
14✔
375
        if fieldsite.coordinates is not None:
14✔
376
            zfile.write(os.path.join(patht, coordname), coordname)
14✔
377
    # delete the temporary directory
378
    shutil.rmtree(patht, ignore_errors=True)
14✔
379
    return file_path
14✔
380

381

382
def save_pumping_test(pump_test, path="", name=None):
14✔
383
    """Save a pumping test to file.
384

385
    This writes the variable to a csv file.
386

387
    Parameters
388
    ----------
389
    path : :class:`str`, optional
390
        Path where the variable should be saved. Default: ``""``
391
    name : :class:`str`, optional
392
        Name of the file. If ``None``, the name will be generated by
393
        ``"Test_"+name``. Default: ``None``
394

395
    Notes
396
    -----
397
    The file will get the suffix ``".tst"``.
398
    """
399
    path = os.path.normpath(path)
14✔
400
    # create the path if not existing
401
    if not os.path.exists(path):
14✔
402
        os.makedirs(path)
×
403
    # create a standard name if None is given
404
    if name is None:
14✔
405
        name = "Test_" + pump_test.name
×
406
    # ensure the name ends with '.tst'
407
    if name[-4:] != ".tst":
14✔
408
        name += ".tst"
×
409
    name = _formname(name)
14✔
410
    # create temporal directory for the included files
411
    patht = tempfile.mkdtemp(dir=path)
14✔
412
    # write the csv-file
413
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
14✔
414
        writer = csv.writer(
14✔
415
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
416
        )
417
        writer.writerow(["wtp-version", __version__])
14✔
418
        writer.writerow(["Testtype", "PumpingTest"])
14✔
419
        writer.writerow(["name", pump_test.name])
14✔
420
        writer.writerow(["description", pump_test.description])
14✔
421
        writer.writerow(["timeframe", pump_test.timeframe])
14✔
422
        writer.writerow(["pumpingwell", pump_test.pumpingwell])
14✔
423
        # define names for the variable-files (file extension added autom.)
424
        pumprname = name[:-4] + "_PprVar"
14✔
425
        aquidname = name[:-4] + "_AqdVar"
14✔
426
        aquirname = name[:-4] + "_AqrVar"
14✔
427
        # save variable-files
428
        pumpr_path = pump_test.pumpingrate.save(patht, pumprname)
14✔
429
        pumpr_base = os.path.basename(pumpr_path)
14✔
430
        writer.writerow(["pumpingrate", pumpr_base])
14✔
431
        aquid_path = pump_test.aquiferdepth.save(patht, aquidname)
14✔
432
        aquid_base = os.path.basename(aquid_path)
14✔
433
        writer.writerow(["aquiferdepth", aquid_base])
14✔
434
        aquir_path = pump_test.aquiferradius.save(patht, aquirname)
14✔
435
        aquir_base = os.path.basename(aquir_path)
14✔
436
        writer.writerow(["aquiferradius", aquir_base])
14✔
437
        okeys = tuple(pump_test.observations.keys())
14✔
438
        writer.writerow(["Observations", len(okeys)])
14✔
439
        obsname = {}
14✔
440
        for k in okeys:
14✔
441
            obsname[k] = name[:-4] + "_" + k + "_Obs.obs"
14✔
442
            writer.writerow([k, obsname[k]])
14✔
443
            pump_test.observations[k].save(patht, obsname[k])
14✔
444
    # compress everything to one zip-file
445
    file_path = os.path.join(path, name)
14✔
446
    with zipfile.ZipFile(file_path, "w") as zfile:
14✔
447
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
14✔
448
        zfile.write(pumpr_path, pumpr_base)
14✔
449
        zfile.write(aquir_path, aquir_base)
14✔
450
        zfile.write(aquid_path, aquid_base)
14✔
451
        for k in okeys:
14✔
452
            zfile.write(os.path.join(patht, obsname[k]), obsname[k])
14✔
453
    # delete the temporary directory
454
    shutil.rmtree(patht, ignore_errors=True)
14✔
455
    return file_path
14✔
456

457

458
# LOAD ###
459

460

461
def _load_var_data(data):
14✔
462
    # default version string
463
    version_string = "1.0.0"
14✔
464
    first_line = _nextr(data)
14✔
465
    if first_line[0] == "wtp-version":
14✔
466
        version_string = first_line[1]
14✔
467
        header = _nextr(data)
14✔
468
    else:
469
        header = first_line
×
470
    version = version_parse(version_string)
14✔
471
    _check_version(version)
14✔
472
    if header[0] != "Variable":
14✔
473
        raise ValueError(
×
474
            f"load_var: expected 'Variable' but got '{header[0]}'"
475
        )
476
    name = next(data)[1]
14✔
477
    symbol = next(data)[1]
14✔
478
    units = next(data)[1]
14✔
479
    description = next(data)[1]
14✔
480
    integer = next(data)[0] == "integer"
14✔
481
    shapenfo = _nextr(data)
14✔
482
    if shapenfo[0] == "scalar":
14✔
483
        if integer:
14✔
484
            value = int(next(data)[1])
×
485
        else:
486
            value = float(next(data)[1])
14✔
487
    else:
488
        shape = tuple(np.array(shapenfo[1:], dtype=int))
14✔
489
        vcnt = int(next(data)[1])
14✔
490
        vlist = []
14✔
491
        for __ in range(vcnt):
14✔
492
            vlist.append(next(data)[0])
14✔
493
        if integer:
14✔
494
            value = np.array(vlist, dtype=int).reshape(shape)
×
495
        else:
496
            value = np.array(vlist, dtype=float).reshape(shape)
14✔
497

498
    return varlib.Variable(name, value, symbol, units, description)
14✔
499

500

501
def load_var(varfile):
14✔
502
    """Load a variable from file.
503

504
    This reads a variable from a csv file.
505

506
    Parameters
507
    ----------
508
    varfile : :class:`str`
509
        Path to the file
510
    """
511
    cleanup = False
14✔
512
    try:
14✔
513
        # read file
514
        data_file = open(varfile, "r")
14✔
515
    except TypeError:  # if it is an instance of TextIOWrapper
14✔
516
        try:
14✔
517
            # read stream
518
            data = csv.reader(varfile)
14✔
519
        except Exception as exc:
×
520
            raise LoadError(
×
521
                f"load_var: couldn't read file '{varfile}'"
522
            ) from exc
523
    else:
524
        data = csv.reader(data_file)
×
525
        cleanup = True
×
526

527
    try:
14✔
528
        var = _load_var_data(data)
14✔
529
    except Exception as exc:
×
530
        raise LoadError(
×
531
            f"load_var: couldn't load variable '{varfile}'"
532
        ) from exc
533

534
    if cleanup:
14✔
535
        data_file.close()
×
536

537
    return var
14✔
538

539

540
def load_obs(obsfile):
14✔
541
    """Load an observation from file.
542

543
    This reads a observation from a csv file.
544

545
    Parameters
546
    ----------
547
    obsfile : :class:`str`
548
        Path to the file
549
    """
550
    # default version string
551
    version_string = "1.0.0"
14✔
552
    try:
14✔
553
        with zipfile.ZipFile(obsfile, "r") as zfile:
14✔
554
            info = TxtIO(zfile.open("info.csv"))
14✔
555
            data = csv.reader(info)
14✔
556
            first_line = _nextr(data)
14✔
557
            if first_line[0] == "wtp-version":
14✔
558
                version_string = first_line[1]
14✔
559
                header = _nextr(data)
14✔
560
            else:
561
                header = first_line
×
562
            version = version_parse(version_string)
14✔
563
            _check_version(version)
14✔
564
            if header[0] != "Observation":
14✔
565
                raise ValueError(
×
566
                    f"load_obs: expected 'Observation' but got '{header[0]}'"
567
                )
568
            name = next(data)[1]
14✔
569
            steady = next(data)[1] == "steady"
14✔
570
            description = next(data)[1]
14✔
571
            if not steady:
14✔
572
                timef = next(data)[1]
14✔
573
            obsf = next(data)[1]
14✔
574
            # read time if not steady
575
            time = None
14✔
576
            if not steady:
14✔
577
                time = load_var(TxtIO(zfile.open(timef)))
14✔
578
            # read observation
579
            obs = load_var(TxtIO(zfile.open(obsf)))
14✔
580
        # generate observation object
581
        observation = varlib.Observation(name, obs, time, description)
14✔
582
    except Exception as exc:
×
583
        raise LoadError(
×
584
            f"load_obs: couldn't load observation '{obsfile}'"
585
        ) from exc
586
    return observation
14✔
587

588

589
def load_well(welfile):
14✔
590
    """Load a well from file.
591

592
    This reads a well from a csv file.
593

594
    Parameters
595
    ----------
596
    welfile : :class:`str`
597
        Path to the file
598
    """
599
    # default version string
600
    version_string = "1.0.0"
14✔
601
    try:
14✔
602
        with zipfile.ZipFile(welfile, "r") as zfile:
14✔
603
            info = TxtIO(zfile.open("info.csv"))
14✔
604
            data = csv.reader(info)
14✔
605
            first_line = _nextr(data)
14✔
606
            if first_line[0] == "wtp-version":
14✔
607
                version_string = first_line[1]
14✔
608
                header = _nextr(data)
14✔
609
            else:
610
                header = first_line
×
611
            version = version_parse(version_string)
14✔
612
            _check_version(version)
14✔
613
            if header[0] != "Well":
14✔
614
                raise ValueError(
×
615
                    f"load_well: expected 'Well' but got '{header[0]}'"
616
                )
617
            name = next(data)[1]
14✔
618
            # radius
619
            radf = next(data)[1]
14✔
620
            rad = load_var(TxtIO(zfile.open(radf)))
14✔
621
            # coordinates
622
            coordf = next(data)[1]
14✔
623
            coord = load_var(TxtIO(zfile.open(coordf)))
14✔
624
            # well depth
625
            welldf = next(data)[1]
14✔
626
            welld = load_var(TxtIO(zfile.open(welldf)))
14✔
627
            # aquifer depth
628
            aquidf = next(data)[1]
14✔
629
            aquid = load_var(TxtIO(zfile.open(aquidf)))
14✔
630
            # read screensize implemented in v1.1
631
            screend = None
14✔
632
            if version.release >= (1, 1):
14✔
633
                screenf = next(data)[1]
14✔
634
                screend = load_var(TxtIO(zfile.open(screenf)))
14✔
635

636
        well = varlib.Well(name, rad, coord, welld, aquid, screend)
14✔
637
    except Exception as exc:
×
638
        raise LoadError(f"load_well: couldn't load well '{welfile}'") from exc
×
639
    return well
14✔
640

641

642
def load_campaign(cmpfile):
14✔
643
    """Load a campaign from file.
644

645
    This reads a campaign from a csv file.
646

647
    Parameters
648
    ----------
649
    cmpfile : :class:`str`
650
        Path to the file
651
    """
652
    # default version string
653
    version_string = "1.0.0"
14✔
654
    try:
14✔
655
        with zipfile.ZipFile(cmpfile, "r") as zfile:
14✔
656
            info = TxtIO(zfile.open("info.csv"))
14✔
657
            data = csv.reader(info)
14✔
658
            first_line = _nextr(data)
14✔
659
            if first_line[0] == "wtp-version":
14✔
660
                version_string = first_line[1]
14✔
661
                header = _nextr(data)
14✔
662
            else:
663
                header = first_line
×
664
            version = version_parse(version_string)
14✔
665
            _check_version(version)
14✔
666
            if header[0] != "Campaign":
14✔
667
                raise ValueError(
×
668
                    f"load_campaign: expected 'Campaign' but got '{header[0]}'"
669
                )
670
            name = next(data)[1]
14✔
671
            description = next(data)[1]
14✔
672
            timeframe = next(data)[1]
14✔
673
            row = _nextr(data)
14✔
674
            if row[1] == "None":
14✔
675
                fieldsite = None
×
676
            else:
677
                fieldsite = load_fieldsite(BytIO(zfile.read(row[1])))
14✔
678
            wcnt = int(next(data)[1])
14✔
679
            wells = {}
14✔
680
            for __ in range(wcnt):
14✔
681
                row = _nextr(data)
14✔
682
                wells[row[0]] = load_well(BytIO(zfile.read(row[1])))
14✔
683

684
            tcnt = int(next(data)[1])
14✔
685
            tests = {}
14✔
686
            for __ in range(tcnt):
14✔
687
                row = _nextr(data)
14✔
688
                tests[row[0]] = load_test(BytIO(zfile.read(row[1])))
14✔
689

690
        campaign = campaignlib.Campaign(
14✔
691
            name, fieldsite, wells, tests, timeframe, description
692
        )
693
    except Exception as exc:
×
694
        raise LoadError(
×
695
            f"load_campaign: couldn't load campaign '{cmpfile}'"
696
        ) from exc
697
    return campaign
14✔
698

699

700
def load_fieldsite(fdsfile):
14✔
701
    """Load a field site from file.
702

703
    This reads a field site from a csv file.
704

705
    Parameters
706
    ----------
707
    fdsfile : :class:`str`
708
        Path to the file
709
    """
710
    # default version string
711
    version_string = "1.0.0"
14✔
712
    try:
14✔
713
        with zipfile.ZipFile(fdsfile, "r") as zfile:
14✔
714
            info = TxtIO(zfile.open("info.csv"))
14✔
715
            data = csv.reader(info)
14✔
716
            first_line = _nextr(data)
14✔
717
            if first_line[0] == "wtp-version":
14✔
718
                version_string = first_line[1]
14✔
719
                header = _nextr(data)
14✔
720
            else:
721
                header = first_line
×
722
            version = version_parse(version_string)
14✔
723
            _check_version(version)
14✔
724
            if header[0] != "Fieldsite":
14✔
725
                raise ValueError(
×
726
                    "load_fieldsite: expected 'Fieldsite' "
727
                    f"but got '{header[0]}'"
728
                )
729
            name = next(data)[1]
14✔
730
            description = next(data)[1]
14✔
731
            coordinfo = next(data)[1]
14✔
732
            if coordinfo == "None":
14✔
733
                coordinates = None
×
734
            else:
735
                coordinates = load_var(TxtIO(zfile.open(coordinfo)))
14✔
736
        fieldsite = campaignlib.FieldSite(name, description, coordinates)
14✔
737
    except Exception as exc:
×
738
        raise LoadError(
×
739
            f"load_fieldsite: couldn't load fieldsite '{fdsfile}'"
740
        ) from exc
741
    return fieldsite
14✔
742

743

744
def load_test(tstfile):
14✔
745
    """Load a test from file.
746

747
    This reads a test from a csv file.
748

749
    Parameters
750
    ----------
751
    tstfile : :class:`str`
752
        Path to the file
753
    """
754
    # default version string
755
    version_string = "1.0.0"
14✔
756
    try:
14✔
757
        with zipfile.ZipFile(tstfile, "r") as zfile:
14✔
758
            info = TxtIO(zfile.open("info.csv"))
14✔
759
            data = csv.reader(info)
14✔
760
            first_line = _nextr(data)
14✔
761
            if first_line[0] == "wtp-version":
14✔
762
                version_string = first_line[1]
14✔
763
                header = _nextr(data)
14✔
764
            else:
765
                header = first_line
×
766
            version = version_parse(version_string)
14✔
767
            _check_version(version)
14✔
768
            if header[0] != "Testtype":
14✔
769
                raise ValueError(
×
770
                    f"load_test: expected 'Testtype' but got '{header[0]}'"
771
                )
772
            if header[1] == "PumpingTest":
14✔
773
                routine = _load_pumping_test
14✔
774
            else:
775
                raise ValueError(f"load_test: unknown test type '{header[1]}'")
×
776
    except Exception as exc:
×
777
        raise LoadError(f"load_test: couldn't load test '{tstfile}'") from exc
×
778
    return routine(tstfile)
14✔
779

780

781
def _load_pumping_test(tstfile):
14✔
782
    """Load a pumping test from file.
783

784
    This reads a pumping test from a csv file.
785

786
    Parameters
787
    ----------
788
    tstfile : :class:`str`
789
        Path to the file
790
    """
791
    # default version string
792
    version_string = "1.0.0"
14✔
793
    try:
14✔
794
        with zipfile.ZipFile(tstfile, "r") as zfile:
14✔
795
            info = TxtIO(zfile.open("info.csv"))
14✔
796
            data = csv.reader(info)
14✔
797
            first_line = _nextr(data)
14✔
798
            if first_line[0] == "wtp-version":
14✔
799
                version_string = first_line[1]
14✔
800
                header = _nextr(data)
14✔
801
            else:
802
                header = first_line
×
803
            version = version_parse(version_string)
14✔
804
            _check_version(version)
14✔
805
            if header[1] != "PumpingTest":
14✔
806
                raise ValueError(
×
807
                    f"load_test: expected 'PumpingTest' but got '{header[1]}'"
808
                )
809
            name = next(data)[1]
14✔
810
            description = next(data)[1]
14✔
811
            timeframe = next(data)[1]
14✔
812
            pumpingwell = next(data)[1]
14✔
813
            rate_raw = TxtIO(zfile.open(next(data)[1]))
14✔
814
            try:
14✔
815
                pumpingrate = load_var(rate_raw)
14✔
816
            except Exception:
×
817
                pumpingrate = load_obs(rate_raw)
×
818
            aquiferdepth = load_var(TxtIO(zfile.open(next(data)[1])))
14✔
819
            aquiferradius = load_var(TxtIO(zfile.open(next(data)[1])))
14✔
820
            obscnt = int(next(data)[1])
14✔
821
            observations = {}
14✔
822
            for __ in range(obscnt):
14✔
823
                row = _nextr(data)
14✔
824
                observations[row[0]] = load_obs(BytIO(zfile.read(row[1])))
14✔
825

826
        pumpingtest = testslib.PumpingTest(
14✔
827
            name,
828
            pumpingwell,
829
            pumpingrate,
830
            observations,
831
            aquiferdepth,
832
            aquiferradius,
833
            description,
834
            timeframe,
835
        )
836
    except Exception as exc:
×
837
        raise LoadError(
×
838
            f"load_test: couldn't load pumpingtest '{tstfile}'"
839
        ) from exc
840
    return pumpingtest
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc