• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emcek / dcspy / 11728624103

07 Nov 2024 05:59PM UTC coverage: 95.066% (-0.9%) from 96.004%
11728624103

Pull #336

github

emcek
organize imports
Pull Request #336: Start DCSpy client as QThread

616 of 692 branches covered (89.02%)

Branch coverage included in aggregate %.

49 of 60 new or added lines in 4 files covered. (81.67%)

11 existing lines in 1 file now uncovered.

2101 of 2166 relevant lines covered (97.0%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.89
/src/dcspy/utils.py
1
from __future__ import annotations
1✔
2

3
import json
1✔
4
import sys
1✔
5
import zipfile
1✔
6
from collections.abc import Callable, Generator, Sequence
1✔
7
from datetime import datetime
1✔
8
from functools import lru_cache
1✔
9
from glob import glob
1✔
10
from itertools import chain
1✔
11
from logging import getLogger
1✔
12
from os import chdir, environ, getcwd, makedirs, walk
1✔
13
from pathlib import Path
1✔
14
from platform import python_implementation, python_version, uname
1✔
15
from pprint import pformat
1✔
16
from re import search, sub
1✔
17
from shutil import rmtree
1✔
18
from subprocess import CalledProcessError, run
1✔
19
from tempfile import gettempdir
1✔
20
from typing import Any, ClassVar
1✔
21

22
import yaml
1✔
23
from packaging import version
1✔
24
from psutil import process_iter
1✔
25
from PySide6.QtCore import QObject, Signal
1✔
26
from requests import get
1✔
27

28
from dcspy.models import (CTRL_LIST_SEPARATOR, AnyButton, BiosValue, ControlDepiction, ControlKeyData, DcsBiosPlaneData, DcspyConfigYaml, ReleaseInfo,
1✔
29
                          RequestModel, get_key_instance)
30

31
try:
1✔
32
    import git
1✔
33
except ImportError:
×
34
    pass
×
35

36
LOG = getLogger(__name__)
1✔
37
__version__ = '3.6.1'
1✔
38
CONFIG_YAML = 'config.yaml'
1✔
39
DEFAULT_YAML_FILE = Path(__file__).parent / 'resources' / CONFIG_YAML
1✔
40

41
with open(DEFAULT_YAML_FILE) as c_file:
1✔
42
    defaults_cfg: DcspyConfigYaml = yaml.load(c_file, Loader=yaml.SafeLoader)
1✔
43
    defaults_cfg['dcsbios'] = f'C:\\Users\\{environ.get("USERNAME", "UNKNOWN")}\\Saved Games\\DCS.openbeta\\Scripts\\DCS-BIOS'
1✔
44

45

46
def get_default_yaml(local_appdata: bool = False) -> Path:
1✔
47
    """
48
    Return a full path to the default configuration file.
49

50
    :param local_appdata: If True value C:/Users/<user_name>/AppData/Local is used
51
    :return: Path like an object
52
    """
53
    cfg_ful_path = DEFAULT_YAML_FILE
1✔
54
    if local_appdata:
1!
55
        user_appdata = get_config_yaml_location()
1✔
56
        makedirs(name=user_appdata, exist_ok=True)
1✔
57
        cfg_ful_path = Path(user_appdata / CONFIG_YAML).resolve()
1✔
58
        if not cfg_ful_path.exists():
1!
59
            save_yaml(data=defaults_cfg, full_path=cfg_ful_path)
1✔
60
    return cfg_ful_path
1✔
61

62

63
def load_yaml(full_path: Path) -> dict[str, Any]:
1✔
64
    """
65
    Load YAML from a file into dictionary.
66

67
    :param full_path: Full path to YAML file
68
    :return: Dictionary with data
69
    """
70
    try:
1✔
71
        with open(file=full_path, encoding='utf-8') as yaml_file:
1✔
72
            data = yaml.load(yaml_file, Loader=yaml.SafeLoader)
1✔
73
            if not isinstance(data, dict):
1✔
74
                data = {}
1✔
75
    except (FileNotFoundError, yaml.parser.ParserError) as err:
1✔
76
        makedirs(name=full_path.parent, exist_ok=True)
1✔
77
        LOG.warning(f'{type(err).__name__}: {full_path}.', exc_info=True)
1✔
78
        LOG.debug(f'{err}')
1✔
79
        data = {}
1✔
80
    return data
1✔
81

82

83
def save_yaml(data: dict[str, Any], full_path: Path) -> None:
1✔
84
    """
85
    Save disc as YAML file.
86

87
    :param data: Dictionary with data
88
    :param full_path: Full a path to YAML file
89
    """
90
    with open(file=full_path, mode='w', encoding='utf-8') as yaml_file:
1✔
91
        yaml.dump(data, yaml_file, Dumper=yaml.SafeDumper)
1✔
92

93

94
def check_ver_at_github(repo: str, current_ver: str, extension: str, file_name: str = '') -> ReleaseInfo:
1✔
95
    """
96
    Check a version of <organization>/<package> at GitHub.
97

98
    Return tuple with:
99
    - result (bool) - if local version is latest
100
    - online version (version.Version) - the latest version
101
    - download url (str) - ready to download
102
    - published date (str) - format DD MMMM YYYY
103
    - release type (str) - Regular or Pre-release
104
    - asset file (str) - file name of asset
105

106
    :param repo: Format '<organization or user>/<package>'
107
    :param current_ver: Current local version
108
    :param extension: File extension
109
    :param file_name: string in file name
110
    :return: ReleaseInfo with data
111
    """
112
    latest, online_version, asset_url, published, pre_release = False, '0.0.0', '', '', False
1✔
113
    package = repo.split('/')[1]
1✔
114
    try:
1✔
115
        response = get(url=f'https://api.github.com/repos/{repo}/releases/latest', timeout=5)
1✔
116
        if response.ok:
1✔
117
            dict_json = response.json()
1✔
118
            online_version = dict_json['tag_name']
1✔
119
            pre_release = dict_json['prerelease']
1✔
120
            published = datetime.strptime(dict_json['published_at'], '%Y-%m-%dT%H:%M:%S%z').strftime('%d %B %Y')
1✔
121
            asset_url = next(asset['browser_download_url'] for asset in dict_json['assets'] if asset['name'].endswith(extension) and file_name in asset['name'])
1✔
122
            LOG.debug(f'Latest GitHub version:{online_version} pre:{pre_release} date:{published} url:{asset_url}')
1✔
123
            latest = _compare_versions(package, current_ver, online_version)
1✔
124
        else:
125
            LOG.warning(f'Unable to check {package} version online. Try again later. Status={response.status_code}')
1✔
126
    except Exception as exc:
1✔
127
        LOG.warning(f'Unable to check {package} version online: {exc}')
1✔
128
    return ReleaseInfo(
1✔
129
        latest=latest,
130
        ver=version.parse(online_version),
131
        dl_url=asset_url,
132
        published=published,
133
        release_type='Pre-release' if pre_release else 'Regular',
134
        asset_file=asset_url.split('/')[-1],
135
    )
136

137

138
def _compare_versions(package: str, current_ver: str, remote_ver: str) -> bool:
1✔
139
    """
140
    Compare two versions of packages and return result.
141

142
    :param package: Package name
143
    :param current_ver: Current/local version
144
    :param remote_ver: Remote/online version
145
    :return:
146
    """
147
    latest = False
1✔
148
    if version.parse(remote_ver) > version.parse(current_ver):
1✔
149
        LOG.info(f'There is new version of {package}: {remote_ver}')
1✔
150
    elif version.parse(remote_ver) <= version.parse(current_ver):
1!
151
        LOG.info(f'{package} is up-to-date version: {current_ver}')
1✔
152
        latest = True
1✔
153
    return latest
1✔
154

155

156
def get_version_string(repo: str, current_ver: str, check: bool = True) -> str:
1✔
157
    """
158
    Generate formatted string with version number.
159

160
    :param repo: Format '<organization or user>/<package>'.
161
    :param current_ver: Current local version.
162
    :param check: Version online.
163
    :return: Formatted version as string.
164
    """
165
    ver_string = f'v{current_ver}'
1✔
166
    if check:
1✔
167
        result = check_ver_at_github(repo=repo, current_ver=current_ver, extension='')
1✔
168
        details = ''
1✔
169
        if result.latest:
1✔
170
            details = ' (latest)'
1✔
171
        elif str(result.ver) != '0.0.0':
1✔
172
            details = ' (update!)'
1✔
173
            current_ver = str(result.ver)
1✔
174
        elif str(result.ver) == '0.0.0':
1!
175
            details = ' (failed)'
1✔
176
        ver_string = f'v{current_ver}{details}'
1✔
177
    return ver_string
1✔
178

179

180
def download_file(url: str, save_path: Path) -> bool:
1✔
181
    """
182
    Download a file from URL and save to save_path.
183

184
    :param url: URL address
185
    :param save_path: full path to save
186
    """
187
    response = get(url=url, stream=True, timeout=5)
1✔
188
    if response.ok:
1✔
189
        LOG.debug(f'Download file from: {url}')
1✔
190
        with open(save_path, 'wb+') as dl_file:
1✔
191
            for chunk in response.iter_content(chunk_size=128):
1✔
192
                dl_file.write(chunk)
1✔
193
            LOG.debug(f'Saved as: {save_path}')
1✔
194
            return True
1✔
195
    else:
196
        LOG.warning(f'Can not download from: {url}')
1✔
197
        return False
1✔
198

199

200
def proc_is_running(name: str) -> int:
1✔
201
    """
202
    Check if the process is running and return its PID.
203

204
    If the process name is not found, 0 (zero) is returned.
205
    :param name: Process name
206
    :return: PID as int
207
    """
208
    for proc in process_iter(['pid', 'name']):
1✔
209
        if name in proc.info['name']:  # type: ignore[attr-defined]
1✔
210
            return proc.info['pid']  # type: ignore[attr-defined]
1✔
211
    return 0
1✔
212

213

214
def check_dcs_ver(dcs_path: Path) -> tuple[str, str]:
1✔
215
    """
216
    Check DCS version and release type.
217

218
    :param dcs_path: Path to DCS installation directory
219
    :return: DCS type and version as strings
220
    """
221
    result_type, result_ver = 'Unknown', 'Unknown'
1✔
222
    try:
1✔
223
        with open(file=dcs_path / 'autoupdate.cfg', encoding='utf-8') as autoupdate_cfg:
1✔
224
            autoupdate_data = autoupdate_cfg.read()
1✔
225
    except (FileNotFoundError, PermissionError) as err:
1✔
226
        LOG.debug(f'{type(err).__name__}: {err.filename}')
1✔
227
    else:
228
        result_type = 'stable'
1✔
229
        if dcs_type := search(r'"branch":\s"([\w.]*)"', autoupdate_data):
1✔
230
            result_type = str(dcs_type.group(1))
1✔
231
        if dcs_ver := search(r'"version":\s"([\d.]*)"', autoupdate_data):
1✔
232
            result_ver = str(dcs_ver.group(1))
1✔
233
    return result_type, result_ver
1✔
234

235

236
def check_bios_ver(bios_path: Path | str) -> ReleaseInfo:
1✔
237
    """
238
    Check the DSC-BIOS release version.
239

240
    :param bios_path: Path to DCS-BIOS directory in Saved Games folder
241
    :return: ReleaseInfo named tuple
242
    """
243
    result = ReleaseInfo(latest=False, ver=version.parse('0.0.0'), dl_url='', published='', release_type='', asset_file='')
1✔
244
    new_location = Path(bios_path) / 'lib' / 'modules' / 'common_modules' / 'CommonData.lua'
1✔
245
    old_location = Path(bios_path) / 'lib' / 'CommonData.lua'
1✔
246

247
    if new_location.is_file():
1✔
248
        with open(file=new_location, encoding='utf-8') as cd_lua:
1✔
249
            cd_lua_data = cd_lua.read()
1✔
250
    elif old_location.is_file():
1✔
251
        with open(file=old_location, encoding='utf-8') as cd_lua:
1✔
252
            cd_lua_data = cd_lua.read()
1✔
253
    else:
254
        cd_lua_data = ''
1✔
255
        LOG.debug(f'No `CommonData.lua` while checking DCS-BIOS version at {new_location.parent} or {old_location.parent}')
1✔
256

257
    if bios_re := search(r'function getVersion\(\)\s*return\s*\"([\d.]*)\"', cd_lua_data):
1✔
258
        bios = version.parse(bios_re.group(1))
1✔
259
        result = ReleaseInfo(latest=False, ver=bios, dl_url='', published='', release_type='', asset_file='')
1✔
260
    return result
1✔
261

262

263
def is_git_repo(dir_path: str) -> bool:
1✔
264
    """
265
    Check if dir_path ios Git repository.
266

267
    :param dir_path: Path as string
268
    :return: True if dir is git repo
269
    """
270
    import git
1✔
271
    try:
1✔
272
        _ = git.Repo(dir_path).git_dir
1✔
273
        return True
1✔
274
    except (git.InvalidGitRepositoryError, git.exc.NoSuchPathError):
1✔
275
        return False
1✔
276

277

278
def _get_sha_hex_str(bios_repo: git.Repo, git_ref: str) -> str:
1✔
279
    """
280
    Return a string representing the commit hash, date, and author of the given Git reference in the provided repository.
281

282
    :param bios_repo: A Git repository object.
283
    :param git_ref: A string representing the Git reference (e.g., commit, branch, tag).
284
    :return: A string representing the commit hash, date, and author.
285
    """
286
    try:
1✔
287
        import git
1✔
288
    except ImportError:
×
289
        raise OSError('Git executable is not available!')
×
290
    try:
1✔
291
        bios_repo.git.checkout(git_ref)
1✔
292
        branch = bios_repo.active_branch.name
1✔
293
        head_commit = bios_repo.head.commit
1✔
294
        sha = f'{branch} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
295
    except (git.exc.GitCommandError, TypeError):
1✔
296
        head_commit = bios_repo.head.commit
1✔
297
        sha = f'{head_commit.hexsha[0:8]} from: {head_commit.committed_datetime.strftime("%d-%b-%Y %H:%M:%S")} by: {head_commit.author}'
1✔
298
    LOG.debug(f'Checkout: {head_commit.hexsha} from: {head_commit.committed_datetime} | by: {head_commit.author}\n{head_commit.message}')  # type: ignore
1✔
299
    return sha
1✔
300

301

302
def check_github_repo(git_ref: str, repo_dir: Path, repo: str, update: bool = True, progress: git.RemoteProgress | None = None) -> str:
1✔
303
    """
304
    Update git repository.
305

306
    Return SHA of the latest commit.
307

308
    :param git_ref: Any Git reference as string
309
    :param repo_dir: Local directory for a repository
310
    :param repo: GitHub repository user/name
311
    :param update: Perform update process
312
    :param progress: Progress callback
313
    """
314
    bios_repo = _checkout_repo(repo=repo, repo_dir=repo_dir, progress=progress)
1✔
315
    if update:
1!
316
        f_info = bios_repo.remotes[0].pull(progress=progress)
1✔
317
        LOG.debug(f'Pulled: {f_info[0].name} as: {f_info[0].commit}')
1✔
318
    sha = _get_sha_hex_str(bios_repo, git_ref)
1✔
319
    return sha
1✔
320

321

322
def _checkout_repo(repo: str, repo_dir: Path, checkout_ref: str = 'master', progress: git.RemoteProgress | None = None) -> git.Repo:
1✔
323
    """
324
    Checkout repository at master branch or clone it when not exists in a system.
325

326
    :param repo: Repository name
327
    :param repo_dir: Local repository directory
328
    :param checkout_ref: Checkout a git reference
329
    :param progress: Progress callback
330
    :return: Repo object of the repository
331
    """
332
    import git
1✔
333

334
    makedirs(name=repo_dir, exist_ok=True)
1✔
335
    if is_git_repo(str(repo_dir)):
1✔
336
        bios_repo = git.Repo(repo_dir)
1✔
337
        bios_repo.git.checkout(checkout_ref)
1✔
338
    else:
339
        rmtree(path=repo_dir, ignore_errors=True)
1✔
340
        bios_repo = git.Repo.clone_from(url=f'https://github.com/{repo}.git', to_path=repo_dir, progress=progress)  # type: ignore
1✔
341
    return bios_repo
1✔
342

343

344
def check_dcs_bios_entry(lua_dst_data: str, lua_dst_path: Path, temp_dir: Path) -> str:
1✔
345
    """
346
    Check DCS-BIOS entry in Export.lua file.
347

348
    :param lua_dst_data: Content of Export.lua
349
    :param lua_dst_path: Export.lua path
350
    :param temp_dir: Directory with DCS-BIOS archive
351
    :return: Result of checks
352
    """
353
    result = '\n\nExport.lua exists.'
1✔
354
    lua = 'Export.lua'
1✔
355
    with open(file=temp_dir / lua, encoding='utf-8') as lua_src:
1✔
356
        lua_src_data = lua_src.read()
1✔
357
    export_re = search(r'dofile\(lfs.writedir\(\)\s*\.\.\s*\[\[Scripts\\DCS-BIOS\\BIOS\.lua]]\)', lua_dst_data)
1✔
358
    if not export_re:
1✔
359
        with open(file=lua_dst_path / lua, mode='a+', encoding='utf-8') as exportlua_dst:
1✔
360
            exportlua_dst.write(f'\n{lua_src_data}')
1✔
361
        LOG.debug(f'Add DCS-BIOS to Export.lua: {lua_src_data}')
1✔
362
        result += '\n\nDCS-BIOS entry added.\n\nYou verify installation at:\ngithub.com/DCS-Skunkworks/DCSFlightpanels/wiki/Installation'
1✔
363
    else:
364
        result += '\n\nDCS-BIOS entry detected.'
1✔
365
    return result
1✔
366

367

368
def count_files(directory: Path, extension: str) -> int:
1✔
369
    """
370
    Count files with extension in directory.
371

372
    :param directory: as Path object
373
    :param extension: file extension
374
    :return: number of files
375
    """
376
    try:
1✔
377
        json_files = [f.name for f in directory.iterdir() if f.is_file() and f.suffix == f'.{extension}']
1!
378
        LOG.debug(f'In: {directory} found {json_files} ')
1✔
379
        return len(json_files)
1✔
380
    except FileNotFoundError:
1✔
381
        LOG.debug(f'Wrong directory: {directory}')
1✔
382
        return -1
1✔
383

384

385
def is_git_exec_present() -> bool:
1✔
386
    """
387
    Check if git executable is present in a system.
388

389
    :return: True if git.exe is available
390
    """
391
    try:
1✔
392
        import git
1✔
393
        return bool(git.GIT_OK)
1✔
394
    except ImportError as err:
×
395
        LOG.debug(type(err).__name__, exc_info=True)
×
396
        return False
×
397

398

399
def is_git_object(repo_dir: Path, git_obj: str) -> bool:
1✔
400
    """
401
    Check if git_obj is a valid Git reference.
402

403
    :param repo_dir: Directory with repository
404
    :param git_obj: Git reference to check
405
    :return: True if git_obj is git reference, False otherwise
406
    """
407
    import gitdb  # type: ignore[import-untyped]
1✔
408
    result = False
1✔
409
    if is_git_repo(str(repo_dir)):
1✔
410
        bios_repo = git.Repo(repo_dir)
1✔
411
        bios_repo.git.checkout('master')
1✔
412
        try:
1✔
413
            bios_repo.commit(git_obj)
1✔
414
            result = True
1✔
415
        except gitdb.exc.BadName:
1✔
416
            pass
1✔
417
    return result
1✔
418

419

420
def get_all_git_refs(repo_dir: Path) -> list[str]:
1✔
421
    """
422
    Get a list of branches and tags for repo.
423

424
    :param repo_dir: Directory with a repository
425
    :return: List of git references as strings
426
    """
427
    refs = []
1✔
428
    if is_git_repo(str(repo_dir)):
1!
429
        for ref in chain(git.Repo(repo_dir).heads, git.Repo(repo_dir).tags):
1✔
430
            refs.append(str(ref))
1✔
431
    return refs
1✔
432

433

434
class WorkerSignals(QObject):
1✔
435
    """
436
    Defines the signals available from a running worker thread.
437

438
    Supported signals are:
439
    * finished - no data
440
    * error - tuple with exctype, value, traceback.format_exc()
441
    * result - object/any type - data returned from processing
442
    * progress - float between zero (0) and one (1) as indication of progress
443
    * stage - string with current stage
444
    * count - tuple of int as count of events
445
    """
446

447
    finished = Signal()
1✔
448
    error = Signal(tuple)
1✔
449
    result = Signal(object)
1✔
450
    progress = Signal(int)
1✔
451
    stage = Signal(str)
1✔
452
    count = Signal(tuple)
1✔
453

454

455
class SignalHandler:
1✔
456
    """QtSignal handler for GUI notification."""
457

458
    def __init__(self, signals_dict: dict[str, Callable], signals: QObject = WorkerSignals()) -> None:
1✔
459
        """
460
        Use for passing signals function and emit to Qt GUI.
461

462
        :param signals_dict: The keys are the signal names, and the values are the corresponding handler functions.
463
        :param signals: QObject used for handling signals, the default is WorkerSignals class.
464
        """
465
        self._sig_handler = signals_dict
1✔
466
        self.signals = signals
1✔
467
        for signal, handler in signals_dict.items():
1✔
468
            getattr(self.signals, signal).connect(handler)
1✔
469

470
    def got_signals_for_interface(self) -> bool:
1✔
471
        """
472
        Check if there are progress or count signals for the interface.
473

474
        :return: True if there are signals for the interface, False otherwise.
475
        """
476
        if self._sig_handler.get('progress', False):
1✔
477
            return True
1✔
478
        if self._sig_handler.get('count', False):
1✔
479
            return True
1✔
480
        return False
1✔
481

482
    def emit(self, sig_name: str, **kwargs) -> None:
1✔
483
        """
484
        Emit the signal with the name and value.
485

486
        :param sig_name: The name of the signal to emit.
487
        """
488
        value = kwargs.get('value', 'No value set')
1✔
489
        if value == 'No value set':
1✔
490
            getattr(self.signals, sig_name).emit()
1✔
491
        else:
492
            getattr(self.signals, sig_name).emit(value)
1✔
493

494
    def __str__(self) -> str:
1✔
NEW
495
        signals = {signal: handler.__name__ for signal, handler in self._sig_handler.items()}
×
NEW
496
        return f'{signals}'
×
497

498

499
class CloneProgress(git.RemoteProgress):
1✔
500
    """Handler providing an interface to parse progress information emitted by git."""
501
    OP_CODES: ClassVar[list[str]] = ['BEGIN', 'CHECKING_OUT', 'COMPRESSING', 'COUNTING', 'END', 'FINDING_SOURCES', 'RECEIVING', 'RESOLVING', 'WRITING']
1✔
502
    OP_CODE_MAP: ClassVar[dict[int, str]] = {getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES}
1!
503

504
    def __init__(self, sig_handler: SignalHandler) -> None:
1✔
505
        """
506
        Initialize the progress handler.
507

508
        :param sig_handler: Qt signal handler for progress notification
509
        """
510
        super().__init__()
1✔
511
        self.sig_handler = sig_handler
1✔
512

513
    def get_curr_op(self, op_code: int) -> str:
1✔
514
        """
515
        Get stage name from OP code.
516

517
        :param op_code: OP code
518
        :return: stage name
519
        """
520
        op_code_masked = op_code & self.OP_MASK
1✔
521
        return self.OP_CODE_MAP.get(op_code_masked, '?').title()
1✔
522

523
    def update(self, op_code: int, cur_count, max_count=None, message: str = ''):
1✔
524
        """
525
        Call whenever the progress changes.
526

527
        :param op_code: Integer allowing to be compared against Operation IDs and stage IDs.
528
        :param cur_count: A count of current absolute items
529
        :param max_count: The maximum count of items we expect. It may be None in case there is no maximum number of items or if it is (yet) unknown.
530
        :param message: It contains the number of bytes transferred. It may be used for other purposes as well.
531
        """
532
        if op_code & git.RemoteProgress.BEGIN:
1!
533
            self.sig_handler.emit(sig_name='stage', value=f'Git clone: {self.get_curr_op(op_code)}')
1✔
534

535
        percentage = int(cur_count / max_count * 100) if max_count else 0
1✔
536
        self.sig_handler.emit(sig_name='progress', value=percentage)
1✔
537

538

539
def collect_debug_data() -> Path:
1✔
540
    """
541
    Collect and zip all data for troubleshooting.
542

543
    :return: Path object to ZIP file
544
    """
545
    config_file = Path(get_config_yaml_location() / CONFIG_YAML).resolve()
1✔
546
    conf_dict = load_yaml(config_file)
1✔
547
    sys_data = _get_sys_file(conf_dict)
1✔
548
    dcs_log = _get_dcs_log(conf_dict)
1✔
549

550
    zip_file = Path(gettempdir()) / f'dcspy_debug_{str(datetime.now()).replace(" ", "_").replace(":", "")}.zip'
1✔
551
    with zipfile.ZipFile(file=zip_file, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) as zipf:
1✔
552
        zipf.write(sys_data, arcname=sys_data.name)
1✔
553
        zipf.write(dcs_log, arcname=dcs_log.name)
1✔
554
        for log_file in _get_log_files():
1✔
555
            zipf.write(log_file, arcname=log_file.name)
1✔
556
        for yaml_file in _get_yaml_files(config_file):
1✔
557
            zipf.write(yaml_file, arcname=yaml_file.name)
1✔
558
        for png in _get_png_files():
1✔
559
            zipf.write(png, arcname=png.name)
1✔
560

561
    return zip_file
1✔
562

563

564
def _get_sys_file(conf_dict: dict[str, Any]) -> Path:
1✔
565
    """
566
    Save system information to file and return its path.
567

568
    :param conf_dict: A dictionary containing configuration information.
569
    :return: A Path object representing the path to the system data file.
570
    """
571
    system_info = _fetch_system_info(conf_dict)
1✔
572
    sys_data = Path(gettempdir()) / 'system_data.txt'
1✔
573
    with open(sys_data, 'w+') as debug_file:
1✔
574
        debug_file.write(system_info)
1✔
575
    return sys_data
1✔
576

577

578
def _fetch_system_info(conf_dict: dict[str, Any]) -> str:
1✔
579
    """
580
    Fetch system information.
581

582
    :param conf_dict: A dictionary containing configuration information.
583
    :return: System data as string
584
    """
585
    name = uname()
1✔
586
    pyver = (python_version(), python_implementation())
1✔
587
    pyexec = sys.executable
1✔
588
    dcs = check_dcs_ver(dcs_path=Path(str(conf_dict['dcs'])))
1✔
589
    bios_ver = check_bios_ver(bios_path=str(conf_dict['dcsbios'])).ver
1✔
590
    repo_dir = Path(str(conf_dict['dcsbios'])).parents[1] / 'dcs-bios'
1✔
591
    git_ver, head_commit = _fetch_git_data(repo_dir=repo_dir)
1✔
592
    lgs_dir = '\n'.join([
1!
593
        str(Path(dir_path) / filename)
594
        for dir_path, _, filenames in walk('C:\\Program Files\\Logitech Gaming Software\\SDK')
595
        for filename in filenames
596
    ])
597
    return f'{__version__=}\n{name=}\n{pyver=}\n{pyexec=}\n{dcs=}\n{bios_ver=}\n{git_ver=}\n{head_commit=}\n{lgs_dir}\ncfg={pformat(conf_dict)}'
1✔
598

599

600
def _fetch_git_data(repo_dir: Path) -> tuple[Sequence[int], str]:
1✔
601
    """
602
    Fetch Git version and SHA of HEAD commit.
603

604
    :param repo_dir: Local directory for repository
605
    :return: Tuple of (a version) and SHA of HEAD commit
606
    """
607
    try:
1✔
608
        import git
1✔
609
        git_ver = git.cmd.Git().version_info
1✔
610
        head_commit = str(git.Repo(repo_dir).head.commit)
1✔
611
    except (git.exc.NoSuchPathError, git.exc.InvalidGitRepositoryError, ImportError):
1✔
612
        git_ver = (0, 0, 0, 0)
1✔
613
        head_commit = 'N/A'
1✔
614
    return git_ver, head_commit
1✔
615

616

617
def _get_dcs_log(conf_dict: dict[str, Any]) -> Path:
1✔
618
    """
619
    Get path to dcs.log path.
620

621
    :param conf_dict: A dictionary containing configuration information.
622
    :return: A Path object representing the path to the dcs.log file.
623
    """
624
    dcs_log_file = Path(conf_dict['dcsbios']).parents[1] / 'Logs' / 'dcs.log'
1✔
625
    return dcs_log_file if dcs_log_file.is_file() else Path()
1✔
626

627

628
def _get_log_files() -> Generator[Path]:
1✔
629
    """
630
    Get a path to all logg files.
631

632
    :return: Generator of a path to log files
633
    """
634
    return (
1✔
635
        Path(gettempdir()) / logfile
636
        for logfile in glob(str(Path(gettempdir()) / 'dcspy.log*'))
637
    )
638

639

640
def _get_yaml_files(config_file: Path) -> Generator[Path]:
1✔
641
    """
642
    Get a path to all configuration YAML files.
643

644
    :param config_file: Path to the config file
645
    :return: Generator of a path to YAML files
646
    """
647
    return (
1✔
648
        Path(dirpath) / filename
649
        for dirpath, _, filenames in walk(config_file.parent)
650
        for filename in filenames
651
        if filename.endswith('yaml')
652
    )
653

654

655
def _get_png_files() -> Generator[Path]:
1✔
656
    """
657
    Get a path to png screenshots for all airplanes.
658

659
    :return: Generator of a path to png files
660
    """
661
    aircrafts = ['FA18Chornet', 'Ka50', 'Ka503', 'Mi8MT', 'Mi24P', 'F16C50', 'F15ESE',
1✔
662
                 'AH64DBLKII', 'A10C', 'A10C2', 'F14A135GR', 'F14B', 'AV8BNA']
663
    return (
1✔
664
        Path(dir_path) / filename
665
        for dir_path, _, filenames in walk(gettempdir())
666
        for filename in filenames
667
        if any(True for aircraft in aircrafts if aircraft in filename and filename.endswith('png'))
668
    )
669

670

671
def get_config_yaml_location() -> Path:
1✔
672
    """
673
    Get a location of YAML configuration files.
674

675
    :rtype: Path object to directory
676
    """
677
    localappdata = environ.get('LOCALAPPDATA', None)
1✔
678
    user_appdata = Path(localappdata) / 'dcspy' if localappdata else DEFAULT_YAML_FILE.parent
1✔
679
    return user_appdata
1✔
680

681

682
def run_pip_command(cmd: str) -> tuple[int, str, str]:
1✔
683
    """
684
    Execute pip command.
685

686
    :param cmd: Command as string
687
    :return: Tuple with return code, stderr and stdout
688
    """
689
    try:
1✔
690
        result = run([sys.executable, '-m', 'pip', *cmd.split(' ')], capture_output=True, check=True)
1✔
691
        return result.returncode, result.stderr.decode('utf-8'), result.stdout.decode('utf-8')
1✔
692
    except CalledProcessError as e:
1✔
693
        LOG.debug(f'Result: {e}')
1✔
694
        return e.returncode, e.stderr.decode('utf-8'), e.stdout.decode('utf-8')
1✔
695

696

697
def run_command(cmd: Sequence[str], cwd: Path | None = None) -> int:
1✔
698
    """
699
    Run command in shell as a subprocess.
700

701
    :param cmd: The command to be executed as a sequence of strings
702
    :param cwd: current working directory
703
    :return: The return code of command
704
    """
705
    try:
1✔
706
        proc = run(cmd, check=True, shell=False, cwd=cwd)
1✔
707
        return proc.returncode
1✔
708
    except CalledProcessError as e:
1✔
709
        LOG.debug(f'Result: {e}')
1✔
710
        return -1
1✔
711

712

713
def load_json(full_path: Path) -> Any:
1✔
714
    """
715
    Load JSON from a file into dictionary.
716

717
    :param full_path: Full path
718
    :return: Python representation of JSON
719
    """
720
    with open(full_path, encoding='utf-8') as json_file:
1✔
721
        data = json_file.read()
1✔
722
    return json.loads(data)
1✔
723

724

725
@lru_cache
1✔
726
def get_full_bios_for_plane(plane: str, bios_dir: Path) -> DcsBiosPlaneData:
1✔
727
    """
728
    Collect full BIOS for plane with name.
729

730
    :param plane: BIOS plane name
731
    :param bios_dir: path to DCS-BIOS directory
732
    :return: dict
733
    """
734
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
735
    local_json: dict[str, Any] = {}
1✔
736
    aircraft_aliases = load_json(full_path=alias_path)
1✔
737
    for json_file in aircraft_aliases[plane]:
1✔
738
        local_json = {**local_json, **load_json(full_path=bios_dir / 'doc' / 'json' / f'{json_file}.json')}
1✔
739

740
    return DcsBiosPlaneData.model_validate(local_json)
1✔
741

742

743
@lru_cache
1✔
744
def get_inputs_for_plane(plane: str, bios_dir: Path) -> dict[str, dict[str, ControlKeyData]]:
1✔
745
    """
746
    Get dict with all not empty inputs for plane.
747

748
    :param plane: BIOS plane name
749
    :param bios_dir: path to DCS-BIOS
750
    :return: dict.
751
    """
752
    plane_bios = get_full_bios_for_plane(plane=plane, bios_dir=bios_dir)
1✔
753
    inputs = plane_bios.get_inputs()
1✔
754
    return inputs
1✔
755

756

757
def get_list_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> list[str]:
1✔
758
    """
759
    Get a list of all controllers from dict with sections and inputs.
760

761
    :param inputs: Dictionary with ControlKeyData
762
    :return: List of string
763
    """
764
    result_list = []
1✔
765
    for section, controllers in inputs.items():
1✔
766
        result_list.append(f'{CTRL_LIST_SEPARATOR} {section} {CTRL_LIST_SEPARATOR}')
1✔
767
        for ctrl_name in controllers:
1✔
768
            result_list.append(ctrl_name)
1✔
769
    return result_list
1✔
770

771

772
@lru_cache
1✔
773
def get_planes_list(bios_dir: Path) -> list[str]:
1✔
774
    """
775
    Get a list of all DCS-BIOS supported planes with clickable cockpit.
776

777
    :param bios_dir: Path to DCS-BIOS
778
    :return: List of all supported planes
779
    """
780
    aircraft_aliases = get_plane_aliases(bios_dir=bios_dir, plane=None)
1✔
781
    return [name for name, yaml_data in aircraft_aliases.items() if yaml_data not in (['CommonData', 'FC3'], ['CommonData'])]
1!
782

783

784
@lru_cache
1✔
785
def get_plane_aliases(bios_dir: Path, plane: str | None = None) -> dict[str, list[str]]:
1✔
786
    """
787
    Get a list of all YAML files for plane with name.
788

789
    :param plane: BIOS plane name
790
    :param bios_dir: path to DCS-BIOS
791
    :return: list of all YAML files for plane definition
792
    """
793
    alias_path = bios_dir / 'doc' / 'json' / 'AircraftAliases.json'
1✔
794
    aircraft_aliases = load_json(full_path=alias_path)
1✔
795
    if plane:
1✔
796
        aircraft_aliases = {plane: aircraft_aliases[plane]}
1✔
797
    return aircraft_aliases
1✔
798

799

800
def get_depiction_of_ctrls(inputs: dict[str, dict[str, ControlKeyData]]) -> dict[str, ControlDepiction]:
1✔
801
    """
802
    Get the depiction of controls.
803

804
    :param inputs: Dictionary with ControlKeyData
805
    :return: A dictionary containing the depiction of controls.
806
    """
807
    result = {}
1✔
808
    for section, controllers in inputs.items():
1✔
809
        for ctrl_name, ctrl in controllers.items():
1✔
810
            result[ctrl_name] = ctrl.depiction
1✔
811
    return result
1✔
812

813

814
def substitute_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
815
    """
816
    Substitute symbols in a string with specified replacements.
817

818
    :param value: The input string to be processed
819
    :param symbol_replacement: A list of symbol patterns and their corresponding replacements.
820
    :return: The processed string with symbols replaced according to the provided symbol_replacement list.
821
    """
822
    for pattern, replacement in symbol_replacement:
1✔
823
        value = sub(pattern, replacement, value)
1✔
824
    return value
1✔
825

826

827
def replace_symbols(value: str, symbol_replacement: Sequence[Sequence[str]]) -> str:
1✔
828
    """
829
    Replace symbols in a string with specified replacements.
830

831
    :param value: The string in which symbols will be replaced.
832
    :param symbol_replacement: A sequence of sequences containing the original symbols and their replacement strings.
833
    :return: The string with symbols replaced.
834
    """
835
    for original, replacement in symbol_replacement:
1✔
836
        value = value.replace(original, replacement)
1✔
837
    return value
1✔
838

839

840
class KeyRequest:
1✔
841
    """Map LCD button ot G-Key with an abstract request model."""
842

843
    def __init__(self, yaml_path: Path, get_bios_fn: Callable[[str], BiosValue]) -> None:
1✔
844
        """
845
        Load YAML with BIOS request for G-Keys and LCD buttons.
846

847
        :param yaml_path: Path to the airplane YAML file.
848
        :param get_bios_fn: Function used to get current BIOS value.
849
        """
850
        plane_yaml = load_yaml(full_path=yaml_path)
1✔
851
        self.buttons: dict[AnyButton, RequestModel] = {}
1✔
852
        for key_str, request in plane_yaml.items():
1✔
853
            if request:
1!
854
                key = get_key_instance(key_str)
1✔
855
                self.buttons[key] = RequestModel.from_request(key=key, request=request, get_bios_fn=get_bios_fn)
1✔
856

857
    @property
1✔
858
    def cycle_button_ctrl_name(self) -> dict[str, int]:
1✔
859
        """Return a dictionary with BIOS selectors to track changes of values for cycle button to get current values."""
860
        return {req_model.ctrl_name: int() for req_model in self.buttons.values() if req_model.is_cycle}
1!
861

862
    def get_request(self, button: AnyButton) -> RequestModel:
1✔
863
        """
864
        Get abstract representation for request ti be sent gor requested button.
865

866
        :param button: LcdButton, Gkey or MouseButton
867
        :return: RequestModel object
868
        """
869
        return self.buttons.get(button, RequestModel.empty(key=button))
1✔
870

871
    def set_request(self, button: AnyButton, req: str) -> None:
1✔
872
        """
873
        Update the internal string request for the specified button.
874

875
        :param button: LcdButton, Gkey or MouseButton
876
        :param req: The raw request to set.
877
        """
878
        self.buttons[button].raw_request = req
1✔
879

880

881
def generate_bios_jsons_with_lupa(dcs_save_games: Path, local_compile='./Scripts/DCS-BIOS/test/compile/LocalCompile.lua') -> None:
1✔
882
    r"""
883
    Regenerate DCS-BIOS JSON files.
884

885
    Using the Lupa library, first it will tries use LuaJIT 2.1 if not it will fall back to Lua 5.1
886

887
    :param dcs_save_games: Full path to Saved Games\DCS.openbeta directory.
888
    :param local_compile: Relative path to LocalCompile.lua file.
889
    """
890
    try:
1✔
891
        import lupa.luajit21 as lupa  # type: ignore[import-untyped]
1✔
892
    except ImportError:
×
893
        try:
×
894
            import lupa.lua51 as lupa  # type: ignore[import-untyped]
×
895
        except ImportError:
×
896
            return
×
897

898
    previous_dir = getcwd()
1✔
899
    try:
1✔
900
        chdir(dcs_save_games)
1✔
901
        LOG.debug(f"Changed to: {dcs_save_games}")
1✔
902
        lua = lupa.LuaRuntime()
1✔
903
        LOG.debug(f"Using {lupa.LuaRuntime().lua_implementation} (compiled with {lupa.LUA_VERSION})")
1✔
904
        with open(local_compile) as lua_file:
1✔
905
            lua_script = lua_file.read()
1✔
906
        lua.execute(lua_script)
1✔
907
    finally:
908
        chdir(previous_dir)
1✔
909
        LOG.debug(f"Change directory back to: {getcwd()}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc