• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5835051142742016

28 Apr 2025 02:57PM UTC coverage: 60.168% (-0.1%) from 60.293%
5835051142742016

Pull #9765

CirrusCI

f321x
plugins: add functionality to allow setting plugin pubkey from gui

Adds functionality that allows the user to store the plugin authorization pubkey without having to edit files/registry manually.
On Linux systems it spawns the commands in a subprocess with pkexec which will trigger a OS prompt to execute the commands as root.
The user sees the executed commands and can either authorize with the root password or decline.
On windows it uses the windows `ShellExecuteExW` api to edit the registry, this also triggers a OS dialog to accept or decline (UAC dialog).
There is also functionality to reset the key again, which works in the same way.
Pull Request #9765: plugins: add functionality to allow setting plugin pubkey from gui

8 of 88 new or added lines in 1 file covered. (9.09%)

237 existing lines in 1 file now uncovered.

21622 of 35936 relevant lines covered (60.17%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.84
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
UNCOV
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
UNCOV
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
UNCOV
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
UNCOV
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
UNCOV
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
UNCOV
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
UNCOV
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
UNCOV
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
UNCOV
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
UNCOV
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
UNCOV
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
UNCOV
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
UNCOV
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
UNCOV
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
203
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
NEW
204
        try:
×
NEW
205
            if sys.platform in ['windows', 'win32']:
×
NEW
206
                self._write_key_to_regedit_windows(pubkey_hex)
×
NEW
207
            elif 'ANDROID_DATA' in os.environ:
×
NEW
UNCOV
208
                raise Exception('platform not supported')
×
209
            else:
NEW
210
                self._write_key_to_root_file_linux(pubkey_hex)
×
NEW
211
        except Exception:
×
NEW
212
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
NEW
213
            return False
×
NEW
214
        return True
×
215

216
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
217
        """
218
        Spawns a pkexec subprocess to write the key to a file with root permissions.
219
        This will open an OS dialog asking for the root password. Can only succeed if
220
        the system has polkit installed.
221
        """
NEW
222
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
NEW
223
        import subprocess
×
NEW
224
        dir_path: str = os.path.dirname(self.keyfile_linux)
×
225

226
        # creates the dir (dir_path), writes the key in file, and sets permissions to 644
NEW
227
        sh_command = (f"mkdir -p {dir_path} "  # create the /etc/electrum dir
×
228
                      f"&& tee {self.keyfile_linux} "  # write the key to the file
229
                      f"&& chmod 644 {self.keyfile_linux} "  # set read permissions for the file
230
                      f"&& chmod 755 {dir_path}")  # set read permissions for the dir
NEW
231
        process = subprocess.Popen(
×
232
            ['pkexec', 'sh', '-c', sh_command],
233
            stdin=subprocess.PIPE,
234
            stdout=subprocess.PIPE,
235
            stderr=subprocess.PIPE,
236
            text=True
237
        )
238

239
        # send the key to tee
NEW
240
        stdout, stderr = process.communicate(input=key_hex)
×
241

NEW
242
        if process.returncode != 0:
×
NEW
243
            raise Exception(f'error saving file ({process.returncode}): {stderr}')
×
244

NEW
245
        with open(self.keyfile_linux, 'r') as f:
×
NEW
246
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
NEW
247
        self.logger.debug(f'file saved successfully to {self.keyfile_linux}')
×
248

249
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
250
        """
251
        Writes the key to the Windows registry with windows UAC prompt.
252
        """
NEW
253
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
254

NEW
255
        key_path = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
×
NEW
256
        value_type = 'REG_SZ'
×
NEW
257
        command = f'add "{key_path}" /ve /t {value_type} /d "{key_hex}" /f'
×
258

NEW
259
        self._run_win_regedit_as_admin(command)
×
260

261
        # check if the key was written correctly
NEW
262
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
263
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
NEW
264
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
NEW
265
        self.logger.debug(f'key saved successfully to {key_path}')
×
266

267
    def try_auto_key_reset(self) -> bool:
5✔
NEW
268
        try:
×
NEW
269
            if sys.platform in ['windows', 'win32']:
×
NEW
270
                self._delete_plugin_key_from_windows_registry()
×
NEW
271
            elif 'ANDROID_DATA' in os.environ:
×
NEW
272
                raise Exception('platform not supported')
×
273
            else:
NEW
274
                self._delete_linux_plugin_keyfile()
×
NEW
275
        except Exception:
×
NEW
276
            self.logger.exception(f'auto-reset of plugin key failed')
×
NEW
277
            return False
×
NEW
278
        return True
×
279

280
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
281
        """
282
        Deletes the root owned key file at self.keyfile_linux.
283
        """
NEW
284
        if not os.path.exists(self.keyfile_linux):
×
NEW
285
            self.logger.debug(f'file {self.keyfile_linux} does not exist')
×
NEW
286
            return
×
NEW
287
        if not self._has_root_permissions(self.keyfile_linux):
×
NEW
288
            os.unlink(self.keyfile_linux)
×
NEW
289
            return
×
290

291
        # use pkexec to delete the file as root user
NEW
292
        import subprocess
×
NEW
293
        process = subprocess.Popen(
×
294
                ['pkexec', 'rm', self.keyfile_linux],
295
                stdout=subprocess.PIPE,
296
                stderr=subprocess.PIPE,
297
                text=True
298
        )
NEW
299
        stdout, stderr = process.communicate()
×
NEW
300
        if process.returncode != 0:
×
NEW
301
            raise Exception(f'error removing file ({process.returncode}): {stderr}')
×
NEW
302
        assert not os.path.exists(self.keyfile_linux), f'file {self.keyfile_linux} still exists'
×
303

304
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
305
        """
306
        Deletes the PluginsKey dir in the Windows registry.
307
        """
NEW
308
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
309

NEW
310
        key_path = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
×
NEW
311
        command = f'delete "{key_path}" /f'
×
312

NEW
313
        self._run_win_regedit_as_admin(command)
×
314

NEW
315
        try:
×
316
            # do a sanity check to see if the key has been deleted
NEW
317
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
318
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
NEW
319
                    raise Exception(f'Key {key_path} still exists, deletion failed')
×
NEW
320
        except FileNotFoundError:
×
NEW
321
            pass
×
322

323
    @staticmethod
5✔
324
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
325
        """
326
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
327
        """
328
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
329
        # for the result of the process (returns no process handle)
NEW
330
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
NEW
331
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
332

333
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
NEW
334
        class SHELLEXECUTEINFO(Structure):
×
NEW
335
            _fields_ = [
×
336
                ('cbSize', DWORD),
337
                ('fMask', c_ulong),
338
                ('hwnd', HWND),
339
                ('lpVerb', LPCWSTR),
340
                ('lpFile', LPCWSTR),
341
                ('lpParameters', LPCWSTR),
342
                ('lpDirectory', LPCWSTR),
343
                ('nShow', c_ulong),
344
                ('hInstApp', HINSTANCE),
345
                ('lpIDList', c_ulong),
346
                ('lpClass', LPCWSTR),
347
                ('hkeyClass', HKEY),
348
                ('dwHotKey', DWORD),
349
                ('hIcon', HANDLE),
350
                ('hProcess', HANDLE)
351
            ]
352

NEW
353
        info = SHELLEXECUTEINFO()
×
NEW
354
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
NEW
355
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
NEW
356
        info.hwnd = None
×
NEW
357
        info.lpVerb = 'runas'  # run as administrator
×
NEW
358
        info.lpFile = 'reg.exe'  # the executable to run
×
NEW
359
        info.lpParameters = reg_exe_command  # the registry edit command
×
NEW
360
        info.lpDirectory = None
×
NEW
361
        info.nShow = 1
×
362

363
        # Execute and wait
NEW
364
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
NEW
365
            error = windll.kernel32.GetLastError()
×
NEW
366
            raise Exception(f'Error executing registry command: {error}')
×
367

368
        # block until the process is done or 5 sec timeout
NEW
369
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
370

371
        # Close handle
NEW
372
        windll.kernel32.CloseHandle(info.hProcess)
×
373

374
    def create_new_key(self, password:str) -> str:
5✔
375
        salt = os.urandom(32)
×
376
        privkey = self.derive_privkey(password, salt)
×
377
        pubkey = privkey.get_public_key_bytes()
×
378
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
379
        return key.hex()
×
380

381
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
382
        """
383
        returns pubkey, salt
384
        returns None, None if the pubkey has not been set
385
        """
386
        if sys.platform in ['windows', 'win32']:
×
387
            import winreg
×
388
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
389
                try:
×
390
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
391
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
392
                except Exception as e:
×
393
                    self.logger.info(f'winreg error: {e}')
×
394
                    return None, None
×
395
        elif 'ANDROID_DATA' in os.environ:
×
396
            return None, None
×
397
        else:
398
            # treat unknown platforms as linux-like
399
            if not os.path.exists(self.keyfile_linux):
×
400
                return None, None
×
401
            if not self._has_root_permissions(self.keyfile_linux):
×
402
                return
×
403
            with open(self.keyfile_linux) as f:
×
404
                key_hex = f.read()
×
405
        key = bytes.fromhex(key_hex)
×
406
        version = key[0]
×
407
        if version != PLUGIN_PASSWORD_VERSION:
×
408
            self.logger.info(f'unknown plugin password version: {version}')
×
409
            return None, None
×
410
        # all good
411
        salt = key[1:1+32]
×
412
        pubkey = key[1+32:]
×
413
        return pubkey, salt
×
414

415
    def get_external_plugin_dir(self) -> str:
5✔
416
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
417
        if not os.path.exists(pkg_path):
5✔
418
            os.mkdir(pkg_path)
5✔
419
        return pkg_path
5✔
420

421
    async def download_external_plugin(self, url):
5✔
422
        filename = os.path.basename(urlparse(url).path)
×
423
        pkg_path = self.get_external_plugin_dir()
×
424
        path = os.path.join(pkg_path, filename)
×
425
        async with aiohttp.ClientSession() as session:
×
426
            async with session.get(url) as resp:
×
427
                if resp.status == 200:
×
428
                    with open(path, 'wb') as fd:
×
429
                        async for chunk in resp.content.iter_chunked(10):
×
430
                            fd.write(chunk)
×
431
        return path
×
432

433
    def read_manifest(self, path) -> dict:
5✔
434
        """ return json dict """
435
        with zipfile_lib.ZipFile(path) as file:
×
436
            for filename in file.namelist():
×
437
                if filename.endswith('manifest.json'):
×
438
                    break
×
439
            else:
440
                raise Exception('could not find manifest.json in zip archive')
×
441
            with file.open(filename, 'r') as f:
×
442
                manifest = json.load(f)
×
443
                manifest['path'] = path  # external, path of the zipfile
×
444
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
445
                manifest['is_zip'] = True
×
446
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
UNCOV
447
                return manifest
×
448

449
    def zip_plugin_path(self, name) -> str:
5✔
UNCOV
450
        path = self.get_metadata(name)['path']
×
UNCOV
451
        filename = os.path.basename(path)
×
UNCOV
452
        if name in self.internal_plugin_metadata:
×
453
            pkg_path = self.pkgpath
×
454
        else:
455
            pkg_path = self.get_external_plugin_dir()
×
456
        return os.path.join(pkg_path, filename)
×
457

458
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
459
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
460
        if pkg_path is None:
5✔
461
            return
×
462
        for filename in os.listdir(pkg_path):
5✔
463
            path = os.path.join(pkg_path, filename)
×
UNCOV
464
            if not filename.endswith('.zip'):
×
UNCOV
465
                continue
×
466
            try:
×
467
                d = self.read_manifest(path)
×
468
                name = d['name']
×
469
            except Exception:
×
470
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
471
                continue
×
472
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
473
                self.logger.info(f"duplicate plugins for {name=}")
×
474
                continue
×
475
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
476
                continue
×
477
            min_version = d.get('min_electrum_version')
×
478
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
479
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
480
                continue
×
UNCOV
481
            max_version = d.get('max_electrum_version')
×
482
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
483
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
484
                continue
×
485

UNCOV
486
            if not self.cmd_only:
×
UNCOV
487
                gui_good = self.gui_name in d.get('available_for', [])
×
UNCOV
488
                if not gui_good:
×
UNCOV
489
                    continue
×
UNCOV
490
                if 'fullname' not in d:
×
UNCOV
491
                    continue
×
UNCOV
492
                details = d.get('registers_keystore')
×
493
                if details:
×
494
                    self.register_keystore(name, gui_good, details)
×
495
            if external:
×
496
                self.external_plugin_metadata[name] = d
×
497
            else:
498
                self.internal_plugin_metadata[name] = d
×
499

500
    def get(self, name):
5✔
501
        return self.plugins.get(name)
×
502

503
    def count(self):
5✔
UNCOV
504
        return len(self.plugins)
×
505

506
    def load_plugin(self, name) -> 'BasePlugin':
5✔
507
        """Imports the code of the given plugin.
508
        note: can be called from any thread.
509
        """
510
        if self.get_metadata(name):
5✔
511
            return self.load_plugin_by_name(name)
5✔
512
        else:
513
            raise Exception(f"could not find plugin {name!r}")
×
514

515
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
516
        """Loads the __init__.py module of the plugin if it is not already loaded."""
517
        is_external = name in self.external_plugin_metadata
5✔
518
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
519
        if base_name not in sys.modules:
5✔
520
            metadata = self.get_metadata(name)
5✔
521
            is_zip = metadata.get('is_zip', False)
5✔
522
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
523
            if not is_zip:
5✔
524
                if is_external:
5✔
525
                    # this branch is deprecated: external plugins are always zip files
526
                    path = os.path.join(metadata['path'], '__init__.py')
×
527
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
528
                else:
529
                    init_spec = importlib.util.find_spec(base_name)
5✔
530
            else:
UNCOV
531
                zipfile = zipimport.zipimporter(metadata['path'])
×
532
                dirname = metadata['dirname']
×
UNCOV
533
                init_spec = zipfile.find_spec(dirname)
×
534

535
            self.exec_module_from_spec(init_spec, base_name)
5✔
536

537
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
538
        if name in self.plugins:
5✔
539
            return self.plugins[name]
5✔
540
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
541
        self.maybe_load_plugin_init_method(name)
5✔
542
        is_external = name in self.external_plugin_metadata
5✔
543
        if is_external and not self.is_authorized(name):
5✔
544
            self.logger.info(f'plugin not authorized {name}')
×
545
            return
×
546
        if not is_external:
5✔
547
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
548
        else:
549
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
550

551
        spec = importlib.util.find_spec(full_name)
5✔
552
        if spec is None:
5✔
553
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
554
        try:
5✔
555
            module = self.exec_module_from_spec(spec, full_name)
5✔
556
            plugin = module.Plugin(self, self.config, name)
5✔
557
        except Exception as e:
×
558
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
559
        self.add_jobs(plugin.thread_jobs())
5✔
560
        self.plugins[name] = plugin
5✔
561
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
562
        return plugin
5✔
563

564
    def close_plugin(self, plugin):
5✔
565
        self.remove_jobs(plugin.thread_jobs())
×
566

567
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
UNCOV
568
        from hashlib import pbkdf2_hmac
×
569
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
UNCOV
570
        return ECPrivkey(secret)
×
571

572
    def install_internal_plugin(self, name):
5✔
UNCOV
573
        self.config.set_key(f'plugins.{name}.enabled', [])
×
574

575
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
UNCOV
576
        self.external_plugin_metadata[name] = manifest
×
UNCOV
577
        self.authorize_plugin(name, path, privkey)
×
578

579
    def uninstall(self, name: str):
5✔
UNCOV
580
        self.config.set_key(f'plugins.{name}', None)
×
UNCOV
581
        if name in self.external_plugin_metadata:
×
UNCOV
582
            zipfile = self.zip_plugin_path(name)
×
UNCOV
583
            os.unlink(zipfile)
×
584
            self.external_plugin_metadata.pop(name)
×
585

586
    def is_internal(self, name) -> bool:
5✔
UNCOV
587
        return name in self.internal_plugin_metadata
×
588

589
    def is_auto_loaded(self, name):
5✔
UNCOV
590
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
UNCOV
591
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
592

593
    def is_installed(self, name) -> bool:
5✔
594
        """an external plugin may be installed but not authorized """
UNCOV
595
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
596
            or name in self.external_plugin_metadata
597

598
    def is_authorized(self, name) -> bool:
5✔
UNCOV
599
        if name in self.internal_plugin_metadata:
×
UNCOV
600
            return True
×
UNCOV
601
        if name not in self.external_plugin_metadata:
×
602
            return False
×
603
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
604
        if not pubkey_bytes:
×
UNCOV
605
            return False
×
UNCOV
606
        if not self.is_plugin_zip(name):
×
UNCOV
607
            return False
×
UNCOV
608
        filename = self.zip_plugin_path(name)
×
UNCOV
609
        plugin_hash = get_file_hash256(filename)
×
UNCOV
610
        sig = self.config.get(f'plugins.{name}.authorized')
×
UNCOV
611
        if not sig:
×
UNCOV
612
            return False
×
UNCOV
613
        pubkey = ECPubkey(pubkey_bytes)
×
UNCOV
614
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
615

616
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
UNCOV
617
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
UNCOV
618
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
UNCOV
619
        plugin_hash = get_file_hash256(filename)
×
620
        sig = privkey.ecdsa_sign(plugin_hash)
×
UNCOV
621
        value = sig.hex()
×
UNCOV
622
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
623

624
    def enable(self, name: str) -> 'BasePlugin':
5✔
UNCOV
625
        self.config.enable_plugin(name)
×
UNCOV
626
        p = self.get(name)
×
UNCOV
627
        if p:
×
628
            return p
×
629
        return self.load_plugin(name)
×
630

631
    def disable(self, name: str) -> None:
5✔
UNCOV
632
        self.config.disable_plugin(name)
×
UNCOV
633
        p = self.get(name)
×
UNCOV
634
        if not p:
×
UNCOV
635
            return
×
636
        self.plugins.pop(name)
×
UNCOV
637
        p.close()
×
UNCOV
638
        self.logger.info(f"closed {name}")
×
639

640
    @classmethod
5✔
641
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
UNCOV
642
        return key.startswith('plugins.')
×
643

644
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
UNCOV
645
        d = self.descriptions.get(name)
×
UNCOV
646
        if not d:
×
647
            return False
×
648
        deps = d.get('requires', [])
×
UNCOV
649
        for dep, s in deps:
×
UNCOV
650
            try:
×
651
                __import__(dep)
×
652
            except ImportError as e:
×
653
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
654
                return False
×
655
        requires = d.get('requires_wallet_type', [])
×
656
        return not requires or wallet.wallet_type in requires
×
657

658
    def get_hardware_support(self):
5✔
659
        out = []
×
UNCOV
660
        for name, (gui_good, details) in self.hw_wallets.items():
×
UNCOV
661
            if gui_good:
×
662
                try:
×
663
                    p = self.get_plugin(name)
×
UNCOV
664
                    if p.is_available():
×
UNCOV
665
                        out.append(HardwarePluginToScan(name=name,
×
666
                                                        description=details[2],
667
                                                        plugin=p,
668
                                                        exception=None))
UNCOV
669
                except Exception as e:
×
UNCOV
670
                    self.logger.exception(f"cannot load plugin for: {name}")
×
671
                    out.append(HardwarePluginToScan(name=name,
×
672
                                                    description=details[2],
673
                                                    plugin=None,
674
                                                    exception=e))
675
        return out
×
676

677
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
678
        from .wallet import register_wallet_type, register_constructor
5✔
679
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
680

681
        def loader():
5✔
682
            plugin = self.get_plugin(name)
5✔
683
            register_constructor(wallet_type, plugin.wallet_class)
5✔
684
        register_wallet_type(wallet_type)
5✔
685
        plugin_loaders[wallet_type] = loader
5✔
686

687
    def register_keystore(self, name, gui_good, details):
5✔
688
        from .keystore import register_keystore
5✔
689

690
        def dynamic_constructor(d):
5✔
691
            return self.get_plugin(name).keystore_class(d)
5✔
692
        if details[0] == 'hardware':
5✔
693
            self.hw_wallets[name] = (gui_good, details)
5✔
694
            self.logger.info(f"registering hardware {name}: {details}")
5✔
695
            register_keystore(details[1], dynamic_constructor)
5✔
696

697
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
698
        if name not in self.plugins:
5✔
699
            self.load_plugin(name)
5✔
700
        return self.plugins[name]
5✔
701

702
    def is_plugin_zip(self, name: str) -> bool:
5✔
703
        """Returns True if the plugin is a zip file"""
704
        if (metadata := self.get_metadata(name)) is None:
×
705
            return False
×
706
        return metadata.get('is_zip', False)
×
707

708
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
709
        """Returns the metadata of the plugin"""
710
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
711
        if not metadata:
5✔
UNCOV
712
            return None
×
713
        return metadata
5✔
714

715
    def run(self):
5✔
716
        while self.is_running():
5✔
717
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
718
            self.run_jobs()
5✔
719
        self.on_stop()
5✔
720

721
    def read_file(self, name: str, filename: str) -> bytes:
5✔
722
        if self.is_plugin_zip(name):
×
723
            plugin_filename = self.zip_plugin_path(name)
×
724
            metadata = self.external_plugin_metadata[name]
×
725
            dirname = metadata['dirname']
×
726
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
727
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
728
                    return myfile.read()
×
729
        else:
UNCOV
730
            assert name in self.internal_plugin_metadata
×
731
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
732
            with open(path, 'rb') as myfile:
×
733
                return myfile.read()
×
734

735
def get_file_hash256(path: str) -> bytes:
5✔
736
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
737
    with open(path, 'rb') as f:
×
UNCOV
738
        return sha256(f.read())
×
739

740

741
def hook(func):
5✔
742
    hook_names.add(func.__name__)
5✔
743
    return func
5✔
744

745

746
def run_hook(name, *args):
5✔
747
    results = []
5✔
748
    f_list = hooks.get(name, [])
5✔
749
    for p, f in f_list:
5✔
750
        if p.is_enabled():
5✔
751
            try:
5✔
752
                r = f(*args)
5✔
UNCOV
753
            except Exception:
×
UNCOV
754
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
755
                r = False
×
756
            if r:
5✔
UNCOV
757
                results.append(r)
×
758

759
    if results:
5✔
UNCOV
760
        assert len(results) == 1, results
×
UNCOV
761
        return results[0]
×
762

763

764
class BasePlugin(Logger):
5✔
765

766
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
767
        self.parent = parent  # type: Plugins  # The plugins object
5✔
768
        self.name = name
5✔
769
        self.config = config
5✔
770
        Logger.__init__(self)
5✔
771
        # add self to hooks
772
        for k in dir(self):
5✔
773
            if k in hook_names:
5✔
774
                l = hooks.get(k, [])
5✔
775
                l.append((self, getattr(self, k)))
5✔
776
                hooks[k] = l
5✔
777

778
    def __str__(self):
5✔
UNCOV
779
        return self.name
×
780

781
    def close(self):
5✔
782
        # remove self from hooks
UNCOV
783
        for attr_name in dir(self):
×
784
            if attr_name in hook_names:
×
785
                # found attribute in self that is also the name of a hook
UNCOV
786
                l = hooks.get(attr_name, [])
×
UNCOV
787
                try:
×
UNCOV
788
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
789
                except ValueError:
×
790
                    # maybe attr name just collided with hook name and was not hook
UNCOV
791
                    continue
×
UNCOV
792
                hooks[attr_name] = l
×
UNCOV
793
        self.parent.close_plugin(self)
×
794
        self.on_close()
×
795

796
    def on_close(self):
5✔
797
        pass
×
798

799
    def requires_settings(self) -> bool:
5✔
800
        return False
×
801

802
    def thread_jobs(self):
5✔
803
        return []
5✔
804

805
    def is_enabled(self):
5✔
UNCOV
806
        if not self.is_available():
×
UNCOV
807
            return False
×
UNCOV
808
        return self.config.is_plugin_enabled(self.name)
×
809

810
    def is_available(self):
5✔
UNCOV
811
        return True
×
812

813
    def can_user_disable(self):
5✔
UNCOV
814
        return True
×
815

816
    def settings_widget(self, window):
5✔
UNCOV
817
        raise NotImplementedError()
×
818

819
    def settings_dialog(self, window):
5✔
UNCOV
820
        raise NotImplementedError()
×
821

822
    def read_file(self, filename: str) -> bytes:
5✔
UNCOV
823
        return self.parent.read_file(self.name, filename)
×
824

825

826
class DeviceUnpairableError(UserFacingException): pass
5✔
827
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
828
class CannotAutoSelectDevice(Exception): pass
5✔
829

830

831
class Device(NamedTuple):
5✔
832
    path: Union[str, bytes]
5✔
833
    interface_number: int
5✔
834
    id_: str
5✔
835
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
836
    usage_page: int
5✔
837
    transport_ui_string: str
5✔
838

839

840
class DeviceInfo(NamedTuple):
5✔
841
    device: Device
5✔
842
    label: Optional[str] = None
5✔
843
    initialized: Optional[bool] = None
5✔
844
    exception: Optional[Exception] = None
5✔
845
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
846
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
847
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
848

849

850
class HardwarePluginToScan(NamedTuple):
5✔
851
    name: str
5✔
852
    description: str
5✔
853
    plugin: Optional['HW_PluginBase']
5✔
854
    exception: Optional[Exception]
5✔
855

856

857
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
858

859

860
# hidapi is not thread-safe
861
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
862
#     https://github.com/libusb/hidapi/issues/45
863
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
864
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
865
# It is not entirely clear to me, exactly what is safe and what isn't, when
866
# using multiple threads...
867
# Hence, we use a single thread for all device communications, including
868
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
869
# the following thread:
870
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
871
    max_workers=1,
872
    thread_name_prefix='hwd_comms_thread'
873
)
874

875
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
876
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
877
# To keep it simple, let's just import it now, as we are likely in the main thread here.
878
if threading.current_thread() is not threading.main_thread():
5✔
879
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
880
try:
5✔
881
    import hid
5✔
882
except ImportError:
5✔
883
    pass
5✔
884

885

886
T = TypeVar('T')
5✔
887

888

889
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
UNCOV
890
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
891
        return func()
×
892
    else:
UNCOV
893
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
894
        return fut.result()
×
895
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
896

897

898
def runs_in_hwd_thread(func):
5✔
899
    @wraps(func)
5✔
900
    def wrapper(*args, **kwargs):
5✔
UNCOV
901
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
902
    return wrapper
5✔
903

904

905
def assert_runs_in_hwd_thread():
5✔
UNCOV
906
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
907
        raise Exception("must only be called from HWD communication thread")
×
908

909

910
class DeviceMgr(ThreadJob):
5✔
911
    """Manages hardware clients.  A client communicates over a hardware
912
    channel with the device.
913

914
    In addition to tracking device HID IDs, the device manager tracks
915
    hardware wallets and manages wallet pairing.  A HID ID may be
916
    paired with a wallet when it is confirmed that the hardware device
917
    matches the wallet, i.e. they have the same master public key.  A
918
    HID ID can be unpaired if e.g. it is wiped.
919

920
    Because of hotplugging, a wallet must request its client
921
    dynamically each time it is required, rather than caching it
922
    itself.
923

924
    The device manager is shared across plugins, so just one place
925
    does hardware scans when needed.  By tracking HID IDs, if a device
926
    is plugged into a different port the wallet is automatically
927
    re-paired.
928

929
    Wallets are informed on connect / disconnect events.  It must
930
    implement connected(), disconnected() callbacks.  Being connected
931
    implies a pairing.  Callbacks can happen in any thread context,
932
    and we do them without holding the lock.
933

934
    Confusingly, the HID ID (serial number) reported by the HID system
935
    doesn't match the device ID reported by the device itself.  We use
936
    the HID IDs.
937

938
    This plugin is thread-safe.  Currently only devices supported by
939
    hidapi are implemented."""
940

941
    def __init__(self, config: SimpleConfig):
5✔
942
        ThreadJob.__init__(self)
5✔
943
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
944
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
945
        # A client->id_ map. Needs self.lock.
946
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
947
        # What we recognise.  (vendor_id, product_id) -> Plugin
948
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
949
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
950
        # Custom enumerate functions for devices we don't know about.
951
        self._enumerate_func = set()  # Needs self.lock.
5✔
952

953
        self.lock = threading.RLock()
5✔
954

955
        self.config = config
5✔
956

957
    def thread_jobs(self):
5✔
958
        # Thread job to handle device timeouts
959
        return [self]
5✔
960

961
    def run(self):
5✔
962
        '''Handle device timeouts.  Runs in the context of the Plugins
963
        thread.'''
964
        with self.lock:
5✔
965
            clients = list(self.clients.keys())
5✔
966
        cutoff = time.time() - self.config.get_session_timeout()
5✔
967
        for client in clients:
5✔
UNCOV
968
            client.timeout(cutoff)
×
969

970
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
971
        for pair in device_pairs:
×
UNCOV
972
            self._recognised_hardware[pair] = plugin
×
973

974
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
UNCOV
975
        for vendor_id in vendor_ids:
×
UNCOV
976
            self._recognised_vendor[vendor_id] = plugin
×
977

978
    def register_enumerate_func(self, func):
5✔
979
        with self.lock:
×
UNCOV
980
            self._enumerate_func.add(func)
×
981

982
    @runs_in_hwd_thread
5✔
983
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
984
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
985
        # Get from cache first
UNCOV
986
        client = self._client_by_id(device.id_)
×
UNCOV
987
        if client:
×
UNCOV
988
            return client
×
UNCOV
989
        client = plugin.create_client(device, handler)
×
UNCOV
990
        if client:
×
UNCOV
991
            self.logger.info(f"Registering {client}")
×
UNCOV
992
            with self.lock:
×
UNCOV
993
                self.clients[client] = device.id_
×
UNCOV
994
        return client
×
995

996
    def id_by_pairing_code(self, pairing_code):
5✔
UNCOV
997
        with self.lock:
×
UNCOV
998
            return self.pairing_code_to_id.get(pairing_code)
×
999

1000
    def pairing_code_by_id(self, id_):
5✔
UNCOV
1001
        with self.lock:
×
UNCOV
1002
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
1003
                if id2 == id_:
×
UNCOV
1004
                    return pairing_code
×
UNCOV
1005
            return None
×
1006

1007
    def unpair_pairing_code(self, pairing_code):
5✔
UNCOV
1008
        with self.lock:
×
UNCOV
1009
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
1010
                return
×
UNCOV
1011
            _id = self.pairing_code_to_id.pop(pairing_code)
×
UNCOV
1012
        self._close_client(_id)
×
1013

1014
    def unpair_id(self, id_):
5✔
UNCOV
1015
        pairing_code = self.pairing_code_by_id(id_)
×
UNCOV
1016
        if pairing_code:
×
UNCOV
1017
            self.unpair_pairing_code(pairing_code)
×
1018
        else:
UNCOV
1019
            self._close_client(id_)
×
1020

1021
    def _close_client(self, id_):
5✔
UNCOV
1022
        with self.lock:
×
UNCOV
1023
            client = self._client_by_id(id_)
×
UNCOV
1024
            self.clients.pop(client, None)
×
UNCOV
1025
        if client:
×
UNCOV
1026
            client.close()
×
1027

1028
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
UNCOV
1029
        with self.lock:
×
UNCOV
1030
            for client, client_id in self.clients.items():
×
UNCOV
1031
                if client_id == id_:
×
UNCOV
1032
                    return client
×
UNCOV
1033
        return None
×
1034

1035
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1036
        '''Returns a client for the device ID if one is registered.  If
1037
        a device is wiped or in bootloader mode pairing is impossible;
1038
        in such cases we communicate by device ID and not wallet.'''
UNCOV
1039
        if scan_now:
×
1040
            self.scan_devices()
×
UNCOV
1041
        return self._client_by_id(id_)
×
1042

1043
    @runs_in_hwd_thread
5✔
1044
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1045
                            keystore: 'Hardware_KeyStore',
1046
                            force_pair: bool, *,
1047
                            devices: Sequence['Device'] = None,
1048
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
1049
        self.logger.info("getting client for keystore")
×
UNCOV
1050
        if handler is None:
×
1051
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1052
        handler.update_status(False)
×
UNCOV
1053
        pcode = keystore.pairing_code()
×
UNCOV
1054
        client = None
×
1055
        # search existing clients first (fast-path)
UNCOV
1056
        if not devices:
×
UNCOV
1057
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1058
        # search clients again, now allowing a (slow) scan
1059
        if client is None:
×
1060
            if devices is None:
×
1061
                devices = self.scan_devices()
×
1062
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1063
        if client is None and force_pair:
×
1064
            try:
×
1065
                info = self.select_device(plugin, handler, keystore, devices,
×
1066
                                          allow_user_interaction=allow_user_interaction)
UNCOV
1067
            except CannotAutoSelectDevice:
×
UNCOV
1068
                pass
×
1069
            else:
1070
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
UNCOV
1071
        if client:
×
UNCOV
1072
            handler.update_status(True)
×
1073
            # note: if select_device was called, we might also update label etc here:
1074
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1075
        self.logger.info("end client for keystore")
×
1076
        return client
×
1077

1078
    def client_by_pairing_code(
5✔
1079
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1080
        devices: Sequence['Device'],
1081
    ) -> Optional['HardwareClientBase']:
1082
        _id = self.id_by_pairing_code(pairing_code)
×
1083
        client = self._client_by_id(_id)
×
1084
        if client:
×
UNCOV
1085
            if type(client.plugin) != type(plugin):
×
UNCOV
1086
                return
×
1087
            # An unpaired client might have another wallet's handler
1088
            # from a prior scan.  Replace to fix dialog parenting.
1089
            client.handler = handler
×
UNCOV
1090
            return client
×
1091

UNCOV
1092
        for device in devices:
×
UNCOV
1093
            if device.id_ == _id:
×
1094
                return self.create_client(device, handler, plugin)
×
1095

1096
    def force_pair_keystore(
5✔
1097
        self,
1098
        *,
1099
        plugin: 'HW_PluginBase',
1100
        handler: 'HardwareHandlerBase',
1101
        info: 'DeviceInfo',
1102
        keystore: 'Hardware_KeyStore',
1103
    ) -> 'HardwareClientBase':
1104
        xpub = keystore.xpub
×
1105
        derivation = keystore.get_derivation_prefix()
×
UNCOV
1106
        assert derivation is not None
×
UNCOV
1107
        xtype = bip32.xpub_type(xpub)
×
UNCOV
1108
        client = self._client_by_id(info.device.id_)
×
UNCOV
1109
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1110
            # See comment above for same code
1111
            client.handler = handler
×
1112
            # This will trigger a PIN/passphrase entry request
1113
            try:
×
UNCOV
1114
                client_xpub = client.get_xpub(derivation, xtype)
×
UNCOV
1115
            except (UserCancelled, RuntimeError):
×
1116
                # Bad / cancelled PIN / passphrase
UNCOV
1117
                client_xpub = None
×
UNCOV
1118
            if client_xpub == xpub:
×
UNCOV
1119
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
1120
                with self.lock:
×
1121
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1122
                return client
×
1123

1124
        # The user input has wrong PIN or passphrase, or cancelled input,
1125
        # or it is not pairable
1126
        raise DeviceUnpairableError(
×
1127
            _('Electrum cannot pair with your {}.\n\n'
1128
              'Before you request bitcoins to be sent to addresses in this '
1129
              'wallet, ensure you can pair with your device, or that you have '
1130
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1131
              'receive will be unspendable.').format(plugin.device))
1132

1133
    def list_pairable_device_infos(
5✔
1134
        self,
1135
        *,
1136
        handler: Optional['HardwareHandlerBase'],
1137
        plugin: 'HW_PluginBase',
1138
        devices: Sequence['Device'] = None,
1139
        include_failing_clients: bool = False,
1140
    ) -> List['DeviceInfo']:
1141
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1142
        Already paired devices are also included, as it is okay to reuse them.
1143
        """
1144
        if not plugin.libraries_available:
×
UNCOV
1145
            message = plugin.get_library_not_available_message()
×
1146
            raise HardwarePluginLibraryUnavailable(message)
×
1147
        if devices is None:
×
1148
            devices = self.scan_devices()
×
UNCOV
1149
        infos = []
×
UNCOV
1150
        for device in devices:
×
UNCOV
1151
            if not plugin.can_recognize_device(device):
×
UNCOV
1152
                continue
×
UNCOV
1153
            try:
×
1154
                client = self.create_client(device, handler, plugin)
×
1155
                if not client:
×
1156
                    continue
×
1157
                label = client.label()
×
1158
                is_initialized = client.is_initialized()
×
UNCOV
1159
                soft_device_id = client.get_soft_device_id()
×
UNCOV
1160
                model_name = client.device_model_name()
×
1161
            except Exception as e:
×
1162
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
UNCOV
1163
                if include_failing_clients:
×
1164
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1165
                continue
×
1166
            infos.append(DeviceInfo(device=device,
×
1167
                                    label=label,
1168
                                    initialized=is_initialized,
1169
                                    plugin_name=plugin.name,
1170
                                    soft_device_id=soft_device_id,
1171
                                    model_name=model_name))
1172

UNCOV
1173
        return infos
×
1174

1175
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1176
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1177
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1178
        """Select the device to use for keystore."""
1179
        # ideally this should not be called from the GUI thread...
1180
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1181
        while True:
×
UNCOV
1182
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1183
            if infos:
×
UNCOV
1184
                break
×
1185
            if not allow_user_interaction:
×
1186
                raise CannotAutoSelectDevice()
×
1187
            msg = _('Please insert your {}').format(plugin.device)
×
UNCOV
1188
            msg += " ("
×
1189
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1190
                msg += f"label: {keystore.label}, "
×
1191
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1192
            msg += ').\n\n{}\n\n{}'.format(
×
1193
                _('Verify the cable is connected and that '
1194
                  'no other application is using it.'),
1195
                _('Try to connect again?')
1196
            )
UNCOV
1197
            if not handler.yes_no_question(msg):
×
1198
                raise UserCancelled()
×
UNCOV
1199
            devices = None
×
1200

1201
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1202
        # method 1: select device by id
UNCOV
1203
        if keystore.soft_device_id:
×
UNCOV
1204
            for info in infos:
×
UNCOV
1205
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
1206
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
UNCOV
1207
                    return info
×
1208
        # method 2: select device by label
1209
        #           but only if not a placeholder label and only if there is no collision
UNCOV
1210
        device_labels = [info.label for info in infos]
×
UNCOV
1211
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1212
                and device_labels.count(keystore.label) == 1):
UNCOV
1213
            for info in infos:
×
UNCOV
1214
                if info.label == keystore.label:
×
UNCOV
1215
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1216
                    return info
×
1217
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1218
        #           saved for keystore anyway, select it
1219
        if (len(infos) == 1
×
1220
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1221
                and keystore.soft_device_id is None):
1222
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1223
            return infos[0]
×
1224

1225
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1226
        if not allow_user_interaction:
×
1227
            raise CannotAutoSelectDevice()
×
1228
        # ask user to select device manually
1229
        msg = (
×
1230
                _("Could not automatically pair with device for given keystore.") + "\n"
1231
                + f"(keystore label: {keystore.label!r}, "
1232
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1233
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1234
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1235
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1236
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1237
                                init=(_("initialized") if info.initialized else _("wiped")),
1238
                                transport=info.device.transport_ui_string,
1239
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1240
                        for info in infos]
UNCOV
1241
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1242
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
1243
        c = handler.query_choice(msg, descriptions)
×
UNCOV
1244
        if c is None:
×
1245
            raise UserCancelled()
×
UNCOV
1246
        info = infos[c]
×
UNCOV
1247
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1248
        # note: updated label/soft_device_id will be saved after pairing succeeds
UNCOV
1249
        return info
×
1250

1251
    @runs_in_hwd_thread
5✔
1252
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1253
        try:
×
1254
            import hid  # noqa: F811
×
1255
        except ImportError:
×
1256
            return []
×
1257

1258
        devices = []
×
1259
        for d in hid.enumerate(0, 0):
×
1260
            vendor_id = d['vendor_id']
×
1261
            product_key = (vendor_id, d['product_id'])
×
1262
            plugin = None
×
1263
            if product_key in self._recognised_hardware:
×
1264
                plugin = self._recognised_hardware[product_key]
×
UNCOV
1265
            elif vendor_id in self._recognised_vendor:
×
UNCOV
1266
                plugin = self._recognised_vendor[vendor_id]
×
UNCOV
1267
            if plugin:
×
UNCOV
1268
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1269
                if device:
×
1270
                    devices.append(device)
×
1271
        return devices
×
1272

1273
    @runs_in_hwd_thread
5✔
1274
    @profiler
5✔
1275
    def scan_devices(self) -> Sequence['Device']:
5✔
1276
        self.logger.info("scanning devices...")
×
1277

1278
        # First see what's connected that we know about
1279
        devices = self._scan_devices_with_hid()
×
1280

1281
        # Let plugin handlers enumerate devices we don't know about
1282
        with self.lock:
×
1283
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
1284
        for f in enumerate_funcs:
×
1285
            try:
×
1286
                new_devices = f()
×
1287
            except BaseException as e:
×
1288
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1289
            else:
UNCOV
1290
                devices.extend(new_devices)
×
1291

1292
        # find out what was disconnected
UNCOV
1293
        client_ids = [dev.id_ for dev in devices]
×
1294
        disconnected_clients = []
×
1295
        with self.lock:
×
UNCOV
1296
            connected = {}
×
1297
            for client, id_ in self.clients.items():
×
1298
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1299
                    connected[client] = id_
×
1300
                else:
1301
                    disconnected_clients.append((client, id_))
×
UNCOV
1302
            self.clients = connected
×
1303

1304
        # Unpair disconnected devices
1305
        for client, id_ in disconnected_clients:
×
1306
            self.unpair_id(id_)
×
1307
            if client.handler:
×
UNCOV
1308
                client.handler.update_status(False)
×
1309

UNCOV
1310
        return devices
×
1311

1312
    @classmethod
5✔
1313
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
UNCOV
1314
        ret = {}
×
1315
        # add libusb
1316
        try:
×
1317
            import usb1
×
1318
        except Exception as e:
×
1319
            ret["libusb.version"] = None
×
1320
        else:
1321
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
UNCOV
1322
            try:
×
UNCOV
1323
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
1324
            except AttributeError:
×
1325
                ret["libusb.path"] = None
×
1326
        # add hidapi
1327
        try:
×
1328
            import hid  # noqa: F811
×
UNCOV
1329
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1330
        except Exception as e:
×
1331
            from importlib.metadata import version
×
1332
            try:
×
1333
                ret["hidapi.version"] = version("hidapi")
×
1334
            except ImportError:
×
1335
                ret["hidapi.version"] = None
×
1336
        return ret
×
1337

1338
    def trigger_pairings(
5✔
1339
            self,
1340
            keystores: Sequence['KeyStore'],
1341
            *,
1342
            allow_user_interaction: bool = True,
1343
            devices: Sequence['Device'] = None,
1344
    ) -> None:
1345
        """Given a list of keystores, try to pair each with a connected hardware device.
1346

1347
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1348
        try to pair each keystore individually. Consider the following scenario:
1349
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1350
        - assume none of the devices are paired yet
1351
        1. if we tried to individually pair keystores, we might try with ks1 first
1352
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1353
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1354
             which is confusing and error-prone. It's especially problematic if the hw device does
1355
             not support labels (such as Ledger), as then the user cannot easily distinguish
1356
             same-type devices. (see #4199)
1357
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1358
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1359
        """
1360
        from .keystore import Hardware_KeyStore
×
UNCOV
1361
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1362
        if not keystores:
×
UNCOV
1363
            return
×
UNCOV
1364
        if devices is None:
×
1365
            devices = self.scan_devices()
×
1366
        # first pair with all devices that can be auto-selected
1367
        for ks in keystores:
×
1368
            try:
×
1369
                ks.get_client(
×
1370
                    force_pair=True,
1371
                    allow_user_interaction=False,
1372
                    devices=devices,
1373
                )
1374
            except UserCancelled:
×
UNCOV
1375
                pass
×
UNCOV
1376
        if allow_user_interaction:
×
1377
            # now do manual selections
1378
            for ks in keystores:
×
1379
                try:
×
1380
                    ks.get_client(
×
1381
                        force_pair=True,
1382
                        allow_user_interaction=True,
1383
                        devices=devices,
1384
                    )
UNCOV
1385
                except UserCancelled:
×
1386
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc