• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emcek / dcspy / 11846138840

14 Nov 2024 09:51PM UTC coverage: 96.044% (+0.04%) from 96.004%
11846138840

Pull #404

github

emcek
allow string or Version object as current_ver parameter
Pull Request #404: Add Asset and Release models for Github

613 of 680 branches covered (90.15%)

Branch coverage included in aggregate %.

59 of 59 new or added lines in 3 files covered. (100.0%)

7 existing lines in 2 files now uncovered.

2106 of 2151 relevant lines covered (97.91%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.29
/src/dcspy/utils.py
1
from __future__ import annotations
1✔
2

3
import json
1✔
4
import sys
1✔
5
import zipfile
1✔
6
from collections.abc import Callable, Generator, Sequence
1✔
7
from datetime import datetime
1✔
8
from functools import lru_cache
1✔
9
from glob import glob
1✔
10
from itertools import chain
1✔
11
from logging import getLogger
1✔
12
from os import chdir, environ, getcwd, makedirs, walk
1✔
13
from pathlib import Path
1✔
14
from platform import python_implementation, python_version, uname
1✔
15
from pprint import pformat
1✔
16
from re import search, sub
1✔
17
from shutil import rmtree
1✔
18
from subprocess import CalledProcessError, run
1✔
19
from tempfile import gettempdir
1✔
20
from typing import Any, ClassVar
1✔
21

22
import yaml
1✔
23
from packaging import version
1✔
24
from psutil import process_iter
1✔
25
from requests import get
1✔
26

27
from dcspy.models import (CTRL_LIST_SEPARATOR, AnyButton, BiosValue, ControlDepiction, ControlKeyData, DcsBiosPlaneData, DcspyConfigYaml, Release, RequestModel,
1✔
28
                          get_key_instance)
29

30
try:
1✔
31
    import git
1✔
32
except ImportError:
×
33
    pass
×
34

35
LOG = getLogger(__name__)
1✔
36
__version__ = '3.6.1'
1✔
37
CONFIG_YAML = 'config.yaml'
1✔
38
DEFAULT_YAML_FILE = Path(__file__).parent / 'resources' / CONFIG_YAML
1✔
39

40
with open(DEFAULT_YAML_FILE) as c_file:
1✔
41
    defaults_cfg: DcspyConfigYaml = yaml.load(c_file, Loader=yaml.SafeLoader)
1✔
42
    defaults_cfg['dcsbios'] = f'C:\\Users\\{environ.get("USERNAME", "UNKNOWN")}\\Saved Games\\DCS.openbeta\\Scripts\\DCS-BIOS'
1✔
43

44

45
def get_default_yaml(local_appdata: bool = False) -> Path:
1✔
46
    """
47
    Return a full path to the default configuration file.
48

49
    :param local_appdata: If True value C:/Users/<user_name>/AppData/Local is used
50
    :return: Path like an object
51
    """
52
    cfg_ful_path = DEFAULT_YAML_FILE
1✔
53
    if local_appdata:
1!
54
        user_appdata = get_config_yaml_location()
1✔
55
        makedirs(name=user_appdata, exist_ok=True)
1✔
56
        cfg_ful_path = Path(user_appdata / CONFIG_YAML).resolve()
1✔
57
        if not cfg_ful_path.exists():
1!
58
            save_yaml(data=defaults_cfg, full_path=cfg_ful_path)
1✔
59
    return cfg_ful_path
1✔
60

61

62
def load_yaml(full_path: Path) -> dict[str, Any]:
1✔
63
    """
64
    Load YAML from a file into dictionary.
65

66
    :param full_path: Full path to YAML file
67
    :return: Dictionary with data
68
    """
69
    try:
1✔
70
        with open(file=full_path, encoding='utf-8') as yaml_file:
1✔
71
            data = yaml.load(yaml_file, Loader=yaml.SafeLoader)
1✔
72
            if not isinstance(data, dict):
1✔
73
                data = {}
1✔
74
    except (FileNotFoundError, yaml.parser.ParserError) as err:
1✔
75
        makedirs(name=full_path.parent, exist_ok=True)
1✔
76
        LOG.warning(f'{type(err).__name__}: {full_path}.', exc_info=True)
1✔
77
        LOG.debug(f'{err}')
1✔
78
        data = {}
1✔
79
    return data
1✔
80

81

82
def save_yaml(data: dict[str, Any], full_path: Path) -> None:
1✔
83
    """
84
    Save disc as YAML file.
85

86
    :param data: Dictionary with data
87
    :param full_path: Full a path to YAML file
88
    """
89
    with open(file=full_path, mode='w', encoding='utf-8') as yaml_file:
1✔
90
        yaml.dump(data, yaml_file, Dumper=yaml.SafeDumper)
1✔
91

92

93
def check_ver_at_github(repo: str) -> Release:
1✔
94
    """
95
    Check a version of <organization>/<package> at GitHub.
96

97
    :param repo: Format '<organization or user>/<package>'
98
    :return: Release object with data
99
    """
100
    package = repo.split('/')[1]
1✔
101
    try:
1✔
102
        response = get(url=f'https://api.github.com/repos/{repo}/releases/latest', timeout=5)
1✔
103
        if response.ok:
1✔
104
            rel = Release(**response.json())
1✔
105
            LOG.debug(f'Latest GitHub release: {rel}')
1✔
106
            return rel
1✔
107
        else:
108
            LOG.warning(f'Unable to check {package} version online. Try again later. Status={response.status_code}')
1✔
109
            raise Exception(f'Unable to check {package} version online. Try again later. Status={response.status_code}')
1✔
110
    except Exception as exc:
1✔
111
        LOG.warning(f'Unable to check {package} version online: {exc}')
1✔
112
        raise ValueError(f'Unable to check {package} version online: {exc}')
1✔
113

114

115
def get_version_string(repo: str, current_ver: str | version.Version, check: bool = True) -> str:
1✔
116
    """
117
    Generate formatted string with version number.
118

119
    :param repo: Format '<organization or user>/<package>'.
120
    :param current_ver: string or Version object.
121
    :param check: Version online.
122
    :return: Formatted version as string.
123
    """
124
    ver_string = f'v{current_ver}'
1✔
125
    if check:
1✔
126
        try:
1✔
127
            details = ''
1✔
128
            result = check_ver_at_github(repo=repo)
1✔
129
        except ValueError:
1✔
130
            return f'v{current_ver} (failed)'
1✔
131

132
        if result.is_latest(current_ver=current_ver):
1✔
133
            details = ' (latest)'
1✔
134
        elif result.version != version.parse('0.0.0'):
1!
135
            details = ' (update!)'
1✔
136
            current_ver = result.version
1✔
137
        ver_string = f'v{current_ver}{details}'
1✔
138
    return ver_string
1✔
139

140

141
def download_file(url: str, save_path: Path) -> bool:
1✔
142
    """
143
    Download a file from URL and save to save_path.
144

145
    :param url: URL address
146
    :param save_path: full path to save
147
    """
148
    response = get(url=url, stream=True, timeout=5)
1✔
149
    if response.ok:
1✔
150
        LOG.debug(f'Download file from: {url}')
1✔
151
        with open(save_path, 'wb+') as dl_file:
1✔
152
            for chunk in response.iter_content(chunk_size=128):
1✔
153
                dl_file.write(chunk)
1✔
154
            LOG.debug(f'Saved as: {save_path}')
1✔
155
            return True
1✔
156
    else:
157
        LOG.warning(f'Can not download from: {url}')
1✔
158
        return False
1✔
159

160

161
def proc_is_running(name: str) -> int:
1✔
162
    """
163
    Check if the process is running and return its PID.
164

165
    If the process name is not found, 0 (zero) is returned.
166
    :param name: Process name
167
    :return: PID as int
168
    """
169
    for proc in process_iter(['pid', 'name']):
1✔
170
        if name in proc.info['name']:  # type: ignore[attr-defined]
1✔
171
            return proc.info['pid']  # type: ignore[attr-defined]
1✔
172
    return 0
1✔
173

174

175
def check_dcs_ver(dcs_path: Path) -> tuple[str, str]:
1✔
176
    """
177
    Check DCS version and release type.
178

179
    :param dcs_path: Path to DCS installation directory
180
    :return: DCS type and version as strings
181
    """
182
    result_type, result_ver = 'Unknown', 'Unknown'
1✔
183
    try:
1✔
184
        with open(file=dcs_path / 'autoupdate.cfg', encoding='utf-8') as autoupdate_cfg:
1✔
185
            autoupdate_data = autoupdate_cfg.read()
1✔
186
    except (FileNotFoundError, PermissionError) as err:
1✔
187
        LOG.debug(f'{type(err).__name__}: {err.filename}')
1✔
188
    else:
189
        result_type = 'stable'
1✔
190
        if dcs_type := search(r'"branch":\s"([\w.]*)"', autoupdate_data):
1✔
191
            result_type = str(dcs_type.group(1))
1✔
192
        if dcs_ver := search(r'"version":\s"([\d.]*)"', autoupdate_data):
1✔
193
            result_ver = str(dcs_ver.group(1))
1✔
194
    return result_type, result_ver
1✔
195

196

197
def check_bios_ver(bios_path: Path | str) -> version.Version:
1✔
198
    """
199
    Check the DSC-BIOS release version.
200

201
    :param bios_path: Path to DCS-BIOS directory in Saved Games folder
202
    :return: Version object
203
    """
204
    bios_ver = version.parse('0.0.0')
1✔
205
    new_location = Path(bios_path) / 'lib' / 'modules' / 'common_modules' / 'CommonData.lua'
1✔
206
    old_location = Path(bios_path) / 'lib' / 'CommonData.lua'
1✔
207

208
    if new_location.is_file():
1✔
209
        with open(file=new_location, encoding='utf-8') as cd_lua:
1✔
210
            cd_lua_data = cd_lua.read()
1✔
211
    elif old_location.is_file():
1✔
212
        with open(file=old_location, encoding='utf-8') as cd_lua:
1✔
213
            cd_lua_data = cd_lua.read()
1✔
214
    else:
215
        cd_lua_data = ''
1✔
216
        LOG.debug(f'No `CommonData.lua` while checking DCS-BIOS version at {new_location.parent} or {old_location.parent}')
1✔
217

218
    if bios_re := search(r'function getVersion\(\)\s*return\s*\"([\d.]*)\"', cd_lua_data):
1✔
219
        bios_ver = version.parse(bios_re.group(1))
1✔
220
    return bios_ver
1✔
221

222

223
def is_git_repo(dir_path: str) -> bool:
1✔
224
    """
225
    Check if dir_path ios Git repository.
226

227
    :param dir_path: Path as string
228
    :return: True if dir is git repo
229
    """
230
    import git
1✔
231
    try:
1✔
232
        _ = git.Repo(dir_path).git_dir
1✔
233
        return True
1✔
234
    except (git.InvalidGitRepositoryError, git.exc.NoSuchPathError):
1✔
235
        return False
1✔
236

237

238
def _get_sha_hex_str(bios_repo: git.Repo, git_ref: str) -> str:
1✔
239
    """
240
    Return a string representing the commit hash, date, and author of the given Git reference in the provided repository.
241

242
    :param bios_repo: A Git repository object.
243
    :param git_ref: A string representing the Git reference (e.g., commit, branch, tag).
244
    :return: A string representing the commit hash, date, and author.
245
    """
246
    try:
1✔
247
        import git
1✔
UNCOV
248
    except ImportError:
×
UNCOV
249
        raise OSError('Git executable is not available!')
×
250
    try:
1✔
251
        bios_repo.git.checkout(git_ref)
1✔
252
        branch = bios_repo.active_branch.name
1✔
253
        head_commit = bios_repo.head.commit
1✔
254
        sha = f'{branch} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
255
    except (git.exc.GitCommandError, TypeError):
1✔
256
        head_commit = bios_repo.head.commit
1✔
257
        sha = f'{head_commit.hexsha[0:8]} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
258
    LOG.debug(f'Checkout: {head_commit.hexsha} from: {head_commit.committed_datetime} | by: {head_commit.author}\n{head_commit.message}')  # type: ignore
1✔
259
    return sha
1✔
260

261

262
def check_github_repo(git_ref: str, repo_dir: Path, repo: str, update: bool = True, progress: git.RemoteProgress | None = None) -> str:
1✔
263
    """
264
    Update git repository.
265

266
    Return SHA of the latest commit.
267

268
    :param git_ref: Any Git reference as string
269
    :param repo_dir: Local directory for a repository
270
    :param repo: GitHub repository user/name
271
    :param update: Perform update process
272
    :param progress: Progress callback
273
    """
274
    bios_repo = _checkout_repo(repo=repo, repo_dir=repo_dir, progress=progress)
1✔
275
    if update:
1!
276
        f_info = bios_repo.remotes[0].pull(progress=progress)
1✔
277
        LOG.debug(f'Pulled: {f_info[0].name} as: {f_info[0].commit}')
1✔
278
    sha = _get_sha_hex_str(bios_repo, git_ref)
1✔
279
    return sha
1✔
280

281

282
def _checkout_repo(repo: str, repo_dir: Path, checkout_ref: str = 'master', progress: git.RemoteProgress | None = None) -> git.Repo:
1✔
283
    """
284
    Checkout repository at master branch or clone it when not exists in a system.
285

286
    :param repo: Repository name
287
    :param repo_dir: Local repository directory
288
    :param checkout_ref: Checkout a git reference
289
    :param progress: Progress callback
290
    :return: Repo object of the repository
291
    """
292
    import git
1✔
293

294
    makedirs(name=repo_dir, exist_ok=True)
1✔
295
    if is_git_repo(str(repo_dir)):
1✔
296
        bios_repo = git.Repo(repo_dir)
1✔
297
        bios_repo.git.checkout(checkout_ref)
1✔
298
    else:
299
        rmtree(path=repo_dir, ignore_errors=True)
1✔
300
        bios_repo = git.Repo.clone_from(url=f'https://github.com/{repo}.git', to_path=repo_dir, progress=progress)  # type: ignore
1✔
301
    return bios_repo
1✔
302

303

304
def check_dcs_bios_entry(lua_dst_data: str, lua_dst_path: Path, temp_dir: Path) -> str:
1✔
305
    """
306
    Check DCS-BIOS entry in Export.lua file.
307

308
    :param lua_dst_data: Content of Export.lua
309
    :param lua_dst_path: Export.lua path
310
    :param temp_dir: Directory with DCS-BIOS archive
311
    :return: Result of checks
312
    """
313
    result = '\n\nExport.lua exists.'
1✔
314
    lua = 'Export.lua'
1✔
315
    with open(file=temp_dir / lua, encoding='utf-8') as lua_src:
1✔
316
        lua_src_data = lua_src.read()
1✔
317
    export_re = search(r'dofile\(lfs.writedir\(\)\s*\.\.\s*\[\[Scripts\\DCS-BIOS\\BIOS\.lua]]\)', lua_dst_data)
1✔
318
    if not export_re:
1✔
319
        with open(file=lua_dst_path / lua, mode='a+', encoding='utf-8') as exportlua_dst:
1✔
320
            exportlua_dst.write(f'\n{lua_src_data}')
1✔
321
        LOG.debug(f'Add DCS-BIOS to Export.lua: {lua_src_data}')
1✔
322
        result += '\n\nDCS-BIOS entry added.\n\nYou verify installation at:\ngithub.com/DCS-Skunkworks/DCSFlightpanels/wiki/Installation'
1✔
323
    else:
324
        result += '\n\nDCS-BIOS entry detected.'
1✔
325
    return result
1✔
326

327

328
def count_files(directory: Path, extension: str) -> int:
1✔
329
    """
330
    Count files with extension in directory.
331

332
    :param directory: as Path object
333
    :param extension: file extension
334
    :return: number of files
335
    """
336
    try:
1✔
337
        json_files = [f.name for f in directory.iterdir() if f.is_file() and f.suffix == f'.{extension}']
1!
338
        LOG.debug(f'In: {directory} found {json_files} ')
1✔
339
        return len(json_files)
1✔
340
    except FileNotFoundError:
1✔
341
        LOG.debug(f'Wrong directory: {directory}')
1✔
342
        return -1
1✔
343

344

345
def is_git_exec_present() -> bool:
1✔
346
    """
347
    Check if git executable is present in a system.
348

349
    :return: True if git.exe is available
350
    """
351
    try:
1✔
352
        import git
1✔
353
        return bool(git.GIT_OK)
1✔
354
    except ImportError as err:
×
UNCOV
355
        LOG.debug(type(err).__name__, exc_info=True)
×
UNCOV
356
        return False
×
357

358

359
def is_git_object(repo_dir: Path, git_obj: str) -> bool:
1✔
360
    """
361
    Check if git_obj is a valid Git reference.
362

363
    :param repo_dir: Directory with repository
364
    :param git_obj: Git reference to check
365
    :return: True if git_obj is git reference, False otherwise
366
    """
367
    import gitdb  # type: ignore[import-untyped]
1✔
368
    result = False
1✔
369
    if is_git_repo(str(repo_dir)):
1✔
370
        bios_repo = git.Repo(repo_dir)
1✔
371
        bios_repo.git.checkout('master')
1✔
372
        try:
1✔
373
            bios_repo.commit(git_obj)
1✔
374
            result = True
1✔
375
        except gitdb.exc.BadName:
1✔
376
            pass
1✔
377
    return result
1✔
378

379

380
def get_all_git_refs(repo_dir: Path) -> list[str]:
1✔
381
    """
382
    Get a list of branches and tags for repo.
383

384
    :param repo_dir: Directory with a repository
385
    :return: List of git references as strings
386
    """
387
    refs = []
1✔
388
    if is_git_repo(str(repo_dir)):
1!
389
        for ref in chain(git.Repo(repo_dir).heads, git.Repo(repo_dir).tags):
1✔
390
            refs.append(str(ref))
1✔
391
    return refs
1✔
392

393

394
class CloneProgress(git.RemoteProgress):
1✔
395
    """Handler providing an interface to parse progress information emitted by git."""
396
    OP_CODES: ClassVar[list[str]] = ['BEGIN', 'CHECKING_OUT', 'COMPRESSING', 'COUNTING', 'END', 'FINDING_SOURCES', 'RECEIVING', 'RESOLVING', 'WRITING']
1✔
397
    OP_CODE_MAP: ClassVar[dict[int, str]] = {getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES}
1!
398

399
    def __init__(self, progress, stage) -> None:
1✔
400
        """
401
        Initialize the progress handler.
402

403
        :param progress: Progress Qt6 signal
404
        :param stage: Report stage Qt6 signal
405
        """
406
        super().__init__()
1✔
407
        self.progress_signal = progress
1✔
408
        self.stage_signal = stage
1✔
409

410
    def get_curr_op(self, op_code: int) -> str:
1✔
411
        """
412
        Get stage name from OP code.
413

414
        :param op_code: OP code
415
        :return: stage name
416
        """
417
        op_code_masked = op_code & self.OP_MASK
1✔
418
        return self.OP_CODE_MAP.get(op_code_masked, '?').title()
1✔
419

420
    def update(self, op_code: int, cur_count, max_count=None, message: str = ''):
1✔
421
        """
422
        Call whenever the progress changes.
423

424
        :param op_code: Integer allowing to be compared against Operation IDs and stage IDs.
425
        :param cur_count: A count of current absolute items
426
        :param max_count: The maximum count of items we expect. It may be None in case there is no maximum number of items or if it is (yet) unknown.
427
        :param message: It contains the number of bytes transferred. It may be used for other purposes as well.
428
        """
429
        if op_code & git.RemoteProgress.BEGIN:
1!
430
            self.stage_signal.emit(f'Git clone: {self.get_curr_op(op_code)}')
1✔
431

432
        percentage = int(cur_count / max_count * 100) if max_count else 0
1✔
433
        self.progress_signal.emit(percentage)
1✔
434

435

436
def collect_debug_data() -> Path:
1✔
437
    """
438
    Collect and zip all data for troubleshooting.
439

440
    :return: Path object to ZIP file
441
    """
442
    config_file = Path(get_config_yaml_location() / CONFIG_YAML).resolve()
1✔
443
    conf_dict = load_yaml(config_file)
1✔
444
    sys_data = _get_sys_file(conf_dict)
1✔
445
    dcs_log = _get_dcs_log(conf_dict)
1✔
446

447
    zip_file = Path(gettempdir()) / f'dcspy_debug_{str(datetime.now()).replace(" ", "_").replace(":", "")}.zip'
1✔
448
    with zipfile.ZipFile(file=zip_file, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) as zipf:
1✔
449
        zipf.write(sys_data, arcname=sys_data.name)
1✔
450
        zipf.write(dcs_log, arcname=dcs_log.name)
1✔
451
        for log_file in _get_log_files():
1✔
452
            zipf.write(log_file, arcname=log_file.name)
1✔
453
        for yaml_file in _get_yaml_files(config_file):
1✔
454
            zipf.write(yaml_file, arcname=yaml_file.name)
1✔
455
        for png in _get_png_files():
1✔
456
            zipf.write(png, arcname=png.name)
1✔
457

458
    return zip_file
1✔
459

460

461
def _get_sys_file(conf_dict: dict[str, Any]) -> Path:
1✔
462
    """
463
    Save system information to file and return its path.
464

465
    :param conf_dict: A dictionary containing configuration information.
466
    :return: A Path object representing the path to the system data file.
467
    """
468
    system_info = _fetch_system_info(conf_dict)
1✔
469
    sys_data = Path(gettempdir()) / 'system_data.txt'
1✔
470
    with open(sys_data, 'w+') as debug_file:
1✔
471
        debug_file.write(system_info)
1✔
472
    return sys_data
1✔
473

474

475
def _fetch_system_info(conf_dict: dict[str, Any]) -> str:
1✔
476
    """
477
    Fetch system information.
478

479
    :param conf_dict: A dictionary containing configuration information.
480
    :return: System data as string
481
    """
482
    name = uname()
1✔
483
    pyver = (python_version(), python_implementation())
1✔
484
    pyexec = sys.executable
1✔
485
    dcs = check_dcs_ver(dcs_path=Path(str(conf_dict['dcs'])))
1✔
486
    bios_ver = check_bios_ver(bios_path=str(conf_dict['dcsbios']))
1✔
487
    repo_dir = Path(str(conf_dict['dcsbios'])).parents[1] / 'dcs-bios'
1✔
488
    git_ver, head_commit = _fetch_git_data(repo_dir=repo_dir)
1✔
489
    lgs_dir = '\n'.join([
1!
490
        str(Path(dir_path) / filename)
491
        for dir_path, _, filenames in walk('C:\\Program Files\\Logitech Gaming Software\\SDK')
492
        for filename in filenames
493
    ])
494
    return f'{__version__=}\n{name=}\n{pyver=}\n{pyexec=}\n{dcs=}\n{bios_ver=}\n{git_ver=}\n{head_commit=}\n{lgs_dir}\ncfg={pformat(conf_dict)}'
1✔
495

496

497
def _fetch_git_data(repo_dir: Path) -> tuple[Sequence[int], str]:
1✔
498
    """
499
    Fetch Git version and SHA of HEAD commit.
500

501
    :param repo_dir: Local directory for repository
502
    :return: Tuple of (a version) and SHA of HEAD commit
503
    """
504
    try:
1✔
505
        import git
1✔
506
        git_ver = git.cmd.Git().version_info
1✔
507
        head_commit = str(git.Repo(repo_dir).head.commit)
1✔
508
    except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError, ImportError):
1✔
509
        git_ver = (0, 0, 0, 0)
1✔
510
        head_commit = 'N/A'
1✔
511
    return git_ver, head_commit
1✔
512

513

514
def _get_dcs_log(conf_dict: dict[str, Any]) -> Path:
1✔
515
    """
516
    Get path to dcs.log path.
517

518
    :param conf_dict: A dictionary containing configuration information.
519
    :return: A Path object representing the path to the dcs.log file.
520
    """
521
    dcs_log_file = Path(conf_dict['dcsbios']).parents[1] / 'Logs' / 'dcs.log'
1✔
522
    return dcs_log_file if dcs_log_file.is_file() else Path()
1✔
523

524

525
def _get_log_files() -> Generator[Path]:
1✔
526
    """
527
    Get a path to all logg files.
528

529
    :return: Generator of a path to log files
530
    """
531
    return (
1✔
532
        Path(gettempdir()) / logfile
533
        for logfile in glob(str(Path(gettempdir()) / 'dcspy.log*'))
534
    )
535

536

537
def _get_yaml_files(config_file: Path) -> Generator[Path]:
1✔
538
    """
539
    Get a path to all configuration YAML files.
540

541
    :param config_file: Path to the config file
542
    :return: Generator of a path to YAML files
543
    """
544
    return (
1✔
545
        Path(dirpath) / filename
546
        for dirpath, _, filenames in walk(config_file.parent)
547
        for filename in filenames
548
        if filename.endswith('yaml')
549
    )
550

551

552
def _get_png_files() -> Generator[Path]:
1✔
553
    """
554
    Get a path to png screenshots for all airplanes.
555

556
    :return: Generator of a path to png files
557
    """
558
    aircrafts = ['FA18Chornet', 'Ka50', 'Ka503', 'Mi8MT', 'Mi24P', 'F16C50', 'F15ESE',
1✔
559
                 'AH64DBLKII', 'A10C', 'A10C2', 'F14A135GR', 'F14B', 'AV8BNA']
560
    return (
1✔
561
        Path(dir_path) / filename
562
        for dir_path, _, filenames in walk(gettempdir())
563
        for filename in filenames
564
        if any(True for aircraft in aircrafts if aircraft in filename and filename.endswith('png'))
565
    )
566

567

568
def get_config_yaml_location() -> Path:
1✔
569
    """
570
    Get a location of YAML configuration files.
571

572
    :rtype: Path object to directory
573
    """
574
    localappdata = environ.get('LOCALAPPDATA', None)
1✔
575
    user_appdata = Path(localappdata) / 'dcspy' if localappdata else DEFAULT_YAML_FILE.parent
1✔
576
    return user_appdata
1✔
577

578

579
def run_pip_command(cmd: str) -> tuple[int, str, str]:
1✔
580
    """
581
    Execute pip command.
582

583
    :param cmd: Command as string
584
    :return: Tuple with return code, stderr and stdout
585
    """
586
    try:
1✔
587
        result = run([sys.executable, '-m', 'pip', *cmd.split(' ')], capture_output=True, check=True)
1✔
588
        return result.returncode, result.stderr.decode('utf-8'), result.stdout.decode('utf-8')
1✔
589
    except CalledProcessError as e:
1✔
590
        LOG.debug(f'Result: {e}')
1✔
591
        return e.returncode, e.stderr.decode('utf-8'), e.stdout.decode('utf-8')
1✔
592

593

594
def run_command(cmd: Sequence[str], cwd: Path | None = None) -> int:
1✔
595
    """
596
    Run command in shell as a subprocess.
597

598
    :param cmd: The command to be executed as a sequence of strings
599
    :param cwd: current working directory
600
    :return: The return code of command
601
    """
602
    try:
1✔
603
        proc = run(cmd, check=True, shell=False, cwd=cwd)
1✔
604
        return proc.returncode
1✔
605
    except CalledProcessError as e:
1✔
606
        LOG.debug(f'Result: {e}')
1✔
607
        return -1
1✔
608

609

610
def load_json(full_path: Path) -> Any:
1✔
611
    """
612
    Load JSON from a file into dictionary.
613

614
    :param full_path: Full path
615
    :return: Python representation of JSON
616
    """
617
    with open(full_path, encoding='utf-8') as json_file:
1✔
618
        data = json_file.read()
1✔
619
    return json.loads(data)
1✔
620

621

622
@lru_cache
1✔
623
def get_full_bios_for_plane(plane: str, bios_dir: Path) -> DcsBiosPlaneData:
1✔
624
    """
625
    Collect full BIOS for plane with name.
626

627
    :param plane: BIOS plane name
628
    :param bios_dir: path to DCS-BIOS directory
629
    :return: dict
630
    """
631
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
632
    local_json: dict[str, Any] = {}
1✔
633
    aircraft_aliases = load_json(full_path=alias_path)
1✔
634
    for json_file in aircraft_aliases[plane]:
1✔
635
        local_json = {**local_json, **load_json(full_path=bios_dir / 'doc' / 'json' / f'{json_file}.json')}
1✔
636

637
    return DcsBiosPlaneData.model_validate(local_json)
1✔
638

639

640
@lru_cache
1✔
641
def get_inputs_for_plane(plane: str, bios_dir: Path) -> dict[str, dict[str, ControlKeyData]]:
1✔
642
    """
643
    Get dict with all not empty inputs for plane.
644

645
    :param plane: BIOS plane name
646
    :param bios_dir: path to DCS-BIOS
647
    :return: dict.
648
    """
649
    plane_bios = get_full_bios_for_plane(plane=plane, bios_dir=bios_dir)
1✔
650
    inputs = plane_bios.get_inputs()
1✔
651
    return inputs
1✔
652

653

654
def get_list_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> list[str]:
1✔
655
    """
656
    Get a list of all controllers from dict with sections and inputs.
657

658
    :param inputs: Dictionary with ControlKeyData
659
    :return: List of string
660
    """
661
    result_list = []
1✔
662
    for section, controllers in inputs.items():
1✔
663
        result_list.append(f'{CTRL_LIST_SEPARATOR} {section} {CTRL_LIST_SEPARATOR}')
1✔
664
        for ctrl_name in controllers:
1✔
665
            result_list.append(ctrl_name)
1✔
666
    return result_list
1✔
667

668

669
@lru_cache
1✔
670
def get_planes_list(bios_dir: Path) -> list[str]:
1✔
671
    """
672
    Get a list of all DCS-BIOS supported planes with clickable cockpit.
673

674
    :param bios_dir: Path to DCS-BIOS
675
    :return: List of all supported planes
676
    """
677
    aircraft_aliases = get_plane_aliases(bios_dir=bios_dir, plane=None)
1✔
678
    return [name for name, yaml_data in aircraft_aliases.items() if yaml_data not in (['CommonData', 'FC3'], ['CommonData'])]
1!
679

680

681
@lru_cache
1✔
682
def get_plane_aliases(bios_dir: Path, plane: str | None = None) -> dict[str, list[str]]:
1✔
683
    """
684
    Get a list of all YAML files for plane with name.
685

686
    :param plane: BIOS plane name
687
    :param bios_dir: path to DCS-BIOS
688
    :return: list of all YAML files for plane definition
689
    """
690
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
691
    aircraft_aliases = load_json(full_path=alias_path)
1✔
692
    if plane:
1✔
693
        aircraft_aliases = {plane: aircraft_aliases[plane]}
1✔
694
    return aircraft_aliases
1✔
695

696

697
def get_depiction_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> dict[str, ControlDepiction]:
1✔
698
    """
699
    Get the depiction of controls.
700

701
    :param inputs: Dictionary with ControlKeyData
702
    :return: A dictionary containing the depiction of controls.
703
    """
704
    result = {}
1✔
705
    for section, controllers in inputs.items():
1✔
706
        for ctrl_name, ctrl in controllers.items():
1✔
707
            result[ctrl_name] = ctrl.depiction
1✔
708
    return result
1✔
709

710

711
def substitute_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
712
    """
713
    Substitute symbols in a string with specified replacements.
714

715
    :param value: The input string to be processed
716
    :param symbol_replacement: A list of symbol patterns and their corresponding replacements.
717
    :return: The processed string with symbols replaced according to the provided symbol_replacement list.
718
    """
719
    for pattern, replacement in symbol_replacement:
1✔
720
        value = sub(pattern, replacement, value)
1✔
721
    return value
1✔
722

723

724
def replace_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
725
    """
726
    Replace symbols in a string with specified replacements.
727

728
    :param value: The string in which symbols will be replaced.
729
    :param symbol_replacement: A sequence of sequences containing the original symbols and their replacement strings.
730
    :return: The string with symbols replaced.
731
    """
732
    for original, replacement in symbol_replacement:
1✔
733
        value = value.replace(original, replacement)
1✔
734
    return value
1✔
735

736

737
class KeyRequest:
1✔
738
    """Map LCD button ot G-Key with an abstract request model."""
739

740
    def __init__(self, yaml_path: Path, get_bios_fn: Callable[[str], BiosValue]) -> None:
1✔
741
        """
742
        Load YAML with BIOS request for G-Keys and LCD buttons.
743

744
        :param yaml_path: Path to the airplane YAML file.
745
        :param get_bios_fn: Function used to get current BIOS value.
746
        """
747
        plane_yaml = load_yaml(full_path=yaml_path)
1✔
748
        self.buttons: dict[AnyButton, RequestModel] = {}
1✔
749
        for key_str, request in plane_yaml.items():
1✔
750
            if request:
1!
751
                key = get_key_instance(key_str)
1✔
752
                self.buttons[key] = RequestModel.from_request(key=key, request=request, get_bios_fn=get_bios_fn)
1✔
753

754
    @property
1✔
755
    def cycle_button_ctrl_name(self) -> dict[str, int]:
1✔
756
        """Return a dictionary with BIOS selectors to track changes of values for cycle button to get current values."""
757
        return {req_model.ctrl_name: int() for req_model in self.buttons.values() if req_model.is_cycle}
1!
758

759
    def get_request(self, button: AnyButton) -> RequestModel:
1✔
760
        """
761
        Get abstract representation for request ti be sent gor requested button.
762

763
        :param button: LcdButton, Gkey or MouseButton
764
        :return: RequestModel object
765
        """
766
        return self.buttons.get(button, RequestModel.empty(key=button))
1✔
767

768
    def set_request(self, button: AnyButton, req: str) -> None:
1✔
769
        """
770
        Update the internal string request for the specified button.
771

772
        :param button: LcdButton, Gkey or MouseButton
773
        :param req: The raw request to set.
774
        """
775
        self.buttons[button].raw_request = req
1✔
776

777

778
def generate_bios_jsons_with_lupa(dcs_save_games: Path, local_compile='./Scripts/DCS-BIOS/test/compile/LocalCompile.lua') -> None:
1✔
779
    r"""
780
    Regenerate DCS-BIOS JSON files.
781

782
    Using the Lupa library, first it will tries use LuaJIT 2.1 if not it will fall back to Lua 5.1
783

784
    :param dcs_save_games: Full path to Saved Games\DCS.openbeta directory.
785
    :param local_compile: Relative path to LocalCompile.lua file.
786
    """
787
    try:
1✔
788
        import lupa.luajit21 as lupa  # type: ignore[import-untyped]
1✔
789
    except ImportError:
×
790
        try:
×
791
            import lupa.lua51 as lupa  # type: ignore[import-untyped]
×
UNCOV
792
        except ImportError:
×
UNCOV
793
            return
×
794

795
    previous_dir = getcwd()
1✔
796
    try:
1✔
797
        chdir(dcs_save_games)
1✔
798
        LOG.debug(f"Changed to: {dcs_save_games}")
1✔
799
        lua = lupa.LuaRuntime()
1✔
800
        LOG.debug(f"Using {lupa.LuaRuntime().lua_implementation} (compiled with {lupa.LUA_VERSION})")
1✔
801
        with open(local_compile) as lua_file:
1✔
802
            lua_script = lua_file.read()
1✔
803
        lua.execute(lua_script)
1✔
804
    finally:
805
        chdir(previous_dir)
1✔
806
        LOG.debug(f"Change directory back to: {getcwd()}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc