• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4874484173242368

08 May 2025 01:01PM UTC coverage: 59.579% (-0.1%) from 59.71%
4874484173242368

Pull #9765

CirrusCI

f321x
plugins: add functionality to allow setting plugin pubkey from gui

Adds functionality that allows the user to store the plugin authorization pubkey without having to edit files/registry manually.
On Linux systems it spawns the commands in a subprocess with pkexec which will trigger a OS prompt to execute the commands as root.
The user sees the executed commands and can either authorize with the root password or decline.
On windows it uses the windows `ShellExecuteExW` api to edit the registry, this also triggers a OS dialog to accept or decline (UAC dialog).
There is also functionality to reset the key again, which works in the same way.
Pull Request #9765: plugins: add functionality to allow setting plugin pubkey from gui

11 of 100 new or added lines in 1 file covered. (11.0%)

14 existing lines in 6 files now uncovered.

21529 of 36135 relevant lines covered (59.58%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.59
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
NEW
199
            if not key_hex:
×
NEW
200
                keyfile_help = ""
×
201
            else:
NEW
202
                keyfile_help = "".join([_('The file must have root permissions'),
×
203
                                         ".\n\n",
204
                                         _("To set it you can also use the Auto-Setup or run "
205
                                           "the following terminal command"),
206
                                         ":\n\n",
207
                                         f"sudo sh -c '{self._linux_plugin_key_creation_command(key_hex)}'",
208
                                        ])
UNCOV
209
        return keyfile_path, keyfile_help
×
210

211
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
212
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
NEW
213
        try:
×
NEW
214
            if sys.platform in ['windows', 'win32']:
×
NEW
215
                self._write_key_to_regedit_windows(pubkey_hex)
×
NEW
216
            elif 'ANDROID_DATA' in os.environ:
×
NEW
217
                raise Exception('platform not supported')
×
218
            else:
NEW
219
                self._write_key_to_root_file_linux(pubkey_hex)
×
NEW
220
        except Exception:
×
NEW
221
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
NEW
222
            return False
×
NEW
223
        return True
×
224

225
    def _linux_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
226
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
NEW
227
        dir_path: str = os.path.dirname(self.keyfile_linux)
×
NEW
228
        sh_command = (f"mkdir -p {dir_path} "  # create the /etc/electrum dir
×
229
                     f"&& echo -n '{pubkey_hex}' > {self.keyfile_linux} "  # write the key to the file
230
                     f"&& chmod 644 {self.keyfile_linux} "  # set read permissions for the file
231
                     f"&& chmod 755 {dir_path}")  # set read permissions for the dir
NEW
232
        return sh_command
×
233

234
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
235
        """
236
        Spawns a pkexec subprocess to write the key to a file with root permissions.
237
        This will open an OS dialog asking for the root password. Can only succeed if
238
        the system has polkit installed.
239
        """
NEW
240
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
NEW
241
        import subprocess
×
242

NEW
243
        sh_command = self._linux_plugin_key_creation_command(key_hex)
×
NEW
244
        process = subprocess.Popen(
×
245
            ['pkexec', 'sh', '-c', sh_command],
246
            stdout=subprocess.PIPE,
247
            stderr=subprocess.PIPE,
248
            text=True
249
        )
NEW
250
        stdout, stderr = process.communicate()
×
251

NEW
252
        if process.returncode != 0:
×
NEW
253
            raise Exception(f'error saving file ({process.returncode}): {stderr}')
×
254

NEW
255
        with open(self.keyfile_linux, 'r') as f:
×
NEW
256
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
NEW
257
        self.logger.debug(f'file saved successfully to {self.keyfile_linux}')
×
258

259
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
260
        """
261
        Writes the key to the Windows registry with windows UAC prompt.
262
        """
NEW
263
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
264

NEW
265
        value_type = 'REG_SZ'
×
NEW
266
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
267

NEW
268
        self._run_win_regedit_as_admin(command)
×
269

270
        # check if the key was written correctly
NEW
271
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
272
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
NEW
273
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
NEW
274
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
275

276
    def try_auto_key_reset(self) -> bool:
5✔
NEW
277
        try:
×
NEW
278
            if sys.platform in ['windows', 'win32']:
×
NEW
279
                self._delete_plugin_key_from_windows_registry()
×
NEW
280
            elif 'ANDROID_DATA' in os.environ:
×
NEW
281
                raise Exception('platform not supported')
×
282
            else:
NEW
283
                self._delete_linux_plugin_keyfile()
×
NEW
284
        except Exception:
×
NEW
285
            self.logger.exception(f'auto-reset of plugin key failed')
×
NEW
286
            return False
×
NEW
287
        return True
×
288

289
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
290
        """
291
        Deletes the root owned key file at self.keyfile_linux.
292
        """
NEW
293
        if not os.path.exists(self.keyfile_linux):
×
NEW
294
            self.logger.debug(f'file {self.keyfile_linux} does not exist')
×
NEW
295
            return
×
NEW
296
        if not self._has_root_permissions(self.keyfile_linux):
×
NEW
297
            os.unlink(self.keyfile_linux)
×
NEW
298
            return
×
299

300
        # use pkexec to delete the file as root user
NEW
301
        import subprocess
×
NEW
302
        process = subprocess.Popen(
×
303
                ['pkexec', 'rm', self.keyfile_linux],
304
                stdout=subprocess.PIPE,
305
                stderr=subprocess.PIPE,
306
                text=True
307
        )
NEW
308
        stdout, stderr = process.communicate()
×
NEW
309
        if process.returncode != 0:
×
NEW
310
            raise Exception(f'error removing file ({process.returncode}): {stderr}')
×
NEW
311
        assert not os.path.exists(self.keyfile_linux), f'file {self.keyfile_linux} still exists'
×
312

313
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
314
        """
315
        Deletes the PluginsKey dir in the Windows registry.
316
        """
NEW
317
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
318

NEW
319
        command = f'delete "{self.keyfile_windows}" /f'
×
NEW
320
        self._run_win_regedit_as_admin(command)
×
321

NEW
322
        try:
×
323
            # do a sanity check to see if the key has been deleted
NEW
324
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
325
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
NEW
326
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
NEW
327
        except FileNotFoundError:
×
NEW
328
            pass
×
329

330
    @staticmethod
5✔
331
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
332
        """
333
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
334
        """
335
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
336
        # for the result of the process (returns no process handle)
NEW
337
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
NEW
338
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
339

340
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
NEW
341
        class SHELLEXECUTEINFO(Structure):
×
NEW
342
            _fields_ = [
×
343
                ('cbSize', DWORD),
344
                ('fMask', c_ulong),
345
                ('hwnd', HWND),
346
                ('lpVerb', LPCWSTR),
347
                ('lpFile', LPCWSTR),
348
                ('lpParameters', LPCWSTR),
349
                ('lpDirectory', LPCWSTR),
350
                ('nShow', c_ulong),
351
                ('hInstApp', HINSTANCE),
352
                ('lpIDList', c_ulong),
353
                ('lpClass', LPCWSTR),
354
                ('hkeyClass', HKEY),
355
                ('dwHotKey', DWORD),
356
                ('hIcon', HANDLE),
357
                ('hProcess', HANDLE)
358
            ]
359

NEW
360
        info = SHELLEXECUTEINFO()
×
NEW
361
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
NEW
362
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
NEW
363
        info.hwnd = None
×
NEW
364
        info.lpVerb = 'runas'  # run as administrator
×
NEW
365
        info.lpFile = 'reg.exe'  # the executable to run
×
NEW
366
        info.lpParameters = reg_exe_command  # the registry edit command
×
NEW
367
        info.lpDirectory = None
×
NEW
368
        info.nShow = 1
×
369

370
        # Execute and wait
NEW
371
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
NEW
372
            error = windll.kernel32.GetLastError()
×
NEW
373
            raise Exception(f'Error executing registry command: {error}')
×
374

375
        # block until the process is done or 5 sec timeout
NEW
376
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
377

378
        # Close handle
NEW
379
        windll.kernel32.CloseHandle(info.hProcess)
×
380

381
    def create_new_key(self, password:str) -> str:
5✔
382
        salt = os.urandom(32)
×
383
        privkey = self.derive_privkey(password, salt)
×
384
        pubkey = privkey.get_public_key_bytes()
×
385
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
386
        return key.hex()
×
387

388
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
389
        """
390
        returns pubkey, salt
391
        returns None, None if the pubkey has not been set
392
        """
393
        if sys.platform in ['windows', 'win32']:
×
394
            import winreg
×
395
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
396
                try:
×
397
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
398
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
399
                except Exception as e:
×
400
                    self.logger.info(f'winreg error: {e}')
×
401
                    return None, None
×
402
        elif 'ANDROID_DATA' in os.environ:
×
403
            return None, None
×
404
        else:
405
            # treat unknown platforms as linux-like
406
            if not os.path.exists(self.keyfile_linux):
×
407
                return None, None
×
408
            if not self._has_root_permissions(self.keyfile_linux):
×
409
                return
×
410
            with open(self.keyfile_linux) as f:
×
411
                key_hex = f.read()
×
NEW
412
        try:
×
NEW
413
            key = bytes.fromhex(key_hex)
×
NEW
414
            version = key[0]
×
NEW
415
        except Exception:
×
NEW
416
            self.logger.exception(f'{key_hex=} invalid')
×
NEW
417
            return None, None
×
418
        if version != PLUGIN_PASSWORD_VERSION:
×
419
            self.logger.info(f'unknown plugin password version: {version}')
×
420
            return None, None
×
421
        # all good
422
        salt = key[1:1+32]
×
423
        pubkey = key[1+32:]
×
424
        return pubkey, salt
×
425

426
    def get_external_plugin_dir(self) -> str:
5✔
427
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
428
        if not os.path.exists(pkg_path):
5✔
429
            os.mkdir(pkg_path)
5✔
430
        return pkg_path
5✔
431

432
    async def download_external_plugin(self, url: str) -> str:
5✔
433
        filename = os.path.basename(urlparse(url).path)
×
434
        pkg_path = self.get_external_plugin_dir()
×
435
        path = os.path.join(pkg_path, filename)
×
436
        if os.path.exists(path):
×
437
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
438
        async with aiohttp.ClientSession() as session:
×
439
            async with session.get(url) as resp:
×
440
                if resp.status == 200:
×
441
                    with open(path, 'wb') as fd:
×
442
                        async for chunk in resp.content.iter_chunked(10):
×
443
                            fd.write(chunk)
×
444
        return path
×
445

446
    def read_manifest(self, path) -> dict:
5✔
447
        """ return json dict """
448
        with zipfile_lib.ZipFile(path) as file:
×
449
            for filename in file.namelist():
×
450
                if filename.endswith('manifest.json'):
×
451
                    break
×
452
            else:
453
                raise Exception('could not find manifest.json in zip archive')
×
454
            with file.open(filename, 'r') as f:
×
455
                manifest = json.load(f)
×
456
                manifest['path'] = path  # external, path of the zipfile
×
457
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
458
                manifest['is_zip'] = True
×
459
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
460
                return manifest
×
461

462
    def zip_plugin_path(self, name) -> str:
5✔
463
        path = self.get_metadata(name)['path']
×
464
        filename = os.path.basename(path)
×
465
        if name in self.internal_plugin_metadata:
×
466
            pkg_path = self.pkgpath
×
467
        else:
468
            pkg_path = self.get_external_plugin_dir()
×
469
        return os.path.join(pkg_path, filename)
×
470

471
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
472
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
473
        if pkg_path is None:
5✔
474
            return
×
475
        for filename in os.listdir(pkg_path):
5✔
476
            path = os.path.join(pkg_path, filename)
×
477
            if not filename.endswith('.zip'):
×
478
                continue
×
479
            try:
×
480
                d = self.read_manifest(path)
×
481
                name = d['name']
×
482
            except Exception:
×
483
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
484
                continue
×
485
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
486
                self.logger.info(f"duplicate plugins for {name=}")
×
487
                continue
×
488
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
489
                continue
×
490
            min_version = d.get('min_electrum_version')
×
491
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
492
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
493
                continue
×
494
            max_version = d.get('max_electrum_version')
×
495
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
496
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
497
                continue
×
498

499
            if not self.cmd_only:
×
500
                gui_good = self.gui_name in d.get('available_for', [])
×
501
                if not gui_good:
×
502
                    continue
×
503
                if 'fullname' not in d:
×
504
                    continue
×
505
                details = d.get('registers_keystore')
×
506
                if details:
×
507
                    self.register_keystore(name, gui_good, details)
×
508
            if external:
×
509
                self.external_plugin_metadata[name] = d
×
510
            else:
511
                self.internal_plugin_metadata[name] = d
×
512

513
    def get(self, name):
5✔
514
        return self.plugins.get(name)
×
515

516
    def count(self):
5✔
517
        return len(self.plugins)
×
518

519
    def load_plugin(self, name) -> 'BasePlugin':
5✔
520
        """Imports the code of the given plugin.
521
        note: can be called from any thread.
522
        """
523
        if self.get_metadata(name):
5✔
524
            return self.load_plugin_by_name(name)
5✔
525
        else:
526
            raise Exception(f"could not find plugin {name!r}")
×
527

528
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
529
        """Loads the __init__.py module of the plugin if it is not already loaded."""
530
        is_external = name in self.external_plugin_metadata
5✔
531
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
532
        if base_name not in sys.modules:
5✔
533
            metadata = self.get_metadata(name)
5✔
534
            is_zip = metadata.get('is_zip', False)
5✔
535
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
536
            if not is_zip:
5✔
537
                if is_external:
5✔
538
                    # this branch is deprecated: external plugins are always zip files
539
                    path = os.path.join(metadata['path'], '__init__.py')
×
540
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
541
                else:
542
                    init_spec = importlib.util.find_spec(base_name)
5✔
543
            else:
544
                zipfile = zipimport.zipimporter(metadata['path'])
×
545
                dirname = metadata['dirname']
×
546
                init_spec = zipfile.find_spec(dirname)
×
547

548
            self.exec_module_from_spec(init_spec, base_name)
5✔
549

550
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
551
        if name in self.plugins:
5✔
552
            return self.plugins[name]
5✔
553
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
554
        self.maybe_load_plugin_init_method(name)
5✔
555
        is_external = name in self.external_plugin_metadata
5✔
556
        if is_external and not self.is_authorized(name):
5✔
557
            self.logger.info(f'plugin not authorized {name}')
×
558
            return
×
559
        if not is_external:
5✔
560
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
561
        else:
562
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
563

564
        spec = importlib.util.find_spec(full_name)
5✔
565
        if spec is None:
5✔
566
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
567
        try:
5✔
568
            module = self.exec_module_from_spec(spec, full_name)
5✔
569
            plugin = module.Plugin(self, self.config, name)
5✔
570
        except Exception as e:
×
571
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
572
        self.add_jobs(plugin.thread_jobs())
5✔
573
        self.plugins[name] = plugin
5✔
574
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
575
        return plugin
5✔
576

577
    def close_plugin(self, plugin):
5✔
578
        self.remove_jobs(plugin.thread_jobs())
×
579

580
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
581
        from hashlib import pbkdf2_hmac
×
582
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
583
        return ECPrivkey(secret)
×
584

585
    def install_internal_plugin(self, name):
5✔
586
        self.config.set_key(f'plugins.{name}.enabled', [])
×
587

588
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
589
        # uninstall old version first to get rid of old zip files when updating plugin
590
        self.uninstall(name)
×
591
        self.external_plugin_metadata[name] = manifest
×
592
        self.authorize_plugin(name, path, privkey)
×
593

594
    def uninstall(self, name: str):
5✔
595
        self.config.set_key(f'plugins.{name}', None)
×
596
        if name in self.external_plugin_metadata:
×
597
            zipfile = self.zip_plugin_path(name)
×
598
            os.unlink(zipfile)
×
599
            self.external_plugin_metadata.pop(name)
×
600

601
    def is_internal(self, name) -> bool:
5✔
602
        return name in self.internal_plugin_metadata
×
603

604
    def is_auto_loaded(self, name):
5✔
605
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
606
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
607

608
    def is_installed(self, name) -> bool:
5✔
609
        """an external plugin may be installed but not authorized """
610
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
611
            or name in self.external_plugin_metadata
612

613
    def is_authorized(self, name) -> bool:
5✔
614
        if name in self.internal_plugin_metadata:
×
615
            return True
×
616
        if name not in self.external_plugin_metadata:
×
617
            return False
×
618
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
619
        if not pubkey_bytes:
×
620
            return False
×
621
        if not self.is_plugin_zip(name):
×
622
            return False
×
623
        filename = self.zip_plugin_path(name)
×
624
        plugin_hash = get_file_hash256(filename)
×
625
        sig = self.config.get(f'plugins.{name}.authorized')
×
626
        if not sig:
×
627
            return False
×
628
        pubkey = ECPubkey(pubkey_bytes)
×
629
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
630

631
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
632
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
633
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
634
        plugin_hash = get_file_hash256(filename)
×
635
        sig = privkey.ecdsa_sign(plugin_hash)
×
636
        value = sig.hex()
×
637
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
638

639
    def enable(self, name: str) -> 'BasePlugin':
5✔
640
        self.config.enable_plugin(name)
×
641
        p = self.get(name)
×
642
        if p:
×
643
            return p
×
644
        return self.load_plugin(name)
×
645

646
    def disable(self, name: str) -> None:
5✔
647
        self.config.disable_plugin(name)
×
648
        p = self.get(name)
×
649
        if not p:
×
650
            return
×
651
        self.plugins.pop(name)
×
652
        p.close()
×
653
        self.logger.info(f"closed {name}")
×
654

655
    @classmethod
5✔
656
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
657
        return key.startswith('plugins.')
×
658

659
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
660
        d = self.descriptions.get(name)
×
661
        if not d:
×
662
            return False
×
663
        deps = d.get('requires', [])
×
664
        for dep, s in deps:
×
665
            try:
×
666
                __import__(dep)
×
667
            except ImportError as e:
×
668
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
669
                return False
×
670
        requires = d.get('requires_wallet_type', [])
×
671
        return not requires or wallet.wallet_type in requires
×
672

673
    def get_hardware_support(self):
5✔
674
        out = []
×
675
        for name, (gui_good, details) in self.hw_wallets.items():
×
676
            if gui_good:
×
677
                try:
×
678
                    p = self.get_plugin(name)
×
679
                    if p.is_available():
×
680
                        out.append(HardwarePluginToScan(name=name,
×
681
                                                        description=details[2],
682
                                                        plugin=p,
683
                                                        exception=None))
684
                except Exception as e:
×
685
                    self.logger.exception(f"cannot load plugin for: {name}")
×
686
                    out.append(HardwarePluginToScan(name=name,
×
687
                                                    description=details[2],
688
                                                    plugin=None,
689
                                                    exception=e))
690
        return out
×
691

692
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
693
        from .wallet import register_wallet_type, register_constructor
5✔
694
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
695

696
        def loader():
5✔
697
            plugin = self.get_plugin(name)
5✔
698
            register_constructor(wallet_type, plugin.wallet_class)
5✔
699
        register_wallet_type(wallet_type)
5✔
700
        plugin_loaders[wallet_type] = loader
5✔
701

702
    def register_keystore(self, name, gui_good, details):
5✔
703
        from .keystore import register_keystore
5✔
704

705
        def dynamic_constructor(d):
5✔
706
            return self.get_plugin(name).keystore_class(d)
5✔
707
        if details[0] == 'hardware':
5✔
708
            self.hw_wallets[name] = (gui_good, details)
5✔
709
            self.logger.info(f"registering hardware {name}: {details}")
5✔
710
            register_keystore(details[1], dynamic_constructor)
5✔
711

712
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
713
        if name not in self.plugins:
5✔
714
            self.load_plugin(name)
5✔
715
        return self.plugins[name]
5✔
716

717
    def is_plugin_zip(self, name: str) -> bool:
5✔
718
        """Returns True if the plugin is a zip file"""
719
        if (metadata := self.get_metadata(name)) is None:
×
720
            return False
×
721
        return metadata.get('is_zip', False)
×
722

723
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
724
        """Returns the metadata of the plugin"""
725
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
726
        if not metadata:
5✔
727
            return None
×
728
        return metadata
5✔
729

730
    def run(self):
5✔
731
        while self.is_running():
5✔
732
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
733
            self.run_jobs()
5✔
734
        self.on_stop()
5✔
735

736
    def read_file(self, name: str, filename: str) -> bytes:
5✔
737
        if self.is_plugin_zip(name):
×
738
            plugin_filename = self.zip_plugin_path(name)
×
739
            metadata = self.external_plugin_metadata[name]
×
740
            dirname = metadata['dirname']
×
741
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
742
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
743
                    return myfile.read()
×
744
        else:
745
            assert name in self.internal_plugin_metadata
×
746
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
747
            with open(path, 'rb') as myfile:
×
748
                return myfile.read()
×
749

750
def get_file_hash256(path: str) -> bytes:
5✔
751
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
752
    with open(path, 'rb') as f:
×
753
        return sha256(f.read())
×
754

755

756
def hook(func):
5✔
757
    hook_names.add(func.__name__)
5✔
758
    return func
5✔
759

760

761
def run_hook(name, *args):
5✔
762
    results = []
5✔
763
    f_list = hooks.get(name, [])
5✔
764
    for p, f in f_list:
5✔
765
        if p.is_enabled():
5✔
766
            try:
5✔
767
                r = f(*args)
5✔
768
            except Exception:
×
769
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
770
                r = False
×
771
            if r:
5✔
772
                results.append(r)
×
773

774
    if results:
5✔
775
        assert len(results) == 1, results
×
776
        return results[0]
×
777

778

779
class BasePlugin(Logger):
5✔
780

781
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
782
        self.parent = parent  # type: Plugins  # The plugins object
5✔
783
        self.name = name
5✔
784
        self.config = config
5✔
785
        Logger.__init__(self)
5✔
786
        # add self to hooks
787
        for k in dir(self):
5✔
788
            if k in hook_names:
5✔
789
                l = hooks.get(k, [])
5✔
790
                l.append((self, getattr(self, k)))
5✔
791
                hooks[k] = l
5✔
792

793
    def __str__(self):
5✔
794
        return self.name
×
795

796
    def close(self):
5✔
797
        # remove self from hooks
798
        for attr_name in dir(self):
×
799
            if attr_name in hook_names:
×
800
                # found attribute in self that is also the name of a hook
801
                l = hooks.get(attr_name, [])
×
802
                try:
×
803
                    l.remove((self, getattr(self, attr_name)))
×
804
                except ValueError:
×
805
                    # maybe attr name just collided with hook name and was not hook
806
                    continue
×
807
                hooks[attr_name] = l
×
808
        self.parent.close_plugin(self)
×
809
        self.on_close()
×
810

811
    def on_close(self):
5✔
812
        pass
×
813

814
    def requires_settings(self) -> bool:
5✔
815
        return False
×
816

817
    def thread_jobs(self):
5✔
818
        return []
5✔
819

820
    def is_enabled(self):
5✔
821
        if not self.is_available():
×
822
            return False
×
823
        return self.config.is_plugin_enabled(self.name)
×
824

825
    def is_available(self):
5✔
826
        return True
×
827

828
    def can_user_disable(self):
5✔
829
        return True
×
830

831
    def settings_widget(self, window):
5✔
832
        raise NotImplementedError()
×
833

834
    def settings_dialog(self, window):
5✔
835
        raise NotImplementedError()
×
836

837
    def read_file(self, filename: str) -> bytes:
5✔
838
        return self.parent.read_file(self.name, filename)
×
839

840
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
841
        """Returns a dict which is persisted in the per-wallet database."""
842
        plugin_storage = wallet.db.get_plugin_storage()
×
843
        return plugin_storage.setdefault(self.name, {})
×
844

845
class DeviceUnpairableError(UserFacingException): pass
5✔
846
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
847
class CannotAutoSelectDevice(Exception): pass
5✔
848

849

850
class Device(NamedTuple):
5✔
851
    path: Union[str, bytes]
5✔
852
    interface_number: int
5✔
853
    id_: str
5✔
854
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
855
    usage_page: int
5✔
856
    transport_ui_string: str
5✔
857

858

859
class DeviceInfo(NamedTuple):
5✔
860
    device: Device
5✔
861
    label: Optional[str] = None
5✔
862
    initialized: Optional[bool] = None
5✔
863
    exception: Optional[Exception] = None
5✔
864
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
865
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
866
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
867

868
    def label_for_device_select(self) -> str:
5✔
869
        return (
×
870
            "{label} ({maybe_model}{init}, {transport})"
871
            .format(
872
                label=self.label or _("An unnamed {}").format(self.plugin_name),
873
                init=(_("initialized") if self.initialized else _("wiped")),
874
                transport=self.device.transport_ui_string,
875
                maybe_model=f"{self.model_name}, " if self.model_name else ""
876
            )
877
        )
878

879

880
class HardwarePluginToScan(NamedTuple):
5✔
881
    name: str
5✔
882
    description: str
5✔
883
    plugin: Optional['HW_PluginBase']
5✔
884
    exception: Optional[Exception]
5✔
885

886

887
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
888

889

890
# hidapi is not thread-safe
891
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
892
#     https://github.com/libusb/hidapi/issues/45
893
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
894
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
895
# It is not entirely clear to me, exactly what is safe and what isn't, when
896
# using multiple threads...
897
# Hence, we use a single thread for all device communications, including
898
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
899
# the following thread:
900
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
901
    max_workers=1,
902
    thread_name_prefix='hwd_comms_thread'
903
)
904

905
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
906
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
907
# To keep it simple, let's just import it now, as we are likely in the main thread here.
908
if threading.current_thread() is not threading.main_thread():
5✔
909
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
910
try:
5✔
911
    import hid
5✔
912
except ImportError:
5✔
913
    pass
5✔
914

915

916
T = TypeVar('T')
5✔
917

918

919
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
920
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
921
        return func()
×
922
    else:
923
        fut = _hwd_comms_executor.submit(func)
×
924
        return fut.result()
×
925
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
926

927

928
def runs_in_hwd_thread(func):
5✔
929
    @wraps(func)
5✔
930
    def wrapper(*args, **kwargs):
5✔
931
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
932
    return wrapper
5✔
933

934

935
def assert_runs_in_hwd_thread():
5✔
936
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
937
        raise Exception("must only be called from HWD communication thread")
×
938

939

940
class DeviceMgr(ThreadJob):
5✔
941
    """Manages hardware clients.  A client communicates over a hardware
942
    channel with the device.
943

944
    In addition to tracking device HID IDs, the device manager tracks
945
    hardware wallets and manages wallet pairing.  A HID ID may be
946
    paired with a wallet when it is confirmed that the hardware device
947
    matches the wallet, i.e. they have the same master public key.  A
948
    HID ID can be unpaired if e.g. it is wiped.
949

950
    Because of hotplugging, a wallet must request its client
951
    dynamically each time it is required, rather than caching it
952
    itself.
953

954
    The device manager is shared across plugins, so just one place
955
    does hardware scans when needed.  By tracking HID IDs, if a device
956
    is plugged into a different port the wallet is automatically
957
    re-paired.
958

959
    Wallets are informed on connect / disconnect events.  It must
960
    implement connected(), disconnected() callbacks.  Being connected
961
    implies a pairing.  Callbacks can happen in any thread context,
962
    and we do them without holding the lock.
963

964
    Confusingly, the HID ID (serial number) reported by the HID system
965
    doesn't match the device ID reported by the device itself.  We use
966
    the HID IDs.
967

968
    This plugin is thread-safe.  Currently only devices supported by
969
    hidapi are implemented."""
970

971
    def __init__(self, config: SimpleConfig):
5✔
972
        ThreadJob.__init__(self)
5✔
973
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
974
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
975
        # A client->id_ map. Needs self.lock.
976
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
977
        # What we recognise.  (vendor_id, product_id) -> Plugin
978
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
979
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
980
        # Custom enumerate functions for devices we don't know about.
981
        self._enumerate_func = set()  # Needs self.lock.
5✔
982

983
        self.lock = threading.RLock()
5✔
984

985
        self.config = config
5✔
986

987
    def thread_jobs(self):
5✔
988
        # Thread job to handle device timeouts
989
        return [self]
5✔
990

991
    def run(self):
5✔
992
        '''Handle device timeouts.  Runs in the context of the Plugins
993
        thread.'''
994
        with self.lock:
5✔
995
            clients = list(self.clients.keys())
5✔
996
        cutoff = time.time() - self.config.get_session_timeout()
5✔
997
        for client in clients:
5✔
998
            client.timeout(cutoff)
×
999

1000
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1001
        for pair in device_pairs:
×
1002
            self._recognised_hardware[pair] = plugin
×
1003

1004
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1005
        for vendor_id in vendor_ids:
×
1006
            self._recognised_vendor[vendor_id] = plugin
×
1007

1008
    def register_enumerate_func(self, func):
5✔
1009
        with self.lock:
×
1010
            self._enumerate_func.add(func)
×
1011

1012
    @runs_in_hwd_thread
5✔
1013
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1014
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1015
        # Get from cache first
1016
        client = self._client_by_id(device.id_)
×
1017
        if client:
×
1018
            return client
×
1019
        client = plugin.create_client(device, handler)
×
1020
        if client:
×
1021
            self.logger.info(f"Registering {client}")
×
1022
            with self.lock:
×
1023
                self.clients[client] = device.id_
×
1024
        return client
×
1025

1026
    def id_by_pairing_code(self, pairing_code):
5✔
1027
        with self.lock:
×
1028
            return self.pairing_code_to_id.get(pairing_code)
×
1029

1030
    def pairing_code_by_id(self, id_):
5✔
1031
        with self.lock:
×
1032
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1033
                if id2 == id_:
×
1034
                    return pairing_code
×
1035
            return None
×
1036

1037
    def unpair_pairing_code(self, pairing_code):
5✔
1038
        with self.lock:
×
1039
            if pairing_code not in self.pairing_code_to_id:
×
1040
                return
×
1041
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1042
        self._close_client(_id)
×
1043

1044
    def unpair_id(self, id_):
5✔
1045
        pairing_code = self.pairing_code_by_id(id_)
×
1046
        if pairing_code:
×
1047
            self.unpair_pairing_code(pairing_code)
×
1048
        else:
1049
            self._close_client(id_)
×
1050

1051
    def _close_client(self, id_):
5✔
1052
        with self.lock:
×
1053
            client = self._client_by_id(id_)
×
1054
            self.clients.pop(client, None)
×
1055
        if client:
×
1056
            client.close()
×
1057

1058
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1059
        with self.lock:
×
1060
            for client, client_id in self.clients.items():
×
1061
                if client_id == id_:
×
1062
                    return client
×
1063
        return None
×
1064

1065
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1066
        '''Returns a client for the device ID if one is registered.  If
1067
        a device is wiped or in bootloader mode pairing is impossible;
1068
        in such cases we communicate by device ID and not wallet.'''
1069
        if scan_now:
×
1070
            self.scan_devices()
×
1071
        return self._client_by_id(id_)
×
1072

1073
    @runs_in_hwd_thread
5✔
1074
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1075
                            keystore: 'Hardware_KeyStore',
1076
                            force_pair: bool, *,
1077
                            devices: Sequence['Device'] = None,
1078
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1079
        self.logger.info("getting client for keystore")
×
1080
        if handler is None:
×
1081
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1082
        handler.update_status(False)
×
1083
        pcode = keystore.pairing_code()
×
1084
        client = None
×
1085
        # search existing clients first (fast-path)
1086
        if not devices:
×
1087
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1088
        # search clients again, now allowing a (slow) scan
1089
        if client is None:
×
1090
            if devices is None:
×
1091
                devices = self.scan_devices()
×
1092
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1093
        if client is None and force_pair:
×
1094
            try:
×
1095
                info = self.select_device(plugin, handler, keystore, devices,
×
1096
                                          allow_user_interaction=allow_user_interaction)
1097
            except CannotAutoSelectDevice:
×
1098
                pass
×
1099
            else:
1100
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1101
        if client:
×
1102
            handler.update_status(True)
×
1103
            # note: if select_device was called, we might also update label etc here:
1104
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1105
        self.logger.info("end client for keystore")
×
1106
        return client
×
1107

1108
    def client_by_pairing_code(
5✔
1109
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1110
        devices: Sequence['Device'],
1111
    ) -> Optional['HardwareClientBase']:
1112
        _id = self.id_by_pairing_code(pairing_code)
×
1113
        client = self._client_by_id(_id)
×
1114
        if client:
×
1115
            if type(client.plugin) != type(plugin):
×
1116
                return
×
1117
            # An unpaired client might have another wallet's handler
1118
            # from a prior scan.  Replace to fix dialog parenting.
1119
            client.handler = handler
×
1120
            return client
×
1121

1122
        for device in devices:
×
1123
            if device.id_ == _id:
×
1124
                return self.create_client(device, handler, plugin)
×
1125

1126
    def force_pair_keystore(
5✔
1127
        self,
1128
        *,
1129
        plugin: 'HW_PluginBase',
1130
        handler: 'HardwareHandlerBase',
1131
        info: 'DeviceInfo',
1132
        keystore: 'Hardware_KeyStore',
1133
    ) -> 'HardwareClientBase':
1134
        xpub = keystore.xpub
×
1135
        derivation = keystore.get_derivation_prefix()
×
1136
        assert derivation is not None
×
1137
        xtype = bip32.xpub_type(xpub)
×
1138
        client = self._client_by_id(info.device.id_)
×
1139
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1140
            # See comment above for same code
1141
            client.handler = handler
×
1142
            # This will trigger a PIN/passphrase entry request
1143
            try:
×
1144
                client_xpub = client.get_xpub(derivation, xtype)
×
1145
            except (UserCancelled, RuntimeError):
×
1146
                # Bad / cancelled PIN / passphrase
1147
                client_xpub = None
×
1148
            if client_xpub == xpub:
×
1149
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1150
                with self.lock:
×
1151
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1152
                return client
×
1153

1154
        # The user input has wrong PIN or passphrase, or cancelled input,
1155
        # or it is not pairable
1156
        raise DeviceUnpairableError(
×
1157
            _('Electrum cannot pair with your {}.\n\n'
1158
              'Before you request bitcoins to be sent to addresses in this '
1159
              'wallet, ensure you can pair with your device, or that you have '
1160
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1161
              'receive will be unspendable.').format(plugin.device))
1162

1163
    def list_pairable_device_infos(
5✔
1164
        self,
1165
        *,
1166
        handler: Optional['HardwareHandlerBase'],
1167
        plugin: 'HW_PluginBase',
1168
        devices: Sequence['Device'] = None,
1169
        include_failing_clients: bool = False,
1170
    ) -> List['DeviceInfo']:
1171
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1172
        Already paired devices are also included, as it is okay to reuse them.
1173
        """
1174
        if not plugin.libraries_available:
×
1175
            message = plugin.get_library_not_available_message()
×
1176
            raise HardwarePluginLibraryUnavailable(message)
×
1177
        if devices is None:
×
1178
            devices = self.scan_devices()
×
1179
        infos = []
×
1180
        for device in devices:
×
1181
            if not plugin.can_recognize_device(device):
×
1182
                continue
×
1183
            try:
×
1184
                client = self.create_client(device, handler, plugin)
×
1185
                if not client:
×
1186
                    continue
×
1187
                label = client.label()
×
1188
                is_initialized = client.is_initialized()
×
1189
                soft_device_id = client.get_soft_device_id()
×
1190
                model_name = client.device_model_name()
×
1191
            except Exception as e:
×
1192
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1193
                if include_failing_clients:
×
1194
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1195
                continue
×
1196
            infos.append(DeviceInfo(device=device,
×
1197
                                    label=label,
1198
                                    initialized=is_initialized,
1199
                                    plugin_name=plugin.name,
1200
                                    soft_device_id=soft_device_id,
1201
                                    model_name=model_name))
1202

1203
        return infos
×
1204

1205
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1206
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1207
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1208
        """Select the device to use for keystore."""
1209
        # ideally this should not be called from the GUI thread...
1210
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1211
        while True:
×
1212
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1213
            if infos:
×
1214
                break
×
1215
            if not allow_user_interaction:
×
1216
                raise CannotAutoSelectDevice()
×
1217
            msg = _('Please insert your {}').format(plugin.device)
×
1218
            msg += " ("
×
1219
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1220
                msg += f"label: {keystore.label}, "
×
1221
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1222
            msg += ').\n\n{}\n\n{}'.format(
×
1223
                _('Verify the cable is connected and that '
1224
                  'no other application is using it.'),
1225
                _('Try to connect again?')
1226
            )
1227
            if not handler.yes_no_question(msg):
×
1228
                raise UserCancelled()
×
1229
            devices = None
×
1230

1231
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1232
        # method 1: select device by id
1233
        if keystore.soft_device_id:
×
1234
            for info in infos:
×
1235
                if info.soft_device_id == keystore.soft_device_id:
×
1236
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1237
                    return info
×
1238
        # method 2: select device by label
1239
        #           but only if not a placeholder label and only if there is no collision
1240
        device_labels = [info.label for info in infos]
×
1241
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1242
                and device_labels.count(keystore.label) == 1):
1243
            for info in infos:
×
1244
                if info.label == keystore.label:
×
1245
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1246
                    return info
×
1247
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1248
        #           saved for keystore anyway, select it
1249
        if (len(infos) == 1
×
1250
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1251
                and keystore.soft_device_id is None):
1252
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1253
            return infos[0]
×
1254

1255
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1256
        if not allow_user_interaction:
×
1257
            raise CannotAutoSelectDevice()
×
1258
        # ask user to select device manually
1259
        msg = (
×
1260
                _("Could not automatically pair with device for given keystore.") + "\n"
1261
                + f"(keystore label: {keystore.label!r}, "
1262
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1263
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1264
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1265
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1266
                   for (idx, info) in enumerate(infos)]
1267
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1268
                          f"num options: {len(infos)}. options: {infos}")
1269
        c = handler.query_choice(msg, choices)
×
1270
        if c is None:
×
1271
            raise UserCancelled()
×
1272
        info = infos[c]
×
1273
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1274
        # note: updated label/soft_device_id will be saved after pairing succeeds
1275
        return info
×
1276

1277
    @runs_in_hwd_thread
5✔
1278
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1279
        try:
×
1280
            import hid  # noqa: F811
×
1281
        except ImportError:
×
1282
            return []
×
1283

1284
        devices = []
×
1285
        for d in hid.enumerate(0, 0):
×
1286
            vendor_id = d['vendor_id']
×
1287
            product_key = (vendor_id, d['product_id'])
×
1288
            plugin = None
×
1289
            if product_key in self._recognised_hardware:
×
1290
                plugin = self._recognised_hardware[product_key]
×
1291
            elif vendor_id in self._recognised_vendor:
×
1292
                plugin = self._recognised_vendor[vendor_id]
×
1293
            if plugin:
×
1294
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1295
                if device:
×
1296
                    devices.append(device)
×
1297
        return devices
×
1298

1299
    @runs_in_hwd_thread
5✔
1300
    @profiler
5✔
1301
    def scan_devices(self) -> Sequence['Device']:
5✔
1302
        self.logger.info("scanning devices...")
×
1303

1304
        # First see what's connected that we know about
1305
        devices = self._scan_devices_with_hid()
×
1306

1307
        # Let plugin handlers enumerate devices we don't know about
1308
        with self.lock:
×
1309
            enumerate_funcs = list(self._enumerate_func)
×
1310
        for f in enumerate_funcs:
×
1311
            try:
×
1312
                new_devices = f()
×
1313
            except BaseException as e:
×
1314
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1315
            else:
1316
                devices.extend(new_devices)
×
1317

1318
        # find out what was disconnected
1319
        client_ids = [dev.id_ for dev in devices]
×
1320
        disconnected_clients = []
×
1321
        with self.lock:
×
1322
            connected = {}
×
1323
            for client, id_ in self.clients.items():
×
1324
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1325
                    connected[client] = id_
×
1326
                else:
1327
                    disconnected_clients.append((client, id_))
×
1328
            self.clients = connected
×
1329

1330
        # Unpair disconnected devices
1331
        for client, id_ in disconnected_clients:
×
1332
            self.unpair_id(id_)
×
1333
            if client.handler:
×
1334
                client.handler.update_status(False)
×
1335

1336
        return devices
×
1337

1338
    @classmethod
5✔
1339
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1340
        ret = {}
×
1341
        # add libusb
1342
        try:
×
1343
            import usb1
×
1344
        except Exception as e:
×
1345
            ret["libusb.version"] = None
×
1346
        else:
1347
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1348
            try:
×
1349
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1350
            except AttributeError:
×
1351
                ret["libusb.path"] = None
×
1352
        # add hidapi
1353
        try:
×
1354
            import hid  # noqa: F811
×
1355
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1356
        except Exception as e:
×
1357
            from importlib.metadata import version
×
1358
            try:
×
1359
                ret["hidapi.version"] = version("hidapi")
×
1360
            except ImportError:
×
1361
                ret["hidapi.version"] = None
×
1362
        return ret
×
1363

1364
    def trigger_pairings(
5✔
1365
            self,
1366
            keystores: Sequence['KeyStore'],
1367
            *,
1368
            allow_user_interaction: bool = True,
1369
            devices: Sequence['Device'] = None,
1370
    ) -> None:
1371
        """Given a list of keystores, try to pair each with a connected hardware device.
1372

1373
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1374
        try to pair each keystore individually. Consider the following scenario:
1375
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1376
        - assume none of the devices are paired yet
1377
        1. if we tried to individually pair keystores, we might try with ks1 first
1378
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1379
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1380
             which is confusing and error-prone. It's especially problematic if the hw device does
1381
             not support labels (such as Ledger), as then the user cannot easily distinguish
1382
             same-type devices. (see #4199)
1383
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1384
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1385
        """
1386
        from .keystore import Hardware_KeyStore
×
1387
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1388
        if not keystores:
×
1389
            return
×
1390
        if devices is None:
×
1391
            devices = self.scan_devices()
×
1392
        # first pair with all devices that can be auto-selected
1393
        for ks in keystores:
×
1394
            try:
×
1395
                ks.get_client(
×
1396
                    force_pair=True,
1397
                    allow_user_interaction=False,
1398
                    devices=devices,
1399
                )
1400
            except UserCancelled:
×
1401
                pass
×
1402
        if allow_user_interaction:
×
1403
            # now do manual selections
1404
            for ks in keystores:
×
1405
                try:
×
1406
                    ks.get_client(
×
1407
                        force_pair=True,
1408
                        allow_user_interaction=True,
1409
                        devices=devices,
1410
                    )
1411
                except UserCancelled:
×
1412
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc