• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emcek / dcspy / 17507661560

06 Sep 2025 12:47AM UTC coverage: 97.61% (-0.08%) from 97.686%
17507661560

Pull #512

github

emcek
add and update tests resources
Pull Request #512: Check hash of downloaded files

386 of 392 branches covered (98.47%)

Branch coverage included in aggregate %.

35 of 38 new or added lines in 2 files covered. (92.11%)

13 existing lines in 2 files now uncovered.

4678 of 4796 relevant lines covered (97.54%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.07
/src/dcspy/utils.py
1
from __future__ import annotations
1✔
2

3
import hashlib
1✔
4
import json
1✔
5
import sys
1✔
6
import zipfile
1✔
7
from collections.abc import Callable, Generator, Sequence
1✔
8
from datetime import datetime
1✔
9
from functools import lru_cache
1✔
10
from glob import glob
1✔
11
from itertools import chain
1✔
12
from logging import getLogger
1✔
13
from os import chdir, environ, getcwd, makedirs, walk
1✔
14
from pathlib import Path
1✔
15
from platform import python_implementation, python_version, uname
1✔
16
from pprint import pformat
1✔
17
from re import search, sub
1✔
18
from shutil import rmtree
1✔
19
from subprocess import CalledProcessError, run
1✔
20
from tempfile import gettempdir
1✔
21
from typing import Any, ClassVar
1✔
22

23
import yaml
1✔
24
from packaging import version
1✔
25
from PIL import ImageColor
1✔
26
from psutil import process_iter
1✔
27
from requests import get
1✔
28

29
from dcspy.models import (CONFIG_YAML, CTRL_LIST_SEPARATOR, DEFAULT_YAML_FILE, AnyButton, BiosValue, Color, ControlDepiction, ControlKeyData, DcsBiosPlaneData,
1✔
30
                          DcspyConfigYaml, Gkey, LcdButton, LcdMode, MouseButton, Release, RequestModel, __version__)
31

32
try:
1✔
33
    import git
1✔
34
except ImportError:
×
35
    pass
×
36

37
LOG = getLogger(__name__)
1✔
38

39
with open(DEFAULT_YAML_FILE) as c_file:
1✔
40
    defaults_cfg: DcspyConfigYaml = yaml.load(c_file, Loader=yaml.SafeLoader)
1✔
41
    defaults_cfg['dcsbios'] = f'C:\\Users\\{environ.get("USERNAME", "UNKNOWN")}\\Saved Games\\DCS\\Scripts\\DCS-BIOS'
1✔
42

43

44
def get_default_yaml(local_appdata: bool = False) -> Path:
1✔
45
    """
46
    Return a full path to the default configuration file.
47

48
    :param local_appdata: If True value C:/Users/<user_name>/AppData/Local is used
49
    :return: Path like an object
50
    """
51
    cfg_ful_path = DEFAULT_YAML_FILE
1✔
52
    if local_appdata:
1✔
53
        user_appdata = get_config_yaml_location()
1✔
54
        makedirs(name=user_appdata, exist_ok=True)
1✔
55
        cfg_ful_path = Path(user_appdata / CONFIG_YAML).resolve()
1✔
56
        if not cfg_ful_path.exists():
1✔
57
            save_yaml(data=defaults_cfg, full_path=cfg_ful_path)
1✔
58
    return cfg_ful_path
1✔
59

60

61
def load_yaml(full_path: Path) -> DcspyConfigYaml:
1✔
62
    """
63
    Load YAML from a file into a dictionary.
64

65
    :param full_path: Full path to YAML file
66
    :return: Dictionary with data
67
    """
68
    try:
1✔
69
        with open(file=full_path, encoding='utf-8') as yaml_file:
1✔
70
            data = yaml.load(yaml_file, Loader=yaml.SafeLoader)
1✔
71
            if not isinstance(data, dict):
1✔
72
                data = {}
1✔
73
    except (FileNotFoundError, yaml.parser.ParserError) as err:
1✔
74
        makedirs(name=full_path.parent, exist_ok=True)
1✔
75
        LOG.warning(f'{type(err).__name__}: {full_path}.', exc_info=True)
1✔
76
        LOG.debug(f'{err}')
1✔
77
        data = {}
1✔
78
    return data
1✔
79

80

81
def save_yaml(data: DcspyConfigYaml, full_path: Path) -> None:
1✔
82
    """
83
    Save disc as YAML file.
84

85
    :param data: Dictionary with data
86
    :param full_path: Full a path to YAML file
87
    """
88
    with open(file=full_path, mode='w', encoding='utf-8') as yaml_file:
1✔
89
        yaml.dump(data, yaml_file, Dumper=yaml.SafeDumper)
1✔
90

91

92
def check_ver_at_github(repo: str) -> Release:
1✔
93
    """
94
    Check a version of <organization>/<package> at GitHub.
95

96
    :param repo: Format '<organization or user>/<package>'
97
    :return: Release object with data
98
    """
99
    package = repo.split('/')[1]
1✔
100
    try:
1✔
101
        response = get(url=f'https://api.github.com/repos/{repo}/releases/latest', timeout=5)
1✔
102
        if response.ok:
1✔
103
            rel = Release(**response.json())
1✔
104
            LOG.debug(f'Latest GitHub release: {rel}')
1✔
105
            return rel
1✔
106
        else:
107
            raise ValueError(f'Try again later. Status={response.status_code}')
1✔
108
    except Exception as exc:
1✔
109
        raise ValueError(f'Unable to check {package} version online: {exc}')
1✔
110

111

112
def get_version_string(repo: str, current_ver: str | version.Version, check: bool = True) -> str:
1✔
113
    """
114
    Generate formatted string with version number.
115

116
    :param repo: Format '<organization or user>/<package>'.
117
    :param current_ver: String or Version object.
118
    :param check: Version online.
119
    :return: Formatted version as a string.
120
    """
121
    ver_string = f'v{current_ver}'
1✔
122
    if check:
1✔
123
        try:
1✔
124
            details = ''
1✔
125
            result = check_ver_at_github(repo=repo)
1✔
126
        except ValueError:
1✔
127
            return f'v{current_ver} (failed)'
1✔
128

129
        if result.is_latest(current_ver=current_ver):
1✔
130
            details = ' (latest)'
1✔
131
        elif result.version != version.parse('0.0.0'):
1✔
132
            details = ' (update!)'
1✔
133
            current_ver = result.version
1✔
134
        ver_string = f'v{current_ver}{details}'
1✔
135
    return ver_string
1✔
136

137

138
def download_file(url: str, save_path: Path, progress_fn: Callable[[int], None] | None = None) -> bool:
1✔
139
    """
140
    Download a file from URL and save to save_path.
141

142
    :param url: URL address
143
    :param save_path: full path to save
144
    :param progress_fn: a callable object to report download progress
145
    """
146
    response = get(url=url, stream=True, timeout=5)
1✔
147
    if response.ok:
1✔
148
        file_size = int(response.headers.get('Content-Length', 0))
1✔
149
        LOG.debug(f'File size: {file_size / (1024 * 1024):.2f} MB' if file_size else 'File size: Unknown')
1✔
150
        LOG.debug(f'Download file from: {url}')
1✔
151
        with open(save_path, 'wb+') as dl_file:
1✔
152
            downloaded = 0
1✔
153
            progress = 0
1✔
154
            for chunk in response.iter_content(chunk_size=1024):
1✔
155
                dl_file.write(chunk)
1✔
156
                downloaded += len(chunk)
1✔
157
                new_progress = int((downloaded / file_size) * 100)
1✔
158
                if progress_fn and new_progress == progress + 1:
1✔
159
                    progress = new_progress
×
160
                    progress_fn(progress)
×
161
            LOG.debug(f'Saved as: {save_path}')
1✔
162
            return True
1✔
163
    else:
164
        LOG.warning(f'Can not download from: {url}')
1✔
165
        return False
1✔
166

167

168
def proc_is_running(name: str) -> int:
1✔
169
    """
170
    Check if the process is running and return its PID.
171

172
    If the process name is not found, 0 (zero) is returned.
173
    :param name: Process name
174
    :return: PID as int
175
    """
176
    for proc in process_iter(['pid', 'name']):
1✔
177
        if name in proc.info['name']:
1✔
178
            return proc.info['pid']
1✔
179
    return 0
1✔
180

181

182
def check_dcs_ver(dcs_path: Path) -> str:
1✔
183
    """
184
    Check DCS version and release type.
185

186
    :param dcs_path: Path to DCS installation directory
187
    :return: DCS version as strings
188
    """
189
    result_ver = 'Unknown'
1✔
190
    try:
1✔
191
        with open(file=dcs_path / 'autoupdate.cfg', encoding='utf-8') as autoupdate_cfg:
1✔
192
            autoupdate_data = autoupdate_cfg.read()
1✔
193
    except (FileNotFoundError, PermissionError) as err:
1✔
194
        LOG.debug(f'{type(err).__name__}: {err.filename}')
1✔
195
    else:
196
        if dcs_ver := search(r'"version":\s"([\d.]*)"', autoupdate_data):
1✔
197
            result_ver = str(dcs_ver.group(1))
1✔
198
    return result_ver
1✔
199

200

201
def check_bios_ver(bios_path: Path | str) -> version.Version:
1✔
202
    """
203
    Check the DSC-BIOS release version.
204

205
    :param bios_path: Path to DCS-BIOS directory in the SavedGames folder
206
    :return: Version object
207
    """
208
    bios_ver = version.parse('0.0.0')
1✔
209
    new_location = Path(bios_path) / 'lib' / 'modules' / 'common_modules' / 'CommonData.lua'
1✔
210
    old_location = Path(bios_path) / 'lib' / 'CommonData.lua'
1✔
211

212
    if new_location.is_file():
1✔
213
        with open(file=new_location, encoding='utf-8') as cd_lua:
1✔
214
            cd_lua_data = cd_lua.read()
1✔
215
    elif old_location.is_file():
1✔
216
        with open(file=old_location, encoding='utf-8') as cd_lua:
1✔
217
            cd_lua_data = cd_lua.read()
1✔
218
    else:
219
        cd_lua_data = ''
1✔
220
        LOG.debug(f'No `CommonData.lua` while checking DCS-BIOS version at {new_location.parent} or {old_location.parent}')
1✔
221

222
    if bios_re := search(r'function getVersion\(\)\s*return\s*\"([\d.]*)\"', cd_lua_data):
1✔
223
        bios_ver = version.parse(bios_re.group(1))
1✔
224
    return bios_ver
1✔
225

226

227
def is_git_repo(dir_path: str) -> bool:
1✔
228
    """
229
    Check if dir_path ios Git repository.
230

231
    :param dir_path: Path as string
232
    :return: True if dir is git repo
233
    """
234
    import git
1✔
235
    try:
1✔
236
        _ = git.Repo(dir_path).git_dir
1✔
237
        return True
1✔
238
    except (git.InvalidGitRepositoryError, git.exc.NoSuchPathError):
1✔
239
        return False
1✔
240

241

242
def _get_sha_hex_str(bios_repo: git.Repo, git_ref: str) -> str:
1✔
243
    """
244
    Return a string representing the commit hash, date, and author of the given Git reference in the provided repository.
245

246
    :param bios_repo: A Git repository object.
247
    :param git_ref: A string representing the Git reference (e.g., commit, branch, tag).
248
    :return: A string representing the commit hash, date, and author.
249
    """
250
    try:
1✔
251
        import git
1✔
252
    except ImportError:
×
253
        raise OSError('Git executable is not available!')
×
254
    try:
1✔
255
        bios_repo.git.checkout(git_ref)
1✔
256
        branch = bios_repo.active_branch.name
1✔
257
        head_commit = bios_repo.head.commit
1✔
258
        sha = f'{branch} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
259
    except (git.exc.GitCommandError, TypeError):
1✔
260
        head_commit = bios_repo.head.commit
1✔
261
        sha = f'{head_commit.hexsha[0:8]} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
262
    LOG.debug(f'Checkout: {head_commit.hexsha} from: {head_commit.committed_datetime} | by: {head_commit.author}\n{head_commit.message}')  # type: ignore
1✔
263
    return sha
1✔
264

265

266
def check_github_repo(git_ref: str, repo_dir: Path, repo: str, update: bool = True, progress: git.RemoteProgress | None = None) -> str:
1✔
267
    """
268
    Update git repository.
269

270
    Return SHA of the latest commit.
271

272
    :param git_ref: Any Git reference as a string
273
    :param repo_dir: Local directory for a repository
274
    :param repo: GitHub repository user/name
275
    :param update: Perform update process
276
    :param progress: Progress callback
277
    """
278
    bios_repo = _checkout_repo(repo=repo, repo_dir=repo_dir, progress=progress)
1✔
279
    if update:
1✔
280
        f_info = bios_repo.remotes[0].pull(progress=progress)
1✔
281
        LOG.debug(f'Pulled: {f_info[0].name} as: {f_info[0].commit}')
1✔
282
    sha = _get_sha_hex_str(bios_repo, git_ref)
1✔
283
    return sha
1✔
284

285

286
def _checkout_repo(repo: str, repo_dir: Path, progress: git.RemoteProgress | None = None) -> git.Repo:
1✔
287
    """
288
    Checkout repository at a main/master branch or clone it when not exists in a system.
289

290
    :param repo: Repository name
291
    :param repo_dir: Local repository directory
292
    :param progress: Progress callback
293
    :return: Repo object of the repository
294
    """
295
    import git
1✔
296

297
    makedirs(name=repo_dir, exist_ok=True)
1✔
298
    if is_git_repo(str(repo_dir)):
1✔
299
        bios_repo = git.Repo(repo_dir)
1✔
300
        all_refs = get_all_git_refs(repo_dir=repo_dir)
1✔
301
        checkout_ref = 'main' if 'main' in all_refs else 'master'
1✔
302
        bios_repo.git.checkout(checkout_ref)
1✔
303
    else:
304
        rmtree(path=repo_dir, ignore_errors=True)
1✔
305
        bios_repo = git.Repo.clone_from(url=f'https://github.com/{repo}.git', to_path=repo_dir, progress=progress)  # type: ignore[arg-type]
1✔
306
    return bios_repo
1✔
307

308

309
def check_dcs_bios_entry(lua_dst_data: str, lua_dst_path: Path, temp_dir: Path) -> str:
1✔
310
    """
311
    Check DCS-BIOS entry in Export.lua file.
312

313
    :param lua_dst_data: Content of Export.lua
314
    :param lua_dst_path: Export.lua path
315
    :param temp_dir: Directory with DCS-BIOS archive
316
    :return: Result of checks
317
    """
318
    result = '\n\nExport.lua exists.'
1✔
319
    lua = 'Export.lua'
1✔
320
    with open(file=temp_dir / lua, encoding='utf-8') as lua_src:
1✔
321
        lua_src_data = lua_src.read()
1✔
322
    export_re = search(r'dofile\(lfs.writedir\(\)\s*\.\.\s*\[\[Scripts\\DCS-BIOS\\BIOS\.lua]]\)', lua_dst_data)
1✔
323
    if not export_re:
1✔
324
        with open(file=lua_dst_path / lua, mode='a+', encoding='utf-8') as exportlua_dst:
1✔
325
            exportlua_dst.write(f'\n{lua_src_data}')
1✔
326
        LOG.debug(f'Add DCS-BIOS to Export.lua: {lua_src_data}')
1✔
327
        result += '\n\nDCS-BIOS entry added.\n\nYou verify installation at:\ngithub.com/DCS-Skunkworks/DCSFlightpanels/wiki/Installation'
1✔
328
    else:
329
        result += '\n\nDCS-BIOS entry detected.'
1✔
330
    return result
1✔
331

332

333
def count_files(directory: Path, extension: str) -> int:
1✔
334
    """
335
    Count files with extension in a directory.
336

337
    :param directory: As Path object
338
    :param extension: File extension
339
    :return: Number of files
340
    """
341
    try:
1✔
342
        json_files = [f.name for f in directory.iterdir() if f.is_file() and f.suffix == f'.{extension}']
1✔
343
        LOG.debug(f'In: {directory} found {json_files} ')
1✔
344
        return len(json_files)
1✔
345
    except FileNotFoundError:
1✔
346
        LOG.debug(f'Wrong directory: {directory}')
1✔
347
        return -1
1✔
348

349

350
def is_git_exec_present() -> bool:
1✔
351
    """
352
    Check if the git executable is present in a system.
353

354
    :return: True if git.exe is available
355
    """
356
    try:
1✔
357
        import git
1✔
358
        return bool(git.GIT_OK)
1✔
359
    except ImportError as err:
×
360
        LOG.debug(type(err).__name__, exc_info=True)
×
361
        return False
×
362

363

364
def is_git_object(repo_dir: Path, git_obj: str) -> bool:
1✔
365
    """
366
    Check if git_obj is a valid Git reference.
367

368
    :param repo_dir: Directory with repository
369
    :param git_obj: Git reference to check
370
    :return: True if git_obj is git reference, False otherwise
371
    """
372
    import gitdb  # type: ignore[import-untyped]
1✔
373
    result = False
1✔
374
    if is_git_repo(str(repo_dir)):
1✔
375
        bios_repo = git.Repo(repo_dir)
1✔
376
        try:
1✔
377
            bios_repo.commit(git_obj)
1✔
378
            result = True
1✔
379
        except (gitdb.exc.BadName, TypeError):
1✔
380
            pass
1✔
381
    return result
1✔
382

383

384
def get_all_git_refs(repo_dir: Path) -> list[str]:
1✔
385
    """
386
    Get a list of branches and tags for repo.
387

388
    :param repo_dir: Directory with a repository
389
    :return: List of git references as strings
390
    """
391
    refs = []
1✔
392
    if is_git_repo(str(repo_dir)):
1✔
393
        for ref in chain(git.Repo(repo_dir).heads, git.Repo(repo_dir).tags):
1✔
394
            refs.append(str(ref))
1✔
395
    return refs
1✔
396

397

398
class CloneProgress(git.RemoteProgress):
1✔
399
    """Handler providing an interface to parse progress information emitted by git."""
400
    OP_CODES: ClassVar[list[str]] = ['BEGIN', 'CHECKING_OUT', 'COMPRESSING', 'COUNTING', 'END', 'FINDING_SOURCES', 'RECEIVING', 'RESOLVING', 'WRITING']
1✔
401
    OP_CODE_MAP: ClassVar[dict[int, str]] = {getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES}
1✔
402

403
    def __init__(self, progress, stage) -> None:
1✔
404
        """
405
        Initialize the progress handler.
406

407
        :param progress: Progress Qt6 signal
408
        :param stage: Report stage Qt6 signal
409
        """
410
        super().__init__()
1✔
411
        self.progress_signal = progress
1✔
412
        self.stage_signal = stage
1✔
413

414
    def get_curr_op(self, op_code: int) -> str:
1✔
415
        """
416
        Get a stage name from OP code.
417

418
        :param op_code: OP code
419
        :return: stage name
420
        """
421
        op_code_masked = op_code & self.OP_MASK
1✔
422
        return self.OP_CODE_MAP.get(op_code_masked, '?').title()
1✔
423

424
    def update(self, op_code: int, cur_count, max_count=None, message: str = '') -> None:
1✔
425
        """
426
        Call whenever the progress changes.
427

428
        :param op_code: Integer allowing to be compared against Operation IDs and stage IDs.
429
        :param cur_count: A count of current absolute items
430
        :param max_count: The maximum count of items we expect. It may be None in case there is no maximum number of items or if it is (yet) unknown.
431
        :param message: It contains the number of bytes transferred. It may be used for other purposes as well.
432
        """
433
        if op_code & git.RemoteProgress.BEGIN:
1✔
434
            self.stage_signal.emit(f'Git clone: {self.get_curr_op(op_code)}')
1✔
435

436
        percentage = int(cur_count / max_count * 100) if max_count else 0
1✔
437
        self.progress_signal.emit(percentage)
1✔
438

439

440
def collect_debug_data() -> Path:
1✔
441
    """
442
    Collect and zip all data for troubleshooting.
443

444
    :return: Path object to ZIP file
445
    """
446
    config_file = Path(get_config_yaml_location() / CONFIG_YAML).resolve()
1✔
447
    conf_dict = load_yaml(config_file)
1✔
448
    sys_data = _get_sys_file(conf_dict)
1✔
449
    dcs_log = _get_dcs_log(conf_dict)
1✔
450

451
    zip_file = Path(gettempdir()) / f'dcspy_debug_{str(datetime.now()).replace(" ", "_").replace(":", "")}.zip'
1✔
452
    with zipfile.ZipFile(file=zip_file, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) as archive:
1✔
453
        archive.write(sys_data, arcname=sys_data.name)
1✔
454
        archive.write(dcs_log, arcname=dcs_log.name)
1✔
455
        for log_file in _get_log_files():
1✔
456
            archive.write(log_file, arcname=log_file.name)
1✔
457
        for yaml_file in _get_yaml_files(config_file):
1✔
458
            archive.write(yaml_file, arcname=yaml_file.name)
1✔
459
        for png in _get_png_files():
1✔
460
            archive.write(png, arcname=png.name)
1✔
461

462
    return zip_file
1✔
463

464

465
def _get_sys_file(conf_dict: dict[str, Any]) -> Path:
1✔
466
    """
467
    Save system information to a file and return its path.
468

469
    :param conf_dict: A dictionary containing configuration information.
470
    :return: A Path object representing the path to the system data file.
471
    """
472
    system_info = _fetch_system_info(conf_dict)
1✔
473
    sys_data = Path(gettempdir()) / 'system_data.txt'
1✔
474
    with open(sys_data, 'w+') as debug_file:
1✔
475
        debug_file.write(system_info)
1✔
476
    return sys_data
1✔
477

478

479
def _fetch_system_info(conf_dict: dict[str, Any]) -> str:
1✔
480
    """
481
    Fetch system information.
482

483
    :param conf_dict: A dictionary containing configuration information.
484
    :return: System data as a string
485
    """
486
    name = uname()
1✔
487
    pyver = (python_version(), python_implementation())
1✔
488
    pyexec = sys.executable
1✔
489
    dcs = check_dcs_ver(dcs_path=Path(str(conf_dict['dcs'])))
1✔
490
    bios_ver = check_bios_ver(bios_path=str(conf_dict['dcsbios']))
1✔
491
    repo_dir = Path(str(conf_dict['dcsbios'])).parents[1] / 'dcs-bios'
1✔
492
    git_ver, head_commit = _fetch_git_data(repo_dir=repo_dir)
1✔
493
    lgs_dir = '\n'.join([
1✔
494
        str(Path(dir_path) / filename)
495
        for dir_path, _, filenames in walk('C:\\Program Files\\Logitech Gaming Software\\SDK')
496
        for filename in filenames
497
    ])
498
    return f'{__version__=}\n{name=}\n{pyver=}\n{pyexec=}\n{dcs=}\n{bios_ver=}\n{git_ver=}\n{head_commit=}\n{lgs_dir}\ncfg={pformat(conf_dict)}'
1✔
499

500

501
def _fetch_git_data(repo_dir: Path) -> tuple[Sequence[int], str]:
1✔
502
    """
503
    Fetch the Git version and SHA of HEAD commit.
504

505
    :param repo_dir: Local directory for repository
506
    :return: Tuple of (a version) and SHA of HEAD commit
507
    """
508
    try:
1✔
509
        import git
1✔
510
        git_ver = git.cmd.Git().version_info
1✔
511
        head_commit = str(git.Repo(repo_dir).head.commit)
1✔
512
    except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError, ImportError):
1✔
513
        git_ver = (0, 0, 0, 0)
1✔
514
        head_commit = 'N/A'
1✔
515
    return git_ver, head_commit
1✔
516

517

518
def _get_dcs_log(conf_dict: dict[str, Any]) -> Path:
1✔
519
    """
520
    Get a path to dcs.log path.
521

522
    :param conf_dict: A dictionary containing configuration information.
523
    :return: A Path object representing the path to the dcs.log file.
524
    """
525
    dcs_log_file = Path(conf_dict['dcsbios']).parents[1] / 'Logs' / 'dcs.log'
1✔
526
    return dcs_log_file if dcs_log_file.is_file() else Path()
1✔
527

528

529
def _get_log_files() -> Generator[Path]:
1✔
530
    """
531
    Get a path to all logg files.
532

533
    :return: Generator of a path to log files
534
    """
535
    return (
1✔
536
        Path(gettempdir()) / logfile
537
        for logfile in glob(str(Path(gettempdir()) / 'dcspy.log*'))
538
    )
539

540

541
def _get_yaml_files(config_file: Path) -> Generator[Path]:
1✔
542
    """
543
    Get a path to all configuration YAML files.
544

545
    :param config_file: Path to the config file
546
    :return: Generator of a path to YAML files
547
    """
548
    return (
1✔
549
        Path(dirpath) / filename
550
        for dirpath, _, filenames in walk(config_file.parent)
551
        for filename in filenames
552
        if filename.endswith('yaml')
553
    )
554

555

556
def _get_png_files() -> Generator[Path]:
1✔
557
    """
558
    Get a path to png screenshots for all airplanes.
559

560
    :return: Generator of a path to png files
561
    """
562
    aircrafts = ['FA18Chornet', 'Ka50', 'Ka503', 'Mi8MT', 'Mi24P', 'F16C50', 'F15ESE',
1✔
563
                 'AH64DBLKII', 'A10C', 'A10C2', 'F14A135GR', 'F14B', 'AV8BNA']
564
    return (
1✔
565
        Path(dir_path) / filename
566
        for dir_path, _, filenames in walk(gettempdir())
567
        for filename in filenames
568
        if any(True for aircraft in aircrafts if aircraft in filename and filename.endswith('png'))
569
    )
570

571

572
def get_config_yaml_location() -> Path:
1✔
573
    """
574
    Get a location of YAML configuration files.
575

576
    :rtype: Path object to directory
577
    """
578
    localappdata = environ.get('LOCALAPPDATA', None)
1✔
579
    user_appdata = Path(localappdata) / 'dcspy' if localappdata else DEFAULT_YAML_FILE.parent
1✔
580
    return user_appdata
1✔
581

582

583
def run_command(cmd: Sequence[str], cwd: Path | None = None) -> int:
1✔
584
    """
585
    Run command in shell as a subprocess.
586

587
    :param cmd: The command to be executed as a sequence of strings
588
    :param cwd: current working directory
589
    :return: The return code of command
590
    """
591
    try:
1✔
592
        proc = run(cmd, check=True, shell=False, cwd=cwd)
1✔
593
        return proc.returncode
1✔
594
    except CalledProcessError as e:
1✔
595
        LOG.debug(f'Result: {e}')
1✔
596
        return -1
1✔
597

598

599
def load_json(full_path: Path) -> dict[Any, Any]:
1✔
600
    """
601
    Load JSON from a file into a dictionary.
602

603
    :param full_path: Full path
604
    :return: Python representation of JSON
605
    """
606
    with open(full_path, encoding='utf-8') as json_file:
1✔
607
        data = json_file.read()
1✔
608
    return json.loads(data)
1✔
609

610

611
@lru_cache
1✔
612
def get_full_bios_for_plane(plane: str, bios_dir: Path) -> DcsBiosPlaneData:
1✔
613
    """
614
    Collect full BIOS for plane with name.
615

616
    :param plane: BIOS plane name
617
    :param bios_dir: path to DCS-BIOS directory
618
    :return: dict
619
    """
620
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
621
    local_json: dict[str, Any] = {}
1✔
622
    aircraft_aliases = load_json(full_path=alias_path)
1✔
623
    for json_file in aircraft_aliases[plane]:
1✔
624
        local_json = {**local_json, **load_json(full_path=bios_dir / 'doc' / 'json' / f'{json_file}.json')}
1✔
625

626
    return DcsBiosPlaneData.model_validate(local_json)
1✔
627

628

629
@lru_cache
1✔
630
def get_inputs_for_plane(plane: str, bios_dir: Path) -> dict[str, dict[str, ControlKeyData]]:
1✔
631
    """
632
    Get dict with all not empty inputs for plane.
633

634
    :param plane: BIOS plane name
635
    :param bios_dir: path to DCS-BIOS
636
    :return: dict.
637
    """
638
    plane_bios = get_full_bios_for_plane(plane=plane, bios_dir=bios_dir)
1✔
639
    inputs = plane_bios.get_inputs()
1✔
640
    return inputs
1✔
641

642

643
def get_list_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> list[str]:
1✔
644
    """
645
    Get a list of all controllers from dict with sections and inputs.
646

647
    :param inputs: Dictionary with ControlKeyData
648
    :return: List of string
649
    """
650
    result_list = []
1✔
651
    for section, controllers in inputs.items():
1✔
652
        result_list.append(f'{CTRL_LIST_SEPARATOR} {section} {CTRL_LIST_SEPARATOR}')
1✔
653
        for ctrl_name in controllers:
1✔
654
            result_list.append(ctrl_name)
1✔
655
    return result_list
1✔
656

657

658
@lru_cache
1✔
659
def get_planes_list(bios_dir: Path) -> list[str]:
1✔
660
    """
661
    Get a list of all DCS-BIOS supported planes with clickable cockpit.
662

663
    :param bios_dir: Path to DCS-BIOS
664
    :return: List of all supported planes
665
    """
666
    aircraft_aliases = get_plane_aliases(bios_dir=bios_dir, plane=None)
1✔
667
    return [name for name, yaml_data in aircraft_aliases.items() if yaml_data not in (['CommonData', 'FC3'], ['CommonData'])]
1✔
668

669

670
@lru_cache
1✔
671
def get_plane_aliases(bios_dir: Path, plane: str | None = None) -> dict[str, list[str]]:
1✔
672
    """
673
    Get a list of all YAML files for plane with name.
674

675
    :param plane: BIOS plane name
676
    :param bios_dir: path to DCS-BIOS
677
    :return: list of all YAML files for plane definition
678
    """
679
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
680
    aircraft_aliases = load_json(full_path=alias_path)
1✔
681
    if plane:
1✔
682
        aircraft_aliases = {plane: aircraft_aliases[plane]}
1✔
683
    return aircraft_aliases
1✔
684

685

686
def get_depiction_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> dict[str, ControlDepiction]:
1✔
687
    """
688
    Get the depiction of controls.
689

690
    :param inputs: Dictionary with ControlKeyData
691
    :return: A dictionary containing the depiction of controls.
692
    """
693
    result = {}
1✔
694
    for section, controllers in inputs.items():
1✔
695
        for ctrl_name, ctrl in controllers.items():
1✔
696
            result[ctrl_name] = ctrl.depiction
1✔
697
    return result
1✔
698

699

700
def substitute_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
701
    """
702
    Substitute symbols in a string with specified replacements.
703

704
    :param value: The input string to be processed
705
    :param symbol_replacement: A list of symbol patterns and their corresponding replacements.
706
    :return: The processed string with symbols replaced according to the provided symbol_replacement list.
707
    """
708
    for pattern, replacement in symbol_replacement:
1✔
709
        value = sub(pattern, replacement, value)
1✔
710
    return value
1✔
711

712

713
def replace_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
714
    """
715
    Replace symbols in a string with specified replacements.
716

717
    :param value: The string in which symbols will be replaced.
718
    :param symbol_replacement: A sequence of sequences containing the original symbols and their replacement strings.
719
    :return: The string with symbols to replace.
720
    """
721
    for original, replacement in symbol_replacement:
1✔
722
        value = value.replace(original, replacement)
1✔
723
    return value
1✔
724

725

726
def _try_key_instance(klass: type[Gkey] | type[LcdButton] | type[MouseButton], method: str, key_str: str) -> AnyButton | None:
1✔
727
    """
728
    Attempt to invoke a method on a class with a given key string.
729

730
    The method will first attempt to call the provided method with the `key_str` as a parameter.
731
    If there is a TypeError (indicating the method does not support a parameter), it attempts to call
732
    the method without arguments.
733
    If the method is missing or the call fails due to a ValueError or AttributeError, the function returns None.
734

735
    :param klass: The class type on which the method is to be invoked.
736
    :param method: The name of the method to call on the class.
737
    :param key_str: A string key to be passed as a parameter to the method, if supported.
738
    :return: An instance of `AnyButton` from the invoked method, if successful, otherwise None.
739
    """
740
    try:
1✔
741
        return getattr(klass, method)(key_str)
1✔
742
    except TypeError:
1✔
743
        return getattr(klass, method)
1✔
744
    except (ValueError, AttributeError):
1✔
745
        return None
1✔
746

747

748
def get_key_instance(key_str: str) -> AnyButton:
1✔
749
    """
750
    Resolve the provided key string into an instance of a valid key class based on a predefined set of classes and their respective resolution methods.
751

752
    If the key string matches a class method's criteria, it returns the resolved key instance.
753
    If no match is found, an exception is raised.
754

755
    :param key_str: A string representing the name or identifier of the key to be resolved into a key instance (e.g., Gkey, LcdButton, or MouseButton).
756
    :return: An instance of a class (AnyButton) that corresponds to the provided key string, if successfully resolved.
757
    :raises AttributeError: If the provided key string cannot be resolved into a valid key instance using the predefined classes and methods.
758
    """
759
    for klass, method in [(Gkey, 'from_yaml'), (MouseButton, 'from_yaml'), (LcdButton, key_str)]:
1✔
760
        key_instance = _try_key_instance(klass=klass, method=method, key_str=key_str)
1✔
761
        if key_instance:
1✔
762
            return key_instance
1✔
763
    raise AttributeError(f'Could not resolve "{key_str}" to a Gkey/LcdButton/MouseButton instance')
1✔
764

765

766
class KeyRequest:
1✔
767
    """Map LCD button or G-Key with an abstract request model."""
768

769
    def __init__(self, yaml_path: Path, get_bios_fn: Callable[[str], BiosValue]) -> None:
1✔
770
        """
771
        Load YAML with BIOS request for G-Keys and LCD buttons.
772

773
        :param yaml_path: Path to the airplane YAML file.
774
        :param get_bios_fn: Function used to get a current BIOS value.
775
        """
776
        plane_yaml = load_yaml(full_path=yaml_path)
1✔
777
        self.buttons: dict[AnyButton, RequestModel] = {}
1✔
778
        for key_str, request in plane_yaml.items():
1✔
779
            if request:
1✔
780
                key = get_key_instance(key_str)
1✔
781
                self.buttons[key] = RequestModel.from_request(key=key, request=request, get_bios_fn=get_bios_fn)
1✔
782

783
    @property
1✔
784
    def cycle_button_ctrl_name(self) -> dict[str, int]:
1✔
785
        """Return a dictionary with BIOS selectors to track changes of values for a cycle button to get current values."""
786
        return {req_model.ctrl_name: int() for req_model in self.buttons.values() if req_model.is_cycle}
1✔
787

788
    def get_request(self, button: AnyButton) -> RequestModel:
1✔
789
        """
790
        Get abstract representation for request ti be sent for requested button.
791

792
        :param button: LcdButton, Gkey or MouseButton
793
        :return: RequestModel object
794
        """
795
        return self.buttons.get(button, RequestModel.make_empty(key=button))
1✔
796

797
    def set_request(self, button: AnyButton, req: str) -> None:
1✔
798
        """
799
        Update the internal string request for the specified button.
800

801
        :param button: LcdButton, Gkey or MouseButton
802
        :param req: The raw request to set.
803
        """
804
        self.buttons[button].raw_request = req
1✔
805

806

807
def generate_bios_jsons_with_lupa(dcs_save_games: Path, local_compile='./Scripts/DCS-BIOS/test/compile/LocalCompile.lua') -> None:
1✔
808
    r"""
809
    Regenerate DCS-BIOS JSON files.
810

811
    Using the Lupa library, first it will try to use LuaJIT 2.1 if not it will fall back to Lua 5.1
812

813
    :param dcs_save_games: Full path to the Saved Games\DCS directory.
814
    :param local_compile: Relative path to the LocalCompile.lua file.
815
    """
816
    try:
1✔
817
        import lupa.luajit21 as lupa
1✔
818
    except ImportError:
×
819
        try:
×
820
            import lupa.lua51 as lupa  # type: ignore[no-redef]
×
821
        except ImportError:
×
822
            return
×
823

824
    previous_dir = getcwd()
1✔
825
    try:
1✔
826
        chdir(dcs_save_games)
1✔
827
        LOG.debug(f"Changed to: {dcs_save_games}")
1✔
828
        lua = lupa.LuaRuntime()
1✔
829
        LOG.debug(f"Using {lupa.LuaRuntime().lua_implementation} (compiled with {lupa.LUA_VERSION})")
1✔
830
        with open(local_compile) as lua_file:
1✔
831
            lua_script = lua_file.read()
1✔
832
        lua.execute(lua_script)
1✔
833
    finally:
834
        chdir(previous_dir)
1✔
835
        LOG.debug(f"Change directory back to: {getcwd()}")
1✔
836

837

838
def rgba(c: Color, /, mode: LcdMode | int = LcdMode.TRUE_COLOR) -> tuple[int, ...] | int:
1✔
839
    """
840
    Convert a color to a single integer or tuple of integers.
841

842
    This depends on a given mode/alpha channel:
843
    * If a mode is an integer, then return a tuple of RGBA channels.
844
    * If a mode is a LcdMode.TRUE_COLOR, then return a tuple of RGBA channels.
845
    * If a mode is a LcdMode.BLACK_WHITE, then return a single integer.
846

847
    :param c: Color name to convert
848
    :param mode: Mode of the LCD or alpha channel as an integer
849
    :return: tuple with RGBA channels or single integer
850
    """
851
    if isinstance(mode, int):
1✔
852
        return *rgb(c), mode
1✔
853
    else:
854
        return ImageColor.getcolor(color=c.name, mode=mode.value)
1✔
855

856

857
def rgb(c: Color, /) -> tuple[int, int, int]:
1✔
858
    """
859
    Convert a Color instance to its RGB components as a tuple of integers.
860

861
    The function extracts the red, green, and blue components from the
862
    color's value, which is expected to be a single integer representing
863
    a 24-bit RGB color.
864

865
    :param c: An instance of Color, whose value is a 24-bit RGB integer.
866
    :return: A tuple containing the red, green, and blue components.
867
    """
868
    red = (c.value >> 16) & 0xff
1✔
869
    green = (c.value >> 8) & 0xff
1✔
870
    blue = c.value & 0xff
1✔
871
    return red, green, blue
1✔
872

873

874
def detect_system_color_mode() -> str:
1✔
875
    """
876
    Detect the color mode of the system.
877

878
    Registry will return 0 if Windows is in Dark Mode and 1 if Windows is in Light Mode.
879
    In case of error, it will return 'light' as default.
880

881
    :return: Dark or light as string
882
    """
883
    from winreg import HKEY_CURRENT_USER, OpenKey, QueryValueEx  # type: ignore[attr-defined]
1✔
884

885
    try:
1✔
886
        key = OpenKey(HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Themes\\Personalize')
1✔
887
        subkey = QueryValueEx(key, 'AppsUseLightTheme')[0]
1✔
888
    except (OSError, IndexError):
×
889
        return 'Light'
×
890
    return {0: 'Dark', 1: 'Light'}[subkey]
1✔
891

892
def verify_hashes(file_path: Path, digest_file: Path) -> tuple[bool, dict[str, bool]]:
1✔
893
    """
894
    Check hashes for a file.
895

896
    :param file_path: Path to the file
897
    :param digest_file: Path to the digests file
898
    :return: Overall verdict and detailed results
899
    """
900
    if not file_path.is_file() or not digest_file.is_file():
1✔
901
        return False, {}
1✔
902

903
    with open(digest_file) as f_digests:
1✔
904
        all_digests = f_digests.readlines()
1✔
905

906
    hashes: dict[str, dict[str, str]] = {}
1✔
907
    for line in all_digests:
1✔
908
        if line.startswith('#HASH'):
1✔
909
            hash_type = line.split()[1]
1✔
910
        elif line.strip():
1✔
911
            hash_and_file = line.split()
1✔
912
            filename = hash_and_file[1] if len(hash_and_file) > 1 else ''
1✔
913
            hashes.setdefault(filename, {})[hash_type] = hash_and_file[0]
1✔
914
    LOG.debug(f'Supported algorithms are: {hashlib.algorithms_guaranteed}')
1✔
915
    results = _compute_hash_and_check_file(file_path=file_path, hashes=hashes)
1✔
916

917
    return all(results.values()), results
1✔
918

919

920
def _compute_hash_and_check_file(file_path: Path, hashes: dict[str, dict[str, str]]) -> dict[str, bool]:
1✔
921
    """
922
    Compute and verify hashes for a file.
923

924
    :param file_path: Path for file to chack hashes
925
    :param hashes: Dictionary of hash types and values
926
    :return: Dictionary of verification results
927
    """
928
    result = {}
1✔
929
    for hash_type, hash_value in hashes.get(file_path.name, {}).items():
1✔
930
        try:
1✔
931
            with open(file_path, 'rb') as f_path:
1✔
932
                if sys.version_info.minor > 10:
1✔
933
                    computed_hash = hashlib.file_digest(f_path, hash_type).hexdigest()
1✔
934
                else:
NEW
935
                    h = hashlib.new(hash_type)
×
NEW
936
                    h.update(f_path.read())
×
NEW
937
                    computed_hash = h.hexdigest()
×
938
        except ValueError:
1✔
939
            computed_hash = ''
1✔
940
            # todo: why there is diffrent hashes for sha1 and md5 on linux
941
        result[hash_type] = (computed_hash == hash_value)
1✔
942
    return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc