• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5579941544198144

05 May 2025 07:37AM UTC coverage: 60.069% (-0.1%) from 60.194%
5579941544198144

Pull #9765

CirrusCI

f321x
plugins: add functionality to allow setting plugin pubkey from gui

Adds functionality that allows the user to store the plugin authorization pubkey without having to edit files/registry manually.
On Linux systems it spawns the commands in a subprocess with pkexec which will trigger a OS prompt to execute the commands as root.
The user sees the executed commands and can either authorize with the root password or decline.
On windows it uses the windows `ShellExecuteExW` api to edit the registry, this also triggers a OS dialog to accept or decline (UAC dialog).
There is also functionality to reset the key again, which works in the same way.
Pull Request #9765: plugins: add functionality to allow setting plugin pubkey from gui

8 of 88 new or added lines in 1 file covered. (9.09%)

207 existing lines in 1 file now uncovered.

21625 of 36000 relevant lines covered (60.07%)

3.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.72
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
203
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
NEW
204
        try:
×
NEW
205
            if sys.platform in ['windows', 'win32']:
×
NEW
206
                self._write_key_to_regedit_windows(pubkey_hex)
×
NEW
207
            elif 'ANDROID_DATA' in os.environ:
×
NEW
208
                raise Exception('platform not supported')
×
209
            else:
NEW
UNCOV
210
                self._write_key_to_root_file_linux(pubkey_hex)
×
NEW
211
        except Exception:
×
NEW
212
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
NEW
213
            return False
×
NEW
214
        return True
×
215

216
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
217
        """
218
        Spawns a pkexec subprocess to write the key to a file with root permissions.
219
        This will open an OS dialog asking for the root password. Can only succeed if
220
        the system has polkit installed.
221
        """
NEW
222
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
NEW
223
        import subprocess
×
NEW
224
        dir_path: str = os.path.dirname(self.keyfile_linux)
×
225

226
        # creates the dir (dir_path), writes the key in file, and sets permissions to 644
NEW
227
        sh_command = (f"mkdir -p {dir_path} "  # create the /etc/electrum dir
×
228
                      f"&& tee {self.keyfile_linux} "  # write the key to the file
229
                      f"&& chmod 644 {self.keyfile_linux} "  # set read permissions for the file
230
                      f"&& chmod 755 {dir_path}")  # set read permissions for the dir
NEW
231
        process = subprocess.Popen(
×
232
            ['pkexec', 'sh', '-c', sh_command],
233
            stdin=subprocess.PIPE,
234
            stdout=subprocess.PIPE,
235
            stderr=subprocess.PIPE,
236
            text=True
237
        )
238

239
        # send the key to tee
NEW
240
        stdout, stderr = process.communicate(input=key_hex)
×
241

NEW
242
        if process.returncode != 0:
×
NEW
243
            raise Exception(f'error saving file ({process.returncode}): {stderr}')
×
244

NEW
245
        with open(self.keyfile_linux, 'r') as f:
×
NEW
246
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
NEW
247
        self.logger.debug(f'file saved successfully to {self.keyfile_linux}')
×
248

249
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
250
        """
251
        Writes the key to the Windows registry with windows UAC prompt.
252
        """
NEW
253
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
254

NEW
255
        key_path = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
×
NEW
256
        value_type = 'REG_SZ'
×
NEW
257
        command = f'add "{key_path}" /ve /t {value_type} /d "{key_hex}" /f'
×
258

NEW
259
        self._run_win_regedit_as_admin(command)
×
260

261
        # check if the key was written correctly
NEW
262
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
263
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
NEW
264
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
NEW
265
        self.logger.debug(f'key saved successfully to {key_path}')
×
266

267
    def try_auto_key_reset(self) -> bool:
5✔
NEW
268
        try:
×
NEW
269
            if sys.platform in ['windows', 'win32']:
×
NEW
270
                self._delete_plugin_key_from_windows_registry()
×
NEW
271
            elif 'ANDROID_DATA' in os.environ:
×
NEW
272
                raise Exception('platform not supported')
×
273
            else:
NEW
274
                self._delete_linux_plugin_keyfile()
×
NEW
275
        except Exception:
×
NEW
276
            self.logger.exception(f'auto-reset of plugin key failed')
×
NEW
277
            return False
×
NEW
278
        return True
×
279

280
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
281
        """
282
        Deletes the root owned key file at self.keyfile_linux.
283
        """
NEW
284
        if not os.path.exists(self.keyfile_linux):
×
NEW
285
            self.logger.debug(f'file {self.keyfile_linux} does not exist')
×
NEW
286
            return
×
NEW
287
        if not self._has_root_permissions(self.keyfile_linux):
×
NEW
288
            os.unlink(self.keyfile_linux)
×
NEW
289
            return
×
290

291
        # use pkexec to delete the file as root user
NEW
292
        import subprocess
×
NEW
293
        process = subprocess.Popen(
×
294
                ['pkexec', 'rm', self.keyfile_linux],
295
                stdout=subprocess.PIPE,
296
                stderr=subprocess.PIPE,
297
                text=True
298
        )
NEW
299
        stdout, stderr = process.communicate()
×
NEW
300
        if process.returncode != 0:
×
NEW
301
            raise Exception(f'error removing file ({process.returncode}): {stderr}')
×
NEW
302
        assert not os.path.exists(self.keyfile_linux), f'file {self.keyfile_linux} still exists'
×
303

304
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
305
        """
306
        Deletes the PluginsKey dir in the Windows registry.
307
        """
NEW
308
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
309

NEW
310
        key_path = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
×
NEW
311
        command = f'delete "{key_path}" /f'
×
312

NEW
313
        self._run_win_regedit_as_admin(command)
×
314

NEW
315
        try:
×
316
            # do a sanity check to see if the key has been deleted
NEW
317
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
318
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
NEW
319
                    raise Exception(f'Key {key_path} still exists, deletion failed')
×
NEW
320
        except FileNotFoundError:
×
NEW
321
            pass
×
322

323
    @staticmethod
5✔
324
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
325
        """
326
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
327
        """
328
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
329
        # for the result of the process (returns no process handle)
NEW
330
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
NEW
331
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
332

333
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
NEW
334
        class SHELLEXECUTEINFO(Structure):
×
NEW
335
            _fields_ = [
×
336
                ('cbSize', DWORD),
337
                ('fMask', c_ulong),
338
                ('hwnd', HWND),
339
                ('lpVerb', LPCWSTR),
340
                ('lpFile', LPCWSTR),
341
                ('lpParameters', LPCWSTR),
342
                ('lpDirectory', LPCWSTR),
343
                ('nShow', c_ulong),
344
                ('hInstApp', HINSTANCE),
345
                ('lpIDList', c_ulong),
346
                ('lpClass', LPCWSTR),
347
                ('hkeyClass', HKEY),
348
                ('dwHotKey', DWORD),
349
                ('hIcon', HANDLE),
350
                ('hProcess', HANDLE)
351
            ]
352

NEW
353
        info = SHELLEXECUTEINFO()
×
NEW
354
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
NEW
355
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
NEW
356
        info.hwnd = None
×
NEW
357
        info.lpVerb = 'runas'  # run as administrator
×
NEW
358
        info.lpFile = 'reg.exe'  # the executable to run
×
NEW
359
        info.lpParameters = reg_exe_command  # the registry edit command
×
NEW
360
        info.lpDirectory = None
×
NEW
361
        info.nShow = 1
×
362

363
        # Execute and wait
NEW
364
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
NEW
365
            error = windll.kernel32.GetLastError()
×
NEW
366
            raise Exception(f'Error executing registry command: {error}')
×
367

368
        # block until the process is done or 5 sec timeout
NEW
369
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
370

371
        # Close handle
NEW
372
        windll.kernel32.CloseHandle(info.hProcess)
×
373

374
    def create_new_key(self, password:str) -> str:
5✔
375
        salt = os.urandom(32)
×
376
        privkey = self.derive_privkey(password, salt)
×
377
        pubkey = privkey.get_public_key_bytes()
×
378
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
379
        return key.hex()
×
380

381
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
382
        """
383
        returns pubkey, salt
384
        returns None, None if the pubkey has not been set
385
        """
386
        if sys.platform in ['windows', 'win32']:
×
UNCOV
387
            import winreg
×
UNCOV
388
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
UNCOV
389
                try:
×
UNCOV
390
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
UNCOV
391
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
UNCOV
392
                except Exception as e:
×
393
                    self.logger.info(f'winreg error: {e}')
×
394
                    return None, None
×
395
        elif 'ANDROID_DATA' in os.environ:
×
396
            return None, None
×
397
        else:
398
            # treat unknown platforms as linux-like
399
            if not os.path.exists(self.keyfile_linux):
×
400
                return None, None
×
401
            if not self._has_root_permissions(self.keyfile_linux):
×
402
                return
×
403
            with open(self.keyfile_linux) as f:
×
UNCOV
404
                key_hex = f.read()
×
UNCOV
405
        key = bytes.fromhex(key_hex)
×
406
        version = key[0]
×
407
        if version != PLUGIN_PASSWORD_VERSION:
×
408
            self.logger.info(f'unknown plugin password version: {version}')
×
409
            return None, None
×
410
        # all good
411
        salt = key[1:1+32]
×
412
        pubkey = key[1+32:]
×
413
        return pubkey, salt
×
414

415
    def get_external_plugin_dir(self) -> str:
5✔
416
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
417
        if not os.path.exists(pkg_path):
5✔
418
            os.mkdir(pkg_path)
5✔
419
        return pkg_path
5✔
420

421
    async def download_external_plugin(self, url: str) -> str:
5✔
422
        filename = os.path.basename(urlparse(url).path)
×
423
        pkg_path = self.get_external_plugin_dir()
×
424
        path = os.path.join(pkg_path, filename)
×
UNCOV
425
        if os.path.exists(path):
×
UNCOV
426
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
UNCOV
427
        async with aiohttp.ClientSession() as session:
×
UNCOV
428
            async with session.get(url) as resp:
×
UNCOV
429
                if resp.status == 200:
×
UNCOV
430
                    with open(path, 'wb') as fd:
×
UNCOV
431
                        async for chunk in resp.content.iter_chunked(10):
×
UNCOV
432
                            fd.write(chunk)
×
433
        return path
×
434

435
    def read_manifest(self, path) -> dict:
5✔
436
        """ return json dict """
437
        with zipfile_lib.ZipFile(path) as file:
×
438
            for filename in file.namelist():
×
439
                if filename.endswith('manifest.json'):
×
440
                    break
×
441
            else:
442
                raise Exception('could not find manifest.json in zip archive')
×
443
            with file.open(filename, 'r') as f:
×
444
                manifest = json.load(f)
×
UNCOV
445
                manifest['path'] = path  # external, path of the zipfile
×
UNCOV
446
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
UNCOV
447
                manifest['is_zip'] = True
×
448
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
449
                return manifest
×
450

451
    def zip_plugin_path(self, name) -> str:
5✔
UNCOV
452
        path = self.get_metadata(name)['path']
×
453
        filename = os.path.basename(path)
×
454
        if name in self.internal_plugin_metadata:
×
455
            pkg_path = self.pkgpath
×
456
        else:
457
            pkg_path = self.get_external_plugin_dir()
×
458
        return os.path.join(pkg_path, filename)
×
459

460
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
461
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
462
        if pkg_path is None:
5✔
463
            return
×
464
        for filename in os.listdir(pkg_path):
5✔
465
            path = os.path.join(pkg_path, filename)
×
466
            if not filename.endswith('.zip'):
×
UNCOV
467
                continue
×
468
            try:
×
469
                d = self.read_manifest(path)
×
UNCOV
470
                name = d['name']
×
UNCOV
471
            except Exception:
×
UNCOV
472
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
UNCOV
473
                continue
×
474
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
UNCOV
475
                self.logger.info(f"duplicate plugins for {name=}")
×
476
                continue
×
477
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
478
                continue
×
479
            min_version = d.get('min_electrum_version')
×
480
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
481
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
482
                continue
×
483
            max_version = d.get('max_electrum_version')
×
484
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
485
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
486
                continue
×
487

488
            if not self.cmd_only:
×
489
                gui_good = self.gui_name in d.get('available_for', [])
×
490
                if not gui_good:
×
491
                    continue
×
492
                if 'fullname' not in d:
×
493
                    continue
×
494
                details = d.get('registers_keystore')
×
495
                if details:
×
496
                    self.register_keystore(name, gui_good, details)
×
497
            if external:
×
UNCOV
498
                self.external_plugin_metadata[name] = d
×
499
            else:
500
                self.internal_plugin_metadata[name] = d
×
501

502
    def get(self, name):
5✔
503
        return self.plugins.get(name)
×
504

505
    def count(self):
5✔
506
        return len(self.plugins)
×
507

508
    def load_plugin(self, name) -> 'BasePlugin':
5✔
509
        """Imports the code of the given plugin.
510
        note: can be called from any thread.
511
        """
512
        if self.get_metadata(name):
5✔
513
            return self.load_plugin_by_name(name)
5✔
514
        else:
UNCOV
515
            raise Exception(f"could not find plugin {name!r}")
×
516

517
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
518
        """Loads the __init__.py module of the plugin if it is not already loaded."""
519
        is_external = name in self.external_plugin_metadata
5✔
520
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
521
        if base_name not in sys.modules:
5✔
522
            metadata = self.get_metadata(name)
5✔
523
            is_zip = metadata.get('is_zip', False)
5✔
524
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
525
            if not is_zip:
5✔
526
                if is_external:
5✔
527
                    # this branch is deprecated: external plugins are always zip files
UNCOV
528
                    path = os.path.join(metadata['path'], '__init__.py')
×
UNCOV
529
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
530
                else:
531
                    init_spec = importlib.util.find_spec(base_name)
5✔
532
            else:
UNCOV
533
                zipfile = zipimport.zipimporter(metadata['path'])
×
UNCOV
534
                dirname = metadata['dirname']
×
UNCOV
535
                init_spec = zipfile.find_spec(dirname)
×
536

537
            self.exec_module_from_spec(init_spec, base_name)
5✔
538

539
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
540
        if name in self.plugins:
5✔
541
            return self.plugins[name]
5✔
542
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
543
        self.maybe_load_plugin_init_method(name)
5✔
544
        is_external = name in self.external_plugin_metadata
5✔
545
        if is_external and not self.is_authorized(name):
5✔
546
            self.logger.info(f'plugin not authorized {name}')
×
UNCOV
547
            return
×
548
        if not is_external:
5✔
549
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
550
        else:
UNCOV
551
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
552

553
        spec = importlib.util.find_spec(full_name)
5✔
554
        if spec is None:
5✔
UNCOV
555
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
556
        try:
5✔
557
            module = self.exec_module_from_spec(spec, full_name)
5✔
558
            plugin = module.Plugin(self, self.config, name)
5✔
UNCOV
559
        except Exception as e:
×
UNCOV
560
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
561
        self.add_jobs(plugin.thread_jobs())
5✔
562
        self.plugins[name] = plugin
5✔
563
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
564
        return plugin
5✔
565

566
    def close_plugin(self, plugin):
5✔
UNCOV
567
        self.remove_jobs(plugin.thread_jobs())
×
568

569
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
570
        from hashlib import pbkdf2_hmac
×
571
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
UNCOV
572
        return ECPrivkey(secret)
×
573

574
    def install_internal_plugin(self, name):
5✔
UNCOV
575
        self.config.set_key(f'plugins.{name}.enabled', [])
×
576

577
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
578
        # uninstall old version first to get rid of old zip files when updating plugin
UNCOV
579
        self.uninstall(name)
×
UNCOV
580
        self.external_plugin_metadata[name] = manifest
×
581
        self.authorize_plugin(name, path, privkey)
×
582

583
    def uninstall(self, name: str):
5✔
UNCOV
584
        self.config.set_key(f'plugins.{name}', None)
×
UNCOV
585
        if name in self.external_plugin_metadata:
×
586
            zipfile = self.zip_plugin_path(name)
×
UNCOV
587
            os.unlink(zipfile)
×
UNCOV
588
            self.external_plugin_metadata.pop(name)
×
589

590
    def is_internal(self, name) -> bool:
5✔
591
        return name in self.internal_plugin_metadata
×
592

593
    def is_auto_loaded(self, name):
5✔
UNCOV
594
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
595
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
596

597
    def is_installed(self, name) -> bool:
5✔
598
        """an external plugin may be installed but not authorized """
599
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
600
            or name in self.external_plugin_metadata
601

602
    def is_authorized(self, name) -> bool:
5✔
UNCOV
603
        if name in self.internal_plugin_metadata:
×
UNCOV
604
            return True
×
605
        if name not in self.external_plugin_metadata:
×
606
            return False
×
UNCOV
607
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
UNCOV
608
        if not pubkey_bytes:
×
UNCOV
609
            return False
×
610
        if not self.is_plugin_zip(name):
×
UNCOV
611
            return False
×
UNCOV
612
        filename = self.zip_plugin_path(name)
×
UNCOV
613
        plugin_hash = get_file_hash256(filename)
×
614
        sig = self.config.get(f'plugins.{name}.authorized')
×
615
        if not sig:
×
616
            return False
×
617
        pubkey = ECPubkey(pubkey_bytes)
×
618
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
619

620
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
621
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
622
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
623
        plugin_hash = get_file_hash256(filename)
×
624
        sig = privkey.ecdsa_sign(plugin_hash)
×
625
        value = sig.hex()
×
626
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
627

628
    def enable(self, name: str) -> 'BasePlugin':
5✔
629
        self.config.enable_plugin(name)
×
UNCOV
630
        p = self.get(name)
×
UNCOV
631
        if p:
×
632
            return p
×
633
        return self.load_plugin(name)
×
634

635
    def disable(self, name: str) -> None:
5✔
636
        self.config.disable_plugin(name)
×
637
        p = self.get(name)
×
UNCOV
638
        if not p:
×
UNCOV
639
            return
×
640
        self.plugins.pop(name)
×
641
        p.close()
×
642
        self.logger.info(f"closed {name}")
×
643

644
    @classmethod
5✔
645
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
UNCOV
646
        return key.startswith('plugins.')
×
647

648
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
649
        d = self.descriptions.get(name)
×
650
        if not d:
×
651
            return False
×
652
        deps = d.get('requires', [])
×
653
        for dep, s in deps:
×
UNCOV
654
            try:
×
UNCOV
655
                __import__(dep)
×
UNCOV
656
            except ImportError as e:
×
657
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
UNCOV
658
                return False
×
UNCOV
659
        requires = d.get('requires_wallet_type', [])
×
660
        return not requires or wallet.wallet_type in requires
×
661

662
    def get_hardware_support(self):
5✔
663
        out = []
×
664
        for name, (gui_good, details) in self.hw_wallets.items():
×
665
            if gui_good:
×
666
                try:
×
667
                    p = self.get_plugin(name)
×
668
                    if p.is_available():
×
669
                        out.append(HardwarePluginToScan(name=name,
×
670
                                                        description=details[2],
671
                                                        plugin=p,
672
                                                        exception=None))
UNCOV
673
                except Exception as e:
×
674
                    self.logger.exception(f"cannot load plugin for: {name}")
×
675
                    out.append(HardwarePluginToScan(name=name,
×
676
                                                    description=details[2],
677
                                                    plugin=None,
678
                                                    exception=e))
679
        return out
×
680

681
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
682
        from .wallet import register_wallet_type, register_constructor
5✔
683
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
684

685
        def loader():
5✔
686
            plugin = self.get_plugin(name)
5✔
687
            register_constructor(wallet_type, plugin.wallet_class)
5✔
688
        register_wallet_type(wallet_type)
5✔
689
        plugin_loaders[wallet_type] = loader
5✔
690

691
    def register_keystore(self, name, gui_good, details):
5✔
692
        from .keystore import register_keystore
5✔
693

694
        def dynamic_constructor(d):
5✔
695
            return self.get_plugin(name).keystore_class(d)
5✔
696
        if details[0] == 'hardware':
5✔
697
            self.hw_wallets[name] = (gui_good, details)
5✔
698
            self.logger.info(f"registering hardware {name}: {details}")
5✔
699
            register_keystore(details[1], dynamic_constructor)
5✔
700

701
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
702
        if name not in self.plugins:
5✔
703
            self.load_plugin(name)
5✔
704
        return self.plugins[name]
5✔
705

706
    def is_plugin_zip(self, name: str) -> bool:
5✔
707
        """Returns True if the plugin is a zip file"""
UNCOV
708
        if (metadata := self.get_metadata(name)) is None:
×
UNCOV
709
            return False
×
UNCOV
710
        return metadata.get('is_zip', False)
×
711

712
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
713
        """Returns the metadata of the plugin"""
714
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
715
        if not metadata:
5✔
UNCOV
716
            return None
×
717
        return metadata
5✔
718

719
    def run(self):
5✔
720
        while self.is_running():
5✔
721
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
722
            self.run_jobs()
5✔
723
        self.on_stop()
5✔
724

725
    def read_file(self, name: str, filename: str) -> bytes:
5✔
UNCOV
726
        if self.is_plugin_zip(name):
×
727
            plugin_filename = self.zip_plugin_path(name)
×
UNCOV
728
            metadata = self.external_plugin_metadata[name]
×
UNCOV
729
            dirname = metadata['dirname']
×
UNCOV
730
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
UNCOV
731
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
UNCOV
732
                    return myfile.read()
×
733
        else:
UNCOV
734
            assert name in self.internal_plugin_metadata
×
UNCOV
735
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
UNCOV
736
            with open(path, 'rb') as myfile:
×
737
                return myfile.read()
×
738

739
def get_file_hash256(path: str) -> bytes:
5✔
740
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
741
    with open(path, 'rb') as f:
×
742
        return sha256(f.read())
×
743

744

745
def hook(func):
5✔
746
    hook_names.add(func.__name__)
5✔
747
    return func
5✔
748

749

750
def run_hook(name, *args):
5✔
751
    results = []
5✔
752
    f_list = hooks.get(name, [])
5✔
753
    for p, f in f_list:
5✔
754
        if p.is_enabled():
5✔
755
            try:
5✔
756
                r = f(*args)
5✔
UNCOV
757
            except Exception:
×
UNCOV
758
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
759
                r = False
×
760
            if r:
5✔
UNCOV
761
                results.append(r)
×
762

763
    if results:
5✔
UNCOV
764
        assert len(results) == 1, results
×
UNCOV
765
        return results[0]
×
766

767

768
class BasePlugin(Logger):
5✔
769

770
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
771
        self.parent = parent  # type: Plugins  # The plugins object
5✔
772
        self.name = name
5✔
773
        self.config = config
5✔
774
        Logger.__init__(self)
5✔
775
        # add self to hooks
776
        for k in dir(self):
5✔
777
            if k in hook_names:
5✔
778
                l = hooks.get(k, [])
5✔
779
                l.append((self, getattr(self, k)))
5✔
780
                hooks[k] = l
5✔
781

782
    def __str__(self):
5✔
UNCOV
783
        return self.name
×
784

785
    def close(self):
5✔
786
        # remove self from hooks
UNCOV
787
        for attr_name in dir(self):
×
UNCOV
788
            if attr_name in hook_names:
×
789
                # found attribute in self that is also the name of a hook
UNCOV
790
                l = hooks.get(attr_name, [])
×
UNCOV
791
                try:
×
UNCOV
792
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
793
                except ValueError:
×
794
                    # maybe attr name just collided with hook name and was not hook
UNCOV
795
                    continue
×
UNCOV
796
                hooks[attr_name] = l
×
UNCOV
797
        self.parent.close_plugin(self)
×
798
        self.on_close()
×
799

800
    def on_close(self):
5✔
801
        pass
×
802

803
    def requires_settings(self) -> bool:
5✔
804
        return False
×
805

806
    def thread_jobs(self):
5✔
807
        return []
5✔
808

809
    def is_enabled(self):
5✔
UNCOV
810
        if not self.is_available():
×
UNCOV
811
            return False
×
812
        return self.config.is_plugin_enabled(self.name)
×
813

814
    def is_available(self):
5✔
815
        return True
×
816

817
    def can_user_disable(self):
5✔
UNCOV
818
        return True
×
819

820
    def settings_widget(self, window):
5✔
821
        raise NotImplementedError()
×
822

823
    def settings_dialog(self, window):
5✔
UNCOV
824
        raise NotImplementedError()
×
825

826
    def read_file(self, filename: str) -> bytes:
5✔
UNCOV
827
        return self.parent.read_file(self.name, filename)
×
828

829

830
class DeviceUnpairableError(UserFacingException): pass
5✔
831
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
832
class CannotAutoSelectDevice(Exception): pass
5✔
833

834

835
class Device(NamedTuple):
5✔
836
    path: Union[str, bytes]
5✔
837
    interface_number: int
5✔
838
    id_: str
5✔
839
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
840
    usage_page: int
5✔
841
    transport_ui_string: str
5✔
842

843

844
class DeviceInfo(NamedTuple):
5✔
845
    device: Device
5✔
846
    label: Optional[str] = None
5✔
847
    initialized: Optional[bool] = None
5✔
848
    exception: Optional[Exception] = None
5✔
849
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
850
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
851
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
852

853

854
class HardwarePluginToScan(NamedTuple):
5✔
855
    name: str
5✔
856
    description: str
5✔
857
    plugin: Optional['HW_PluginBase']
5✔
858
    exception: Optional[Exception]
5✔
859

860

861
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
862

863

864
# hidapi is not thread-safe
865
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
866
#     https://github.com/libusb/hidapi/issues/45
867
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
868
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
869
# It is not entirely clear to me, exactly what is safe and what isn't, when
870
# using multiple threads...
871
# Hence, we use a single thread for all device communications, including
872
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
873
# the following thread:
874
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
875
    max_workers=1,
876
    thread_name_prefix='hwd_comms_thread'
877
)
878

879
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
880
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
881
# To keep it simple, let's just import it now, as we are likely in the main thread here.
882
if threading.current_thread() is not threading.main_thread():
5✔
UNCOV
883
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
884
try:
5✔
885
    import hid
5✔
886
except ImportError:
5✔
887
    pass
5✔
888

889

890
T = TypeVar('T')
5✔
891

892

893
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
894
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
895
        return func()
×
896
    else:
UNCOV
897
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
898
        return fut.result()
×
899
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
900

901

902
def runs_in_hwd_thread(func):
5✔
903
    @wraps(func)
5✔
904
    def wrapper(*args, **kwargs):
5✔
905
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
906
    return wrapper
5✔
907

908

909
def assert_runs_in_hwd_thread():
5✔
UNCOV
910
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
911
        raise Exception("must only be called from HWD communication thread")
×
912

913

914
class DeviceMgr(ThreadJob):
5✔
915
    """Manages hardware clients.  A client communicates over a hardware
916
    channel with the device.
917

918
    In addition to tracking device HID IDs, the device manager tracks
919
    hardware wallets and manages wallet pairing.  A HID ID may be
920
    paired with a wallet when it is confirmed that the hardware device
921
    matches the wallet, i.e. they have the same master public key.  A
922
    HID ID can be unpaired if e.g. it is wiped.
923

924
    Because of hotplugging, a wallet must request its client
925
    dynamically each time it is required, rather than caching it
926
    itself.
927

928
    The device manager is shared across plugins, so just one place
929
    does hardware scans when needed.  By tracking HID IDs, if a device
930
    is plugged into a different port the wallet is automatically
931
    re-paired.
932

933
    Wallets are informed on connect / disconnect events.  It must
934
    implement connected(), disconnected() callbacks.  Being connected
935
    implies a pairing.  Callbacks can happen in any thread context,
936
    and we do them without holding the lock.
937

938
    Confusingly, the HID ID (serial number) reported by the HID system
939
    doesn't match the device ID reported by the device itself.  We use
940
    the HID IDs.
941

942
    This plugin is thread-safe.  Currently only devices supported by
943
    hidapi are implemented."""
944

945
    def __init__(self, config: SimpleConfig):
5✔
946
        ThreadJob.__init__(self)
5✔
947
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
948
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
949
        # A client->id_ map. Needs self.lock.
950
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
951
        # What we recognise.  (vendor_id, product_id) -> Plugin
952
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
953
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
954
        # Custom enumerate functions for devices we don't know about.
955
        self._enumerate_func = set()  # Needs self.lock.
5✔
956

957
        self.lock = threading.RLock()
5✔
958

959
        self.config = config
5✔
960

961
    def thread_jobs(self):
5✔
962
        # Thread job to handle device timeouts
963
        return [self]
5✔
964

965
    def run(self):
5✔
966
        '''Handle device timeouts.  Runs in the context of the Plugins
967
        thread.'''
968
        with self.lock:
5✔
969
            clients = list(self.clients.keys())
5✔
970
        cutoff = time.time() - self.config.get_session_timeout()
5✔
971
        for client in clients:
5✔
UNCOV
972
            client.timeout(cutoff)
×
973

974
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
975
        for pair in device_pairs:
×
UNCOV
976
            self._recognised_hardware[pair] = plugin
×
977

978
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
UNCOV
979
        for vendor_id in vendor_ids:
×
UNCOV
980
            self._recognised_vendor[vendor_id] = plugin
×
981

982
    def register_enumerate_func(self, func):
5✔
983
        with self.lock:
×
UNCOV
984
            self._enumerate_func.add(func)
×
985

986
    @runs_in_hwd_thread
5✔
987
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
988
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
989
        # Get from cache first
990
        client = self._client_by_id(device.id_)
×
991
        if client:
×
UNCOV
992
            return client
×
UNCOV
993
        client = plugin.create_client(device, handler)
×
994
        if client:
×
995
            self.logger.info(f"Registering {client}")
×
UNCOV
996
            with self.lock:
×
UNCOV
997
                self.clients[client] = device.id_
×
UNCOV
998
        return client
×
999

1000
    def id_by_pairing_code(self, pairing_code):
5✔
1001
        with self.lock:
×
1002
            return self.pairing_code_to_id.get(pairing_code)
×
1003

1004
    def pairing_code_by_id(self, id_):
5✔
1005
        with self.lock:
×
1006
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1007
                if id2 == id_:
×
1008
                    return pairing_code
×
1009
            return None
×
1010

1011
    def unpair_pairing_code(self, pairing_code):
5✔
1012
        with self.lock:
×
1013
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
1014
                return
×
UNCOV
1015
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1016
        self._close_client(_id)
×
1017

1018
    def unpair_id(self, id_):
5✔
1019
        pairing_code = self.pairing_code_by_id(id_)
×
1020
        if pairing_code:
×
UNCOV
1021
            self.unpair_pairing_code(pairing_code)
×
1022
        else:
1023
            self._close_client(id_)
×
1024

1025
    def _close_client(self, id_):
5✔
1026
        with self.lock:
×
1027
            client = self._client_by_id(id_)
×
UNCOV
1028
            self.clients.pop(client, None)
×
UNCOV
1029
        if client:
×
1030
            client.close()
×
1031

1032
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
UNCOV
1033
        with self.lock:
×
1034
            for client, client_id in self.clients.items():
×
UNCOV
1035
                if client_id == id_:
×
UNCOV
1036
                    return client
×
1037
        return None
×
1038

1039
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1040
        '''Returns a client for the device ID if one is registered.  If
1041
        a device is wiped or in bootloader mode pairing is impossible;
1042
        in such cases we communicate by device ID and not wallet.'''
UNCOV
1043
        if scan_now:
×
1044
            self.scan_devices()
×
1045
        return self._client_by_id(id_)
×
1046

1047
    @runs_in_hwd_thread
5✔
1048
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1049
                            keystore: 'Hardware_KeyStore',
1050
                            force_pair: bool, *,
1051
                            devices: Sequence['Device'] = None,
1052
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
1053
        self.logger.info("getting client for keystore")
×
1054
        if handler is None:
×
1055
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1056
        handler.update_status(False)
×
UNCOV
1057
        pcode = keystore.pairing_code()
×
UNCOV
1058
        client = None
×
1059
        # search existing clients first (fast-path)
UNCOV
1060
        if not devices:
×
UNCOV
1061
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1062
        # search clients again, now allowing a (slow) scan
UNCOV
1063
        if client is None:
×
1064
            if devices is None:
×
1065
                devices = self.scan_devices()
×
1066
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1067
        if client is None and force_pair:
×
1068
            try:
×
1069
                info = self.select_device(plugin, handler, keystore, devices,
×
1070
                                          allow_user_interaction=allow_user_interaction)
1071
            except CannotAutoSelectDevice:
×
1072
                pass
×
1073
            else:
1074
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1075
        if client:
×
1076
            handler.update_status(True)
×
1077
            # note: if select_device was called, we might also update label etc here:
1078
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1079
        self.logger.info("end client for keystore")
×
1080
        return client
×
1081

1082
    def client_by_pairing_code(
5✔
1083
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1084
        devices: Sequence['Device'],
1085
    ) -> Optional['HardwareClientBase']:
1086
        _id = self.id_by_pairing_code(pairing_code)
×
1087
        client = self._client_by_id(_id)
×
UNCOV
1088
        if client:
×
1089
            if type(client.plugin) != type(plugin):
×
1090
                return
×
1091
            # An unpaired client might have another wallet's handler
1092
            # from a prior scan.  Replace to fix dialog parenting.
UNCOV
1093
            client.handler = handler
×
UNCOV
1094
            return client
×
1095

UNCOV
1096
        for device in devices:
×
1097
            if device.id_ == _id:
×
1098
                return self.create_client(device, handler, plugin)
×
1099

1100
    def force_pair_keystore(
5✔
1101
        self,
1102
        *,
1103
        plugin: 'HW_PluginBase',
1104
        handler: 'HardwareHandlerBase',
1105
        info: 'DeviceInfo',
1106
        keystore: 'Hardware_KeyStore',
1107
    ) -> 'HardwareClientBase':
1108
        xpub = keystore.xpub
×
1109
        derivation = keystore.get_derivation_prefix()
×
UNCOV
1110
        assert derivation is not None
×
UNCOV
1111
        xtype = bip32.xpub_type(xpub)
×
UNCOV
1112
        client = self._client_by_id(info.device.id_)
×
UNCOV
1113
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1114
            # See comment above for same code
UNCOV
1115
            client.handler = handler
×
1116
            # This will trigger a PIN/passphrase entry request
UNCOV
1117
            try:
×
UNCOV
1118
                client_xpub = client.get_xpub(derivation, xtype)
×
1119
            except (UserCancelled, RuntimeError):
×
1120
                # Bad / cancelled PIN / passphrase
1121
                client_xpub = None
×
1122
            if client_xpub == xpub:
×
1123
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1124
                with self.lock:
×
UNCOV
1125
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1126
                return client
×
1127

1128
        # The user input has wrong PIN or passphrase, or cancelled input,
1129
        # or it is not pairable
1130
        raise DeviceUnpairableError(
×
1131
            _('Electrum cannot pair with your {}.\n\n'
1132
              'Before you request bitcoins to be sent to addresses in this '
1133
              'wallet, ensure you can pair with your device, or that you have '
1134
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1135
              'receive will be unspendable.').format(plugin.device))
1136

1137
    def list_pairable_device_infos(
5✔
1138
        self,
1139
        *,
1140
        handler: Optional['HardwareHandlerBase'],
1141
        plugin: 'HW_PluginBase',
1142
        devices: Sequence['Device'] = None,
1143
        include_failing_clients: bool = False,
1144
    ) -> List['DeviceInfo']:
1145
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1146
        Already paired devices are also included, as it is okay to reuse them.
1147
        """
UNCOV
1148
        if not plugin.libraries_available:
×
UNCOV
1149
            message = plugin.get_library_not_available_message()
×
UNCOV
1150
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
1151
        if devices is None:
×
UNCOV
1152
            devices = self.scan_devices()
×
UNCOV
1153
        infos = []
×
UNCOV
1154
        for device in devices:
×
UNCOV
1155
            if not plugin.can_recognize_device(device):
×
UNCOV
1156
                continue
×
UNCOV
1157
            try:
×
UNCOV
1158
                client = self.create_client(device, handler, plugin)
×
1159
                if not client:
×
1160
                    continue
×
1161
                label = client.label()
×
1162
                is_initialized = client.is_initialized()
×
1163
                soft_device_id = client.get_soft_device_id()
×
1164
                model_name = client.device_model_name()
×
1165
            except Exception as e:
×
1166
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1167
                if include_failing_clients:
×
1168
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1169
                continue
×
1170
            infos.append(DeviceInfo(device=device,
×
1171
                                    label=label,
1172
                                    initialized=is_initialized,
1173
                                    plugin_name=plugin.name,
1174
                                    soft_device_id=soft_device_id,
1175
                                    model_name=model_name))
1176

1177
        return infos
×
1178

1179
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1180
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1181
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1182
        """Select the device to use for keystore."""
1183
        # ideally this should not be called from the GUI thread...
1184
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
1185
        while True:
×
UNCOV
1186
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
UNCOV
1187
            if infos:
×
1188
                break
×
UNCOV
1189
            if not allow_user_interaction:
×
UNCOV
1190
                raise CannotAutoSelectDevice()
×
UNCOV
1191
            msg = _('Please insert your {}').format(plugin.device)
×
UNCOV
1192
            msg += " ("
×
UNCOV
1193
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
UNCOV
1194
                msg += f"label: {keystore.label}, "
×
UNCOV
1195
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1196
            msg += ').\n\n{}\n\n{}'.format(
×
1197
                _('Verify the cable is connected and that '
1198
                  'no other application is using it.'),
1199
                _('Try to connect again?')
1200
            )
1201
            if not handler.yes_no_question(msg):
×
1202
                raise UserCancelled()
×
1203
            devices = None
×
1204

1205
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1206
        # method 1: select device by id
1207
        if keystore.soft_device_id:
×
UNCOV
1208
            for info in infos:
×
UNCOV
1209
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
1210
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
UNCOV
1211
                    return info
×
1212
        # method 2: select device by label
1213
        #           but only if not a placeholder label and only if there is no collision
1214
        device_labels = [info.label for info in infos]
×
UNCOV
1215
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1216
                and device_labels.count(keystore.label) == 1):
UNCOV
1217
            for info in infos:
×
1218
                if info.label == keystore.label:
×
1219
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1220
                    return info
×
1221
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1222
        #           saved for keystore anyway, select it
UNCOV
1223
        if (len(infos) == 1
×
1224
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1225
                and keystore.soft_device_id is None):
1226
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
UNCOV
1227
            return infos[0]
×
1228

1229
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1230
        if not allow_user_interaction:
×
1231
            raise CannotAutoSelectDevice()
×
1232
        # ask user to select device manually
UNCOV
1233
        msg = (
×
1234
                _("Could not automatically pair with device for given keystore.") + "\n"
1235
                + f"(keystore label: {keystore.label!r}, "
1236
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1237
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1238
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
1239
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1240
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1241
                                init=(_("initialized") if info.initialized else _("wiped")),
1242
                                transport=info.device.transport_ui_string,
1243
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1244
                        for info in infos]
UNCOV
1245
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1246
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
1247
        c = handler.query_choice(msg, descriptions)
×
1248
        if c is None:
×
1249
            raise UserCancelled()
×
1250
        info = infos[c]
×
UNCOV
1251
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1252
        # note: updated label/soft_device_id will be saved after pairing succeeds
UNCOV
1253
        return info
×
1254

1255
    @runs_in_hwd_thread
5✔
1256
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
UNCOV
1257
        try:
×
1258
            import hid  # noqa: F811
×
1259
        except ImportError:
×
1260
            return []
×
1261

1262
        devices = []
×
UNCOV
1263
        for d in hid.enumerate(0, 0):
×
1264
            vendor_id = d['vendor_id']
×
UNCOV
1265
            product_key = (vendor_id, d['product_id'])
×
UNCOV
1266
            plugin = None
×
UNCOV
1267
            if product_key in self._recognised_hardware:
×
1268
                plugin = self._recognised_hardware[product_key]
×
1269
            elif vendor_id in self._recognised_vendor:
×
1270
                plugin = self._recognised_vendor[vendor_id]
×
1271
            if plugin:
×
UNCOV
1272
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1273
                if device:
×
1274
                    devices.append(device)
×
1275
        return devices
×
1276

1277
    @runs_in_hwd_thread
5✔
1278
    @profiler
5✔
1279
    def scan_devices(self) -> Sequence['Device']:
5✔
1280
        self.logger.info("scanning devices...")
×
1281

1282
        # First see what's connected that we know about
1283
        devices = self._scan_devices_with_hid()
×
1284

1285
        # Let plugin handlers enumerate devices we don't know about
1286
        with self.lock:
×
UNCOV
1287
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
1288
        for f in enumerate_funcs:
×
UNCOV
1289
            try:
×
UNCOV
1290
                new_devices = f()
×
1291
            except BaseException as e:
×
UNCOV
1292
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1293
            else:
1294
                devices.extend(new_devices)
×
1295

1296
        # find out what was disconnected
1297
        client_ids = [dev.id_ for dev in devices]
×
1298
        disconnected_clients = []
×
1299
        with self.lock:
×
1300
            connected = {}
×
1301
            for client, id_ in self.clients.items():
×
1302
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1303
                    connected[client] = id_
×
1304
                else:
1305
                    disconnected_clients.append((client, id_))
×
UNCOV
1306
            self.clients = connected
×
1307

1308
        # Unpair disconnected devices
1309
        for client, id_ in disconnected_clients:
×
1310
            self.unpair_id(id_)
×
1311
            if client.handler:
×
1312
                client.handler.update_status(False)
×
1313

1314
        return devices
×
1315

1316
    @classmethod
5✔
1317
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
UNCOV
1318
        ret = {}
×
1319
        # add libusb
1320
        try:
×
1321
            import usb1
×
1322
        except Exception as e:
×
1323
            ret["libusb.version"] = None
×
1324
        else:
1325
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
UNCOV
1326
            try:
×
UNCOV
1327
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
1328
            except AttributeError:
×
1329
                ret["libusb.path"] = None
×
1330
        # add hidapi
1331
        try:
×
1332
            import hid  # noqa: F811
×
1333
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1334
        except Exception as e:
×
UNCOV
1335
            from importlib.metadata import version
×
1336
            try:
×
1337
                ret["hidapi.version"] = version("hidapi")
×
1338
            except ImportError:
×
1339
                ret["hidapi.version"] = None
×
1340
        return ret
×
1341

1342
    def trigger_pairings(
5✔
1343
            self,
1344
            keystores: Sequence['KeyStore'],
1345
            *,
1346
            allow_user_interaction: bool = True,
1347
            devices: Sequence['Device'] = None,
1348
    ) -> None:
1349
        """Given a list of keystores, try to pair each with a connected hardware device.
1350

1351
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1352
        try to pair each keystore individually. Consider the following scenario:
1353
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1354
        - assume none of the devices are paired yet
1355
        1. if we tried to individually pair keystores, we might try with ks1 first
1356
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1357
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1358
             which is confusing and error-prone. It's especially problematic if the hw device does
1359
             not support labels (such as Ledger), as then the user cannot easily distinguish
1360
             same-type devices. (see #4199)
1361
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1362
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1363
        """
UNCOV
1364
        from .keystore import Hardware_KeyStore
×
UNCOV
1365
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1366
        if not keystores:
×
UNCOV
1367
            return
×
UNCOV
1368
        if devices is None:
×
UNCOV
1369
            devices = self.scan_devices()
×
1370
        # first pair with all devices that can be auto-selected
UNCOV
1371
        for ks in keystores:
×
UNCOV
1372
            try:
×
UNCOV
1373
                ks.get_client(
×
1374
                    force_pair=True,
1375
                    allow_user_interaction=False,
1376
                    devices=devices,
1377
                )
1378
            except UserCancelled:
×
1379
                pass
×
1380
        if allow_user_interaction:
×
1381
            # now do manual selections
1382
            for ks in keystores:
×
1383
                try:
×
1384
                    ks.get_client(
×
1385
                        force_pair=True,
1386
                        allow_user_interaction=True,
1387
                        devices=devices,
1388
                    )
1389
                except UserCancelled:
×
1390
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc