• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emcek / dcspy / 17130084222

21 Aug 2025 02:33PM UTC coverage: 97.685%. Remained the same
17130084222

Pull #509

github

emcek
remove 2022Server tests resources
Pull Request #509: Add 2025server CI

376 of 382 branches covered (98.43%)

Branch coverage included in aggregate %.

4604 of 4716 relevant lines covered (97.63%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.42
/src/dcspy/utils.py
1
from __future__ import annotations
1✔
2

3
import json
1✔
4
import sys
1✔
5
import zipfile
1✔
6
from collections.abc import Callable, Generator, Sequence
1✔
7
from datetime import datetime
1✔
8
from functools import lru_cache
1✔
9
from glob import glob
1✔
10
from itertools import chain
1✔
11
from logging import getLogger
1✔
12
from os import chdir, environ, getcwd, makedirs, walk
1✔
13
from pathlib import Path
1✔
14
from platform import python_implementation, python_version, uname
1✔
15
from pprint import pformat
1✔
16
from re import search, sub
1✔
17
from shutil import rmtree
1✔
18
from subprocess import CalledProcessError, run
1✔
19
from tempfile import gettempdir
1✔
20
from typing import Any, ClassVar
1✔
21

22
import yaml
1✔
23
from packaging import version
1✔
24
from PIL import ImageColor
1✔
25
from psutil import process_iter
1✔
26
from requests import get
1✔
27

28
from dcspy.models import (CONFIG_YAML, CTRL_LIST_SEPARATOR, DEFAULT_YAML_FILE, AnyButton, BiosValue, Color, ControlDepiction, ControlKeyData, DcsBiosPlaneData,
1✔
29
                          DcspyConfigYaml, Gkey, LcdButton, LcdMode, MouseButton, Release, RequestModel, __version__)
30

31
try:
1✔
32
    import git
1✔
33
except ImportError:
×
34
    pass
×
35

36
LOG = getLogger(__name__)
1✔
37

38
with open(DEFAULT_YAML_FILE) as c_file:
1✔
39
    defaults_cfg: DcspyConfigYaml = yaml.load(c_file, Loader=yaml.SafeLoader)
1✔
40
    defaults_cfg['dcsbios'] = f'C:\\Users\\{environ.get("USERNAME", "UNKNOWN")}\\Saved Games\\DCS\\Scripts\\DCS-BIOS'
1✔
41

42

43
def get_default_yaml(local_appdata: bool = False) -> Path:
1✔
44
    """
45
    Return a full path to the default configuration file.
46

47
    :param local_appdata: If True value C:/Users/<user_name>/AppData/Local is used
48
    :return: Path like an object
49
    """
50
    cfg_ful_path = DEFAULT_YAML_FILE
1✔
51
    if local_appdata:
1✔
52
        user_appdata = get_config_yaml_location()
1✔
53
        makedirs(name=user_appdata, exist_ok=True)
1✔
54
        cfg_ful_path = Path(user_appdata / CONFIG_YAML).resolve()
1✔
55
        if not cfg_ful_path.exists():
1✔
56
            save_yaml(data=defaults_cfg, full_path=cfg_ful_path)
1✔
57
    return cfg_ful_path
1✔
58

59

60
def load_yaml(full_path: Path) -> DcspyConfigYaml:
1✔
61
    """
62
    Load YAML from a file into a dictionary.
63

64
    :param full_path: Full path to YAML file
65
    :return: Dictionary with data
66
    """
67
    try:
1✔
68
        with open(file=full_path, encoding='utf-8') as yaml_file:
1✔
69
            data = yaml.load(yaml_file, Loader=yaml.SafeLoader)
1✔
70
            if not isinstance(data, dict):
1✔
71
                data = {}
1✔
72
    except (FileNotFoundError, yaml.parser.ParserError) as err:
1✔
73
        makedirs(name=full_path.parent, exist_ok=True)
1✔
74
        LOG.warning(f'{type(err).__name__}: {full_path}.', exc_info=True)
1✔
75
        LOG.debug(f'{err}')
1✔
76
        data = {}
1✔
77
    return data
1✔
78

79

80
def save_yaml(data: DcspyConfigYaml, full_path: Path) -> None:
1✔
81
    """
82
    Save disc as YAML file.
83

84
    :param data: Dictionary with data
85
    :param full_path: Full a path to YAML file
86
    """
87
    with open(file=full_path, mode='w', encoding='utf-8') as yaml_file:
1✔
88
        yaml.dump(data, yaml_file, Dumper=yaml.SafeDumper)
1✔
89

90

91
def check_ver_at_github(repo: str) -> Release:
1✔
92
    """
93
    Check a version of <organization>/<package> at GitHub.
94

95
    :param repo: Format '<organization or user>/<package>'
96
    :return: Release object with data
97
    """
98
    package = repo.split('/')[1]
1✔
99
    try:
1✔
100
        response = get(url=f'https://api.github.com/repos/{repo}/releases/latest', timeout=5)
1✔
101
        if response.ok:
1✔
102
            rel = Release(**response.json())
1✔
103
            LOG.debug(f'Latest GitHub release: {rel}')
1✔
104
            return rel
1✔
105
        else:
106
            raise ValueError(f'Try again later. Status={response.status_code}')
1✔
107
    except Exception as exc:
1✔
108
        raise ValueError(f'Unable to check {package} version online: {exc}')
1✔
109

110

111
def get_version_string(repo: str, current_ver: str | version.Version, check: bool = True) -> str:
1✔
112
    """
113
    Generate formatted string with version number.
114

115
    :param repo: Format '<organization or user>/<package>'.
116
    :param current_ver: String or Version object.
117
    :param check: Version online.
118
    :return: Formatted version as a string.
119
    """
120
    ver_string = f'v{current_ver}'
1✔
121
    if check:
1✔
122
        try:
1✔
123
            details = ''
1✔
124
            result = check_ver_at_github(repo=repo)
1✔
125
        except ValueError:
1✔
126
            return f'v{current_ver} (failed)'
1✔
127

128
        if result.is_latest(current_ver=current_ver):
1✔
129
            details = ' (latest)'
1✔
130
        elif result.version != version.parse('0.0.0'):
1✔
131
            details = ' (update!)'
1✔
132
            current_ver = result.version
1✔
133
        ver_string = f'v{current_ver}{details}'
1✔
134
    return ver_string
1✔
135

136

137
def download_file(url: str, save_path: Path, progress_fn: Callable[[int], None] | None = None) -> bool:
1✔
138
    """
139
    Download a file from URL and save to save_path.
140

141
    :param url: URL address
142
    :param save_path: full path to save
143
    :param progress_fn: a callable object to report download progress
144
    """
145
    response = get(url=url, stream=True, timeout=5)
1✔
146
    if response.ok:
1✔
147
        file_size = int(response.headers.get('Content-Length', 0))
1✔
148
        LOG.debug(f'File size: {file_size / (1024 * 1024):.2f} MB' if file_size else 'File size: Unknown')
1✔
149
        LOG.debug(f'Download file from: {url}')
1✔
150
        with open(save_path, 'wb+') as dl_file:
1✔
151
            downloaded = 0
1✔
152
            progress = 0
1✔
153
            for chunk in response.iter_content(chunk_size=1024):
1✔
154
                dl_file.write(chunk)
1✔
155
                downloaded += len(chunk)
1✔
156
                new_progress = int((downloaded / file_size) * 100)
1✔
157
                if progress_fn and new_progress == progress + 1:
1✔
158
                    progress = new_progress
×
159
                    progress_fn(progress)
×
160
            LOG.debug(f'Saved as: {save_path}')
1✔
161
            return True
1✔
162
    else:
163
        LOG.warning(f'Can not download from: {url}')
1✔
164
        return False
1✔
165

166

167
def proc_is_running(name: str) -> int:
1✔
168
    """
169
    Check if the process is running and return its PID.
170

171
    If the process name is not found, 0 (zero) is returned.
172
    :param name: Process name
173
    :return: PID as int
174
    """
175
    for proc in process_iter(['pid', 'name']):
1✔
176
        if name in proc.info['name']:
1✔
177
            return proc.info['pid']
1✔
178
    return 0
1✔
179

180

181
def check_dcs_ver(dcs_path: Path) -> str:
1✔
182
    """
183
    Check DCS version and release type.
184

185
    :param dcs_path: Path to DCS installation directory
186
    :return: DCS version as strings
187
    """
188
    result_ver = 'Unknown'
1✔
189
    try:
1✔
190
        with open(file=dcs_path / 'autoupdate.cfg', encoding='utf-8') as autoupdate_cfg:
1✔
191
            autoupdate_data = autoupdate_cfg.read()
1✔
192
    except (FileNotFoundError, PermissionError) as err:
1✔
193
        LOG.debug(f'{type(err).__name__}: {err.filename}')
1✔
194
    else:
195
        if dcs_ver := search(r'"version":\s"([\d.]*)"', autoupdate_data):
1✔
196
            result_ver = str(dcs_ver.group(1))
1✔
197
    return result_ver
1✔
198

199

200
def check_bios_ver(bios_path: Path | str) -> version.Version:
1✔
201
    """
202
    Check the DSC-BIOS release version.
203

204
    :param bios_path: Path to DCS-BIOS directory in the SavedGames folder
205
    :return: Version object
206
    """
207
    bios_ver = version.parse('0.0.0')
1✔
208
    new_location = Path(bios_path) / 'lib' / 'modules' / 'common_modules' / 'CommonData.lua'
1✔
209
    old_location = Path(bios_path) / 'lib' / 'CommonData.lua'
1✔
210

211
    if new_location.is_file():
1✔
212
        with open(file=new_location, encoding='utf-8') as cd_lua:
1✔
213
            cd_lua_data = cd_lua.read()
1✔
214
    elif old_location.is_file():
1✔
215
        with open(file=old_location, encoding='utf-8') as cd_lua:
1✔
216
            cd_lua_data = cd_lua.read()
1✔
217
    else:
218
        cd_lua_data = ''
1✔
219
        LOG.debug(f'No `CommonData.lua` while checking DCS-BIOS version at {new_location.parent} or {old_location.parent}')
1✔
220

221
    if bios_re := search(r'function getVersion\(\)\s*return\s*\"([\d.]*)\"', cd_lua_data):
1✔
222
        bios_ver = version.parse(bios_re.group(1))
1✔
223
    return bios_ver
1✔
224

225

226
def is_git_repo(dir_path: str) -> bool:
1✔
227
    """
228
    Check if dir_path ios Git repository.
229

230
    :param dir_path: Path as string
231
    :return: True if dir is git repo
232
    """
233
    import git
1✔
234
    try:
1✔
235
        _ = git.Repo(dir_path).git_dir
1✔
236
        return True
1✔
237
    except (git.InvalidGitRepositoryError, git.exc.NoSuchPathError):
1✔
238
        return False
1✔
239

240

241
def _get_sha_hex_str(bios_repo: git.Repo, git_ref: str) -> str:
1✔
242
    """
243
    Return a string representing the commit hash, date, and author of the given Git reference in the provided repository.
244

245
    :param bios_repo: A Git repository object.
246
    :param git_ref: A string representing the Git reference (e.g., commit, branch, tag).
247
    :return: A string representing the commit hash, date, and author.
248
    """
249
    try:
1✔
250
        import git
1✔
251
    except ImportError:
×
252
        raise OSError('Git executable is not available!')
×
253
    try:
1✔
254
        bios_repo.git.checkout(git_ref)
1✔
255
        branch = bios_repo.active_branch.name
1✔
256
        head_commit = bios_repo.head.commit
1✔
257
        sha = f'{branch} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
258
    except (git.exc.GitCommandError, TypeError):
1✔
259
        head_commit = bios_repo.head.commit
1✔
260
        sha = f'{head_commit.hexsha[0:8]} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
261
    LOG.debug(f'Checkout: {head_commit.hexsha} from: {head_commit.committed_datetime} | by: {head_commit.author}\n{head_commit.message}')  # type: ignore
1✔
262
    return sha
1✔
263

264

265
def check_github_repo(git_ref: str, repo_dir: Path, repo: str, update: bool = True, progress: git.RemoteProgress | None = None) -> str:
1✔
266
    """
267
    Update git repository.
268

269
    Return SHA of the latest commit.
270

271
    :param git_ref: Any Git reference as a string
272
    :param repo_dir: Local directory for a repository
273
    :param repo: GitHub repository user/name
274
    :param update: Perform update process
275
    :param progress: Progress callback
276
    """
277
    bios_repo = _checkout_repo(repo=repo, repo_dir=repo_dir, progress=progress)
1✔
278
    if update:
1✔
279
        f_info = bios_repo.remotes[0].pull(progress=progress)
1✔
280
        LOG.debug(f'Pulled: {f_info[0].name} as: {f_info[0].commit}')
1✔
281
    sha = _get_sha_hex_str(bios_repo, git_ref)
1✔
282
    return sha
1✔
283

284

285
def _checkout_repo(repo: str, repo_dir: Path, progress: git.RemoteProgress | None = None) -> git.Repo:
1✔
286
    """
287
    Checkout repository at a main/master branch or clone it when not exists in a system.
288

289
    :param repo: Repository name
290
    :param repo_dir: Local repository directory
291
    :param progress: Progress callback
292
    :return: Repo object of the repository
293
    """
294
    import git
1✔
295

296
    makedirs(name=repo_dir, exist_ok=True)
1✔
297
    if is_git_repo(str(repo_dir)):
1✔
298
        bios_repo = git.Repo(repo_dir)
1✔
299
        all_refs = get_all_git_refs(repo_dir=repo_dir)
1✔
300
        checkout_ref = 'main' if 'main' in all_refs else 'master'
1✔
301
        bios_repo.git.checkout(checkout_ref)
1✔
302
    else:
303
        rmtree(path=repo_dir, ignore_errors=True)
1✔
304
        bios_repo = git.Repo.clone_from(url=f'https://github.com/{repo}.git', to_path=repo_dir, progress=progress)  # type: ignore[arg-type]
1✔
305
    return bios_repo
1✔
306

307

308
def check_dcs_bios_entry(lua_dst_data: str, lua_dst_path: Path, temp_dir: Path) -> str:
1✔
309
    """
310
    Check DCS-BIOS entry in Export.lua file.
311

312
    :param lua_dst_data: Content of Export.lua
313
    :param lua_dst_path: Export.lua path
314
    :param temp_dir: Directory with DCS-BIOS archive
315
    :return: Result of checks
316
    """
317
    result = '\n\nExport.lua exists.'
1✔
318
    lua = 'Export.lua'
1✔
319
    with open(file=temp_dir / lua, encoding='utf-8') as lua_src:
1✔
320
        lua_src_data = lua_src.read()
1✔
321
    export_re = search(r'dofile\(lfs.writedir\(\)\s*\.\.\s*\[\[Scripts\\DCS-BIOS\\BIOS\.lua]]\)', lua_dst_data)
1✔
322
    if not export_re:
1✔
323
        with open(file=lua_dst_path / lua, mode='a+', encoding='utf-8') as exportlua_dst:
1✔
324
            exportlua_dst.write(f'\n{lua_src_data}')
1✔
325
        LOG.debug(f'Add DCS-BIOS to Export.lua: {lua_src_data}')
1✔
326
        result += '\n\nDCS-BIOS entry added.\n\nYou verify installation at:\ngithub.com/DCS-Skunkworks/DCSFlightpanels/wiki/Installation'
1✔
327
    else:
328
        result += '\n\nDCS-BIOS entry detected.'
1✔
329
    return result
1✔
330

331

332
def count_files(directory: Path, extension: str) -> int:
1✔
333
    """
334
    Count files with extension in a directory.
335

336
    :param directory: As Path object
337
    :param extension: File extension
338
    :return: Number of files
339
    """
340
    try:
1✔
341
        json_files = [f.name for f in directory.iterdir() if f.is_file() and f.suffix == f'.{extension}']
1✔
342
        LOG.debug(f'In: {directory} found {json_files} ')
1✔
343
        return len(json_files)
1✔
344
    except FileNotFoundError:
1✔
345
        LOG.debug(f'Wrong directory: {directory}')
1✔
346
        return -1
1✔
347

348

349
def is_git_exec_present() -> bool:
1✔
350
    """
351
    Check if the git executable is present in a system.
352

353
    :return: True if git.exe is available
354
    """
355
    try:
1✔
356
        import git
1✔
357
        return bool(git.GIT_OK)
1✔
358
    except ImportError as err:
×
359
        LOG.debug(type(err).__name__, exc_info=True)
×
360
        return False
×
361

362

363
def is_git_object(repo_dir: Path, git_obj: str) -> bool:
1✔
364
    """
365
    Check if git_obj is a valid Git reference.
366

367
    :param repo_dir: Directory with repository
368
    :param git_obj: Git reference to check
369
    :return: True if git_obj is git reference, False otherwise
370
    """
371
    import gitdb  # type: ignore[import-untyped]
1✔
372
    result = False
1✔
373
    if is_git_repo(str(repo_dir)):
1✔
374
        bios_repo = git.Repo(repo_dir)
1✔
375
        try:
1✔
376
            bios_repo.commit(git_obj)
1✔
377
            result = True
1✔
378
        except (gitdb.exc.BadName, TypeError):
1✔
379
            pass
1✔
380
    return result
1✔
381

382

383
def get_all_git_refs(repo_dir: Path) -> list[str]:
1✔
384
    """
385
    Get a list of branches and tags for repo.
386

387
    :param repo_dir: Directory with a repository
388
    :return: List of git references as strings
389
    """
390
    refs = []
1✔
391
    if is_git_repo(str(repo_dir)):
1✔
392
        for ref in chain(git.Repo(repo_dir).heads, git.Repo(repo_dir).tags):
1✔
393
            refs.append(str(ref))
1✔
394
    return refs
1✔
395

396

397
class CloneProgress(git.RemoteProgress):
1✔
398
    """Handler providing an interface to parse progress information emitted by git."""
399
    OP_CODES: ClassVar[list[str]] = ['BEGIN', 'CHECKING_OUT', 'COMPRESSING', 'COUNTING', 'END', 'FINDING_SOURCES', 'RECEIVING', 'RESOLVING', 'WRITING']
1✔
400
    OP_CODE_MAP: ClassVar[dict[int, str]] = {getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES}
1✔
401

402
    def __init__(self, progress, stage) -> None:
1✔
403
        """
404
        Initialize the progress handler.
405

406
        :param progress: Progress Qt6 signal
407
        :param stage: Report stage Qt6 signal
408
        """
409
        super().__init__()
1✔
410
        self.progress_signal = progress
1✔
411
        self.stage_signal = stage
1✔
412

413
    def get_curr_op(self, op_code: int) -> str:
1✔
414
        """
415
        Get a stage name from OP code.
416

417
        :param op_code: OP code
418
        :return: stage name
419
        """
420
        op_code_masked = op_code & self.OP_MASK
1✔
421
        return self.OP_CODE_MAP.get(op_code_masked, '?').title()
1✔
422

423
    def update(self, op_code: int, cur_count, max_count=None, message: str = '') -> None:
1✔
424
        """
425
        Call whenever the progress changes.
426

427
        :param op_code: Integer allowing to be compared against Operation IDs and stage IDs.
428
        :param cur_count: A count of current absolute items
429
        :param max_count: The maximum count of items we expect. It may be None in case there is no maximum number of items or if it is (yet) unknown.
430
        :param message: It contains the number of bytes transferred. It may be used for other purposes as well.
431
        """
432
        if op_code & git.RemoteProgress.BEGIN:
1✔
433
            self.stage_signal.emit(f'Git clone: {self.get_curr_op(op_code)}')
1✔
434

435
        percentage = int(cur_count / max_count * 100) if max_count else 0
1✔
436
        self.progress_signal.emit(percentage)
1✔
437

438

439
def collect_debug_data() -> Path:
1✔
440
    """
441
    Collect and zip all data for troubleshooting.
442

443
    :return: Path object to ZIP file
444
    """
445
    config_file = Path(get_config_yaml_location() / CONFIG_YAML).resolve()
1✔
446
    conf_dict = load_yaml(config_file)
1✔
447
    sys_data = _get_sys_file(conf_dict)
1✔
448
    dcs_log = _get_dcs_log(conf_dict)
1✔
449

450
    zip_file = Path(gettempdir()) / f'dcspy_debug_{str(datetime.now()).replace(" ", "_").replace(":", "")}.zip'
1✔
451
    with zipfile.ZipFile(file=zip_file, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) as archive:
1✔
452
        archive.write(sys_data, arcname=sys_data.name)
1✔
453
        archive.write(dcs_log, arcname=dcs_log.name)
1✔
454
        for log_file in _get_log_files():
1✔
455
            archive.write(log_file, arcname=log_file.name)
1✔
456
        for yaml_file in _get_yaml_files(config_file):
1✔
457
            archive.write(yaml_file, arcname=yaml_file.name)
1✔
458
        for png in _get_png_files():
1✔
459
            archive.write(png, arcname=png.name)
1✔
460

461
    return zip_file
1✔
462

463

464
def _get_sys_file(conf_dict: dict[str, Any]) -> Path:
1✔
465
    """
466
    Save system information to a file and return its path.
467

468
    :param conf_dict: A dictionary containing configuration information.
469
    :return: A Path object representing the path to the system data file.
470
    """
471
    system_info = _fetch_system_info(conf_dict)
1✔
472
    sys_data = Path(gettempdir()) / 'system_data.txt'
1✔
473
    with open(sys_data, 'w+') as debug_file:
1✔
474
        debug_file.write(system_info)
1✔
475
    return sys_data
1✔
476

477

478
def _fetch_system_info(conf_dict: dict[str, Any]) -> str:
1✔
479
    """
480
    Fetch system information.
481

482
    :param conf_dict: A dictionary containing configuration information.
483
    :return: System data as a string
484
    """
485
    name = uname()
1✔
486
    pyver = (python_version(), python_implementation())
1✔
487
    pyexec = sys.executable
1✔
488
    dcs = check_dcs_ver(dcs_path=Path(str(conf_dict['dcs'])))
1✔
489
    bios_ver = check_bios_ver(bios_path=str(conf_dict['dcsbios']))
1✔
490
    repo_dir = Path(str(conf_dict['dcsbios'])).parents[1] / 'dcs-bios'
1✔
491
    git_ver, head_commit = _fetch_git_data(repo_dir=repo_dir)
1✔
492
    lgs_dir = '\n'.join([
1✔
493
        str(Path(dir_path) / filename)
494
        for dir_path, _, filenames in walk('C:\\Program Files\\Logitech Gaming Software\\SDK')
495
        for filename in filenames
496
    ])
497
    return f'{__version__=}\n{name=}\n{pyver=}\n{pyexec=}\n{dcs=}\n{bios_ver=}\n{git_ver=}\n{head_commit=}\n{lgs_dir}\ncfg={pformat(conf_dict)}'
1✔
498

499

500
def _fetch_git_data(repo_dir: Path) -> tuple[Sequence[int], str]:
1✔
501
    """
502
    Fetch the Git version and SHA of HEAD commit.
503

504
    :param repo_dir: Local directory for repository
505
    :return: Tuple of (a version) and SHA of HEAD commit
506
    """
507
    try:
1✔
508
        import git
1✔
509
        git_ver = git.cmd.Git().version_info
1✔
510
        head_commit = str(git.Repo(repo_dir).head.commit)
1✔
511
    except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError, ImportError):
1✔
512
        git_ver = (0, 0, 0, 0)
1✔
513
        head_commit = 'N/A'
1✔
514
    return git_ver, head_commit
1✔
515

516

517
def _get_dcs_log(conf_dict: dict[str, Any]) -> Path:
1✔
518
    """
519
    Get a path to dcs.log path.
520

521
    :param conf_dict: A dictionary containing configuration information.
522
    :return: A Path object representing the path to the dcs.log file.
523
    """
524
    dcs_log_file = Path(conf_dict['dcsbios']).parents[1] / 'Logs' / 'dcs.log'
1✔
525
    return dcs_log_file if dcs_log_file.is_file() else Path()
1✔
526

527

528
def _get_log_files() -> Generator[Path]:
1✔
529
    """
530
    Get a path to all logg files.
531

532
    :return: Generator of a path to log files
533
    """
534
    return (
1✔
535
        Path(gettempdir()) / logfile
536
        for logfile in glob(str(Path(gettempdir()) / 'dcspy.log*'))
537
    )
538

539

540
def _get_yaml_files(config_file: Path) -> Generator[Path]:
1✔
541
    """
542
    Get a path to all configuration YAML files.
543

544
    :param config_file: Path to the config file
545
    :return: Generator of a path to YAML files
546
    """
547
    return (
1✔
548
        Path(dirpath) / filename
549
        for dirpath, _, filenames in walk(config_file.parent)
550
        for filename in filenames
551
        if filename.endswith('yaml')
552
    )
553

554

555
def _get_png_files() -> Generator[Path]:
1✔
556
    """
557
    Get a path to png screenshots for all airplanes.
558

559
    :return: Generator of a path to png files
560
    """
561
    aircrafts = ['FA18Chornet', 'Ka50', 'Ka503', 'Mi8MT', 'Mi24P', 'F16C50', 'F15ESE',
1✔
562
                 'AH64DBLKII', 'A10C', 'A10C2', 'F14A135GR', 'F14B', 'AV8BNA']
563
    return (
1✔
564
        Path(dir_path) / filename
565
        for dir_path, _, filenames in walk(gettempdir())
566
        for filename in filenames
567
        if any(True for aircraft in aircrafts if aircraft in filename and filename.endswith('png'))
568
    )
569

570

571
def get_config_yaml_location() -> Path:
1✔
572
    """
573
    Get a location of YAML configuration files.
574

575
    :rtype: Path object to directory
576
    """
577
    localappdata = environ.get('LOCALAPPDATA', None)
1✔
578
    user_appdata = Path(localappdata) / 'dcspy' if localappdata else DEFAULT_YAML_FILE.parent
1✔
579
    return user_appdata
1✔
580

581

582
def run_command(cmd: Sequence[str], cwd: Path | None = None) -> int:
1✔
583
    """
584
    Run command in shell as a subprocess.
585

586
    :param cmd: The command to be executed as a sequence of strings
587
    :param cwd: current working directory
588
    :return: The return code of command
589
    """
590
    try:
1✔
591
        proc = run(cmd, check=True, shell=False, cwd=cwd)
1✔
592
        return proc.returncode
1✔
593
    except CalledProcessError as e:
1✔
594
        LOG.debug(f'Result: {e}')
1✔
595
        return -1
1✔
596

597

598
def load_json(full_path: Path) -> dict[Any, Any]:
1✔
599
    """
600
    Load JSON from a file into a dictionary.
601

602
    :param full_path: Full path
603
    :return: Python representation of JSON
604
    """
605
    with open(full_path, encoding='utf-8') as json_file:
1✔
606
        data = json_file.read()
1✔
607
    return json.loads(data)
1✔
608

609

610
@lru_cache
1✔
611
def get_full_bios_for_plane(plane: str, bios_dir: Path) -> DcsBiosPlaneData:
1✔
612
    """
613
    Collect full BIOS for plane with name.
614

615
    :param plane: BIOS plane name
616
    :param bios_dir: path to DCS-BIOS directory
617
    :return: dict
618
    """
619
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
620
    local_json: dict[str, Any] = {}
1✔
621
    aircraft_aliases = load_json(full_path=alias_path)
1✔
622
    for json_file in aircraft_aliases[plane]:
1✔
623
        local_json = {**local_json, **load_json(full_path=bios_dir / 'doc' / 'json' / f'{json_file}.json')}
1✔
624

625
    return DcsBiosPlaneData.model_validate(local_json)
1✔
626

627

628
@lru_cache
1✔
629
def get_inputs_for_plane(plane: str, bios_dir: Path) -> dict[str, dict[str, ControlKeyData]]:
1✔
630
    """
631
    Get dict with all not empty inputs for plane.
632

633
    :param plane: BIOS plane name
634
    :param bios_dir: path to DCS-BIOS
635
    :return: dict.
636
    """
637
    plane_bios = get_full_bios_for_plane(plane=plane, bios_dir=bios_dir)
1✔
638
    inputs = plane_bios.get_inputs()
1✔
639
    return inputs
1✔
640

641

642
def get_list_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> list[str]:
1✔
643
    """
644
    Get a list of all controllers from dict with sections and inputs.
645

646
    :param inputs: Dictionary with ControlKeyData
647
    :return: List of string
648
    """
649
    result_list = []
1✔
650
    for section, controllers in inputs.items():
1✔
651
        result_list.append(f'{CTRL_LIST_SEPARATOR} {section} {CTRL_LIST_SEPARATOR}')
1✔
652
        for ctrl_name in controllers:
1✔
653
            result_list.append(ctrl_name)
1✔
654
    return result_list
1✔
655

656

657
@lru_cache
1✔
658
def get_planes_list(bios_dir: Path) -> list[str]:
1✔
659
    """
660
    Get a list of all DCS-BIOS supported planes with clickable cockpit.
661

662
    :param bios_dir: Path to DCS-BIOS
663
    :return: List of all supported planes
664
    """
665
    aircraft_aliases = get_plane_aliases(bios_dir=bios_dir, plane=None)
1✔
666
    return [name for name, yaml_data in aircraft_aliases.items() if yaml_data not in (['CommonData', 'FC3'], ['CommonData'])]
1✔
667

668

669
@lru_cache
1✔
670
def get_plane_aliases(bios_dir: Path, plane: str | None = None) -> dict[str, list[str]]:
1✔
671
    """
672
    Get a list of all YAML files for plane with name.
673

674
    :param plane: BIOS plane name
675
    :param bios_dir: path to DCS-BIOS
676
    :return: list of all YAML files for plane definition
677
    """
678
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
679
    aircraft_aliases = load_json(full_path=alias_path)
1✔
680
    if plane:
1✔
681
        aircraft_aliases = {plane: aircraft_aliases[plane]}
1✔
682
    return aircraft_aliases
1✔
683

684

685
def get_depiction_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> dict[str, ControlDepiction]:
1✔
686
    """
687
    Get the depiction of controls.
688

689
    :param inputs: Dictionary with ControlKeyData
690
    :return: A dictionary containing the depiction of controls.
691
    """
692
    result = {}
1✔
693
    for section, controllers in inputs.items():
1✔
694
        for ctrl_name, ctrl in controllers.items():
1✔
695
            result[ctrl_name] = ctrl.depiction
1✔
696
    return result
1✔
697

698

699
def substitute_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
700
    """
701
    Substitute symbols in a string with specified replacements.
702

703
    :param value: The input string to be processed
704
    :param symbol_replacement: A list of symbol patterns and their corresponding replacements.
705
    :return: The processed string with symbols replaced according to the provided symbol_replacement list.
706
    """
707
    for pattern, replacement in symbol_replacement:
1✔
708
        value = sub(pattern, replacement, value)
1✔
709
    return value
1✔
710

711

712
def replace_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
713
    """
714
    Replace symbols in a string with specified replacements.
715

716
    :param value: The string in which symbols will be replaced.
717
    :param symbol_replacement: A sequence of sequences containing the original symbols and their replacement strings.
718
    :return: The string with symbols to replace.
719
    """
720
    for original, replacement in symbol_replacement:
1✔
721
        value = value.replace(original, replacement)
1✔
722
    return value
1✔
723

724

725
def _try_key_instance(klass: type[Gkey] | type[LcdButton] | type[MouseButton], method: str, key_str: str) -> AnyButton | None:
1✔
726
    """
727
    Attempt to invoke a method on a class with a given key string.
728

729
    The method will first attempt to call the provided method with the `key_str` as a parameter.
730
    If there is a TypeError (indicating the method does not support a parameter), it attempts to call
731
    the method without arguments.
732
    If the method is missing or the call fails due to a ValueError or AttributeError, the function returns None.
733

734
    :param klass: The class type on which the method is to be invoked.
735
    :param method: The name of the method to call on the class.
736
    :param key_str: A string key to be passed as a parameter to the method, if supported.
737
    :return: An instance of `AnyButton` from the invoked method, if successful, otherwise None.
738
    """
739
    try:
1✔
740
        return getattr(klass, method)(key_str)
1✔
741
    except TypeError:
1✔
742
        return getattr(klass, method)
1✔
743
    except (ValueError, AttributeError):
1✔
744
        return None
1✔
745

746

747
def get_key_instance(key_str: str) -> AnyButton:
1✔
748
    """
749
    Resolve the provided key string into an instance of a valid key class based on a predefined set of classes and their respective resolution methods.
750

751
    If the key string matches a class method's criteria, it returns the resolved key instance.
752
    If no match is found, an exception is raised.
753

754
    :param key_str: A string representing the name or identifier of the key to be resolved into a key instance (e.g., Gkey, LcdButton, or MouseButton).
755
    :return: An instance of a class (AnyButton) that corresponds to the provided key string, if successfully resolved.
756
    :raises AttributeError: If the provided key string cannot be resolved into a valid key instance using the predefined classes and methods.
757
    """
758
    for klass, method in [(Gkey, 'from_yaml'), (MouseButton, 'from_yaml'), (LcdButton, key_str)]:
1✔
759
        key_instance = _try_key_instance(klass=klass, method=method, key_str=key_str)
1✔
760
        if key_instance:
1✔
761
            return key_instance
1✔
762
    raise AttributeError(f'Could not resolve "{key_str}" to a Gkey/LcdButton/MouseButton instance')
1✔
763

764

765
class KeyRequest:
1✔
766
    """Map LCD button or G-Key with an abstract request model."""
767

768
    def __init__(self, yaml_path: Path, get_bios_fn: Callable[[str], BiosValue]) -> None:
1✔
769
        """
770
        Load YAML with BIOS request for G-Keys and LCD buttons.
771

772
        :param yaml_path: Path to the airplane YAML file.
773
        :param get_bios_fn: Function used to get a current BIOS value.
774
        """
775
        plane_yaml = load_yaml(full_path=yaml_path)
1✔
776
        self.buttons: dict[AnyButton, RequestModel] = {}
1✔
777
        for key_str, request in plane_yaml.items():
1✔
778
            if request:
1✔
779
                key = get_key_instance(key_str)
1✔
780
                self.buttons[key] = RequestModel.from_request(key=key, request=request, get_bios_fn=get_bios_fn)
1✔
781

782
    @property
1✔
783
    def cycle_button_ctrl_name(self) -> dict[str, int]:
1✔
784
        """Return a dictionary with BIOS selectors to track changes of values for a cycle button to get current values."""
785
        return {req_model.ctrl_name: int() for req_model in self.buttons.values() if req_model.is_cycle}
1✔
786

787
    def get_request(self, button: AnyButton) -> RequestModel:
1✔
788
        """
789
        Get abstract representation for request ti be sent for requested button.
790

791
        :param button: LcdButton, Gkey or MouseButton
792
        :return: RequestModel object
793
        """
794
        return self.buttons.get(button, RequestModel.make_empty(key=button))
1✔
795

796
    def set_request(self, button: AnyButton, req: str) -> None:
1✔
797
        """
798
        Update the internal string request for the specified button.
799

800
        :param button: LcdButton, Gkey or MouseButton
801
        :param req: The raw request to set.
802
        """
803
        self.buttons[button].raw_request = req
1✔
804

805

806
def generate_bios_jsons_with_lupa(dcs_save_games: Path, local_compile='./Scripts/DCS-BIOS/test/compile/LocalCompile.lua') -> None:
1✔
807
    r"""
808
    Regenerate DCS-BIOS JSON files.
809

810
    Using the Lupa library, first it will try to use LuaJIT 2.1 if not it will fall back to Lua 5.1
811

812
    :param dcs_save_games: Full path to the Saved Games\DCS directory.
813
    :param local_compile: Relative path to the LocalCompile.lua file.
814
    """
815
    try:
1✔
816
        import lupa.luajit21 as lupa
1✔
817
    except ImportError:
×
818
        try:
×
819
            import lupa.lua51 as lupa  # type: ignore[no-redef]
×
820
        except ImportError:
×
821
            return
×
822

823
    previous_dir = getcwd()
1✔
824
    try:
1✔
825
        chdir(dcs_save_games)
1✔
826
        LOG.debug(f"Changed to: {dcs_save_games}")
1✔
827
        lua = lupa.LuaRuntime()
1✔
828
        LOG.debug(f"Using {lupa.LuaRuntime().lua_implementation} (compiled with {lupa.LUA_VERSION})")
1✔
829
        with open(local_compile) as lua_file:
1✔
830
            lua_script = lua_file.read()
1✔
831
        lua.execute(lua_script)
1✔
832
    finally:
833
        chdir(previous_dir)
1✔
834
        LOG.debug(f"Change directory back to: {getcwd()}")
1✔
835

836

837
def rgba(c: Color, /, mode: LcdMode | int = LcdMode.TRUE_COLOR) -> tuple[int, ...] | int:
1✔
838
    """
839
    Convert a color to a single integer or tuple of integers.
840

841
    This depends on a given mode/alpha channel:
842
    * If a mode is an integer, then return a tuple of RGBA channels.
843
    * If a mode is a LcdMode.TRUE_COLOR, then return a tuple of RGBA channels.
844
    * If a mode is a LcdMode.BLACK_WHITE, then return a single integer.
845

846
    :param c: Color name to convert
847
    :param mode: Mode of the LCD or alpha channel as an integer
848
    :return: tuple with RGBA channels or single integer
849
    """
850
    if isinstance(mode, int):
1✔
851
        return *rgb(c), mode
1✔
852
    else:
853
        return ImageColor.getcolor(color=c.name, mode=mode.value)
1✔
854

855

856
def rgb(c: Color, /) -> tuple[int, int, int]:
1✔
857
    """
858
    Convert a Color instance to its RGB components as a tuple of integers.
859

860
    The function extracts the red, green, and blue components from the
861
    color's value, which is expected to be a single integer representing
862
    a 24-bit RGB color.
863

864
    :param c: An instance of Color, whose value is a 24-bit RGB integer.
865
    :return: A tuple containing the red, green, and blue components.
866
    """
867
    red = (c.value >> 16) & 0xff
1✔
868
    green = (c.value >> 8) & 0xff
1✔
869
    blue = c.value & 0xff
1✔
870
    return red, green, blue
1✔
871

872

873
def detect_system_color_mode() -> str:
1✔
874
    """
875
    Detect the color mode of the system.
876

877
    Registry will return 0 if Windows is in Dark Mode and 1 if Windows is in Light Mode.
878
    In case of error, it will return 'light' as default.
879

880
    :return: Dark or light as string
881
    """
882
    from winreg import HKEY_CURRENT_USER, OpenKey, QueryValueEx  # type: ignore[attr-defined]
1✔
883

884
    try:
1✔
885
        key = OpenKey(HKEY_CURRENT_USER, 'Software\\Microsoft\\Windows\\CurrentVersion\\Themes\\Personalize')
1✔
886
        subkey = QueryValueEx(key, 'AppsUseLightTheme')[0]
1✔
887
    except (OSError, IndexError):
×
888
        return 'Light'
×
889
    return {0: 'Dark', 1: 'Light'}[subkey]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc