• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5560612039688192

22 Jan 2026 09:10AM UTC coverage: 62.625% (+0.002%) from 62.623%
5560612039688192

Pull #10247

CirrusCI

f321x
qt: TxEditor: restructure submarine payment messages

Show:
error
amount details
\n
proposed action

instead of:
error
proposed action
amount details
Pull Request #10247: qt: improve send_change_to_lightning feedback

1 of 5 new or added lines in 1 file covered. (20.0%)

479 existing lines in 3 files now uncovered.

23912 of 38183 relevant lines covered (62.62%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.89
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
1✔
27
import os
1✔
28
import pkgutil
1✔
29
import importlib.util
1✔
30
import time
1✔
31
import threading
1✔
32
import sys
1✔
33
import zipfile as zipfile_lib
1✔
34
from urllib.parse import urlparse
1✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
1✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
1✔
39
import zipimport
1✔
40
from functools import wraps, partial
1✔
41
from itertools import chain
1✔
42

43
from electrum_ecc import ECPrivkey, ECPubkey
1✔
44

45
from ._vendor.distutils.version import StrictVersion
1✔
46
from .version import ELECTRUM_VERSION
1✔
47
from .i18n import _
1✔
48
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
1✔
49
                   make_dir, make_aiohttp_session)
50
from . import bip32
1✔
51
from . import plugins
1✔
52
from .simple_config import SimpleConfig
1✔
53
from .logging import get_logger, Logger
1✔
54
from .crypto import sha256
1✔
55
from .network import Network
1✔
56

57
if TYPE_CHECKING:
58
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
59
    from .keystore import Hardware_KeyStore, KeyStore
60
    from .wallet import Abstract_Wallet
61

62

63
_logger = get_logger(__name__)
1✔
64
plugin_loaders = {}
1✔
65
hook_names = set()
1✔
66
hooks = {}
1✔
67
_exec_module_failure = {}  # type: Dict[str, Exception]
1✔
68

69
PLUGIN_PASSWORD_VERSION = 1
1✔
70

71

72
class Plugins(DaemonThread):
1✔
73

74
    pkgpath = os.path.dirname(plugins.__file__)
1✔
75
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
76
    keyfile_posix = '/etc/electrum/plugins_key'
1✔
77
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
1✔
78

79
    @profiler
1✔
80
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
1✔
81
        self.config = config
1✔
82
        self.cmd_only = cmd_only  # type: bool
1✔
83
        self.internal_plugin_metadata = {}
1✔
84
        self.external_plugin_metadata = {}
1✔
85
        if cmd_only:
1✔
86
            # only import the command modules of plugins
UNCOV
87
            Logger.__init__(self)
×
88
            self.find_plugins()
×
89
            self.load_plugins()
×
90
            return
×
91
        DaemonThread.__init__(self)
1✔
92
        self.device_manager = DeviceMgr(config)
1✔
93
        self.name = 'Plugins'  # set name of thread
1✔
94
        self._hw_wallets = {}
1✔
95
        self.plugins = {}  # type: Dict[str, BasePlugin]
1✔
96
        self.gui_name = gui_name
1✔
97
        self.find_plugins()
1✔
98
        self.load_plugins()
1✔
99
        self.add_jobs(self.device_manager.thread_jobs())
1✔
100
        self.start()
1✔
101

102
    @property
1✔
103
    def descriptions(self):
1✔
UNCOV
104
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
105

106
    def find_directory_plugins(self, pkg_path: str, external: bool):
1✔
107
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
108
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
1✔
109
        for loader, name, ispkg in iter_modules:
1✔
110
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
111
            #       once as data and once as code. To honor the "no duplicates" rule below,
112
            #       we exclude the ones packaged as *code*, here:
113
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
1✔
UNCOV
114
                continue
×
115
            module_path = os.path.join(pkg_path, name)
1✔
116
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
1✔
UNCOV
117
                continue
×
118
            try:
1✔
119
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
1✔
120
                    d = json.load(f)
1✔
UNCOV
121
            except FileNotFoundError:
×
122
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
123
                continue
×
124
            if 'fullname' not in d:
1✔
UNCOV
125
                continue
×
126
            d['path'] = module_path
1✔
127
            if not self.cmd_only:
1✔
128
                gui_good = self.gui_name in d.get('available_for', [])
1✔
129
                if not gui_good:
1✔
130
                    continue
1✔
131
                details = d.get('registers_wallet_type')
1✔
132
                if details:
1✔
133
                    self.register_wallet_type(name, gui_good, details)
1✔
134
                details = d.get('registers_keystore')
1✔
135
                if details:
1✔
136
                    self.register_keystore(name, details)
1✔
137
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
1✔
UNCOV
138
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
139
                _logger.info(f"duplicate plugins? for {name=}")
×
140
                continue
×
141
            if not external:
1✔
142
                self.internal_plugin_metadata[name] = d
1✔
143
            else:
UNCOV
144
                self.external_plugin_metadata[name] = d
×
145

146
    @staticmethod
1✔
147
    def exec_module_from_spec(spec, path: str):
1✔
148
        if prev_fail := _exec_module_failure.get(path):
1✔
UNCOV
149
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
150
        try:
1✔
151
            module = importlib.util.module_from_spec(spec)
1✔
152
            # sys.modules needs to be modified for relative imports to work
153
            # see https://stackoverflow.com/a/50395128
154
            sys.modules[path] = module
1✔
155
            spec.loader.exec_module(module)
1✔
UNCOV
156
        except Exception as e:
×
157
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
158
            # so the import system knows it failed. If called again for the same plugin, we do not
159
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
160
            # might have defined commands).
UNCOV
161
            _exec_module_failure[path] = e
×
162
            if path in sys.modules:
×
163
                sys.modules.pop(path, None)
×
164
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
165
        return module
1✔
166

167
    def find_plugins(self):
1✔
168
        internal_plugins_path = (self.pkgpath, False)
1✔
169
        external_plugins_path = (self.get_external_plugin_dir(), True)
1✔
170
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
1✔
171
            if pkg_path and os.path.exists(pkg_path):
1✔
172
                if not external:
1✔
173
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
1✔
174
                else:
175
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
1✔
176

177
    def load_plugins(self):
1✔
178
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
1✔
179
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
1✔
UNCOV
180
                try:
×
181
                    if self.cmd_only:  # only load init method to register commands
×
182
                        self.maybe_load_plugin_init_method(name)
×
183
                    else:
UNCOV
184
                        self.load_plugin_by_name(name)
×
185
                except BaseException as e:
×
186
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
187

188
    def _has_root_permissions(self, path):
1✔
UNCOV
189
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
190

191
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
1✔
UNCOV
192
        if sys.platform in ['windows', 'win32']:
×
193
            keyfile_path = self.keyfile_windows
×
194
            keyfile_help = _('This file can be edited with Regedit')
×
195
        elif 'ANDROID_DATA' in os.environ:
×
196
            raise Exception('platform not supported')
×
197
        else:
198
            # treat unknown platforms and macOS as linux-like
UNCOV
199
            keyfile_path = self.keyfile_posix
×
200
            keyfile_help = "" if not key_hex else "".join([
×
201
                                         _('The file must have root permissions'),
202
                                         ".\n\n",
203
                                         _("To set it you can also use the Auto-Setup or run "
204
                                           "the following terminal command"),
205
                                         ":\n\n",
206
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
207
            ])
UNCOV
208
        return keyfile_path, keyfile_help
×
209

210
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
1✔
211
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
UNCOV
212
        try:
×
213
            if sys.platform in ['windows', 'win32']:
×
214
                self._write_key_to_regedit_windows(pubkey_hex)
×
215
            elif 'ANDROID_DATA' in os.environ:
×
216
                raise Exception('platform not supported')
×
217
            elif sys.platform.startswith('darwin'):  # macOS
×
218
                self._write_key_to_root_file_macos(pubkey_hex)
×
219
            else:
UNCOV
220
                self._write_key_to_root_file_linux(pubkey_hex)
×
221
        except Exception:
×
222
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
223
            return False
×
224
        return True
×
225

226
    def try_auto_key_reset(self) -> bool:
1✔
UNCOV
227
        try:
×
228
            if sys.platform in ['windows', 'win32']:
×
229
                self._delete_plugin_key_from_windows_registry()
×
230
            elif 'ANDROID_DATA' in os.environ:
×
231
                raise Exception('platform not supported')
×
232
            elif sys.platform.startswith('darwin'):  # macOS
×
233
                self._delete_macos_plugin_keyfile()
×
234
            else:
UNCOV
235
                self._delete_linux_plugin_keyfile()
×
236
        except Exception:
×
237
            self.logger.exception(f'auto-reset of plugin key failed')
×
238
            return False
×
239
        return True
×
240

241
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
1✔
242
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
UNCOV
243
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
244
        sh_command = (
×
245
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
246
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
247
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
248
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
249
        )
UNCOV
250
        return sh_command
×
251

252
    @staticmethod
1✔
253
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
1✔
254
        """
255
        Inspired by
256
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
257
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
258
        """
UNCOV
259
        from shlex import quote
×
260

UNCOV
261
        def quote_shell(args):
×
262
            return " ".join(quote(arg) for arg in args)
×
263

UNCOV
264
        def quote_applescript(string):
×
265
            charmap = {
×
266
                "\n": "\\n",
267
                "\r": "\\r",
268
                "\t": "\\t",
269
                "\"": "\\\"",
270
                "\\": "\\\\",
271
            }
UNCOV
272
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
273

UNCOV
274
        commands = [
×
275
            "osascript",
276
            "-e",
277
            "do shell script %s "
278
            "with administrator privileges "
279
            "without altering line endings"
280
            % quote_applescript(quote_shell(commands))
281
        ]
UNCOV
282
        return commands
×
283

284
    @staticmethod
1✔
285
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
1✔
286
        """
287
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
288
        """
289
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
290
        # for the result of the process (returns no process handle)
UNCOV
291
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
292
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
293

294
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
UNCOV
295
        class SHELLEXECUTEINFO(Structure):
×
296
            _fields_ = [
×
297
                ('cbSize', DWORD),
298
                ('fMask', c_ulong),
299
                ('hwnd', HWND),
300
                ('lpVerb', LPCWSTR),
301
                ('lpFile', LPCWSTR),
302
                ('lpParameters', LPCWSTR),
303
                ('lpDirectory', LPCWSTR),
304
                ('nShow', c_ulong),
305
                ('hInstApp', HINSTANCE),
306
                ('lpIDList', c_ulong),
307
                ('lpClass', LPCWSTR),
308
                ('hkeyClass', HKEY),
309
                ('dwHotKey', DWORD),
310
                ('hIcon', HANDLE),
311
                ('hProcess', HANDLE)
312
            ]
313

UNCOV
314
        info = SHELLEXECUTEINFO()
×
315
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
316
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
317
        info.hwnd = None
×
318
        info.lpVerb = 'runas'  # run as administrator
×
319
        info.lpFile = 'reg.exe'  # the executable to run
×
320
        info.lpParameters = reg_exe_command  # the registry edit command
×
321
        info.lpDirectory = None
×
322
        info.nShow = 1
×
323

324
        # Execute and wait
UNCOV
325
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
326
            error = windll.kernel32.GetLastError()
×
327
            raise Exception(f'Error executing registry command: {error}')
×
328

329
        # block until the process is done or 5 sec timeout
UNCOV
330
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
331

332
        # Close handle
UNCOV
333
        windll.kernel32.CloseHandle(info.hProcess)
×
334

335
    @staticmethod
1✔
336
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
1✔
337
        """
338
        Executes the given commands in a subprocess and asserts that it was successful.
339
        """
UNCOV
340
        import subprocess
×
341
        with subprocess.Popen(
×
342
            commands,
343
            stdout=subprocess.PIPE,
344
            stderr=subprocess.PIPE,
345
            text=True,
346
        ) as process:
UNCOV
347
            stdout, stderr = process.communicate()
×
348
            if process.returncode != 0:
×
349
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
350

351
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
1✔
352
        """
353
        Spawns a pkexec subprocess to write the key to a file with root permissions.
354
        This will open an OS dialog asking for the root password. Can only succeed if
355
        the system has polkit installed.
356
        """
UNCOV
357
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
358

UNCOV
359
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
360
        commands = ['pkexec', 'sh', '-c', sh_command]
×
361
        self._execute_commands_in_subprocess(commands)
×
362

363
        # check if the key was written correctly
UNCOV
364
        with open(self.keyfile_posix, 'r') as f:
×
365
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
366
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
367

368
    def _delete_linux_plugin_keyfile(self) -> None:
1✔
369
        """
370
        Deletes the root owned key file at self.keyfile_posix.
371
        """
UNCOV
372
        if not os.path.exists(self.keyfile_posix):
×
373
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
374
            return
×
375
        if not self._has_root_permissions(self.keyfile_posix):
×
376
            os.unlink(self.keyfile_posix)
×
377
            return
×
378

379
        # use pkexec to delete the file as root user
UNCOV
380
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
381
        self._execute_commands_in_subprocess(commands)
×
382
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
383

384
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
1✔
UNCOV
385
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
386

UNCOV
387
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
388
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
389

UNCOV
390
        self._execute_commands_in_subprocess(macos_commands)
×
391
        with open(self.keyfile_posix, 'r') as f:
×
392
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
393
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
394

395
    def _delete_macos_plugin_keyfile(self) -> None:
1✔
UNCOV
396
        if not os.path.exists(self.keyfile_posix):
×
397
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
398
            return
×
399
        if not self._has_root_permissions(self.keyfile_posix):
×
400
            os.unlink(self.keyfile_posix)
×
401
            return
×
402
        # use osascript to delete the file as root user
UNCOV
403
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
404
        self._execute_commands_in_subprocess(macos_commands)
×
405
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
406

407
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
1✔
408
        """
409
        Writes the key to the Windows registry with windows UAC prompt.
410
        """
UNCOV
411
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
412

UNCOV
413
        value_type = 'REG_SZ'
×
414
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
415

UNCOV
416
        self._run_win_regedit_as_admin(command)
×
417

418
        # check if the key was written correctly
UNCOV
419
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
420
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
421
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
422
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
423

424
    def _delete_plugin_key_from_windows_registry(self) -> None:
1✔
425
        """
426
        Deletes the PluginsKey dir in the Windows registry.
427
        """
UNCOV
428
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
429

UNCOV
430
        command = f'delete "{self.keyfile_windows}" /f'
×
431
        self._run_win_regedit_as_admin(command)
×
432

UNCOV
433
        try:
×
434
            # do a sanity check to see if the key has been deleted
UNCOV
435
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
436
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
437
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
438
        except FileNotFoundError:
×
439
            pass
×
440

441
    def create_new_key(self, password:str) -> str:
1✔
UNCOV
442
        salt = os.urandom(32)
×
443
        privkey = self.derive_privkey(password, salt)
×
444
        pubkey = privkey.get_public_key_bytes()
×
445
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
446
        return key.hex()
×
447

448
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], Optional[bytes]]:
1✔
449
        """
450
        returns pubkey, salt
451
        returns None, None if the pubkey has not been set
452
        """
UNCOV
453
        if sys.platform in ['windows', 'win32']:
×
454
            import winreg
×
455
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
456
                try:
×
457
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
458
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
459
                except Exception as e:
×
460
                    self.logger.info(f'winreg error: {e}')
×
461
                    return None, None
×
462
        elif 'ANDROID_DATA' in os.environ:
×
463
            return None, None
×
464
        else:
465
            # treat unknown platforms as linux-like
UNCOV
466
            if not os.path.exists(self.keyfile_posix):
×
467
                return None, None
×
468
            if not self._has_root_permissions(self.keyfile_posix):
×
469
                return None, None
×
470
            with open(self.keyfile_posix) as f:
×
471
                key_hex = f.read()
×
472
        try:
×
473
            key = bytes.fromhex(key_hex)
×
474
            version = key[0]
×
475
        except Exception:
×
476
            self.logger.exception(f'{key_hex=} invalid')
×
477
            return None, None
×
478
        if version != PLUGIN_PASSWORD_VERSION:
×
479
            self.logger.info(f'unknown plugin password version: {version}')
×
480
            return None, None
×
481
        # all good
UNCOV
482
        salt = key[1:1+32]
×
483
        pubkey = key[1+32:]
×
484
        return pubkey, salt
×
485

486
    def get_external_plugin_dir(self) -> str:
1✔
487
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
1✔
488
        make_dir(pkg_path)
1✔
489
        return pkg_path
1✔
490

491
    async def download_external_plugin(self, url: str) -> str:
1✔
UNCOV
492
        filename = os.path.basename(urlparse(url).path)
×
493
        pkg_path = self.get_external_plugin_dir()
×
494
        path = os.path.join(pkg_path, filename)
×
495
        if os.path.exists(path):
×
496
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
497
        network = Network.get_instance()
×
498
        proxy = network.proxy if network else None
×
499
        async with make_aiohttp_session(proxy=proxy) as session:
×
500
            async with session.get(url) as resp:
×
501
                if resp.status == 200:
×
502
                    with open(path, 'wb') as fd:
×
503
                        async for chunk in resp.content.iter_chunked(10):
×
504
                            fd.write(chunk)
×
505
        return path
×
506

507
    def read_manifest(self, path) -> dict:
1✔
508
        """ return json dict """
UNCOV
509
        with zipfile_lib.ZipFile(path) as file:
×
510
            for filename in file.namelist():
×
511
                if filename.endswith('manifest.json'):
×
512
                    break
×
513
            else:
UNCOV
514
                raise Exception('could not find manifest.json in zip archive')
×
515
            with file.open(filename, 'r') as f:
×
516
                manifest = json.load(f)
×
517
                manifest['path'] = path  # external, path of the zipfile
×
518
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
519
                manifest['is_zip'] = True
×
520
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
521
                return manifest
×
522

523
    def zip_plugin_path(self, name) -> str:
1✔
UNCOV
524
        path = self.get_metadata(name)['path']
×
525
        filename = os.path.basename(path)
×
526
        if name in self.internal_plugin_metadata:
×
527
            pkg_path = self.pkgpath
×
528
        else:
UNCOV
529
            pkg_path = self.get_external_plugin_dir()
×
530
        return os.path.join(pkg_path, filename)
×
531

532
    def find_zip_plugins(self, pkg_path: str, external: bool):
1✔
533
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
534
        if pkg_path is None:
1✔
UNCOV
535
            return
×
536
        for filename in os.listdir(pkg_path):
1✔
UNCOV
537
            path = os.path.join(pkg_path, filename)
×
538
            if not filename.endswith('.zip'):
×
539
                continue
×
540
            try:
×
541
                d = self.read_manifest(path)
×
542
                name = d['name']
×
543
            except Exception:
×
544
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
545
                continue
×
546
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
547
                self.logger.info(f"duplicate plugins for {name=}")
×
548
                continue
×
549
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
550
                continue
×
551
            min_version = d.get('min_electrum_version')
×
552
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
553
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
554
                continue
×
555
            max_version = d.get('max_electrum_version')
×
556
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
557
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
558
                continue
×
559

UNCOV
560
            if not self.cmd_only:
×
561
                gui_good = self.gui_name in d.get('available_for', [])
×
562
                if not gui_good:
×
563
                    continue
×
564
                if 'fullname' not in d:
×
565
                    continue
×
566
                details = d.get('registers_keystore')
×
567
                if details:
×
568
                    self.register_keystore(name, details)
×
569
            if external:
×
570
                self.external_plugin_metadata[name] = d
×
571
            else:
UNCOV
572
                self.internal_plugin_metadata[name] = d
×
573

574
    def get(self, name):
1✔
UNCOV
575
        return self.plugins.get(name)
×
576

577
    def count(self):
1✔
UNCOV
578
        return len(self.plugins)
×
579

580
    def load_plugin(self, name) -> 'BasePlugin':
1✔
581
        """Imports the code of the given plugin.
582
        note: can be called from any thread.
583
        """
584
        if self.get_metadata(name):
1✔
585
            return self.load_plugin_by_name(name)
1✔
586
        else:
UNCOV
587
            raise Exception(f"could not find plugin {name!r}")
×
588

589
    def maybe_load_plugin_init_method(self, name: str) -> None:
1✔
590
        """Loads the __init__.py module of the plugin if it is not already loaded."""
591
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
1✔
592
        if base_name not in sys.modules:
1✔
593
            metadata = self.get_metadata(name)
1✔
594
            is_zip = metadata.get('is_zip', False)
1✔
595
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
596
            if not is_zip:
1✔
597
                if self.is_external(name):
1✔
598
                    # this branch is deprecated: external plugins are always zip files
UNCOV
599
                    path = os.path.join(metadata['path'], '__init__.py')
×
600
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
601
                else:
602
                    init_spec = importlib.util.find_spec(base_name)
1✔
603
            else:
UNCOV
604
                zipfile = zipimport.zipimporter(metadata['path'])
×
605
                dirname = metadata['dirname']
×
606
                init_spec = zipfile.find_spec(dirname)
×
607

608
            self.exec_module_from_spec(init_spec, base_name)
1✔
609

610
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
1✔
611
        if name in self.plugins:
1✔
612
            return self.plugins[name]
1✔
613
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
614
        self.maybe_load_plugin_init_method(name)
1✔
615
        is_external = self.is_external(name)
1✔
616
        if is_external and not self.is_authorized(name):
1✔
UNCOV
617
            self.logger.info(f'plugin not authorized {name}')
×
618
            return
×
619
        if not is_external:
1✔
620
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
1✔
621
        else:
UNCOV
622
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
623

624
        spec = importlib.util.find_spec(full_name)
1✔
625
        if spec is None:
1✔
UNCOV
626
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
627
        try:
1✔
628
            module = self.exec_module_from_spec(spec, full_name)
1✔
629
            plugin = module.Plugin(self, self.config, name)
1✔
UNCOV
630
        except Exception as e:
×
631
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
632
        self.add_jobs(plugin.thread_jobs())
1✔
633
        self.plugins[name] = plugin
1✔
634
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
1✔
635
        return plugin
1✔
636

637
    def close_plugin(self, plugin):
1✔
UNCOV
638
        self.remove_jobs(plugin.thread_jobs())
×
639

640
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
1✔
UNCOV
641
        from hashlib import pbkdf2_hmac
×
642
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
643
        return ECPrivkey(secret)
×
644

645
    def uninstall(self, name: str):
1✔
UNCOV
646
        if self.config.get(f'plugins.{name}'):
×
647
            self.config.set_key(f'plugins.{name}', None)
×
648
        if name in self.external_plugin_metadata:
×
649
            zipfile = self.zip_plugin_path(name)
×
650
            os.unlink(zipfile)
×
651
            self.external_plugin_metadata.pop(name)
×
652

653
    def is_internal(self, name) -> bool:
1✔
UNCOV
654
        return name in self.internal_plugin_metadata
×
655

656
    def is_external(self, name) -> bool:
1✔
657
        return name in self.external_plugin_metadata
1✔
658

659
    def is_auto_loaded(self, name):
1✔
UNCOV
660
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
661
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
662

663
    def is_installed(self, name) -> bool:
1✔
664
        """an external plugin may be installed but not authorized """
UNCOV
665
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
666

667
    def is_authorized(self, name) -> bool:
1✔
UNCOV
668
        if name in self.internal_plugin_metadata:
×
669
            return True
×
670
        if name not in self.external_plugin_metadata:
×
671
            return False
×
672
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
673
        if not pubkey_bytes:
×
674
            return False
×
675
        if not self.is_plugin_zip(name):
×
676
            return False
×
677
        filename = self.zip_plugin_path(name)
×
678
        plugin_hash = get_file_hash256(filename)
×
679
        sig = self.config.get(f'plugins.{name}.authorized')
×
680
        if not sig:
×
681
            return False
×
682
        pubkey = ECPubkey(pubkey_bytes)
×
683
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
684

685
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
1✔
UNCOV
686
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
687
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
688
        plugin_hash = get_file_hash256(filename)
×
689
        sig = privkey.ecdsa_sign(plugin_hash)
×
690
        value = sig.hex()
×
691
        self.config.set_key(f'plugins.{name}.authorized', value)
×
692
        self.config.set_key(f'plugins.{name}.enabled', True)
×
693

694
    def enable(self, name: str) -> 'BasePlugin':
1✔
UNCOV
695
        self.config.enable_plugin(name)
×
696
        p = self.get(name)
×
697
        if p:
×
698
            return p
×
699
        return self.load_plugin(name)
×
700

701
    def disable(self, name: str) -> None:
1✔
UNCOV
702
        self.config.disable_plugin(name)
×
703
        p = self.get(name)
×
704
        if not p:
×
705
            return
×
706
        self.plugins.pop(name)
×
707
        p.close()
×
708
        self.logger.info(f"closed {name}")
×
709

710
    @classmethod
1✔
711
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
1✔
UNCOV
712
        return key.startswith('plugins.')
×
713

714
    def is_available(self, name: str) -> bool:
1✔
UNCOV
715
        d = self.descriptions.get(name)
×
716
        if not d:
×
717
            return False
×
718
        deps = d.get('requires', [])
×
719
        for dep, s in deps:
×
720
            try:
×
721
                __import__(dep)
×
722
            except ImportError as e:
×
723
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
724
                return False
×
725
        return True
×
726

727
    def get_hardware_support(self):
1✔
UNCOV
728
        out = []
×
729
        for name, details in self._hw_wallets.items():
×
730
            try:
×
731
                p = self.get_plugin(name)
×
732
                if p.is_available():
×
733
                    out.append(HardwarePluginToScan(
×
734
                        name=name,
735
                        description=details[2],
736
                        plugin=p,
737
                        exception=None))
UNCOV
738
            except Exception as e:
×
739
                self.logger.exception(f"cannot load plugin for: {name}")
×
740
                out.append(HardwarePluginToScan(
×
741
                    name=name,
742
                    description=details[2],
743
                    plugin=None,
744
                    exception=e))
UNCOV
745
        return out
×
746

747
    def register_wallet_type(self, name, gui_good, wallet_type):
1✔
748
        from .wallet import register_wallet_type, register_constructor
1✔
749
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
1✔
750

751
        def loader():
1✔
752
            plugin = self.get_plugin(name)
1✔
753
            register_constructor(wallet_type, plugin.wallet_class)
1✔
754
        register_wallet_type(wallet_type)
1✔
755
        plugin_loaders[wallet_type] = loader
1✔
756

757
    def register_keystore(self, name, details):
1✔
758
        from .keystore import register_keystore
1✔
759

760
        def dynamic_constructor(d):
1✔
761
            return self.get_plugin(name).keystore_class(d)
1✔
762
        if details[0] == 'hardware':
1✔
763
            self._hw_wallets[name] = details
1✔
764
            self.logger.info(f"registering hardware {name}: {details}")
1✔
765
            register_keystore(details[1], dynamic_constructor)
1✔
766

767
    def get_plugin(self, name: str) -> 'BasePlugin':
1✔
768
        if name not in self.plugins:
1✔
769
            self.load_plugin(name)
1✔
770
        return self.plugins[name]
1✔
771

772
    def is_plugin_zip(self, name: str) -> bool:
1✔
773
        """Returns True if the plugin is a zip file"""
UNCOV
774
        if (metadata := self.get_metadata(name)) is None:
×
775
            return False
×
776
        return metadata.get('is_zip', False)
×
777

778
    def get_metadata(self, name: str) -> Optional[dict]:
1✔
779
        """Returns the metadata of the plugin"""
780
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
1✔
781
        if not metadata:
1✔
UNCOV
782
            return None
×
783
        return metadata
1✔
784

785
    def run(self):
1✔
786
        while self.is_running():
1✔
787
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
1✔
788
            self.run_jobs()
1✔
789
        self.on_stop()
1✔
790

791
    def read_file(self, name: str, filename: str) -> bytes:
1✔
UNCOV
792
        if self.is_plugin_zip(name):
×
793
            plugin_filename = self.zip_plugin_path(name)
×
794
            metadata = self.external_plugin_metadata[name]
×
795
            dirname = metadata['dirname']
×
796
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
797
                with myzip.open("/".join([dirname,filename])) as myfile:
×
798
                    return myfile.read()
×
799
        elif name in self.internal_plugin_metadata:
×
800
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
801
            with open(path, 'rb') as myfile:
×
802
                return myfile.read()
×
803
        else:
804
            # no icon
UNCOV
805
            return None
×
806

807

808
def get_file_hash256(path: str) -> bytes:
1✔
809
    """Get the sha256 hash of a file, similar to `sha256sum`."""
UNCOV
810
    with open(path, 'rb') as f:
×
811
        return sha256(f.read())
×
812

813

814
def hook(func):
1✔
815
    hook_names.add(func.__name__)
1✔
816
    return func
1✔
817

818

819
def run_hook(name, *args):
1✔
820
    results = []
1✔
821
    f_list = hooks.get(name, [])
1✔
822
    for p, f in f_list:
1✔
823
        if p.is_enabled():
1✔
824
            try:
1✔
825
                r = f(*args)
1✔
UNCOV
826
            except Exception:
×
827
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
828
                r = False
×
829
            if r:
1✔
UNCOV
830
                results.append(r)
×
831

832
    if results:
1✔
UNCOV
833
        assert len(results) == 1, results
×
834
        return results[0]
×
835

836

837
class BasePlugin(Logger):
1✔
838

839
    def __init__(self, parent, config: 'SimpleConfig', name):
1✔
840
        self.parent = parent  # type: Plugins  # The plugins object
1✔
841
        self.name = name
1✔
842
        self.config = config
1✔
843
        Logger.__init__(self)
1✔
844
        # add self to hooks
845
        for k in dir(self):
1✔
846
            if k in hook_names:
1✔
847
                l = hooks.get(k, [])
1✔
848
                l.append((self, getattr(self, k)))
1✔
849
                hooks[k] = l
1✔
850

851
    def __str__(self):
1✔
UNCOV
852
        return self.name
×
853

854
    def close(self):
1✔
855
        # remove self from hooks
UNCOV
856
        for attr_name in dir(self):
×
857
            if attr_name in hook_names:
×
858
                # found attribute in self that is also the name of a hook
UNCOV
859
                l = hooks.get(attr_name, [])
×
860
                try:
×
861
                    l.remove((self, getattr(self, attr_name)))
×
862
                except ValueError:
×
863
                    # maybe attr name just collided with hook name and was not hook
UNCOV
864
                    continue
×
865
                hooks[attr_name] = l
×
866
        self.parent.close_plugin(self)
×
867
        self.on_close()
×
868

869
    def on_close(self):
1✔
UNCOV
870
        pass
×
871

872
    def requires_settings(self) -> bool:
1✔
UNCOV
873
        return False
×
874

875
    def thread_jobs(self):
1✔
876
        return []
1✔
877

878
    def is_enabled(self):
1✔
UNCOV
879
        if not self.is_available():
×
880
            return False
×
881
        if not self.parent.is_authorized(self.name):
×
882
            return False
×
883
        return self.config.is_plugin_enabled(self.name)
×
884

885
    def is_available(self):
1✔
UNCOV
886
        return True
×
887

888
    def can_user_disable(self):
1✔
UNCOV
889
        return True
×
890

891
    def settings_widget(self, window):
1✔
UNCOV
892
        raise NotImplementedError()
×
893

894
    def settings_dialog(self, window):
1✔
UNCOV
895
        raise NotImplementedError()
×
896

897
    def read_file(self, filename: str) -> bytes:
1✔
UNCOV
898
        return self.parent.read_file(self.name, filename)
×
899

900
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
1✔
901
        """Returns a dict which is persisted in the per-wallet database."""
UNCOV
902
        plugin_storage = wallet.db.get_plugin_storage()
×
903
        return plugin_storage.setdefault(self.name, {})
×
904

905

906
class DeviceUnpairableError(UserFacingException): pass
1✔
907
class HardwarePluginLibraryUnavailable(Exception): pass
1✔
908
class CannotAutoSelectDevice(Exception): pass
1✔
909

910

911
class Device(NamedTuple):
1✔
912
    path: Union[str, bytes]
1✔
913
    interface_number: int
1✔
914
    id_: str
1✔
915
    product_key: Any   # when using hid, often Tuple[int, int]
1✔
916
    usage_page: int
1✔
917
    transport_ui_string: str
1✔
918

919

920
class DeviceInfo(NamedTuple):
1✔
921
    device: Device
1✔
922
    label: Optional[str] = None
1✔
923
    initialized: Optional[bool] = None
1✔
924
    exception: Optional[Exception] = None
1✔
925
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
1✔
926
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
1✔
927
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
1✔
928

929
    def label_for_device_select(self) -> str:
1✔
UNCOV
930
        return (
×
931
            "{label} ({maybe_model}{init}, {transport})"
932
            .format(
933
                label=self.label or _("An unnamed {}").format(self.plugin_name),
934
                init=(_("initialized") if self.initialized else _("wiped")),
935
                transport=self.device.transport_ui_string,
936
                maybe_model=f"{self.model_name}, " if self.model_name else ""
937
            )
938
        )
939

940

941
class HardwarePluginToScan(NamedTuple):
1✔
942
    name: str
1✔
943
    description: str
1✔
944
    plugin: Optional['HW_PluginBase']
1✔
945
    exception: Optional[Exception]
1✔
946

947

948
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
1✔
949

950

951
# hidapi is not thread-safe
952
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
953
#     https://github.com/libusb/hidapi/issues/45
954
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
955
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
956
# It is not entirely clear to me, exactly what is safe and what isn't, when
957
# using multiple threads...
958
# Hence, we use a single thread for all device communications, including
959
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
960
# the following thread:
961
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
1✔
962
    max_workers=1,
963
    thread_name_prefix='hwd_comms_thread'
964
)
965

966
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
967
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
968
# To keep it simple, let's just import it now, as we are likely in the main thread here.
969
if threading.current_thread() is not threading.main_thread():
1✔
UNCOV
970
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
971
try:
1✔
972
    import hid
1✔
973
except ImportError:
1✔
974
    pass
1✔
975

976

977
T = TypeVar('T')
1✔
978

979

980
def run_in_hwd_thread(func: Callable[[], T]) -> T:
1✔
UNCOV
981
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
982
        return func()
×
983
    else:
UNCOV
984
        fut = _hwd_comms_executor.submit(func)
×
985
        return fut.result()
×
986
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
987

988

989
def runs_in_hwd_thread(func):
1✔
990
    @wraps(func)
1✔
991
    def wrapper(*args, **kwargs):
1✔
UNCOV
992
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
993
    return wrapper
1✔
994

995

996
def assert_runs_in_hwd_thread():
1✔
UNCOV
997
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
998
        raise Exception("must only be called from HWD communication thread")
×
999

1000

1001
class DeviceMgr(ThreadJob):
1✔
1002
    """Manages hardware clients.  A client communicates over a hardware
1003
    channel with the device.
1004

1005
    In addition to tracking device HID IDs, the device manager tracks
1006
    hardware wallets and manages wallet pairing.  A HID ID may be
1007
    paired with a wallet when it is confirmed that the hardware device
1008
    matches the wallet, i.e. they have the same master public key.  A
1009
    HID ID can be unpaired if e.g. it is wiped.
1010

1011
    Because of hotplugging, a wallet must request its client
1012
    dynamically each time it is required, rather than caching it
1013
    itself.
1014

1015
    The device manager is shared across plugins, so just one place
1016
    does hardware scans when needed.  By tracking HID IDs, if a device
1017
    is plugged into a different port the wallet is automatically
1018
    re-paired.
1019

1020
    Wallets are informed on connect / disconnect events.  It must
1021
    implement connected(), disconnected() callbacks.  Being connected
1022
    implies a pairing.  Callbacks can happen in any thread context,
1023
    and we do them without holding the lock.
1024

1025
    Confusingly, the HID ID (serial number) reported by the HID system
1026
    doesn't match the device ID reported by the device itself.  We use
1027
    the HID IDs.
1028

1029
    This plugin is thread-safe.  Currently only devices supported by
1030
    hidapi are implemented."""
1031

1032
    def __init__(self, config: SimpleConfig):
1✔
1033
        ThreadJob.__init__(self)
1✔
1034
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1035
        self.pairing_code_to_id = {}  # type: Dict[str, str]
1✔
1036
        # A client->id_ map. Needs self.lock.
1037
        self.clients = {}  # type: Dict[HardwareClientBase, str]
1✔
1038
        # What we recognise.  (vendor_id, product_id) -> Plugin
1039
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
1✔
1040
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
1✔
1041
        # Custom enumerate functions for devices we don't know about.
1042
        self._enumerate_func = set()  # Needs self.lock.
1✔
1043

1044
        self.lock = threading.RLock()
1✔
1045

1046
        self.config = config
1✔
1047

1048
    def thread_jobs(self):
1✔
1049
        # Thread job to handle device timeouts
1050
        return [self]
1✔
1051

1052
    def run(self):
1✔
1053
        """Handle device timeouts.  Runs in the context of the Plugins
1054
        thread."""
1055
        with self.lock:
1✔
1056
            clients = list(self.clients.keys())
1✔
1057
        cutoff = time.time() - self.config.get_session_timeout()
1✔
1058
        for client in clients:
1✔
UNCOV
1059
            client.timeout(cutoff)
×
1060

1061
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
1✔
1062
        for pair in device_pairs:
×
1063
            self._recognised_hardware[pair] = plugin
×
1064

1065
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
1✔
1066
        for vendor_id in vendor_ids:
×
1067
            self._recognised_vendor[vendor_id] = plugin
×
1068

1069
    def register_enumerate_func(self, func):
1✔
1070
        with self.lock:
×
1071
            self._enumerate_func.add(func)
×
1072

1073
    @runs_in_hwd_thread
1✔
1074
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
1✔
1075
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1076
        # Get from cache first
UNCOV
1077
        client = self._client_by_id(device.id_)
×
1078
        if client:
×
1079
            return client
×
UNCOV
1080
        client = plugin.create_client(device, handler)
×
UNCOV
1081
        if client:
×
UNCOV
1082
            self.logger.info(f"Registering {client}")
×
UNCOV
1083
            with self.lock:
×
UNCOV
1084
                self.clients[client] = device.id_
×
1085
        return client
×
1086

1087
    def id_by_pairing_code(self, pairing_code):
1✔
1088
        with self.lock:
×
1089
            return self.pairing_code_to_id.get(pairing_code)
×
1090

1091
    def pairing_code_by_id(self, id_):
1✔
1092
        with self.lock:
×
1093
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
1094
                if id2 == id_:
×
UNCOV
1095
                    return pairing_code
×
1096
            return None
×
1097

1098
    def unpair_pairing_code(self, pairing_code):
1✔
UNCOV
1099
        with self.lock:
×
1100
            if pairing_code not in self.pairing_code_to_id:
×
1101
                return
×
1102
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1103
        self._close_client(_id)
×
1104

1105
    def unpair_id(self, id_):
1✔
UNCOV
1106
        pairing_code = self.pairing_code_by_id(id_)
×
1107
        if pairing_code:
×
1108
            self.unpair_pairing_code(pairing_code)
×
1109
        else:
1110
            self._close_client(id_)
×
1111

1112
    def _close_client(self, id_):
1✔
UNCOV
1113
        with self.lock:
×
1114
            client = self._client_by_id(id_)
×
1115
            self.clients.pop(client, None)
×
1116
        if client:
×
UNCOV
1117
            client.close()
×
1118

1119
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
1✔
UNCOV
1120
        with self.lock:
×
1121
            for client, client_id in self.clients.items():
×
1122
                if client_id == id_:
×
1123
                    return client
×
1124
        return None
×
1125

1126
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
1✔
1127
        """Returns a client for the device ID if one is registered.  If
1128
        a device is wiped or in bootloader mode pairing is impossible;
1129
        in such cases we communicate by device ID and not wallet."""
1130
        if scan_now:
×
1131
            self.scan_devices()
×
1132
        return self._client_by_id(id_)
×
1133

1134
    @runs_in_hwd_thread
1✔
1135
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
1✔
1136
                            keystore: 'Hardware_KeyStore',
1137
                            force_pair: bool, *,
1138
                            devices: Sequence['Device'] = None,
1139
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1140
        self.logger.info("getting client for keystore")
×
1141
        if handler is None:
×
1142
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
1143
        handler.update_status(False)
×
UNCOV
1144
        pcode = keystore.pairing_code()
×
UNCOV
1145
        client = None
×
1146
        # search existing clients first (fast-path)
UNCOV
1147
        if not devices:
×
UNCOV
1148
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1149
        # search clients again, now allowing a (slow) scan
1150
        if client is None:
×
1151
            if devices is None:
×
1152
                devices = self.scan_devices()
×
1153
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1154
        if client is None and force_pair:
×
1155
            try:
×
UNCOV
1156
                info = self.select_device(plugin, handler, keystore, devices,
×
1157
                                          allow_user_interaction=allow_user_interaction)
1158
            except CannotAutoSelectDevice:
×
UNCOV
1159
                pass
×
1160
            else:
1161
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1162
        if client:
×
1163
            handler.update_status(True)
×
1164
            # note: if select_device was called, we might also update label etc here:
1165
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1166
        self.logger.info("end client for keystore")
×
UNCOV
1167
        return client
×
1168

1169
    def client_by_pairing_code(
1✔
1170
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1171
        devices: Sequence['Device'],
1172
    ) -> Optional['HardwareClientBase']:
1173
        _id = self.id_by_pairing_code(pairing_code)
×
UNCOV
1174
        client = self._client_by_id(_id)
×
1175
        if client:
×
1176
            if type(client.plugin) != type(plugin):
×
1177
                return
×
1178
            # An unpaired client might have another wallet's handler
1179
            # from a prior scan.  Replace to fix dialog parenting.
UNCOV
1180
            client.handler = handler
×
UNCOV
1181
            return client
×
1182

1183
        for device in devices:
×
1184
            if device.id_ == _id:
×
1185
                return self.create_client(device, handler, plugin)
×
1186

1187
    def force_pair_keystore(
1✔
1188
        self,
1189
        *,
1190
        plugin: 'HW_PluginBase',
1191
        handler: 'HardwareHandlerBase',
1192
        info: 'DeviceInfo',
1193
        keystore: 'Hardware_KeyStore',
1194
    ) -> 'HardwareClientBase':
1195
        xpub = keystore.xpub
×
UNCOV
1196
        derivation = keystore.get_derivation_prefix()
×
UNCOV
1197
        assert derivation is not None
×
UNCOV
1198
        xtype = bip32.xpub_type(xpub)
×
UNCOV
1199
        client = self._client_by_id(info.device.id_)
×
UNCOV
1200
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1201
            # See comment above for same code
UNCOV
1202
            client.handler = handler
×
1203
            # This will trigger a PIN/passphrase entry request
UNCOV
1204
            try:
×
1205
                client_xpub = client.get_xpub(derivation, xtype)
×
1206
            except (UserCancelled, RuntimeError):
×
1207
                # Bad / cancelled PIN / passphrase
1208
                client_xpub = None
×
1209
            if client_xpub == xpub:
×
1210
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
1211
                with self.lock:
×
1212
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
UNCOV
1213
                return client
×
1214

1215
        # The user input has wrong PIN or passphrase, or cancelled input,
1216
        # or it is not pairable
UNCOV
1217
        raise DeviceUnpairableError(
×
1218
            _('Electrum cannot pair with your {}.\n\n'
1219
              'Before you request bitcoins to be sent to addresses in this '
1220
              'wallet, ensure you can pair with your device, or that you have '
1221
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1222
              'receive will be unspendable.').format(plugin.device))
1223

1224
    def list_pairable_device_infos(
1✔
1225
        self,
1226
        *,
1227
        handler: Optional['HardwareHandlerBase'],
1228
        plugin: 'HW_PluginBase',
1229
        devices: Sequence['Device'] = None,
1230
        include_failing_clients: bool = False,
1231
    ) -> List['DeviceInfo']:
1232
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1233
        Already paired devices are also included, as it is okay to reuse them.
1234
        """
UNCOV
1235
        if not plugin.libraries_available:
×
UNCOV
1236
            message = plugin.get_library_not_available_message()
×
UNCOV
1237
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
1238
        if devices is None:
×
UNCOV
1239
            devices = self.scan_devices()
×
UNCOV
1240
        infos = []
×
UNCOV
1241
        for device in devices:
×
UNCOV
1242
            if not plugin.can_recognize_device(device):
×
UNCOV
1243
                continue
×
UNCOV
1244
            try:
×
1245
                client = self.create_client(device, handler, plugin)
×
1246
                if not client:
×
1247
                    continue
×
1248
                label = client.label()
×
1249
                is_initialized = client.is_initialized()
×
1250
                soft_device_id = client.get_soft_device_id()
×
1251
                model_name = client.device_model_name()
×
1252
            except Exception as e:
×
1253
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1254
                if include_failing_clients:
×
1255
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1256
                continue
×
1257
            infos.append(DeviceInfo(device=device,
×
1258
                                    label=label,
1259
                                    initialized=is_initialized,
1260
                                    plugin_name=plugin.name,
1261
                                    soft_device_id=soft_device_id,
1262
                                    model_name=model_name))
1263

1264
        return infos
×
1265

1266
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
1✔
1267
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1268
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1269
        """Select the device to use for keystore."""
1270
        # ideally this should not be called from the GUI thread...
1271
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
1272
        while True:
×
UNCOV
1273
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1274
            if infos:
×
UNCOV
1275
                break
×
UNCOV
1276
            if not allow_user_interaction:
×
UNCOV
1277
                raise CannotAutoSelectDevice()
×
UNCOV
1278
            msg = _('Please insert your {}').format(plugin.device)
×
UNCOV
1279
            msg += " ("
×
UNCOV
1280
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
UNCOV
1281
                msg += f"label: {keystore.label}, "
×
1282
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1283
            msg += ').\n\n{}\n\n{}'.format(
×
1284
                _('Verify the cable is connected and that '
1285
                  'no other application is using it.'),
1286
                _('Try to connect again?')
1287
            )
1288
            if not handler.yes_no_question(msg):
×
1289
                raise UserCancelled()
×
1290
            devices = None
×
1291

1292
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1293
        # method 1: select device by id
UNCOV
1294
        if keystore.soft_device_id:
×
UNCOV
1295
            for info in infos:
×
UNCOV
1296
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
1297
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1298
                    return info
×
1299
        # method 2: select device by label
1300
        #           but only if not a placeholder label and only if there is no collision
UNCOV
1301
        device_labels = [info.label for info in infos]
×
UNCOV
1302
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1303
                and device_labels.count(keystore.label) == 1):
1304
            for info in infos:
×
1305
                if info.label == keystore.label:
×
1306
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1307
                    return info
×
1308
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1309
        #           saved for keystore anyway, select it
UNCOV
1310
        if (len(infos) == 1
×
1311
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1312
                and keystore.soft_device_id is None):
UNCOV
1313
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1314
            return infos[0]
×
1315

1316
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1317
        if not allow_user_interaction:
×
UNCOV
1318
            raise CannotAutoSelectDevice()
×
1319
        # ask user to select device manually
1320
        msg = (
×
1321
                _("Could not automatically pair with device for given keystore.") + "\n"
1322
                + f"(keystore label: {keystore.label!r}, "
1323
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1324
        msg += _("Please select which {} device to use:").format(plugin.device)
×
UNCOV
1325
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1326
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1327
                   for (idx, info) in enumerate(infos)]
1328
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1329
                          f"num options: {len(infos)}. options: {infos}")
1330
        c = handler.query_choice(msg, choices)
×
UNCOV
1331
        if c is None:
×
UNCOV
1332
            raise UserCancelled()
×
UNCOV
1333
        info = infos[c]
×
1334
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1335
        # note: updated label/soft_device_id will be saved after pairing succeeds
1336
        return info
×
1337

1338
    @runs_in_hwd_thread
1✔
1339
    def _scan_devices_with_hid(self) -> List['Device']:
1✔
1340
        try:
×
1341
            import hid  # noqa: F811
×
1342
        except ImportError:
×
1343
            return []
×
1344

UNCOV
1345
        devices = []
×
1346
        for d in hid.enumerate(0, 0):
×
UNCOV
1347
            vendor_id = d['vendor_id']
×
UNCOV
1348
            product_key = (vendor_id, d['product_id'])
×
UNCOV
1349
            plugin = None
×
1350
            if product_key in self._recognised_hardware:
×
1351
                plugin = self._recognised_hardware[product_key]
×
1352
            elif vendor_id in self._recognised_vendor:
×
1353
                plugin = self._recognised_vendor[vendor_id]
×
UNCOV
1354
            if plugin:
×
1355
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1356
                if device:
×
1357
                    devices.append(device)
×
1358
        return devices
×
1359

1360
    @runs_in_hwd_thread
1✔
1361
    @profiler
1✔
1362
    def scan_devices(self) -> Sequence['Device']:
1✔
1363
        self.logger.info("scanning devices...")
×
1364

1365
        # First see what's connected that we know about
1366
        devices = self._scan_devices_with_hid()
×
1367

1368
        # Let plugin handlers enumerate devices we don't know about
UNCOV
1369
        with self.lock:
×
UNCOV
1370
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
1371
        for f in enumerate_funcs:
×
UNCOV
1372
            try:
×
1373
                new_devices = f()
×
UNCOV
1374
            except BaseException as e:
×
UNCOV
1375
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1376
            else:
UNCOV
1377
                devices.extend(new_devices)
×
1378

1379
        # find out what was disconnected
1380
        client_ids = [dev.id_ for dev in devices]
×
1381
        disconnected_clients = []
×
1382
        with self.lock:
×
1383
            connected = {}
×
1384
            for client, id_ in self.clients.items():
×
1385
                if id_ in client_ids and client.has_usable_connection_with_device():
×
UNCOV
1386
                    connected[client] = id_
×
1387
                else:
UNCOV
1388
                    disconnected_clients.append((client, id_))
×
UNCOV
1389
            self.clients = connected
×
1390

1391
        # Unpair disconnected devices
1392
        for client, id_ in disconnected_clients:
×
1393
            self.unpair_id(id_)
×
1394
            if client.handler:
×
1395
                client.handler.update_status(False)
×
1396

UNCOV
1397
        return devices
×
1398

1399
    @classmethod
1✔
1400
    def version_info(cls) -> Mapping[str, Optional[str]]:
1✔
UNCOV
1401
        ret = {}
×
1402
        # add libusb
1403
        try:
×
1404
            import usb1
×
1405
        except Exception as e:
×
UNCOV
1406
            ret["libusb.version"] = None
×
1407
        else:
UNCOV
1408
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
UNCOV
1409
            try:
×
UNCOV
1410
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1411
            except AttributeError:
×
UNCOV
1412
                ret["libusb.path"] = None
×
1413
        # add hidapi
1414
        try:
×
1415
            import hid  # noqa: F811
×
1416
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
1417
        except Exception as e:
×
1418
            from importlib.metadata import version
×
1419
            try:
×
1420
                ret["hidapi.version"] = version("hidapi")
×
1421
            except ImportError:
×
1422
                ret["hidapi.version"] = None
×
UNCOV
1423
        return ret
×
1424

1425
    def trigger_pairings(
1✔
1426
            self,
1427
            keystores: Sequence['KeyStore'],
1428
            *,
1429
            allow_user_interaction: bool = True,
1430
            devices: Sequence['Device'] = None,
1431
    ) -> None:
1432
        """Given a list of keystores, try to pair each with a connected hardware device.
1433

1434
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1435
        try to pair each keystore individually. Consider the following scenario:
1436
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1437
        - assume none of the devices are paired yet
1438
        1. if we tried to individually pair keystores, we might try with ks1 first
1439
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1440
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1441
             which is confusing and error-prone. It's especially problematic if the hw device does
1442
             not support labels (such as Ledger), as then the user cannot easily distinguish
1443
             same-type devices. (see #4199)
1444
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1445
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1446
        """
UNCOV
1447
        from .keystore import Hardware_KeyStore
×
UNCOV
1448
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1449
        if not keystores:
×
UNCOV
1450
            return
×
UNCOV
1451
        if devices is None:
×
UNCOV
1452
            devices = self.scan_devices()
×
1453
        # first pair with all devices that can be auto-selected
UNCOV
1454
        for ks in keystores:
×
UNCOV
1455
            try:
×
UNCOV
1456
                ks.get_client(
×
1457
                    force_pair=True,
1458
                    allow_user_interaction=False,
1459
                    devices=devices,
1460
                )
1461
            except UserCancelled:
×
1462
                pass
×
UNCOV
1463
        if allow_user_interaction:
×
1464
            # now do manual selections
1465
            for ks in keystores:
×
1466
                try:
×
UNCOV
1467
                    ks.get_client(
×
1468
                        force_pair=True,
1469
                        allow_user_interaction=True,
1470
                        devices=devices,
1471
                    )
1472
                except UserCancelled:
×
1473
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc