• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pypest / pyemu / 19592938997

14 Nov 2025 05:37PM UTC coverage: 77.871% (+0.07%) from 77.802%
19592938997

push

github

web-flow
Merge pull request #637 from briochh/feat_pestppnightly

Use pestpp-nightly-builds in CI

18 of 22 new or added lines in 1 file covered. (81.82%)

3 existing lines in 1 file now uncovered.

13896 of 17845 relevant lines covered (77.87%)

8.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.39
/pyemu/utils/get_pestpp.py
1
#!/usr/bin/env python3
2
"""Download and install the PEST++ software suite.
3

4
This script originates from pyemu: https://github.com/pypest/pyemu
5
This file can be downloaded and run independently outside pyemu.
6
It requires Python 3.6 or later, and has no dependencies.
7

8
See https://developer.github.com/v3/repos/releases/ for GitHub Releases API.
9
"""
10
import json
11✔
11
import os
11✔
12
import shutil
11✔
13
import sys
11✔
14
import tarfile
11✔
15
import tempfile
11✔
16
import urllib
11✔
17
import urllib.request
11✔
18
import warnings
11✔
19
import zipfile
11✔
20
from importlib.util import find_spec
11✔
21
from pathlib import Path
11✔
22

23
__all__ = ["run_main"]
11✔
24
__license__ = "CC0"
11✔
25

26
from typing import Dict, List, Tuple
11✔
27

28
default_owner = "usgs"
11✔
29
default_repo = "pestpp"
11✔
30
# key is the repo name, value is the renamed file prefix for the download
31
renamed_prefix = {
11✔
32
    "pestpp": "pestpp",
33
    "pestpp-nightly-builds": "pestpp",
34
}
35
available_repos = list(renamed_prefix.keys())
11✔
36
available_ostags = ["linux", "mac", "win"]
11✔
37
max_http_tries = 3
11✔
38

39
# Check if this is running from pyemu
40
within_pyemu = False
11✔
41
spec = find_spec("pyemu")
11✔
42
if spec is not None:
11✔
43
    within_pyemu = (
11✔
44
        Path(spec.origin).resolve().parent in Path(__file__).resolve().parents
45
    )
46
del spec
11✔
47

48
# local pyemu install location (selected with :pyemu)
49
pyemu_appdata_path = (
11✔
50
    Path(os.path.expandvars(r"%LOCALAPPDATA%\pyemu"))
51
    if sys.platform.startswith("win")
52
    else Path.home() / ".local" / "share" / "pyemu"
53
)
54

55

56
def get_ostag() -> str:
11✔
57
    """Determine operating system tag from sys.platform."""
58
    if sys.platform.startswith("linux"):
11✔
59
        return "linux"
5✔
60
    elif sys.platform.startswith("win"):
6✔
61
        return "win"
5✔
62
    elif sys.platform.startswith("darwin") or sys.platform.startswith("mac"):
1✔
63
        return "mac"
1✔
64
    raise ValueError(f"platform {sys.platform!r} not supported")
×
65

66

67
def get_suffixes(ostag) -> Tuple[str, str]:
11✔
68
    if ostag in ["win"]:
11✔
69
        return ".exe", ".dll"
5✔
70
    elif ostag == "linux":
6✔
71
        return "", ".so"
5✔
72
    elif ostag == "mac":
1✔
73
        return "", ".dylib"
1✔
74
    else:
75
        raise KeyError(
×
76
            f"unrecognized ostag {ostag!r}; choose one of {available_ostags}"
77
        )
78

79

80
def get_request(url, params={}):
11✔
81
    """Get urllib.request.Request, with parameters and headers.
82

83
    This bears GITHUB_TOKEN if it is set as an environment variable.
84
    """
85
    if isinstance(params, dict):
11✔
86
        if len(params) > 0:
11✔
87
            url += "?" + urllib.parse.urlencode(params)
×
88
    else:
89
        raise TypeError("data must be a dict")
×
90
    headers = {}
11✔
91
    github_token = os.environ.get("GITHUB_TOKEN")
11✔
92
    if github_token:
11✔
93
        headers["Authorization"] = f"Bearer {github_token}"
11✔
94
    return urllib.request.Request(url, headers=headers)
11✔
95

96

97
def get_releases(
11✔
98
    owner=None, repo=None, quiet=False, per_page=None
99
) -> List[str]:
100
    """Get list of available releases."""
101
    owner, repo = _get_defaults(owner, repo)
11✔
102
    req_url = f"https://api.github.com/repos/{owner}/{repo}/releases"
11✔
103

104
    params = {}
11✔
105
    if per_page is not None:
11✔
106
        if per_page < 1 or per_page > 100:
11✔
107
            raise ValueError("per_page must be between 1 and 100")
11✔
108
        params["per_page"] = per_page
×
109

110
    request = get_request(req_url, params=params)
11✔
111
    num_tries = 0
11✔
112
    while True:
11✔
113
        num_tries += 1
11✔
114
        try:
11✔
115
            with urllib.request.urlopen(request, timeout=10) as resp:
11✔
116
                result = resp.read()
11✔
117
                break
11✔
118
        except urllib.error.HTTPError as err:
11✔
119
            if err.code == 401 and os.environ.get("GITHUB_TOKEN"):
11✔
120
                raise ValueError("GITHUB_TOKEN env is invalid") from err
×
121
            elif err.code == 403 and "rate limit exceeded" in err.reason:
11✔
122
                raise ValueError(
×
123
                    f"use GITHUB_TOKEN env to bypass rate limit ({err})"
124
                ) from err
125
            elif err.code in (404, 503) and num_tries < max_http_tries:
11✔
126
                # GitHub sometimes returns this error for valid URLs, so retry
127
                print(f"URL request {num_tries} did not work ({err})")
11✔
128
                continue
11✔
129
            raise RuntimeError(f"cannot retrieve data from {req_url}") from err
11✔
130

131
    releases = json.loads(result.decode())
11✔
132
    if not quiet:
11✔
133
        print(f"found {len(releases)} releases for {owner}/{repo}")
11✔
134

135
    avail_releases = ["latest"]
11✔
136
    avail_releases.extend(release["tag_name"] for release in releases)
11✔
137
    return avail_releases
11✔
138

139

140
def _get_defaults(owner=None, repo=None):
11✔
141
    """Get default owner and repo if not provided."""
142
    default_owner_dict = {'pestpp': "usgs",
11✔
143
                          'pestpp-nightly-builds': "pestpp"}
144
    default_repo_dict = {o: r for r, o in default_owner_dict.items()}
11✔
145
    # if nothing passed
146
    if owner is None and repo is None:
11✔
NEW
147
        owner = default_owner
×
148

149
    if repo is None:
11✔
NEW
150
        repo = default_repo_dict.get(owner, default_repo)
×
151
    elif owner is None:
11✔
152
        owner = default_owner_dict.get(repo, default_owner)
11✔
153
    return owner, repo
11✔
154

155
def get_release(owner=None, repo=None, tag="latest", quiet=False) -> dict:
11✔
156
    """Get info about a particular release."""
157
    owner, repo = _get_defaults(owner, repo)
11✔
158
    api_url = f"https://api.github.com/repos/{owner}/{repo}"
11✔
159
    req_url = (
11✔
160
        f"{api_url}/releases/latest"
161
        if tag == "latest"
162
        else f"{api_url}/releases/tags/{tag}"
163
    )
164
    request = get_request(req_url)
11✔
165
    releases = None
11✔
166
    num_tries = 0
11✔
167

168
    while True:
11✔
169
        num_tries += 1
11✔
170
        try:
11✔
171
            with urllib.request.urlopen(request, timeout=10) as resp:
11✔
172
                result = resp.read()
11✔
173
                remaining = resp.headers.get("x-ratelimit-remaining",None)
11✔
174
                if remaining is not None and int(remaining) <= 10:
11✔
175
                    warnings.warn(
×
176
                        f"Only {remaining} GitHub API requests remaining "
177
                        "before rate-limiting"
178
                    )
179
                break
11✔
180
        except urllib.error.HTTPError as err:
11✔
181
            if err.code == 401 and os.environ.get("GITHUB_TOKEN"):
11✔
NEW
182
                raise IOError("GITHUB_TOKEN env is invalid") from err
×
183
            elif err.code == 403 and "rate limit exceeded" in err.reason:
11✔
NEW
184
                raise IOError(
×
185
                    f"use GITHUB_TOKEN env to bypass rate limit ({err})"
186
                ) from err
187
            elif err.code == 404:
11✔
188
                if releases is None:
11✔
189
                    releases = get_releases(owner, repo, quiet)
11✔
190
                if tag not in releases:
×
191
                    raise ValueError(
×
192
                        f"Release {tag} not found (choose from {', '.join(releases)})"
193
                    )
194
            elif err.code == 503 and num_tries < max_http_tries:
×
195
                # GitHub sometimes returns this error for valid URLs, so retry
196
                warnings.warn(f"URL request {num_tries} did not work ({err})")
×
197
                continue
×
198
            raise RuntimeError(f"cannot retrieve data from {req_url}") from err
×
199

200
    release = json.loads(result.decode())
11✔
201
    tag_name = release["tag_name"]
11✔
202
    if not quiet:
11✔
203
        print(f"\nFetched release {tag_name!r} info from '{owner}/{repo}'.")
11✔
204

205
    return release
11✔
206

207

208
def columns_str(items, line_chars=79) -> str:
11✔
209
    """Return str of columns of items, similar to 'ls' command."""
210
    item_chars = max(len(item) for item in items)
11✔
211
    num_cols = line_chars // item_chars
11✔
212
    if num_cols == 0:
11✔
213
        num_cols = 1
×
214
    num_rows = len(items) // num_cols
11✔
215
    if len(items) % num_cols != 0:
11✔
216
        num_rows += 1
11✔
217
    lines = []
11✔
218
    for row_num in range(num_rows):
11✔
219
        row_items = items[row_num::num_rows]
11✔
220
        lines.append(
11✔
221
            " ".join(item.ljust(item_chars) for item in row_items).rstrip()
222
        )
223
    return "\n".join(lines)
11✔
224

225

226
def get_bindir_options(previous=None) -> Dict[str, Tuple[Path, str]]:
11✔
227
    """Generate install location options based on platform and filesystem access."""
228
    options = {}  # key is an option name, value is (optpath, optinfo)
11✔
229
    if previous is not None and os.access(previous, os.W_OK):
11✔
230
        # Make previous bindir as the first option
231
        options[":prev"] = (previous, "previously selected bindir")
×
232
    if within_pyemu:  # don't check is_dir() or access yet
11✔
233
        options[":pyemu"] = (pyemu_appdata_path / "bin", "used by pyemu")
11✔
234
    # Python bin (same for standard or conda varieties)
235
    py_bin = Path(sys.prefix) / (
11✔
236
        "Scripts" if get_ostag().startswith("win") else "bin"
237
    )
238
    if py_bin.is_dir() and os.access(py_bin, os.W_OK):
11✔
239
        options[":python"] = (py_bin, "used by Python")
11✔
240
    home_local_bin = Path.home() / ".local" / "bin"
11✔
241
    if home_local_bin.is_dir() and os.access(home_local_bin, os.W_OK):
11✔
242
        options[":home"] = (home_local_bin, "user-specific bindir")
11✔
243
    local_bin = Path("/usr") / "local" / "bin"
11✔
244
    if local_bin.is_dir() and os.access(local_bin, os.W_OK):
11✔
245
        options[":system"] = (local_bin, "system local bindir")
6✔
246
    # Windows user
247
    windowsapps_dir = Path(
11✔
248
        os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\WindowsApps")
249
    )
250
    if windowsapps_dir.is_dir() and os.access(windowsapps_dir, os.W_OK):
11✔
251
        options[":windowsapps"] = (windowsapps_dir, "User App path")
5✔
252

253
    # any other possible OS-specific hard-coded locations?
254
    if not options:
11✔
255
        raise RuntimeError("could not find any installable folders")
×
256

257
    return options
11✔
258

259

260
def select_bindir(bindir, previous=None, quiet=False, is_cli=False) -> Path:
11✔
261
    """Resolve an install location if provided, or prompt interactive user to select one."""
262
    options = get_bindir_options(previous)
11✔
263

264
    if len(bindir) > 1:  # auto-select mode
11✔
265
        # match one option that starts with input, e.g. :Py -> :python
266
        sel = list(opt for opt in options if opt.startswith(bindir.lower()))
11✔
267
        if len(sel) != 1:
11✔
268
            opt_avail = ", ".join(
×
269
                f"'{opt}' for '{optpath}'"
270
                for opt, (optpath, _) in options.items()
271
            )
272
            raise ValueError(
×
273
                f"invalid option '{bindir}', choose from: {opt_avail}"
274
            )
275
        if not quiet:
11✔
276
            print(f"auto-selecting option {sel[0]!r} for 'bindir'")
11✔
277
        return Path(options[sel[0]][0]).resolve()
11✔
278
    else:
279
        if not is_cli:
×
280
            opt_avail = ", ".join(
×
281
                f"'{opt}' for '{optpath}'"
282
                for opt, (optpath, _) in options.items()
283
            )
284
            raise ValueError(f"specify the option, choose from: {opt_avail}")
×
285

286
        ioptions = dict(enumerate(options.keys(), 1))
×
287
        print("select a number to extract executables to a directory:")
×
288
        for iopt, opt in ioptions.items():
×
289
            optpath, optinfo = options[opt]
×
290
            print(f" {iopt}: '{optpath}' -- {optinfo} ('{opt}')")
×
291
        num_tries = 0
×
292
        while True:
×
293
            num_tries += 1
×
294
            res = input("> ")
×
295
            try:
×
296
                opt = ioptions[int(res)]
×
297
                print(f"selecting option {opt!r}")
×
298
                return Path(options[opt][0]).resolve()
×
299
            except (KeyError, ValueError):
×
300
                if num_tries < 2:
×
301
                    print("invalid option, try choosing option again")
×
302
                else:
303
                    raise RuntimeError(
×
304
                        "invalid option, too many attempts"
305
                    ) from None
306

307

308
def run_main(
11✔
309
    bindir,
310
    owner=default_owner,
311
    repo=default_repo,
312
    release_id="latest",
313
    ostag=None,
314
    subset=None,
315
    downloads_dir=None,
316
    force=False,
317
    quiet=False,
318
    _is_cli=False,
319
):
320
    """Run main method to get the PEST++ software suite.
321

322
    Parameters
323
    ----------
324
    bindir : str or Path
325
        Writable path to extract executables. Auto-select options start with a
326
        colon character. See error message or other documentation for further
327
        information on auto-select options.
328
    owner : str, default "usgs"
329
        Name of GitHub repository owner (user or organization).
330
    repo : str, default "pestpp"
331
        Name of GitHub PEST++ repository.
332
    release_id : str, default "latest"
333
        GitHub release ID.
334
    ostag : str, optional
335
        Operating system tag; default is to automatically choose.
336
    subset : list, set or str, optional
337
        Optional subset of executables to extract, specified as a list (e.g.)
338
        ``["pestpp-glm", "pestpp-ies"]`` or a comma-separated string
339
        "pestpp-glm,pestpp-ies".
340
    downloads_dir : str or Path, optional
341
        Manually specify directory to download archives. Default is to use
342
        home Downloads, if available, otherwise a temporary directory.
343
    force : bool, default False
344
        If True, always download archive. Default False will use archive if
345
        previously downloaded in ``downloads_dir``.
346
    quiet : bool, default False
347
        If True, show fewer messages.
348
    _is_cli : bool, default False
349
        Control behavior of method if this is run as a command-line interface
350
        or as a Python function.
351
    """
352
    meta_path = False
11✔
353
    prev_bindir = None
11✔
354
    pyemu_bin = False
11✔
355
    if within_pyemu:
11✔
356
        meta_list = []
11✔
357
        # Store metadata and possibly 'bin' in a user-writable path
358
        if not pyemu_appdata_path.exists():
11✔
359
            pyemu_appdata_path.mkdir(parents=True, exist_ok=True)
×
360
        pyemu_bin = pyemu_appdata_path / "bin"
11✔
361
        meta_path = pyemu_appdata_path / "get_pestpp.json"
11✔
362
        meta_path_exists = meta_path.exists()
11✔
363
        if meta_path_exists:
11✔
364
            del_meta_path = False
11✔
365
            try:
11✔
366
                meta_list = json.loads(meta_path.read_text())
11✔
367
            except (OSError, json.JSONDecodeError) as err:
×
368
                print(f"cannot read pyemu metadata file '{meta_path}': {err}")
×
369
                if isinstance(err, OSError):
×
370
                    meta_path = False
×
371
                if isinstance(err, json.JSONDecodeError):
×
372
                    del_meta_path = True
×
373
            try:
11✔
374
                prev_bindir = Path(meta_list[-1]["bindir"])
11✔
375
            except (KeyError, IndexError):
×
376
                del_meta_path = True
×
377
            if del_meta_path:
11✔
378
                try:
×
379
                    meta_path.unlink()
×
380
                    meta_path_exists = False
×
381
                    print(f"removed corrupt pyemu metadata file '{meta_path}'")
×
382
                except OSError as err:
×
383
                    print(f"cannot remove pyemu metadata file: {err!r}")
×
384
                    meta_path = False
×
385

386
    if ostag is None:
11✔
387
        ostag = get_ostag()
11✔
388

389
    exe_suffix, lib_suffix = get_suffixes(ostag)
11✔
390

391
    # select bindir if path not provided
392
    if isinstance(bindir, str):
11✔
393
        if bindir.startswith(":"):
11✔
394
            bindir = select_bindir(
×
395
                bindir, previous=prev_bindir, quiet=quiet, is_cli=_is_cli
396
            )  # returns resolved Path
397
        else:
398
            bindir = Path(bindir).resolve()
11✔
399
    elif isinstance(bindir, Path):
×
400
        bindir = bindir.resolve()
×
401
    else:
402
        raise ValueError("Invalid bindir option (expected string or Path)")
×
403

404
    # make sure bindir exists
405
    if bindir == pyemu_bin:
11✔
406
        if not within_pyemu:
×
407
            raise ValueError("option ':pyemu' is only for pyemu")
×
408
        elif not pyemu_bin.exists():
×
409
            # special case option that can create non-existing directory
410
            pyemu_bin.mkdir(parents=True, exist_ok=True)
×
411
    if not bindir.is_dir():
11✔
412
        raise OSError(f"extraction directory '{bindir}' does not exist")
×
413
    elif not os.access(bindir, os.W_OK):
11✔
414
        raise OSError(f"extraction directory '{bindir}' is not writable")
×
415

416
    # make sure repo option is valid
417
    if repo not in available_repos:
11✔
418
        raise KeyError(
×
419
            f"repo {repo!r} not supported; choose one of {available_repos}"
420
        )
421

422
    # get the selected release
423
    release = get_release(owner, repo, release_id, quiet)
11✔
424
    assets = release.get("assets", [])
11✔
425

426
    inconsistent_ostag_dict = {
11✔
427
        "win": "iwin",
428
        "mac": "mac",
429
        "linux": "linux",
430
    }
431

432
    for asset in assets:
11✔
433
        if ostag in asset["name"] or inconsistent_ostag_dict[ostag] in asset["name"]:
11✔
434
            break
11✔
435
    else:
436
        raise ValueError(
×
437
            f"could not find ostag {ostag!r} from release {release['tag_name']!r}; "
438
            f"see available assets here:\n{release['html_url']}"
439
        )
440
    asset_name = asset["name"]
11✔
441
    download_url = asset["browser_download_url"]
11✔
442
    asset_pth = Path(asset_name)
11✔
443
    asset_stem = asset_pth.stem
11✔
444
    if str(asset_pth).endswith("tar.gz"):
11✔
445
        asset_suffix = ".tar.gz"
6✔
446
    else:
447
        asset_suffix = asset_pth.suffix
11✔
448
    if repo == "pestpp":
11✔
449
        dst_fname = "-".join([repo, release["tag_name"], ostag]) + asset_suffix
11✔
450
    else:
451
        # change local download name so it is more unique
452
        dst_fname = "-".join(
11✔
453
            [renamed_prefix[repo], release["tag_name"], asset_name]
454
        )
455
    tmpdir = None
11✔
456
    if downloads_dir is None:
11✔
457
        downloads_dir = Path.home() / "Downloads"
×
458
        if not (downloads_dir.is_dir() and os.access(downloads_dir, os.W_OK)):
×
459
            tmpdir = tempfile.TemporaryDirectory()
×
460
            downloads_dir = Path(tmpdir.name)
×
461
    else:  # check user-defined
462
        downloads_dir = Path(downloads_dir)
11✔
463
        if not downloads_dir.is_dir():
11✔
464
            raise OSError(
×
465
                f"downloads directory '{downloads_dir}' does not exist"
466
            )
467
        elif not os.access(downloads_dir, os.W_OK):
11✔
468
            raise OSError(
×
469
                f"downloads directory '{downloads_dir}' is not writable"
470
            )
471
    download_pth = downloads_dir / dst_fname
11✔
472
    if download_pth.is_file() and not force:
11✔
473
        if not quiet:
×
474
            print(
×
475
                f"using previous download '{download_pth}' (use "
476
                f"{'--force' if _is_cli else 'force=True'!r} to re-download)"
477
            )
478
    else:
479
        if not quiet:
11✔
480
            print(f"\nDownloading '{download_url}' to '{download_pth}'.")
11✔
481
        urllib.request.urlretrieve(download_url, download_pth)
11✔
482

483
    if subset:
11✔
484
        if not isinstance(subset, (str, list, tuple)):
×
485
            raise TypeError(
×
486
                "subset but be a comma-separated string, list, or tuple"
487
            )
488
        print(subset)
×
489
        if isinstance(subset, str):
×
490
            for rep_text in ("'", '"'):
×
491
                subset = subset.replace(rep_text, "")
×
492
            subset = subset.split(sep=",")
×
493
        if ostag in ("win",):
×
494
            for idx, entry in enumerate(subset):
×
495
                if entry.startswith("pestpp") and not entry.endswith(
×
496
                    exe_suffix
497
                ):
498
                    subset[idx] = f"{entry}{exe_suffix}"
×
499
        subset = set(subset)
×
500

501
    # Open archive and extract files
502
    extract = set()
11✔
503
    chmod = set()
11✔
504
    items = []
11✔
505
    full_path = {}
11✔
506
    if meta_path:
11✔
507
        from datetime import datetime
11✔
508

509
        meta = {
11✔
510
            "bindir": str(bindir),
511
            "owner": owner,
512
            "repo": repo,
513
            "release_id": release["tag_name"],
514
            "name": asset_name,
515
            "updated_at": asset["updated_at"],
516
            "extracted_at": datetime.now().isoformat(),
517
        }
518
        if subset:
11✔
519
            meta["subset"] = sorted(subset)
×
520

521
    if str(download_pth).endswith(".tar.gz"):
11✔
522
        zip_path = Path(str(download_pth).replace(".tar.gz", ".zip"))
6✔
523
        if not quiet:
6✔
524
            print(
6✔
525
                f"\nA *.tar.gz file ('{download_pth}') has been downloaded "
526
                + f" and will be converted to a zip file ('{zip_path}')."
527
            )
528

529
        if zip_path.exists():
6✔
530
            zip_path.unlink()
×
531

532
        zipf = zipfile.ZipFile(
6✔
533
            file=zip_path, mode="a", compression=zipfile.ZIP_DEFLATED
534
        )
535
        with tarfile.open(name=download_pth, mode="r|gz") as tarf:
6✔
536
            for m in tarf:
6✔
537
                f = tarf.extractfile(m)
6✔
538
                if f is not None:
6✔
539
                    fl = f.read()
6✔
540
                    fn = m.name
6✔
541
                    zipf.writestr(fn, fl)
6✔
542
        zipf.close()
6✔
543
        download_pth = zip_path
6✔
544

545
    with zipfile.ZipFile(download_pth, "r") as zipf:
11✔
546
        # First gather files within internal directories named "bin" or "dist/*/"
547
        for pth in zipf.namelist():
11✔
548
            p = Path(pth)
11✔
549
            if p.parent.name == "bin":
11✔
550
                full_path[p.name] = pth
11✔
551
            elif p.parent.parent.name == "dist":
11✔
552
                full_path[p.name] = pth
11✔
553
        files = set(full_path.keys())
11✔
554

555
        if not files:
11✔
556
            # there was no internal "bin", so assume all files to be extracted
557
            files = set(zipf.namelist())
×
558

559
        code = False
11✔
560
        if "code.json" in files and repo == "pestpp":
11✔
561
            code_bytes = zipf.read("code.json")
×
562
            code = json.loads(code_bytes.decode())
×
563
            if meta_path:
×
564
                import hashlib
×
565

566
                code_md5 = hashlib.md5(code_bytes).hexdigest()
×
567
                meta["code_json_md5"] = code_md5
×
568

569
        if "code.json" in files:
11✔
570
            # don't extract this file
571
            files.remove("code.json")
×
572

573
        if subset:
11✔
574
            nosub = False
×
575
            subset_keys = files
×
576
            if code:
×
577
                subset_keys |= set(code.keys())
×
578
            not_found = subset.difference(subset_keys)
×
579
            if not_found:
×
580
                raise ValueError(
×
581
                    f"subset item{'s' if len(not_found) != 1 else ''} "
582
                    f"not found: {', '.join(sorted(not_found))}\n"
583
                    f"available items are:\n{columns_str(sorted(subset_keys))}"
584
                )
585
        else:
586
            nosub = True
11✔
587
            subset = set()
11✔
588

589
        if code:
11✔
590

591
            def add_item(key, fname, do_chmod):
×
592
                if fname in files:
×
593
                    extract.add(fname)
×
594
                    items.append(f"{fname} ({code[key]['version']})")
×
595
                    if do_chmod:
×
596
                        chmod.add(fname)
×
597
                else:
598
                    print(f"file {fname} does not exist")
×
599
                return
×
600

601
            for key in sorted(code):
×
602
                if code[key].get("shared_object"):
×
603
                    fname = f"{key}{lib_suffix}"
×
604
                    if nosub or (
×
605
                        subset and (key in subset or fname in subset)
606
                    ):
607
                        add_item(key, fname, do_chmod=False)
×
608
                else:
609
                    fname = f"{key}{exe_suffix}"
×
610
                    if nosub or (
×
611
                        subset and (key in subset or fname in subset)
612
                    ):
613
                        add_item(key, fname, do_chmod=True)
×
614
                    # check if double version exists
615
                    fname = f"{key}dbl{exe_suffix}"
×
616
                    if (
×
617
                        code[key].get("double_switch", True)
618
                        and fname in files
619
                        and (
620
                            nosub
621
                            or (subset and (key in subset or fname in subset))
622
                        )
623
                    ):
624
                        add_item(key, fname, do_chmod=True)
×
625

626
        else:
627
            # releases without code.json
628
            for fname in sorted(files):
11✔
629
                if nosub or (subset and fname in subset):
11✔
630
                    if full_path:
11✔
631
                        extract.add(full_path[fname])
11✔
632
                    else:
633
                        extract.add(fname)
×
634
                    items.append(fname)
11✔
635
                    if not fname.endswith(lib_suffix):
11✔
636
                        chmod.add(fname)
11✔
637
        if not quiet:
11✔
638
            print(
11✔
639
                f"\nExtracting {len(extract)} "
640
                f"file{'s' if len(extract) != 1 else ''} to '{bindir}'"
641
            )
642

643
        zipf.extractall(bindir, members=extract)
11✔
644

645
    # If this is a TemporaryDirectory, then delete the directory and files
646
    del tmpdir
11✔
647

648
    if full_path:
11✔
649
        # move files that used a full path to bindir
650
        rmdirs = set()
11✔
651
        for fpath in extract:
11✔
652
            fpath = Path(fpath)
11✔
653
            bindir_path = bindir / fpath
11✔
654
            bindir_path.replace(bindir / fpath.name)
11✔
655
            rmdirs.add(fpath.parent)
11✔
656
        # clean up directories, starting with the longest
657
        for rmdir in reversed(sorted(rmdirs)):
11✔
658
            bindir_path = bindir / rmdir
11✔
659
            bindir_path.rmdir()
11✔
660
            for subdir in rmdir.parents:
11✔
661
                bindir_path = bindir / subdir
11✔
662
                if bindir_path == bindir:
11✔
663
                    break
11✔
664
                shutil.rmtree(str(bindir_path))
11✔
665

666
    if ostag in ["linux", "mac"]:
11✔
667
        # similar to "chmod +x fname" for each executable
668
        for fname in chmod:
6✔
669
            pth = bindir / fname
6✔
670
            pth.chmod(pth.stat().st_mode | 0o111)
6✔
671

672
    # Show listing
673
    if not quiet:
11✔
674
        if any(items):
11✔
675
            print(columns_str(items))
11✔
676

677
        if not subset:
11✔
678
            if full_path:
11✔
679
                extract = {Path(fpth).name for fpth in extract}
11✔
680
            unexpected = extract.difference(files)
11✔
681
            if unexpected:
11✔
682
                print(f"unexpected remaining {len(unexpected)} files:")
×
683
                print(columns_str(sorted(unexpected)))
×
684

685
    # Save metadata, only for pyemu
686
    if meta_path:
11✔
687
        if "pytest" in str(bindir) or "pytest" in sys.modules:
11✔
688
            # Don't write metadata if this is part of pytest
689
            print("skipping writing pyemu metadata for pytest")
11✔
690
            return
11✔
691
        meta_list.append(meta)
×
692
        if not pyemu_appdata_path.exists():
×
693
            pyemu_appdata_path.mkdir(parents=True, exist_ok=True)
×
694
        try:
×
695
            meta_path.write_text(json.dumps(meta_list, indent=4) + "\n")
×
696
        except OSError as err:
×
697
            print(f"cannot write pyemu metadata file: '{meta_path}': {err!r}")
×
698
        if not quiet:
×
699
            if meta_path_exists:
×
700
                print(f"\nUpdated pyemu metadata file: '{meta_path}'")
×
701
            else:
702
                print(f"\nWrote new pyemu metadata file: '{meta_path}'")
×
703

704

705
def cli_main():
11✔
706
    """Command-line interface."""
707
    import argparse
×
708

709
    # Show meaningful examples at bottom of help
710
    prog = Path(sys.argv[0]).stem
×
711
    if sys.platform.startswith("win"):
×
712
        drv = Path("c:/")
×
713
    else:
714
        drv = Path("/")
×
715
    example_bindir = drv / "path" / "to" / "bin"
×
716
    examples = f"""\
×
717
Examples:
718

719
  Install executables into an existing '{example_bindir}' directory:
720
    $ {prog} {example_bindir}
721

722
  Install a development snapshot of PEST ++ by choosing a repo:
723
    $ {prog} --repo pestpp {example_bindir}
724
    """
725
    if within_pyemu:
×
726
        examples += f"""\
×
727

728
  PpyEMU users can install executables using a special option:
729
    $ {prog} :pyemu
730
    """
731

732
    parser = argparse.ArgumentParser(
×
733
        description=__doc__.split("\n")[0],
734
        formatter_class=argparse.RawDescriptionHelpFormatter,
735
        epilog=examples,
736
    )
737

738
    bindir_help = (
×
739
        "Directory to extract executables. Use ':' to interactively select an "
740
        "option of paths. Other auto-select options are only available if the "
741
        "current user can write files. "
742
    )
743
    if within_pyemu:
×
744
        bindir_help += (
×
745
            "Option ':prev' is the previously used 'bindir' path selection. "
746
            "Option ':pyemu' will create and install programs for pyEMY. "
747
        )
748
    if sys.platform.startswith("win"):
×
749
        bindir_help += (
×
750
            "Option ':python' is Python's Scripts directory. "
751
            "Option ':windowsapps' is "
752
            "'%%LOCALAPPDATA%%\\Microsoft\\WindowsApps'."
753
        )
754
    else:
755
        bindir_help += (
×
756
            "Option ':python' is Python's bin directory. "
757
            "Option ':home' is '$HOME/.local/bin'. "
758
            "Option ':system' is '/usr/local/bin'."
759
        )
760
    parser.add_argument("bindir", help=bindir_help)
×
761
    parser.add_argument(
×
762
        "--owner",
763
        type=str,
764
        default=default_owner,
765
        help=f"GitHub repository owner; default is '{default_owner}'.",
766
    )
767
    parser.add_argument(
×
768
        "--repo",
769
        choices=available_repos,
770
        default=default_repo,
771
        help=f"Name of GitHub repository; default is '{default_repo}'.",
772
    )
773
    parser.add_argument(
×
774
        "--release-id",
775
        default="latest",
776
        help="GitHub release ID; default is 'latest'.",
777
    )
778
    parser.add_argument(
×
779
        "--ostag",
780
        choices=available_ostags,
781
        help="Operating system tag; default is to automatically choose.",
782
    )
783
    parser.add_argument(
×
784
        "--subset",
785
        help="Subset of executables to extract, specified as a "
786
        "comma-separated string, e.g. 'pestpp-glm,pestpp-ies'.",
787
    )
788
    parser.add_argument(
×
789
        "--downloads-dir",
790
        help="Manually specify directory to download archives.",
791
    )
792
    parser.add_argument(
×
793
        "--force",
794
        action="store_true",
795
        help="Force re-download archive. Default behavior will use archive if "
796
        "previously downloaded in downloads-dir.",
797
    )
798
    parser.add_argument(
×
799
        "--quiet", action="store_true", help="Show fewer messages."
800
    )
801
    args = vars(parser.parse_args())
×
802
    try:
×
803
        run_main(**args, _is_cli=True)
×
804
    except (EOFError, KeyboardInterrupt):
×
805
        sys.exit(f" cancelling '{sys.argv[0]}'")
×
806

807

808
if __name__ == "__main__":
11✔
809
    """Run command-line interface, if run as a script."""
×
810
    cli_main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc