• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6116353632894976

18 May 2025 10:08AM UTC coverage: 59.563% (-0.2%) from 59.759%
6116353632894976

push

CirrusCI

web-flow
Merge pull request #9765 from f321x/plugin_pubk_user_prompt

plugins: add functionality to allow setting plugin pubkey from gui

18 of 138 new or added lines in 1 file covered. (13.04%)

3 existing lines in 3 files now uncovered.

21581 of 36232 relevant lines covered (59.56%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.06
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
76
    keyfile_posix = '/etc/electrum/plugins_key'
5✔
77
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
78

79
    @profiler
5✔
80
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
81
        self.config = config
5✔
82
        self.cmd_only = cmd_only  # type: bool
5✔
83
        self.internal_plugin_metadata = {}
5✔
84
        self.external_plugin_metadata = {}
5✔
85
        if cmd_only:
5✔
86
            # only import the command modules of plugins
87
            Logger.__init__(self)
×
88
            self.find_plugins()
×
89
            self.load_plugins()
×
90
            return
×
91
        DaemonThread.__init__(self)
5✔
92
        self.device_manager = DeviceMgr(config)
5✔
93
        self.name = 'Plugins'  # set name of thread
5✔
94
        self.hw_wallets = {}
5✔
95
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
96
        self.gui_name = gui_name
5✔
97
        self.find_plugins()
5✔
98
        self.load_plugins()
5✔
99
        self.add_jobs(self.device_manager.thread_jobs())
5✔
100
        self.start()
5✔
101

102
    @property
5✔
103
    def descriptions(self):
5✔
104
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
105

106
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
107
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
108
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
109
        for loader, name, ispkg in iter_modules:
5✔
110
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
111
            #       once as data and once as code. To honor the "no duplicates" rule below,
112
            #       we exclude the ones packaged as *code*, here:
113
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
114
                continue
×
115
            module_path = os.path.join(pkg_path, name)
5✔
116
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
117
                continue
×
118
            try:
5✔
119
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
120
                    d = json.load(f)
5✔
121
            except FileNotFoundError:
×
122
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
123
                continue
×
124
            if 'fullname' not in d:
5✔
125
                continue
×
126
            d['path'] = module_path
5✔
127
            if not self.cmd_only:
5✔
128
                gui_good = self.gui_name in d.get('available_for', [])
5✔
129
                if not gui_good:
5✔
130
                    continue
5✔
131
                details = d.get('registers_wallet_type')
5✔
132
                if details:
5✔
133
                    self.register_wallet_type(name, gui_good, details)
5✔
134
                details = d.get('registers_keystore')
5✔
135
                if details:
5✔
136
                    self.register_keystore(name, gui_good, details)
5✔
137
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
138
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
139
                _logger.info(f"duplicate plugins? for {name=}")
×
140
                continue
×
141
            if not external:
5✔
142
                self.internal_plugin_metadata[name] = d
5✔
143
            else:
144
                self.external_plugin_metadata[name] = d
×
145

146
    @staticmethod
5✔
147
    def exec_module_from_spec(spec, path: str):
5✔
148
        if prev_fail := _exec_module_failure.get(path):
5✔
149
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
150
        try:
5✔
151
            module = importlib.util.module_from_spec(spec)
5✔
152
            # sys.modules needs to be modified for relative imports to work
153
            # see https://stackoverflow.com/a/50395128
154
            sys.modules[path] = module
5✔
155
            spec.loader.exec_module(module)
5✔
156
        except Exception as e:
×
157
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
158
            # so the import system knows it failed. If called again for the same plugin, we do not
159
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
160
            # might have defined commands).
161
            _exec_module_failure[path] = e
×
162
            if path in sys.modules:
×
163
                sys.modules.pop(path, None)
×
164
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
165
        return module
5✔
166

167
    def find_plugins(self):
5✔
168
        internal_plugins_path = (self.pkgpath, False)
5✔
169
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
170
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
171
            if pkg_path and os.path.exists(pkg_path):
5✔
172
                if not external:
5✔
173
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
174
                else:
175
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
176

177
    def load_plugins(self):
5✔
178
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
179
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
180
                try:
×
181
                    if self.cmd_only:  # only load init method to register commands
×
182
                        self.maybe_load_plugin_init_method(name)
×
183
                    else:
184
                        self.load_plugin_by_name(name)
×
185
                except BaseException as e:
×
186
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
187

188
    def _has_root_permissions(self, path):
5✔
189
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
190

191
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
192
        if sys.platform in ['windows', 'win32']:
×
193
            keyfile_path = self.keyfile_windows
×
194
            keyfile_help = _('This file can be edited with Regdit')
×
195
        elif 'ANDROID_DATA' in os.environ:
×
196
            raise Exception('platform not supported')
×
197
        else:
198
            # treat unknown platforms and macOS as linux-like
NEW
199
            keyfile_path = self.keyfile_posix
×
NEW
200
            keyfile_help = "" if not key_hex else "".join([
×
201
                                         _('The file must have root permissions'),
202
                                         ".\n\n",
203
                                         _("To set it you can also use the Auto-Setup or run "
204
                                           "the following terminal command"),
205
                                         ":\n\n",
206
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
207
            ])
UNCOV
208
        return keyfile_path, keyfile_help
×
209

210
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
211
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
NEW
212
        try:
×
NEW
213
            if sys.platform in ['windows', 'win32']:
×
NEW
214
                self._write_key_to_regedit_windows(pubkey_hex)
×
NEW
215
            elif 'ANDROID_DATA' in os.environ:
×
NEW
216
                raise Exception('platform not supported')
×
NEW
217
            elif sys.platform.startswith('darwin'):  # macOS
×
NEW
218
                self._write_key_to_root_file_macos(pubkey_hex)
×
219
            else:
NEW
220
                self._write_key_to_root_file_linux(pubkey_hex)
×
NEW
221
        except Exception:
×
NEW
222
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
NEW
223
            return False
×
NEW
224
        return True
×
225

226
    def try_auto_key_reset(self) -> bool:
5✔
NEW
227
        try:
×
NEW
228
            if sys.platform in ['windows', 'win32']:
×
NEW
229
                self._delete_plugin_key_from_windows_registry()
×
NEW
230
            elif 'ANDROID_DATA' in os.environ:
×
NEW
231
                raise Exception('platform not supported')
×
NEW
232
            elif sys.platform.startswith('darwin'):  # macOS
×
NEW
233
                self._delete_macos_plugin_keyfile()
×
234
            else:
NEW
235
                self._delete_linux_plugin_keyfile()
×
NEW
236
        except Exception:
×
NEW
237
            self.logger.exception(f'auto-reset of plugin key failed')
×
NEW
238
            return False
×
NEW
239
        return True
×
240

241
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
242
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
NEW
243
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
NEW
244
        sh_command = (
×
245
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
246
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
247
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
248
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
249
        )
NEW
250
        return sh_command
×
251

252
    @staticmethod
5✔
253
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
5✔
254
        """
255
        Inspired by
256
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
257
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
258
        """
NEW
259
        from shlex import quote
×
260

NEW
261
        def quote_shell(args):
×
NEW
262
            return " ".join(quote(arg) for arg in args)
×
263

NEW
264
        def quote_applescript(string):
×
NEW
265
            charmap = {
×
266
                "\n": "\\n",
267
                "\r": "\\r",
268
                "\t": "\\t",
269
                "\"": "\\\"",
270
                "\\": "\\\\",
271
            }
NEW
272
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
273

NEW
274
        commands = [
×
275
            "osascript",
276
            "-e",
277
            "do shell script %s "
278
            "with administrator privileges "
279
            "without altering line endings"
280
            % quote_applescript(quote_shell(commands))
281
        ]
NEW
282
        return commands
×
283

284
    @staticmethod
5✔
285
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
286
        """
287
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
288
        """
289
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
290
        # for the result of the process (returns no process handle)
NEW
291
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
NEW
292
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
293

294
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
NEW
295
        class SHELLEXECUTEINFO(Structure):
×
NEW
296
            _fields_ = [
×
297
                ('cbSize', DWORD),
298
                ('fMask', c_ulong),
299
                ('hwnd', HWND),
300
                ('lpVerb', LPCWSTR),
301
                ('lpFile', LPCWSTR),
302
                ('lpParameters', LPCWSTR),
303
                ('lpDirectory', LPCWSTR),
304
                ('nShow', c_ulong),
305
                ('hInstApp', HINSTANCE),
306
                ('lpIDList', c_ulong),
307
                ('lpClass', LPCWSTR),
308
                ('hkeyClass', HKEY),
309
                ('dwHotKey', DWORD),
310
                ('hIcon', HANDLE),
311
                ('hProcess', HANDLE)
312
            ]
313

NEW
314
        info = SHELLEXECUTEINFO()
×
NEW
315
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
NEW
316
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
NEW
317
        info.hwnd = None
×
NEW
318
        info.lpVerb = 'runas'  # run as administrator
×
NEW
319
        info.lpFile = 'reg.exe'  # the executable to run
×
NEW
320
        info.lpParameters = reg_exe_command  # the registry edit command
×
NEW
321
        info.lpDirectory = None
×
NEW
322
        info.nShow = 1
×
323

324
        # Execute and wait
NEW
325
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
NEW
326
            error = windll.kernel32.GetLastError()
×
NEW
327
            raise Exception(f'Error executing registry command: {error}')
×
328

329
        # block until the process is done or 5 sec timeout
NEW
330
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
331

332
        # Close handle
NEW
333
        windll.kernel32.CloseHandle(info.hProcess)
×
334

335
    @staticmethod
5✔
336
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
5✔
337
        """
338
        Executes the given commands in a subprocess and asserts that it was successful.
339
        """
NEW
340
        import subprocess
×
NEW
341
        process = subprocess.Popen(
×
342
            commands,
343
            stdout=subprocess.PIPE,
344
            stderr=subprocess.PIPE,
345
            text=True
346
        )
NEW
347
        stdout, stderr = process.communicate()
×
NEW
348
        if process.returncode != 0:
×
NEW
349
            raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
350

351
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
352
        """
353
        Spawns a pkexec subprocess to write the key to a file with root permissions.
354
        This will open an OS dialog asking for the root password. Can only succeed if
355
        the system has polkit installed.
356
        """
NEW
357
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
358

NEW
359
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
NEW
360
        commands = ['pkexec', 'sh', '-c', sh_command]
×
NEW
361
        self._execute_commands_in_subprocess(commands)
×
362

363
        # check if the key was written correctly
NEW
364
        with open(self.keyfile_posix, 'r') as f:
×
NEW
365
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
NEW
366
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
367

368
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
369
        """
370
        Deletes the root owned key file at self.keyfile_posix.
371
        """
NEW
372
        if not os.path.exists(self.keyfile_posix):
×
NEW
373
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
NEW
374
            return
×
NEW
375
        if not self._has_root_permissions(self.keyfile_posix):
×
NEW
376
            os.unlink(self.keyfile_posix)
×
NEW
377
            return
×
378

379
        # use pkexec to delete the file as root user
NEW
380
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
NEW
381
        self._execute_commands_in_subprocess(commands)
×
NEW
382
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
383

384
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
5✔
NEW
385
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
386

NEW
387
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
NEW
388
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
389

NEW
390
        self._execute_commands_in_subprocess(macos_commands)
×
NEW
391
        with open(self.keyfile_posix, 'r') as f:
×
NEW
392
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
NEW
393
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
394

395
    def _delete_macos_plugin_keyfile(self) -> None:
5✔
NEW
396
        if not os.path.exists(self.keyfile_posix):
×
NEW
397
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
NEW
398
            return
×
NEW
399
        if not self._has_root_permissions(self.keyfile_posix):
×
NEW
400
            os.unlink(self.keyfile_posix)
×
NEW
401
            return
×
402
        # use osascript to delete the file as root user
NEW
403
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
NEW
404
        self._execute_commands_in_subprocess(macos_commands)
×
NEW
405
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
406

407
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
408
        """
409
        Writes the key to the Windows registry with windows UAC prompt.
410
        """
NEW
411
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
412

NEW
413
        value_type = 'REG_SZ'
×
NEW
414
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
415

NEW
416
        self._run_win_regedit_as_admin(command)
×
417

418
        # check if the key was written correctly
NEW
419
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
420
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
NEW
421
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
NEW
422
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
423

424
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
425
        """
426
        Deletes the PluginsKey dir in the Windows registry.
427
        """
NEW
428
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
429

NEW
430
        command = f'delete "{self.keyfile_windows}" /f'
×
NEW
431
        self._run_win_regedit_as_admin(command)
×
432

NEW
433
        try:
×
434
            # do a sanity check to see if the key has been deleted
NEW
435
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
NEW
436
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
NEW
437
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
NEW
438
        except FileNotFoundError:
×
NEW
439
            pass
×
440

441
    def create_new_key(self, password:str) -> str:
5✔
442
        salt = os.urandom(32)
×
443
        privkey = self.derive_privkey(password, salt)
×
444
        pubkey = privkey.get_public_key_bytes()
×
445
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
446
        return key.hex()
×
447

448
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
449
        """
450
        returns pubkey, salt
451
        returns None, None if the pubkey has not been set
452
        """
453
        if sys.platform in ['windows', 'win32']:
×
454
            import winreg
×
455
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
456
                try:
×
457
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
458
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
459
                except Exception as e:
×
460
                    self.logger.info(f'winreg error: {e}')
×
461
                    return None, None
×
462
        elif 'ANDROID_DATA' in os.environ:
×
463
            return None, None
×
464
        else:
465
            # treat unknown platforms as linux-like
NEW
466
            if not os.path.exists(self.keyfile_posix):
×
467
                return None, None
×
NEW
468
            if not self._has_root_permissions(self.keyfile_posix):
×
469
                return
×
NEW
470
            with open(self.keyfile_posix) as f:
×
471
                key_hex = f.read()
×
NEW
472
        try:
×
NEW
473
            key = bytes.fromhex(key_hex)
×
NEW
474
            version = key[0]
×
NEW
475
        except Exception:
×
NEW
476
            self.logger.exception(f'{key_hex=} invalid')
×
NEW
477
            return None, None
×
478
        if version != PLUGIN_PASSWORD_VERSION:
×
479
            self.logger.info(f'unknown plugin password version: {version}')
×
480
            return None, None
×
481
        # all good
482
        salt = key[1:1+32]
×
483
        pubkey = key[1+32:]
×
484
        return pubkey, salt
×
485

486
    def get_external_plugin_dir(self) -> str:
5✔
487
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
488
        if not os.path.exists(pkg_path):
5✔
489
            os.mkdir(pkg_path)
5✔
490
        return pkg_path
5✔
491

492
    async def download_external_plugin(self, url: str) -> str:
5✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        async with aiohttp.ClientSession() as session:
×
499
            async with session.get(url) as resp:
×
500
                if resp.status == 200:
×
501
                    with open(path, 'wb') as fd:
×
502
                        async for chunk in resp.content.iter_chunked(10):
×
503
                            fd.write(chunk)
×
504
        return path
×
505

506
    def read_manifest(self, path) -> dict:
5✔
507
        """ return json dict """
508
        with zipfile_lib.ZipFile(path) as file:
×
509
            for filename in file.namelist():
×
510
                if filename.endswith('manifest.json'):
×
511
                    break
×
512
            else:
513
                raise Exception('could not find manifest.json in zip archive')
×
514
            with file.open(filename, 'r') as f:
×
515
                manifest = json.load(f)
×
516
                manifest['path'] = path  # external, path of the zipfile
×
517
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
518
                manifest['is_zip'] = True
×
519
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
520
                return manifest
×
521

522
    def zip_plugin_path(self, name) -> str:
5✔
523
        path = self.get_metadata(name)['path']
×
524
        filename = os.path.basename(path)
×
525
        if name in self.internal_plugin_metadata:
×
526
            pkg_path = self.pkgpath
×
527
        else:
528
            pkg_path = self.get_external_plugin_dir()
×
529
        return os.path.join(pkg_path, filename)
×
530

531
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
532
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
533
        if pkg_path is None:
5✔
534
            return
×
535
        for filename in os.listdir(pkg_path):
5✔
536
            path = os.path.join(pkg_path, filename)
×
537
            if not filename.endswith('.zip'):
×
538
                continue
×
539
            try:
×
540
                d = self.read_manifest(path)
×
541
                name = d['name']
×
542
            except Exception:
×
543
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
544
                continue
×
545
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
546
                self.logger.info(f"duplicate plugins for {name=}")
×
547
                continue
×
548
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
549
                continue
×
550
            min_version = d.get('min_electrum_version')
×
551
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
552
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
553
                continue
×
554
            max_version = d.get('max_electrum_version')
×
555
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
556
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
557
                continue
×
558

559
            if not self.cmd_only:
×
560
                gui_good = self.gui_name in d.get('available_for', [])
×
561
                if not gui_good:
×
562
                    continue
×
563
                if 'fullname' not in d:
×
564
                    continue
×
565
                details = d.get('registers_keystore')
×
566
                if details:
×
567
                    self.register_keystore(name, gui_good, details)
×
568
            if external:
×
569
                self.external_plugin_metadata[name] = d
×
570
            else:
571
                self.internal_plugin_metadata[name] = d
×
572

573
    def get(self, name):
5✔
574
        return self.plugins.get(name)
×
575

576
    def count(self):
5✔
577
        return len(self.plugins)
×
578

579
    def load_plugin(self, name) -> 'BasePlugin':
5✔
580
        """Imports the code of the given plugin.
581
        note: can be called from any thread.
582
        """
583
        if self.get_metadata(name):
5✔
584
            return self.load_plugin_by_name(name)
5✔
585
        else:
586
            raise Exception(f"could not find plugin {name!r}")
×
587

588
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
589
        """Loads the __init__.py module of the plugin if it is not already loaded."""
590
        is_external = name in self.external_plugin_metadata
5✔
591
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
592
        if base_name not in sys.modules:
5✔
593
            metadata = self.get_metadata(name)
5✔
594
            is_zip = metadata.get('is_zip', False)
5✔
595
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
596
            if not is_zip:
5✔
597
                if is_external:
5✔
598
                    # this branch is deprecated: external plugins are always zip files
599
                    path = os.path.join(metadata['path'], '__init__.py')
×
600
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
601
                else:
602
                    init_spec = importlib.util.find_spec(base_name)
5✔
603
            else:
604
                zipfile = zipimport.zipimporter(metadata['path'])
×
605
                dirname = metadata['dirname']
×
606
                init_spec = zipfile.find_spec(dirname)
×
607

608
            self.exec_module_from_spec(init_spec, base_name)
5✔
609

610
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
611
        if name in self.plugins:
5✔
612
            return self.plugins[name]
5✔
613
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
614
        self.maybe_load_plugin_init_method(name)
5✔
615
        is_external = name in self.external_plugin_metadata
5✔
616
        if is_external and not self.is_authorized(name):
5✔
617
            self.logger.info(f'plugin not authorized {name}')
×
618
            return
×
619
        if not is_external:
5✔
620
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
621
        else:
622
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
623

624
        spec = importlib.util.find_spec(full_name)
5✔
625
        if spec is None:
5✔
626
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
627
        try:
5✔
628
            module = self.exec_module_from_spec(spec, full_name)
5✔
629
            plugin = module.Plugin(self, self.config, name)
5✔
630
        except Exception as e:
×
631
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
632
        self.add_jobs(plugin.thread_jobs())
5✔
633
        self.plugins[name] = plugin
5✔
634
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
635
        return plugin
5✔
636

637
    def close_plugin(self, plugin):
5✔
638
        self.remove_jobs(plugin.thread_jobs())
×
639

640
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
641
        from hashlib import pbkdf2_hmac
×
642
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
643
        return ECPrivkey(secret)
×
644

645
    def install_internal_plugin(self, name):
5✔
646
        self.config.set_key(f'plugins.{name}.enabled', [])
×
647

648
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
649
        # uninstall old version first to get rid of old zip files when updating plugin
650
        self.uninstall(name)
×
651
        self.external_plugin_metadata[name] = manifest
×
652
        self.authorize_plugin(name, path, privkey)
×
653

654
    def uninstall(self, name: str):
5✔
NEW
655
        if self.config.get(f'plugins.{name}'):
×
NEW
656
            self.config.set_key(f'plugins.{name}', None)
×
657
        if name in self.external_plugin_metadata:
×
658
            zipfile = self.zip_plugin_path(name)
×
659
            os.unlink(zipfile)
×
660
            self.external_plugin_metadata.pop(name)
×
661

662
    def is_internal(self, name) -> bool:
5✔
663
        return name in self.internal_plugin_metadata
×
664

665
    def is_auto_loaded(self, name):
5✔
666
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
667
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
668

669
    def is_installed(self, name) -> bool:
5✔
670
        """an external plugin may be installed but not authorized """
671
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
672
            or name in self.external_plugin_metadata
673

674
    def is_authorized(self, name) -> bool:
5✔
675
        if name in self.internal_plugin_metadata:
×
676
            return True
×
677
        if name not in self.external_plugin_metadata:
×
678
            return False
×
679
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
680
        if not pubkey_bytes:
×
681
            return False
×
682
        if not self.is_plugin_zip(name):
×
683
            return False
×
684
        filename = self.zip_plugin_path(name)
×
685
        plugin_hash = get_file_hash256(filename)
×
686
        sig = self.config.get(f'plugins.{name}.authorized')
×
687
        if not sig:
×
688
            return False
×
689
        pubkey = ECPubkey(pubkey_bytes)
×
690
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
691

692
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
693
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
694
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
695
        plugin_hash = get_file_hash256(filename)
×
696
        sig = privkey.ecdsa_sign(plugin_hash)
×
697
        value = sig.hex()
×
698
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
699

700
    def enable(self, name: str) -> 'BasePlugin':
5✔
701
        self.config.enable_plugin(name)
×
702
        p = self.get(name)
×
703
        if p:
×
704
            return p
×
705
        return self.load_plugin(name)
×
706

707
    def disable(self, name: str) -> None:
5✔
708
        self.config.disable_plugin(name)
×
709
        p = self.get(name)
×
710
        if not p:
×
711
            return
×
712
        self.plugins.pop(name)
×
713
        p.close()
×
714
        self.logger.info(f"closed {name}")
×
715

716
    @classmethod
5✔
717
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
718
        return key.startswith('plugins.')
×
719

720
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
721
        d = self.descriptions.get(name)
×
722
        if not d:
×
723
            return False
×
724
        deps = d.get('requires', [])
×
725
        for dep, s in deps:
×
726
            try:
×
727
                __import__(dep)
×
728
            except ImportError as e:
×
729
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
730
                return False
×
731
        requires = d.get('requires_wallet_type', [])
×
732
        return not requires or wallet.wallet_type in requires
×
733

734
    def get_hardware_support(self):
5✔
735
        out = []
×
736
        for name, (gui_good, details) in self.hw_wallets.items():
×
737
            if gui_good:
×
738
                try:
×
739
                    p = self.get_plugin(name)
×
740
                    if p.is_available():
×
741
                        out.append(HardwarePluginToScan(name=name,
×
742
                                                        description=details[2],
743
                                                        plugin=p,
744
                                                        exception=None))
745
                except Exception as e:
×
746
                    self.logger.exception(f"cannot load plugin for: {name}")
×
747
                    out.append(HardwarePluginToScan(name=name,
×
748
                                                    description=details[2],
749
                                                    plugin=None,
750
                                                    exception=e))
751
        return out
×
752

753
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
754
        from .wallet import register_wallet_type, register_constructor
5✔
755
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
756

757
        def loader():
5✔
758
            plugin = self.get_plugin(name)
5✔
759
            register_constructor(wallet_type, plugin.wallet_class)
5✔
760
        register_wallet_type(wallet_type)
5✔
761
        plugin_loaders[wallet_type] = loader
5✔
762

763
    def register_keystore(self, name, gui_good, details):
5✔
764
        from .keystore import register_keystore
5✔
765

766
        def dynamic_constructor(d):
5✔
767
            return self.get_plugin(name).keystore_class(d)
5✔
768
        if details[0] == 'hardware':
5✔
769
            self.hw_wallets[name] = (gui_good, details)
5✔
770
            self.logger.info(f"registering hardware {name}: {details}")
5✔
771
            register_keystore(details[1], dynamic_constructor)
5✔
772

773
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
774
        if name not in self.plugins:
5✔
775
            self.load_plugin(name)
5✔
776
        return self.plugins[name]
5✔
777

778
    def is_plugin_zip(self, name: str) -> bool:
5✔
779
        """Returns True if the plugin is a zip file"""
780
        if (metadata := self.get_metadata(name)) is None:
×
781
            return False
×
782
        return metadata.get('is_zip', False)
×
783

784
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
785
        """Returns the metadata of the plugin"""
786
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
787
        if not metadata:
5✔
788
            return None
×
789
        return metadata
5✔
790

791
    def run(self):
5✔
792
        while self.is_running():
5✔
793
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
794
            self.run_jobs()
5✔
795
        self.on_stop()
5✔
796

797
    def read_file(self, name: str, filename: str) -> bytes:
5✔
798
        if self.is_plugin_zip(name):
×
799
            plugin_filename = self.zip_plugin_path(name)
×
800
            metadata = self.external_plugin_metadata[name]
×
801
            dirname = metadata['dirname']
×
802
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
803
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
804
                    return myfile.read()
×
805
        else:
806
            assert name in self.internal_plugin_metadata
×
807
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
808
            with open(path, 'rb') as myfile:
×
809
                return myfile.read()
×
810

811
def get_file_hash256(path: str) -> bytes:
5✔
812
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
813
    with open(path, 'rb') as f:
×
814
        return sha256(f.read())
×
815

816

817
def hook(func):
5✔
818
    hook_names.add(func.__name__)
5✔
819
    return func
5✔
820

821

822
def run_hook(name, *args):
5✔
823
    results = []
5✔
824
    f_list = hooks.get(name, [])
5✔
825
    for p, f in f_list:
5✔
826
        if p.is_enabled():
5✔
827
            try:
5✔
828
                r = f(*args)
5✔
829
            except Exception:
×
830
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
831
                r = False
×
832
            if r:
5✔
833
                results.append(r)
×
834

835
    if results:
5✔
836
        assert len(results) == 1, results
×
837
        return results[0]
×
838

839

840
class BasePlugin(Logger):
5✔
841

842
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
843
        self.parent = parent  # type: Plugins  # The plugins object
5✔
844
        self.name = name
5✔
845
        self.config = config
5✔
846
        Logger.__init__(self)
5✔
847
        # add self to hooks
848
        for k in dir(self):
5✔
849
            if k in hook_names:
5✔
850
                l = hooks.get(k, [])
5✔
851
                l.append((self, getattr(self, k)))
5✔
852
                hooks[k] = l
5✔
853

854
    def __str__(self):
5✔
855
        return self.name
×
856

857
    def close(self):
5✔
858
        # remove self from hooks
859
        for attr_name in dir(self):
×
860
            if attr_name in hook_names:
×
861
                # found attribute in self that is also the name of a hook
862
                l = hooks.get(attr_name, [])
×
863
                try:
×
864
                    l.remove((self, getattr(self, attr_name)))
×
865
                except ValueError:
×
866
                    # maybe attr name just collided with hook name and was not hook
867
                    continue
×
868
                hooks[attr_name] = l
×
869
        self.parent.close_plugin(self)
×
870
        self.on_close()
×
871

872
    def on_close(self):
5✔
873
        pass
×
874

875
    def requires_settings(self) -> bool:
5✔
876
        return False
×
877

878
    def thread_jobs(self):
5✔
879
        return []
5✔
880

881
    def is_enabled(self):
5✔
882
        if not self.is_available():
×
883
            return False
×
884
        return self.config.is_plugin_enabled(self.name)
×
885

886
    def is_available(self):
5✔
887
        return True
×
888

889
    def can_user_disable(self):
5✔
890
        return True
×
891

892
    def settings_widget(self, window):
5✔
893
        raise NotImplementedError()
×
894

895
    def settings_dialog(self, window):
5✔
896
        raise NotImplementedError()
×
897

898
    def read_file(self, filename: str) -> bytes:
5✔
899
        return self.parent.read_file(self.name, filename)
×
900

901
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
902
        """Returns a dict which is persisted in the per-wallet database."""
903
        plugin_storage = wallet.db.get_plugin_storage()
×
904
        return plugin_storage.setdefault(self.name, {})
×
905

906
class DeviceUnpairableError(UserFacingException): pass
5✔
907
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
908
class CannotAutoSelectDevice(Exception): pass
5✔
909

910

911
class Device(NamedTuple):
5✔
912
    path: Union[str, bytes]
5✔
913
    interface_number: int
5✔
914
    id_: str
5✔
915
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
916
    usage_page: int
5✔
917
    transport_ui_string: str
5✔
918

919

920
class DeviceInfo(NamedTuple):
5✔
921
    device: Device
5✔
922
    label: Optional[str] = None
5✔
923
    initialized: Optional[bool] = None
5✔
924
    exception: Optional[Exception] = None
5✔
925
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
926
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
927
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
928

929
    def label_for_device_select(self) -> str:
5✔
930
        return (
×
931
            "{label} ({maybe_model}{init}, {transport})"
932
            .format(
933
                label=self.label or _("An unnamed {}").format(self.plugin_name),
934
                init=(_("initialized") if self.initialized else _("wiped")),
935
                transport=self.device.transport_ui_string,
936
                maybe_model=f"{self.model_name}, " if self.model_name else ""
937
            )
938
        )
939

940

941
class HardwarePluginToScan(NamedTuple):
5✔
942
    name: str
5✔
943
    description: str
5✔
944
    plugin: Optional['HW_PluginBase']
5✔
945
    exception: Optional[Exception]
5✔
946

947

948
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
949

950

951
# hidapi is not thread-safe
952
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
953
#     https://github.com/libusb/hidapi/issues/45
954
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
955
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
956
# It is not entirely clear to me, exactly what is safe and what isn't, when
957
# using multiple threads...
958
# Hence, we use a single thread for all device communications, including
959
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
960
# the following thread:
961
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
962
    max_workers=1,
963
    thread_name_prefix='hwd_comms_thread'
964
)
965

966
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
967
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
968
# To keep it simple, let's just import it now, as we are likely in the main thread here.
969
if threading.current_thread() is not threading.main_thread():
5✔
970
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
971
try:
5✔
972
    import hid
5✔
973
except ImportError:
5✔
974
    pass
5✔
975

976

977
T = TypeVar('T')
5✔
978

979

980
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
981
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
982
        return func()
×
983
    else:
984
        fut = _hwd_comms_executor.submit(func)
×
985
        return fut.result()
×
986
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
987

988

989
def runs_in_hwd_thread(func):
5✔
990
    @wraps(func)
5✔
991
    def wrapper(*args, **kwargs):
5✔
992
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
993
    return wrapper
5✔
994

995

996
def assert_runs_in_hwd_thread():
5✔
997
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
998
        raise Exception("must only be called from HWD communication thread")
×
999

1000

1001
class DeviceMgr(ThreadJob):
5✔
1002
    """Manages hardware clients.  A client communicates over a hardware
1003
    channel with the device.
1004

1005
    In addition to tracking device HID IDs, the device manager tracks
1006
    hardware wallets and manages wallet pairing.  A HID ID may be
1007
    paired with a wallet when it is confirmed that the hardware device
1008
    matches the wallet, i.e. they have the same master public key.  A
1009
    HID ID can be unpaired if e.g. it is wiped.
1010

1011
    Because of hotplugging, a wallet must request its client
1012
    dynamically each time it is required, rather than caching it
1013
    itself.
1014

1015
    The device manager is shared across plugins, so just one place
1016
    does hardware scans when needed.  By tracking HID IDs, if a device
1017
    is plugged into a different port the wallet is automatically
1018
    re-paired.
1019

1020
    Wallets are informed on connect / disconnect events.  It must
1021
    implement connected(), disconnected() callbacks.  Being connected
1022
    implies a pairing.  Callbacks can happen in any thread context,
1023
    and we do them without holding the lock.
1024

1025
    Confusingly, the HID ID (serial number) reported by the HID system
1026
    doesn't match the device ID reported by the device itself.  We use
1027
    the HID IDs.
1028

1029
    This plugin is thread-safe.  Currently only devices supported by
1030
    hidapi are implemented."""
1031

1032
    def __init__(self, config: SimpleConfig):
5✔
1033
        ThreadJob.__init__(self)
5✔
1034
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1035
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
1036
        # A client->id_ map. Needs self.lock.
1037
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
1038
        # What we recognise.  (vendor_id, product_id) -> Plugin
1039
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
1040
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
1041
        # Custom enumerate functions for devices we don't know about.
1042
        self._enumerate_func = set()  # Needs self.lock.
5✔
1043

1044
        self.lock = threading.RLock()
5✔
1045

1046
        self.config = config
5✔
1047

1048
    def thread_jobs(self):
5✔
1049
        # Thread job to handle device timeouts
1050
        return [self]
5✔
1051

1052
    def run(self):
5✔
1053
        '''Handle device timeouts.  Runs in the context of the Plugins
1054
        thread.'''
1055
        with self.lock:
5✔
1056
            clients = list(self.clients.keys())
5✔
1057
        cutoff = time.time() - self.config.get_session_timeout()
5✔
1058
        for client in clients:
5✔
1059
            client.timeout(cutoff)
×
1060

1061
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1062
        for pair in device_pairs:
×
1063
            self._recognised_hardware[pair] = plugin
×
1064

1065
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1066
        for vendor_id in vendor_ids:
×
1067
            self._recognised_vendor[vendor_id] = plugin
×
1068

1069
    def register_enumerate_func(self, func):
5✔
1070
        with self.lock:
×
1071
            self._enumerate_func.add(func)
×
1072

1073
    @runs_in_hwd_thread
5✔
1074
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1075
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1076
        # Get from cache first
1077
        client = self._client_by_id(device.id_)
×
1078
        if client:
×
1079
            return client
×
1080
        client = plugin.create_client(device, handler)
×
1081
        if client:
×
1082
            self.logger.info(f"Registering {client}")
×
1083
            with self.lock:
×
1084
                self.clients[client] = device.id_
×
1085
        return client
×
1086

1087
    def id_by_pairing_code(self, pairing_code):
5✔
1088
        with self.lock:
×
1089
            return self.pairing_code_to_id.get(pairing_code)
×
1090

1091
    def pairing_code_by_id(self, id_):
5✔
1092
        with self.lock:
×
1093
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1094
                if id2 == id_:
×
1095
                    return pairing_code
×
1096
            return None
×
1097

1098
    def unpair_pairing_code(self, pairing_code):
5✔
1099
        with self.lock:
×
1100
            if pairing_code not in self.pairing_code_to_id:
×
1101
                return
×
1102
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1103
        self._close_client(_id)
×
1104

1105
    def unpair_id(self, id_):
5✔
1106
        pairing_code = self.pairing_code_by_id(id_)
×
1107
        if pairing_code:
×
1108
            self.unpair_pairing_code(pairing_code)
×
1109
        else:
1110
            self._close_client(id_)
×
1111

1112
    def _close_client(self, id_):
5✔
1113
        with self.lock:
×
1114
            client = self._client_by_id(id_)
×
1115
            self.clients.pop(client, None)
×
1116
        if client:
×
1117
            client.close()
×
1118

1119
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1120
        with self.lock:
×
1121
            for client, client_id in self.clients.items():
×
1122
                if client_id == id_:
×
1123
                    return client
×
1124
        return None
×
1125

1126
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1127
        '''Returns a client for the device ID if one is registered.  If
1128
        a device is wiped or in bootloader mode pairing is impossible;
1129
        in such cases we communicate by device ID and not wallet.'''
1130
        if scan_now:
×
1131
            self.scan_devices()
×
1132
        return self._client_by_id(id_)
×
1133

1134
    @runs_in_hwd_thread
5✔
1135
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1136
                            keystore: 'Hardware_KeyStore',
1137
                            force_pair: bool, *,
1138
                            devices: Sequence['Device'] = None,
1139
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1140
        self.logger.info("getting client for keystore")
×
1141
        if handler is None:
×
1142
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1143
        handler.update_status(False)
×
1144
        pcode = keystore.pairing_code()
×
1145
        client = None
×
1146
        # search existing clients first (fast-path)
1147
        if not devices:
×
1148
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1149
        # search clients again, now allowing a (slow) scan
1150
        if client is None:
×
1151
            if devices is None:
×
1152
                devices = self.scan_devices()
×
1153
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1154
        if client is None and force_pair:
×
1155
            try:
×
1156
                info = self.select_device(plugin, handler, keystore, devices,
×
1157
                                          allow_user_interaction=allow_user_interaction)
1158
            except CannotAutoSelectDevice:
×
1159
                pass
×
1160
            else:
1161
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1162
        if client:
×
1163
            handler.update_status(True)
×
1164
            # note: if select_device was called, we might also update label etc here:
1165
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1166
        self.logger.info("end client for keystore")
×
1167
        return client
×
1168

1169
    def client_by_pairing_code(
5✔
1170
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1171
        devices: Sequence['Device'],
1172
    ) -> Optional['HardwareClientBase']:
1173
        _id = self.id_by_pairing_code(pairing_code)
×
1174
        client = self._client_by_id(_id)
×
1175
        if client:
×
1176
            if type(client.plugin) != type(plugin):
×
1177
                return
×
1178
            # An unpaired client might have another wallet's handler
1179
            # from a prior scan.  Replace to fix dialog parenting.
1180
            client.handler = handler
×
1181
            return client
×
1182

1183
        for device in devices:
×
1184
            if device.id_ == _id:
×
1185
                return self.create_client(device, handler, plugin)
×
1186

1187
    def force_pair_keystore(
5✔
1188
        self,
1189
        *,
1190
        plugin: 'HW_PluginBase',
1191
        handler: 'HardwareHandlerBase',
1192
        info: 'DeviceInfo',
1193
        keystore: 'Hardware_KeyStore',
1194
    ) -> 'HardwareClientBase':
1195
        xpub = keystore.xpub
×
1196
        derivation = keystore.get_derivation_prefix()
×
1197
        assert derivation is not None
×
1198
        xtype = bip32.xpub_type(xpub)
×
1199
        client = self._client_by_id(info.device.id_)
×
1200
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1201
            # See comment above for same code
1202
            client.handler = handler
×
1203
            # This will trigger a PIN/passphrase entry request
1204
            try:
×
1205
                client_xpub = client.get_xpub(derivation, xtype)
×
1206
            except (UserCancelled, RuntimeError):
×
1207
                # Bad / cancelled PIN / passphrase
1208
                client_xpub = None
×
1209
            if client_xpub == xpub:
×
1210
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1211
                with self.lock:
×
1212
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1213
                return client
×
1214

1215
        # The user input has wrong PIN or passphrase, or cancelled input,
1216
        # or it is not pairable
1217
        raise DeviceUnpairableError(
×
1218
            _('Electrum cannot pair with your {}.\n\n'
1219
              'Before you request bitcoins to be sent to addresses in this '
1220
              'wallet, ensure you can pair with your device, or that you have '
1221
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1222
              'receive will be unspendable.').format(plugin.device))
1223

1224
    def list_pairable_device_infos(
5✔
1225
        self,
1226
        *,
1227
        handler: Optional['HardwareHandlerBase'],
1228
        plugin: 'HW_PluginBase',
1229
        devices: Sequence['Device'] = None,
1230
        include_failing_clients: bool = False,
1231
    ) -> List['DeviceInfo']:
1232
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1233
        Already paired devices are also included, as it is okay to reuse them.
1234
        """
1235
        if not plugin.libraries_available:
×
1236
            message = plugin.get_library_not_available_message()
×
1237
            raise HardwarePluginLibraryUnavailable(message)
×
1238
        if devices is None:
×
1239
            devices = self.scan_devices()
×
1240
        infos = []
×
1241
        for device in devices:
×
1242
            if not plugin.can_recognize_device(device):
×
1243
                continue
×
1244
            try:
×
1245
                client = self.create_client(device, handler, plugin)
×
1246
                if not client:
×
1247
                    continue
×
1248
                label = client.label()
×
1249
                is_initialized = client.is_initialized()
×
1250
                soft_device_id = client.get_soft_device_id()
×
1251
                model_name = client.device_model_name()
×
1252
            except Exception as e:
×
1253
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1254
                if include_failing_clients:
×
1255
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1256
                continue
×
1257
            infos.append(DeviceInfo(device=device,
×
1258
                                    label=label,
1259
                                    initialized=is_initialized,
1260
                                    plugin_name=plugin.name,
1261
                                    soft_device_id=soft_device_id,
1262
                                    model_name=model_name))
1263

1264
        return infos
×
1265

1266
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1267
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1268
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1269
        """Select the device to use for keystore."""
1270
        # ideally this should not be called from the GUI thread...
1271
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1272
        while True:
×
1273
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1274
            if infos:
×
1275
                break
×
1276
            if not allow_user_interaction:
×
1277
                raise CannotAutoSelectDevice()
×
1278
            msg = _('Please insert your {}').format(plugin.device)
×
1279
            msg += " ("
×
1280
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1281
                msg += f"label: {keystore.label}, "
×
1282
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1283
            msg += ').\n\n{}\n\n{}'.format(
×
1284
                _('Verify the cable is connected and that '
1285
                  'no other application is using it.'),
1286
                _('Try to connect again?')
1287
            )
1288
            if not handler.yes_no_question(msg):
×
1289
                raise UserCancelled()
×
1290
            devices = None
×
1291

1292
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1293
        # method 1: select device by id
1294
        if keystore.soft_device_id:
×
1295
            for info in infos:
×
1296
                if info.soft_device_id == keystore.soft_device_id:
×
1297
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1298
                    return info
×
1299
        # method 2: select device by label
1300
        #           but only if not a placeholder label and only if there is no collision
1301
        device_labels = [info.label for info in infos]
×
1302
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1303
                and device_labels.count(keystore.label) == 1):
1304
            for info in infos:
×
1305
                if info.label == keystore.label:
×
1306
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1307
                    return info
×
1308
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1309
        #           saved for keystore anyway, select it
1310
        if (len(infos) == 1
×
1311
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1312
                and keystore.soft_device_id is None):
1313
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1314
            return infos[0]
×
1315

1316
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1317
        if not allow_user_interaction:
×
1318
            raise CannotAutoSelectDevice()
×
1319
        # ask user to select device manually
1320
        msg = (
×
1321
                _("Could not automatically pair with device for given keystore.") + "\n"
1322
                + f"(keystore label: {keystore.label!r}, "
1323
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1324
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1325
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1326
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1327
                   for (idx, info) in enumerate(infos)]
1328
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1329
                          f"num options: {len(infos)}. options: {infos}")
1330
        c = handler.query_choice(msg, choices)
×
1331
        if c is None:
×
1332
            raise UserCancelled()
×
1333
        info = infos[c]
×
1334
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1335
        # note: updated label/soft_device_id will be saved after pairing succeeds
1336
        return info
×
1337

1338
    @runs_in_hwd_thread
5✔
1339
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1340
        try:
×
1341
            import hid  # noqa: F811
×
1342
        except ImportError:
×
1343
            return []
×
1344

1345
        devices = []
×
1346
        for d in hid.enumerate(0, 0):
×
1347
            vendor_id = d['vendor_id']
×
1348
            product_key = (vendor_id, d['product_id'])
×
1349
            plugin = None
×
1350
            if product_key in self._recognised_hardware:
×
1351
                plugin = self._recognised_hardware[product_key]
×
1352
            elif vendor_id in self._recognised_vendor:
×
1353
                plugin = self._recognised_vendor[vendor_id]
×
1354
            if plugin:
×
1355
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1356
                if device:
×
1357
                    devices.append(device)
×
1358
        return devices
×
1359

1360
    @runs_in_hwd_thread
5✔
1361
    @profiler
5✔
1362
    def scan_devices(self) -> Sequence['Device']:
5✔
1363
        self.logger.info("scanning devices...")
×
1364

1365
        # First see what's connected that we know about
1366
        devices = self._scan_devices_with_hid()
×
1367

1368
        # Let plugin handlers enumerate devices we don't know about
1369
        with self.lock:
×
1370
            enumerate_funcs = list(self._enumerate_func)
×
1371
        for f in enumerate_funcs:
×
1372
            try:
×
1373
                new_devices = f()
×
1374
            except BaseException as e:
×
1375
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1376
            else:
1377
                devices.extend(new_devices)
×
1378

1379
        # find out what was disconnected
1380
        client_ids = [dev.id_ for dev in devices]
×
1381
        disconnected_clients = []
×
1382
        with self.lock:
×
1383
            connected = {}
×
1384
            for client, id_ in self.clients.items():
×
1385
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1386
                    connected[client] = id_
×
1387
                else:
1388
                    disconnected_clients.append((client, id_))
×
1389
            self.clients = connected
×
1390

1391
        # Unpair disconnected devices
1392
        for client, id_ in disconnected_clients:
×
1393
            self.unpair_id(id_)
×
1394
            if client.handler:
×
1395
                client.handler.update_status(False)
×
1396

1397
        return devices
×
1398

1399
    @classmethod
5✔
1400
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1401
        ret = {}
×
1402
        # add libusb
1403
        try:
×
1404
            import usb1
×
1405
        except Exception as e:
×
1406
            ret["libusb.version"] = None
×
1407
        else:
1408
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1409
            try:
×
1410
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1411
            except AttributeError:
×
1412
                ret["libusb.path"] = None
×
1413
        # add hidapi
1414
        try:
×
1415
            import hid  # noqa: F811
×
1416
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1417
        except Exception as e:
×
1418
            from importlib.metadata import version
×
1419
            try:
×
1420
                ret["hidapi.version"] = version("hidapi")
×
1421
            except ImportError:
×
1422
                ret["hidapi.version"] = None
×
1423
        return ret
×
1424

1425
    def trigger_pairings(
5✔
1426
            self,
1427
            keystores: Sequence['KeyStore'],
1428
            *,
1429
            allow_user_interaction: bool = True,
1430
            devices: Sequence['Device'] = None,
1431
    ) -> None:
1432
        """Given a list of keystores, try to pair each with a connected hardware device.
1433

1434
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1435
        try to pair each keystore individually. Consider the following scenario:
1436
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1437
        - assume none of the devices are paired yet
1438
        1. if we tried to individually pair keystores, we might try with ks1 first
1439
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1440
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1441
             which is confusing and error-prone. It's especially problematic if the hw device does
1442
             not support labels (such as Ledger), as then the user cannot easily distinguish
1443
             same-type devices. (see #4199)
1444
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1445
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1446
        """
1447
        from .keystore import Hardware_KeyStore
×
1448
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1449
        if not keystores:
×
1450
            return
×
1451
        if devices is None:
×
1452
            devices = self.scan_devices()
×
1453
        # first pair with all devices that can be auto-selected
1454
        for ks in keystores:
×
1455
            try:
×
1456
                ks.get_client(
×
1457
                    force_pair=True,
1458
                    allow_user_interaction=False,
1459
                    devices=devices,
1460
                )
1461
            except UserCancelled:
×
1462
                pass
×
1463
        if allow_user_interaction:
×
1464
            # now do manual selections
1465
            for ks in keystores:
×
1466
                try:
×
1467
                    ks.get_client(
×
1468
                        force_pair=True,
1469
                        allow_user_interaction=True,
1470
                        devices=devices,
1471
                    )
1472
                except UserCancelled:
×
1473
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc