• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emcek / dcspy / 11488282252

23 Oct 2024 09:16PM UTC coverage: 94.334% (-0.8%) from 95.113%
11488282252

Pull #373

github

emcek
Merge remote-tracking branch 'origin/bios-symbolic-link' into bios-symbolic-link

# Conflicts:
#	CHANGELOG.md
Pull Request #373: Install DCS-BIOS live version as symbolic link to repository

332 of 378 branches covered (87.83%)

Branch coverage included in aggregate %.

10 of 28 new or added lines in 1 file covered. (35.71%)

2 existing lines in 1 file now uncovered.

2049 of 2146 relevant lines covered (95.48%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.99
/src/dcspy/utils.py
1
import json
1✔
2
import sys
1✔
3
import zipfile
1✔
4
from collections.abc import Generator, Sequence
1✔
5
from datetime import datetime
1✔
6
from functools import lru_cache
1✔
7
from glob import glob
1✔
8
from itertools import chain
1✔
9
from logging import getLogger
1✔
10
from os import chdir, environ, getcwd, makedirs, walk
1✔
11
from pathlib import Path
1✔
12
from platform import python_implementation, python_version, uname
1✔
13
from pprint import pformat
1✔
14
from re import search, sub
1✔
15
from shutil import rmtree
1✔
16
from subprocess import CalledProcessError, run
1✔
17
from tempfile import gettempdir
1✔
18
from typing import Any, Callable, ClassVar, Optional, Union
1✔
19

20
import yaml
1✔
21
from packaging import version
1✔
22
from psutil import process_iter
1✔
23
from requests import get
1✔
24

25
from dcspy.models import (CTRL_LIST_SEPARATOR, AnyButton, BiosValue, ControlDepiction, ControlKeyData, DcsBiosPlaneData, DcspyConfigYaml, ReleaseInfo,
1✔
26
                          RequestModel, get_key_instance)
27

28
try:
1✔
29
    import git
1✔
30
except ImportError:
×
31
    pass
×
32

33
LOG = getLogger(__name__)
1✔
34
__version__ = '3.5.2'
1✔
35
CONFIG_YAML = 'config.yaml'
1✔
36
DEFAULT_YAML_FILE = Path(__file__).parent / 'resources' / CONFIG_YAML
1✔
37

38
with open(DEFAULT_YAML_FILE) as c_file:
1✔
39
    defaults_cfg: DcspyConfigYaml = yaml.load(c_file, Loader=yaml.SafeLoader)
1✔
40
    defaults_cfg['dcsbios'] = f'C:\\Users\\{environ.get("USERNAME", "UNKNOWN")}\\Saved Games\\DCS.openbeta\\Scripts\\DCS-BIOS'
1✔
41

42

43
def get_default_yaml(local_appdata: bool = False) -> Path:
1✔
44
    """
45
    Return a full path to the default configuration file.
46

47
    :param local_appdata: If True value C:/Users/<user_name>/AppData/Local is used
48
    :return: Path like an object
49
    """
50
    cfg_ful_path = DEFAULT_YAML_FILE
1✔
51
    if local_appdata:
1!
52
        user_appdata = get_config_yaml_location()
1✔
53
        makedirs(name=user_appdata, exist_ok=True)
1✔
54
        cfg_ful_path = Path(user_appdata / CONFIG_YAML).resolve()
1✔
55
        if not cfg_ful_path.exists():
1!
56
            save_yaml(data=defaults_cfg, full_path=cfg_ful_path)
1✔
57
    return cfg_ful_path
1✔
58

59

60
def load_yaml(full_path: Path) -> dict[str, Any]:
1✔
61
    """
62
    Load YAML from a file into dictionary.
63

64
    :param full_path: Full path to YAML file
65
    :return: Dictionary with data
66
    """
67
    try:
1✔
68
        with open(file=full_path, encoding='utf-8') as yaml_file:
1✔
69
            data = yaml.load(yaml_file, Loader=yaml.SafeLoader)
1✔
70
            if not isinstance(data, dict):
1✔
71
                data = {}
1✔
72
    except (FileNotFoundError, yaml.parser.ParserError) as err:
1✔
73
        makedirs(name=full_path.parent, exist_ok=True)
1✔
74
        LOG.warning(f'{type(err).__name__}: {full_path}.', exc_info=True)
1✔
75
        LOG.debug(f'{err}')
1✔
76
        data = {}
1✔
77
    return data
1✔
78

79

80
def save_yaml(data: dict[str, Any], full_path: Path) -> None:
1✔
81
    """
82
    Save disc as YAML file.
83

84
    :param data: Dictionary with data
85
    :param full_path: Full a path to YAML file
86
    """
87
    with open(file=full_path, mode='w', encoding='utf-8') as yaml_file:
1✔
88
        yaml.dump(data, yaml_file, Dumper=yaml.SafeDumper)
1✔
89

90

91
def check_ver_at_github(repo: str, current_ver: str, extension: str) -> ReleaseInfo:
1✔
92
    """
93
    Check a version of <organization>/<package> at GitHub.
94

95
    Return tuple with:
96
    - result (bool) - if local version is latest
97
    - online version (version.Version) - the latest version
98
    - download url (str) - ready to download
99
    - published date (str) - format DD MMMM YYYY
100
    - release type (str) - Regular or Pre-release
101
    - asset file (str) - file name of asset
102

103
    :param repo: Format '<organization or user>/<package>'
104
    :param current_ver: Current local version
105
    :param extension: File extension
106
    :return: ReleaseInfo with data
107
    """
108
    latest, online_version, asset_url, published, pre_release = False, '0.0.0', '', '', False
1✔
109
    package = repo.split('/')[1]
1✔
110
    try:
1✔
111
        response = get(url=f'https://api.github.com/repos/{repo}/releases/latest', timeout=5)
1✔
112
        if response.ok:
1✔
113
            dict_json = response.json()
1✔
114
            online_version = dict_json['tag_name']
1✔
115
            pre_release = dict_json['prerelease']
1✔
116
            published = datetime.strptime(dict_json['published_at'], '%Y-%m-%dT%H:%M:%S%z').strftime('%d %B %Y')
1✔
117
            asset_url = next(asset['browser_download_url'] for asset in dict_json['assets'] if asset['name'].endswith(extension))
1✔
118
            LOG.debug(f'Latest GitHub version:{online_version} pre:{pre_release} date:{published} url:{asset_url}')
1✔
119
            latest = _compare_versions(package, current_ver, online_version)
1✔
120
        else:
121
            LOG.warning(f'Unable to check {package} version online. Try again later. Status={response.status_code}')
1✔
122
    except Exception as exc:
1✔
123
        LOG.warning(f'Unable to check {package} version online: {exc}')
1✔
124
    return ReleaseInfo(
1✔
125
        latest=latest,
126
        ver=version.parse(online_version),
127
        dl_url=asset_url,
128
        published=published,
129
        release_type='Pre-release' if pre_release else 'Regular',
130
        asset_file=asset_url.split('/')[-1],
131
    )
132

133

134
def _compare_versions(package: str, current_ver: str, remote_ver: str) -> bool:
1✔
135
    """
136
    Compare two versions of packages and return result.
137

138
    :param package: Package name
139
    :param current_ver: Current/local version
140
    :param remote_ver: Remote/online version
141
    :return:
142
    """
143
    latest = False
1✔
144
    if version.parse(remote_ver) > version.parse(current_ver):
1✔
145
        LOG.info(f'There is new version of {package}: {remote_ver}')
1✔
146
    elif version.parse(remote_ver) <= version.parse(current_ver):
1!
147
        LOG.info(f'{package} is up-to-date version: {current_ver}')
1✔
148
        latest = True
1✔
149
    return latest
1✔
150

151

152
def get_version_string(repo: str, current_ver: str, check: bool = True) -> str:
1✔
153
    """
154
    Generate formatted string with version number.
155

156
    :param repo: Format '<organization or user>/<package>'.
157
    :param current_ver: Current local version.
158
    :param check: Version online.
159
    :return: Formatted version as string.
160
    """
161
    ver_string = f'v{current_ver}'
1✔
162
    if check:
1✔
163
        result = check_ver_at_github(repo=repo, current_ver=current_ver, extension='')
1✔
164
        details = ''
1✔
165
        if result.latest:
1✔
166
            details = ' (latest)'
1✔
167
        elif str(result.ver) != '0.0.0':
1✔
168
            details = ' (update!)'
1✔
169
            current_ver = str(result.ver)
1✔
170
        elif str(result.ver) == '0.0.0':
1!
171
            details = ' (failed)'
1✔
172
        ver_string = f'v{current_ver}{details}'
1✔
173
    return ver_string
1✔
174

175

176
def download_file(url: str, save_path: Path) -> bool:
1✔
177
    """
178
    Download a file from URL and save to save_path.
179

180
    :param url: URL address
181
    :param save_path: full path to save
182
    """
183
    response = get(url=url, stream=True, timeout=5)
1✔
184
    if response.ok:
1✔
185
        LOG.debug(f'Download file from: {url}')
1✔
186
        with open(save_path, 'wb+') as dl_file:
1✔
187
            for chunk in response.iter_content(chunk_size=128):
1✔
188
                dl_file.write(chunk)
1✔
189
            LOG.debug(f'Saved as: {save_path}')
1✔
190
            return True
1✔
191
    else:
192
        LOG.warning(f'Can not download from: {url}')
1✔
193
        return False
1✔
194

195

196
def proc_is_running(name: str) -> int:
1✔
197
    """
198
    Check if the process is running and return its PID.
199

200
    If the process name is not found, 0 (zero) is returned.
201
    :param name: Process name
202
    :return: PID as int
203
    """
204
    for proc in process_iter(['pid', 'name']):
1✔
205
        if name in proc.info['name']:  # type: ignore[attr-defined]
1✔
206
            return proc.info['pid']  # type: ignore[attr-defined]
1✔
207
    return 0
1✔
208

209

210
def check_dcs_ver(dcs_path: Path) -> tuple[str, str]:
1✔
211
    """
212
    Check DCS version and release type.
213

214
    :param dcs_path: Path to DCS installation directory
215
    :return: DCS type and version as strings
216
    """
217
    result_type, result_ver = 'Unknown', 'Unknown'
1✔
218
    try:
1✔
219
        with open(file=dcs_path / 'autoupdate.cfg', encoding='utf-8') as autoupdate_cfg:
1✔
220
            autoupdate_data = autoupdate_cfg.read()
1✔
221
    except (FileNotFoundError, PermissionError) as err:
1✔
222
        LOG.debug(f'{type(err).__name__}: {err.filename}')
1✔
223
    else:
224
        result_type = 'stable'
1✔
225
        if dcs_type := search(r'"branch":\s"([\w.]*)"', autoupdate_data):
1✔
226
            result_type = str(dcs_type.group(1))
1✔
227
        if dcs_ver := search(r'"version":\s"([\d.]*)"', autoupdate_data):
1✔
228
            result_ver = str(dcs_ver.group(1))
1✔
229
    return result_type, result_ver
1✔
230

231

232
def check_bios_ver(bios_path: Union[Path, str]) -> ReleaseInfo:
1✔
233
    """
234
    Check the DSC-BIOS release version.
235

236
    :param bios_path: Path to DCS-BIOS directory in Saved Games folder
237
    :return: ReleaseInfo named tuple
238
    """
239
    result = ReleaseInfo(latest=False, ver=version.parse('0.0.0'), dl_url='', published='', release_type='', asset_file='')
1✔
240
    new_location = Path(bios_path) / 'lib' / 'modules' / 'common_modules' / 'CommonData.lua'
1✔
241
    old_location = Path(bios_path) / 'lib' / 'CommonData.lua'
1✔
242

243
    if new_location.is_file():
1✔
244
        with open(file=new_location, encoding='utf-8') as cd_lua:
1✔
245
            cd_lua_data = cd_lua.read()
1✔
246
    elif old_location.is_file():
1✔
247
        with open(file=old_location, encoding='utf-8') as cd_lua:
1✔
248
            cd_lua_data = cd_lua.read()
1✔
249
    else:
250
        cd_lua_data = ''
1✔
251
        LOG.debug(f'No `CommonData.lua` while checking DCS-BIOS version at {new_location.parent} or {old_location.parent}')
1✔
252

253
    if bios_re := search(r'function getVersion\(\)\s*return\s*\"([\d.]*)\"', cd_lua_data):
1✔
254
        bios = version.parse(bios_re.group(1))
1✔
255
        result = ReleaseInfo(latest=False, ver=bios, dl_url='', published='', release_type='', asset_file='')
1✔
256
    return result
1✔
257

258

259
def is_git_repo(dir_path: str) -> bool:
1✔
260
    """
261
    Check if dir_path ios Git repository.
262

263
    :param dir_path: Path as string
264
    :return: True if dir is git repo
265
    """
266
    import git
1✔
267
    try:
1✔
268
        _ = git.Repo(dir_path).git_dir
1✔
269
        return True
1✔
270
    except (git.InvalidGitRepositoryError, git.exc.NoSuchPathError):
1✔
271
        return False
1✔
272

273

274
def _get_sha_hex_str(bios_repo: 'git.Repo', git_ref: str) -> str:
1✔
275
    """
276
    Return a string representing the commit hash, date, and author of the given Git reference in the provided repository.
277

278
    :param bios_repo: A Git repository object.
279
    :param git_ref: A string representing the Git reference (e.g., commit, branch, tag).
280
    :return: A string representing the commit hash, date, and author.
281
    """
282
    try:
1✔
283
        import git
1✔
UNCOV
284
    except ImportError:
×
285
        raise OSError('Git executable is not available!')
×
286
    try:
1✔
287
        bios_repo.git.checkout(git_ref)
1✔
288
        branch = bios_repo.active_branch.name
1✔
289
        head_commit = bios_repo.head.commit
1✔
290
        sha = f'{branch}: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
291
    except (git.exc.GitCommandError, TypeError):
1✔
292
        head_commit = bios_repo.head.commit
1✔
293
        sha = f'{head_commit.hexsha[0:8]} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
294
    LOG.debug(f'Checkout: {head_commit.hexsha} from: {head_commit.committed_datetime} | by: {head_commit.author}\n{head_commit.message}')  # type: ignore
1✔
295
    return sha
1✔
296

297

298
def check_github_repo(git_ref: str, repo_dir: Path, repo: str, update: bool = True, progress: Optional[git.RemoteProgress] = None) -> str:
1✔
299
    """
300
    Update git repository.
301

302
    Return SHA of the latest commit.
303

304
    :param git_ref: Any Git reference as string
305
    :param repo_dir: Local directory for a repository
306
    :param repo: GitHub repository user/name
307
    :param update: Perform update process
308
    :param progress: Progress callback
309
    """
310
    bios_repo = _checkout_repo(repo=repo, repo_dir=repo_dir, progress=progress)
1✔
311
    if update:
1!
312
        f_info = bios_repo.remotes[0].pull(progress=progress)
1✔
313
        LOG.debug(f'Pulled: {f_info[0].name} as: {f_info[0].commit}')
1✔
314
    sha = _get_sha_hex_str(bios_repo, git_ref)
1✔
315
    return sha
1✔
316

317

318
def _checkout_repo(repo: str, repo_dir: Path, checkout_ref: str = 'master', progress: Optional[git.RemoteProgress] = None) -> 'git.Repo':
1✔
319
    """
320
    Checkout repository at master branch or clone it when not exists in a system.
321

322
    :param repo: Repository name
323
    :param repo_dir: Local repository directory
324
    :param checkout_ref: Checkout a git reference
325
    :param progress: Progress callback
326
    :return: Repo object of the repository
327
    """
328
    import git
1✔
329

330
    makedirs(name=repo_dir, exist_ok=True)
1✔
331
    if is_git_repo(str(repo_dir)):
1✔
332
        bios_repo = git.Repo(repo_dir)
1✔
333
        bios_repo.git.checkout(checkout_ref)
1✔
334
    else:
335
        rmtree(path=repo_dir, ignore_errors=True)
1✔
336
        bios_repo = git.Repo.clone_from(url=f'https://github.com/{repo}.git', to_path=repo_dir, progress=progress)  # type: ignore
1✔
337
    return bios_repo
1✔
338

339

340
def check_dcs_bios_entry(lua_dst_data: str, lua_dst_path: Path, temp_dir: Path) -> str:
1✔
341
    """
342
    Check DCS-BIOS entry in Export.lua file.
343

344
    :param lua_dst_data: Content of Export.lua
345
    :param lua_dst_path: Export.lua path
346
    :param temp_dir: Directory with DCS-BIOS archive
347
    :return: Result of checks
348
    """
349
    result = '\n\nExport.lua exists.'
1✔
350
    lua = 'Export.lua'
1✔
351
    with open(file=temp_dir / lua, encoding='utf-8') as lua_src:
1✔
352
        lua_src_data = lua_src.read()
1✔
353
    export_re = search(r'dofile\(lfs.writedir\(\)\s*\.\.\s*\[\[Scripts\\DCS-BIOS\\BIOS\.lua]]\)', lua_dst_data)
1✔
354
    if not export_re:
1✔
355
        with open(file=lua_dst_path / lua, mode='a+', encoding='utf-8') as exportlua_dst:
1✔
356
            exportlua_dst.write(f'\n{lua_src_data}')
1✔
357
        LOG.debug(f'Add DCS-BIOS to Export.lua: {lua_src_data}')
1✔
358
        result += '\n\nDCS-BIOS entry added.\n\nYou verify installation at:\ngithub.com/DCS-Skunkworks/DCSFlightpanels/wiki/Installation'
1✔
359
    else:
360
        result += '\n\nDCS-BIOS entry detected.'
1✔
361
    return result
1✔
362

363

364
def count_files(directory: Path, extension: str) -> int:
1✔
365
    """
366
    Count files with extension in directory.
367

368
    :param directory: as Path object
369
    :param extension: file extension
370
    :return: number of files
371
    """
372
    try:
1✔
373
        json_files = [f.name for f in directory.iterdir() if f.is_file() and f.suffix == f'.{extension}']
1✔
374
        LOG.debug(f'In: {directory} found {json_files} ')
1✔
375
        return len(json_files)
1✔
376
    except FileNotFoundError:
1✔
377
        LOG.debug(f'Wrong directory: {directory}')
1✔
378
        return -1
1✔
379

380

381
def is_git_exec_present() -> bool:
1✔
382
    """
383
    Check if git executable is present in a system.
384

385
    :return: True if git.exe is available
386
    """
387
    try:
1✔
388
        import git
1✔
389
        return bool(git.GIT_OK)
1✔
UNCOV
390
    except ImportError as err:
×
391
        LOG.debug(type(err).__name__, exc_info=True)
×
392
        return False
×
393

394

395
def is_git_object(repo_dir: Path, git_obj: str) -> bool:
1✔
396
    """
397
    Check if git_obj is a valid Git reference.
398

399
    :param repo_dir: Directory with repository
400
    :param git_obj: Git reference to check
401
    :return: True if git_obj is git reference, False otherwise
402
    """
403
    import gitdb  # type: ignore[import-untyped]
1✔
404
    result = False
1✔
405
    if is_git_repo(str(repo_dir)):
1✔
406
        bios_repo = git.Repo(repo_dir)
1✔
407
        bios_repo.git.checkout('master')
1✔
408
        try:
1✔
409
            bios_repo.commit(git_obj)
1✔
410
            result = True
1✔
411
        except gitdb.exc.BadName:
1✔
412
            pass
1✔
413
    return result
1✔
414

415

416
def get_all_git_refs(repo_dir: Path) -> list[str]:
1✔
417
    """
418
    Get a list of branches and tags for repo.
419

420
    :param repo_dir: Directory with a repository
421
    :return: List of git references as strings
422
    """
423
    refs = []
1✔
424
    if is_git_repo(str(repo_dir)):
1!
425
        for ref in chain(git.Repo(repo_dir).heads, git.Repo(repo_dir).tags):
1✔
426
            refs.append(str(ref))
1✔
427
    return refs
1✔
428

429

430
class CloneProgress(git.RemoteProgress):
1✔
431
    """Handler providing an interface to parse progress information emitted by git."""
432
    OP_CODES: ClassVar[list[str]] = ['BEGIN', 'CHECKING_OUT', 'COMPRESSING', 'COUNTING', 'END', 'FINDING_SOURCES', 'RECEIVING', 'RESOLVING', 'WRITING']
1✔
433
    OP_CODE_MAP: ClassVar[dict[int, str]] = {getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES}
1✔
434

435
    def __init__(self, progress, stage) -> None:
1✔
436
        """
437
        Initialize the progress handler.
438

439
        :param progress: Progress Qt6 signal
440
        :param stage: Report stage Qt6 signal
441
        """
442
        super().__init__()
1✔
443
        self.progress_signal = progress
1✔
444
        self.stage_signal = stage
1✔
445

446
    def get_curr_op(self, op_code: int) -> str:
1✔
447
        """
448
        Get stage name from OP code.
449

450
        :param op_code: OP code
451
        :return: stage name
452
        """
453
        op_code_masked = op_code & self.OP_MASK
1✔
454
        return self.OP_CODE_MAP.get(op_code_masked, '?').title()
1✔
455

456
    def update(self, op_code: int, cur_count, max_count=None, message: str = ''):
1✔
457
        """
458
        Call whenever the progress changes.
459

460
        :param op_code: Integer allowing to be compared against Operation IDs and stage IDs.
461
        :param cur_count: A count of current absolute items
462
        :param max_count: The maximum count of items we expect. It may be None in case there is no maximum number of items or if it is (yet) unknown.
463
        :param message: It contains the number of bytes transferred. It may be used for other purposes as well.
464
        """
465
        if op_code & git.RemoteProgress.BEGIN:
1!
466
            self.stage_signal.emit(f'Git clone: {self.get_curr_op(op_code)}')
1✔
467

468
        percentage = int(cur_count / max_count * 100) if max_count else 0
1✔
469
        self.progress_signal.emit(percentage)
1✔
470

471

472
def collect_debug_data() -> Path:
1✔
473
    """
474
    Collect and zip all data for troubleshooting.
475

476
    :return: Path object to ZIP file
477
    """
478
    config_file = Path(get_config_yaml_location() / CONFIG_YAML).resolve()
1✔
479
    conf_dict = load_yaml(config_file)
1✔
480
    sys_data = _get_sys_file(conf_dict)
1✔
481
    dcs_log = _get_dcs_log(conf_dict)
1✔
482

483
    zip_file = Path(gettempdir()) / f'dcspy_debug_{str(datetime.now()).replace(" ", "_").replace(":", "")}.zip'
1✔
484
    with zipfile.ZipFile(file=zip_file, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) as zipf:
1✔
485
        zipf.write(sys_data, arcname=sys_data.name)
1✔
486
        zipf.write(dcs_log, arcname=dcs_log.name)
1✔
487
        for log_file in _get_log_files():
1✔
488
            zipf.write(log_file, arcname=log_file.name)
1✔
489
        for yaml_file in _get_yaml_files(config_file):
1✔
490
            zipf.write(yaml_file, arcname=yaml_file.name)
1✔
491
        for png in _get_png_files():
1✔
492
            zipf.write(png, arcname=png.name)
1✔
493

494
    return zip_file
1✔
495

496

497
def _get_sys_file(conf_dict: dict[str, Any]) -> Path:
1✔
498
    """
499
    Save system information to file and return its path.
500

501
    :param conf_dict: A dictionary containing configuration information.
502
    :return: A Path object representing the path to the system data file.
503
    """
504
    system_info = _fetch_system_info(conf_dict)
1✔
505
    sys_data = Path(gettempdir()) / 'system_data.txt'
1✔
506
    with open(sys_data, 'w+') as debug_file:
1✔
507
        debug_file.write(system_info)
1✔
508
    return sys_data
1✔
509

510

511
def _fetch_system_info(conf_dict: dict[str, Any]) -> str:
1✔
512
    """
513
    Fetch system information.
514

515
    :param conf_dict: A dictionary containing configuration information.
516
    :return: System data as string
517
    """
518
    name = uname()
1✔
519
    pyver = (python_version(), python_implementation())
1✔
520
    pyexec = sys.executable
1✔
521
    dcs = check_dcs_ver(dcs_path=Path(str(conf_dict['dcs'])))
1✔
522
    bios_ver = check_bios_ver(bios_path=str(conf_dict['dcsbios'])).ver
1✔
523
    repo_dir = Path(str(conf_dict['dcsbios'])).parents[1] / 'dcs-bios'
1✔
524
    git_ver, head_commit = _fetch_git_data(repo_dir=repo_dir)
1✔
525
    lgs_dir = '\n'.join([
1✔
526
        str(Path(dir_path) / filename)
527
        for dir_path, _, filenames in walk('C:\\Program Files\\Logitech Gaming Software\\SDK')
528
        for filename in filenames
529
    ])
530
    return f'{__version__=}\n{name=}\n{pyver=}\n{pyexec=}\n{dcs=}\n{bios_ver=}\n{git_ver=}\n{head_commit=}\n{lgs_dir}\ncfg={pformat(conf_dict)}'
1✔
531

532

533
def _fetch_git_data(repo_dir: Path) -> tuple[Sequence[int], str]:
1✔
534
    """
535
    Fetch Git version and SHA of HEAD commit.
536

537
    :param repo_dir: Local directory for repository
538
    :return: Tuple of (a version) and SHA of HEAD commit
539
    """
540
    try:
1✔
541
        import git
1✔
542
        git_ver = git.cmd.Git().version_info
1✔
543
        head_commit = str(git.Repo(repo_dir).head.commit)
1✔
544
    except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError, ImportError):
1✔
545
        git_ver = (0, 0, 0, 0)
1✔
546
        head_commit = 'N/A'
1✔
547
    return git_ver, head_commit
1✔
548

549

550
def _get_dcs_log(conf_dict: dict[str, Any]) -> Path:
1✔
551
    """
552
    Get path to dcs.log path.
553

554
    :param conf_dict: A dictionary containing configuration information.
555
    :return: A Path object representing the path to the dcs.log file.
556
    """
557
    dcs_log_file = Path(conf_dict['dcsbios']).parents[1] / 'Logs' / 'dcs.log'
1✔
558
    return dcs_log_file if dcs_log_file.is_file() else Path()
1✔
559

560

561
def _get_log_files() -> Generator[Path, None, None]:
1✔
562
    """
563
    Get a path to all logg files.
564

565
    :return: Generator of a path to log files
566
    """
567
    return (
1✔
568
        Path(gettempdir()) / logfile
569
        for logfile in glob(str(Path(gettempdir()) / 'dcspy.log*'))
570
    )
571

572

573
def _get_yaml_files(config_file: Path) -> Generator[Path, None, None]:
1✔
574
    """
575
    Get a path to all configuration YAML files.
576

577
    :param config_file: Path to the config file
578
    :return: Generator of a path to YAML files
579
    """
580
    return (
1✔
581
        Path(dirpath) / filename
582
        for dirpath, _, filenames in walk(config_file.parent)
583
        for filename in filenames
584
        if filename.endswith('yaml')
585
    )
586

587

588
def _get_png_files() -> Generator[Path, None, None]:
1✔
589
    """
590
    Get a path to png screenshots for all airplanes.
591

592
    :return: Generator of a path to png files
593
    """
594
    aircrafts = ['FA18Chornet', 'Ka50', 'Ka503', 'Mi8MT', 'Mi24P', 'F16C50', 'F15ESE',
1✔
595
                 'AH64DBLKII', 'A10C', 'A10C2', 'F14A135GR', 'F14B', 'AV8BNA']
596
    return (
1✔
597
        Path(dir_path) / filename
598
        for dir_path, _, filenames in walk(gettempdir())
599
        for filename in filenames
600
        if any(True for aircraft in aircrafts if aircraft in filename and filename.endswith('png'))
601
    )
602

603

604
def get_config_yaml_location() -> Path:
1✔
605
    """
606
    Get a location of YAML configuration files.
607

608
    :rtype: Path object to directory
609
    """
610
    localappdata = environ.get('LOCALAPPDATA', None)
1✔
611
    user_appdata = Path(localappdata) / 'dcspy' if localappdata else DEFAULT_YAML_FILE.parent
1✔
612
    return user_appdata
1✔
613

614

615
def run_pip_command(cmd: str) -> tuple[int, str, str]:
1✔
616
    """
617
    Execute pip command.
618

619
    :param cmd: Command as string
620
    :return: Tuple with return code, stderr and stdout
621
    """
622
    try:
1✔
623
        result = run([sys.executable, '-m', 'pip', *cmd.split(' ')], capture_output=True, check=True)
1✔
624
        return result.returncode, result.stderr.decode('utf-8'), result.stdout.decode('utf-8')
1✔
625
    except CalledProcessError as e:
1✔
626
        LOG.debug(f'Result: {e}')
1✔
627
        return e.returncode, e.stderr.decode('utf-8'), e.stdout.decode('utf-8')
1✔
628

629

630
def run_command(cmd: Sequence[str], cwd: Optional[Path] = None) -> int:
1✔
631
    """
632
    Run command in shell as a subprocess.
633

634
    :param cmd: The command to be executed as a sequence of strings
635
    :param cwd: current working directory
636
    :return: The return code of command
637
    """
638
    try:
1✔
639
        proc = run(cmd, check=True, shell=False, cwd=cwd)
1✔
640
        return proc.returncode
1✔
641
    except CalledProcessError as e:
1✔
642
        LOG.debug(f'Result: {e}')
1✔
643
        return -1
1✔
644

645

646
def load_json(full_path: Path) -> Any:
1✔
647
    """
648
    Load JSON from a file into dictionary.
649

650
    :param full_path: Full path
651
    :return: Python representation of JSON
652
    """
653
    with open(full_path, encoding='utf-8') as json_file:
1✔
654
        data = json_file.read()
1✔
655
    return json.loads(data)
1✔
656

657

658
@lru_cache
1✔
659
def get_full_bios_for_plane(plane: str, bios_dir: Path) -> DcsBiosPlaneData:
1✔
660
    """
661
    Collect full BIOS for plane with name.
662

663
    :param plane: BIOS plane name
664
    :param bios_dir: path to DCS-BIOS directory
665
    :return: dict
666
    """
667
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
668
    local_json: dict[str, Any] = {}
1✔
669
    aircraft_aliases = load_json(full_path=alias_path)
1✔
670
    for json_file in aircraft_aliases[plane]:
1✔
671
        local_json = {**local_json, **load_json(full_path=bios_dir / 'doc' / 'json' / f'{json_file}.json')}
1✔
672

673
    return DcsBiosPlaneData.model_validate(local_json)
1✔
674

675

676
@lru_cache
1✔
677
def get_inputs_for_plane(plane: str, bios_dir: Path) -> dict[str, dict[str, ControlKeyData]]:
1✔
678
    """
679
    Get dict with all not empty inputs for plane.
680

681
    :param plane: BIOS plane name
682
    :param bios_dir: path to DCS-BIOS
683
    :return: dict.
684
    """
685
    plane_bios = get_full_bios_for_plane(plane=plane, bios_dir=bios_dir)
1✔
686
    inputs = plane_bios.get_inputs()
1✔
687
    return inputs
1✔
688

689

690
def get_list_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> list[str]:
1✔
691
    """
692
    Get a list of all controllers from dict with sections and inputs.
693

694
    :param inputs: Dictionary with ControlKeyData
695
    :return: List of string
696
    """
697
    result_list = []
1✔
698
    for section, controllers in inputs.items():
1✔
699
        result_list.append(f'{CTRL_LIST_SEPARATOR} {section} {CTRL_LIST_SEPARATOR}')
1✔
700
        for ctrl_name in controllers:
1✔
701
            result_list.append(ctrl_name)
1✔
702
    return result_list
1✔
703

704

705
@lru_cache
1✔
706
def get_planes_list(bios_dir: Path) -> list[str]:
1✔
707
    """
708
    Get a list of all DCS-BIOS supported planes with clickable cockpit.
709

710
    :param bios_dir: Path to DCS-BIOS
711
    :return: List of all supported planes
712
    """
713
    aircraft_aliases = get_plane_aliases(bios_dir=bios_dir, plane=None)
1✔
714
    return [name for name, yaml_data in aircraft_aliases.items() if yaml_data not in (['CommonData', 'FC3'], ['CommonData'])]
1✔
715

716

717
@lru_cache
1✔
718
def get_plane_aliases(bios_dir: Path, plane: Optional[str] = None) -> dict[str, list[str]]:
1✔
719
    """
720
    Get a list of all YAML files for plane with name.
721

722
    :param plane: BIOS plane name
723
    :param bios_dir: path to DCS-BIOS
724
    :return: list of all YAML files for plane definition
725
    """
726
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
727
    aircraft_aliases = load_json(full_path=alias_path)
1✔
728
    if plane:
1✔
729
        aircraft_aliases = {plane: aircraft_aliases[plane]}
1✔
730
    return aircraft_aliases
1✔
731

732

733
def get_depiction_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> dict[str, ControlDepiction]:
1✔
734
    """
735
    Get the depiction of controls.
736

737
    :param inputs: Dictionary with ControlKeyData
738
    :return: A dictionary containing the depiction of controls.
739
    """
740
    result = {}
1✔
741
    for section, controllers in inputs.items():
1✔
742
        for ctrl_name, ctrl in controllers.items():
1✔
743
            result[ctrl_name] = ctrl.depiction
1✔
744
    return result
1✔
745

746

747
def substitute_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
748
    """
749
    Substitute symbols in a string with specified replacements.
750

751
    :param value: The input string to be processed
752
    :param symbol_replacement: A list of symbol patterns and their corresponding replacements.
753
    :return: The processed string with symbols replaced according to the provided symbol_replacement list.
754
    """
755
    for pattern, replacement in symbol_replacement:
1✔
756
        value = sub(pattern, replacement, value)
1✔
757
    return value
1✔
758

759

760
def replace_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
761
    """
762
    Replace symbols in a string with specified replacements.
763

764
    :param value: The string in which symbols will be replaced.
765
    :param symbol_replacement: A sequence of sequences containing the original symbols and their replacement strings.
766
    :return: The string with symbols replaced.
767
    """
768
    for original, replacement in symbol_replacement:
1✔
769
        value = value.replace(original, replacement)
1✔
770
    return value
1✔
771

772

773
class KeyRequest:
1✔
774
    """Map LCD button ot G-Key with an abstract request model."""
775

776
    def __init__(self, yaml_path: Path, get_bios_fn: Callable[[str], BiosValue]) -> None:
1✔
777
        """
778
        Load YAML with BIOS request for G-Keys and LCD buttons.
779

780
        :param yaml_path: Path to the airplane YAML file.
781
        :param get_bios_fn: Function used to get current BIOS value.
782
        """
783
        plane_yaml = load_yaml(full_path=yaml_path)
1✔
784
        self.buttons: dict[AnyButton, RequestModel] = {}
1✔
785
        for key_str, request in plane_yaml.items():
1✔
786
            if request:
1!
787
                key = get_key_instance(key_str)
1✔
788
                self.buttons[key] = RequestModel.from_request(key=key, request=request, get_bios_fn=get_bios_fn)
1✔
789

790
    @property
1✔
791
    def cycle_button_ctrl_name(self) -> dict[str, int]:
1✔
792
        """Return a dictionary with BIOS selectors to track changes of values for cycle button to get current values."""
793
        return {req_model.ctrl_name: int() for req_model in self.buttons.values() if req_model.is_cycle}
1✔
794

795
    def get_request(self, button: AnyButton) -> RequestModel:
1✔
796
        """
797
        Get abstract representation for request ti be sent gor requested button.
798

799
        :param button: LcdButton, Gkey or MouseButton
800
        :return: RequestModel object
801
        """
802
        return self.buttons.get(button, RequestModel.empty(key=button))
1✔
803

804
    def set_request(self, button: AnyButton, req: str) -> None:
1✔
805
        """
806
        Update the internal string request for the specified button.
807

808
        :param button: LcdButton, Gkey or MouseButton
809
        :param req: The raw request to set.
810
        """
811
        self.buttons[button].raw_request = req
1✔
812

813

814
def generate_bios_jsons_with_lupa(dcs_save_games: Path, local_compile='./Scripts/DCS-BIOS/test/compile/LocalCompile.lua') -> None:
1✔
815
    r"""
816
    Regenerate DCS-BIOS JSON files.
817

818
    Using the Lupa library, first it will tries use LuaJIT 2.1 if not it will fall back to Lua 5.1
819

820
    :param dcs_save_games: Full path to Saved Games\DCS.openbeta directory.
821
    :param local_compile: Relative path to LocalCompile.lua file.
822
    """
NEW
823
    try:
×
NEW
824
        import lupa.luajit21 as lupa  # type: ignore[import-untyped]
×
NEW
825
    except ImportError:
×
NEW
826
        try:
×
NEW
827
            import lupa.lua51 as lupa  # type: ignore[import-untyped]
×
NEW
828
        except ImportError:
×
NEW
829
            return
×
830

NEW
831
    previous_dir = getcwd()
×
NEW
832
    try:
×
NEW
833
        chdir(dcs_save_games)
×
NEW
834
        LOG.debug(f"Changed to: {dcs_save_games}")
×
NEW
835
        lua = (lupa.LuaRuntime())
×
NEW
836
        LOG.debug(f"Using {lupa.LuaRuntime().lua_implementation} (compiled with {lupa.LUA_VERSION})")
×
NEW
837
        with open(local_compile) as lua_file:
×
NEW
838
            lua_script = lua_file.read()
×
NEW
839
        lua.execute(lua_script)
×
840
    finally:
NEW
841
        chdir(previous_dir)
×
NEW
842
        LOG.debug(f"Change directory back to: {getcwd()}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc