• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4945183021727744

21 Jan 2026 03:28PM UTC coverage: 62.623% (-0.009%) from 62.632%
4945183021727744

push

CirrusCI

web-flow
Merge pull request #10429 from f321x/fix_deadlock

plugin: make DeviceMgr.run non-blocking, fix lock

7 of 15 new or added lines in 2 files covered. (46.67%)

2 existing lines in 2 files now uncovered.

23917 of 38192 relevant lines covered (62.62%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.82
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
1✔
27
import os
1✔
28
import pkgutil
1✔
29
import importlib.util
1✔
30
import time
1✔
31
import threading
1✔
32
import sys
1✔
33
import zipfile as zipfile_lib
1✔
34
from urllib.parse import urlparse
1✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
1✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
1✔
39
from concurrent.futures import Future
1✔
40
import zipimport
1✔
41
from functools import wraps, partial
1✔
42
from itertools import chain
1✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
1✔
45

46
from ._vendor.distutils.version import StrictVersion
1✔
47
from .version import ELECTRUM_VERSION
1✔
48
from .i18n import _
1✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
1✔
50
                   make_dir, make_aiohttp_session)
51
from . import bip32
1✔
52
from . import plugins
1✔
53
from .simple_config import SimpleConfig
1✔
54
from .logging import get_logger, Logger
1✔
55
from .crypto import sha256
1✔
56
from .network import Network
1✔
57

58
if TYPE_CHECKING:
59
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
60
    from .keystore import Hardware_KeyStore, KeyStore
61
    from .wallet import Abstract_Wallet
62

63

64
_logger = get_logger(__name__)
1✔
65
plugin_loaders = {}
1✔
66
hook_names = set()
1✔
67
hooks = {}
1✔
68
_exec_module_failure = {}  # type: Dict[str, Exception]
1✔
69

70
PLUGIN_PASSWORD_VERSION = 1
1✔
71

72

73
class Plugins(DaemonThread):
1✔
74

75
    pkgpath = os.path.dirname(plugins.__file__)
1✔
76
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
77
    keyfile_posix = '/etc/electrum/plugins_key'
1✔
78
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
1✔
79

80
    @profiler
1✔
81
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
1✔
82
        self.config = config
1✔
83
        self.cmd_only = cmd_only  # type: bool
1✔
84
        self.internal_plugin_metadata = {}
1✔
85
        self.external_plugin_metadata = {}
1✔
86
        if cmd_only:
1✔
87
            # only import the command modules of plugins
88
            Logger.__init__(self)
×
89
            self.find_plugins()
×
90
            self.load_plugins()
×
91
            return
×
92
        DaemonThread.__init__(self)
1✔
93
        self.device_manager = DeviceMgr(config)
1✔
94
        self.name = 'Plugins'  # set name of thread
1✔
95
        self._hw_wallets = {}
1✔
96
        self.plugins = {}  # type: Dict[str, BasePlugin]
1✔
97
        self.gui_name = gui_name
1✔
98
        self.find_plugins()
1✔
99
        self.load_plugins()
1✔
100
        self.add_jobs(self.device_manager.thread_jobs())
1✔
101
        self.start()
1✔
102

103
    @property
1✔
104
    def descriptions(self):
1✔
105
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
106

107
    def find_directory_plugins(self, pkg_path: str, external: bool):
1✔
108
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
109
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
1✔
110
        for loader, name, ispkg in iter_modules:
1✔
111
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
112
            #       once as data and once as code. To honor the "no duplicates" rule below,
113
            #       we exclude the ones packaged as *code*, here:
114
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
1✔
115
                continue
×
116
            module_path = os.path.join(pkg_path, name)
1✔
117
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
1✔
118
                continue
×
119
            try:
1✔
120
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
1✔
121
                    d = json.load(f)
1✔
122
            except FileNotFoundError:
×
123
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
124
                continue
×
125
            if 'fullname' not in d:
1✔
126
                continue
×
127
            d['path'] = module_path
1✔
128
            if not self.cmd_only:
1✔
129
                gui_good = self.gui_name in d.get('available_for', [])
1✔
130
                if not gui_good:
1✔
131
                    continue
1✔
132
                details = d.get('registers_wallet_type')
1✔
133
                if details:
1✔
134
                    self.register_wallet_type(name, gui_good, details)
1✔
135
                details = d.get('registers_keystore')
1✔
136
                if details:
1✔
137
                    self.register_keystore(name, details)
1✔
138
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
1✔
139
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
140
                _logger.info(f"duplicate plugins? for {name=}")
×
141
                continue
×
142
            if not external:
1✔
143
                self.internal_plugin_metadata[name] = d
1✔
144
            else:
145
                self.external_plugin_metadata[name] = d
×
146

147
    @staticmethod
1✔
148
    def exec_module_from_spec(spec, path: str):
1✔
149
        if prev_fail := _exec_module_failure.get(path):
1✔
150
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
151
        try:
1✔
152
            module = importlib.util.module_from_spec(spec)
1✔
153
            # sys.modules needs to be modified for relative imports to work
154
            # see https://stackoverflow.com/a/50395128
155
            sys.modules[path] = module
1✔
156
            spec.loader.exec_module(module)
1✔
157
        except Exception as e:
×
158
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
159
            # so the import system knows it failed. If called again for the same plugin, we do not
160
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
161
            # might have defined commands).
162
            _exec_module_failure[path] = e
×
163
            if path in sys.modules:
×
164
                sys.modules.pop(path, None)
×
165
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
166
        return module
1✔
167

168
    def find_plugins(self):
1✔
169
        internal_plugins_path = (self.pkgpath, False)
1✔
170
        external_plugins_path = (self.get_external_plugin_dir(), True)
1✔
171
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
1✔
172
            if pkg_path and os.path.exists(pkg_path):
1✔
173
                if not external:
1✔
174
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
1✔
175
                else:
176
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
1✔
177

178
    def load_plugins(self):
1✔
179
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
1✔
180
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
1✔
181
                try:
×
182
                    if self.cmd_only:  # only load init method to register commands
×
183
                        self.maybe_load_plugin_init_method(name)
×
184
                    else:
185
                        self.load_plugin_by_name(name)
×
186
                except BaseException as e:
×
187
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
188

189
    def _has_root_permissions(self, path):
1✔
190
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
191

192
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
1✔
193
        if sys.platform in ['windows', 'win32']:
×
194
            keyfile_path = self.keyfile_windows
×
195
            keyfile_help = _('This file can be edited with Regedit')
×
196
        elif 'ANDROID_DATA' in os.environ:
×
197
            raise Exception('platform not supported')
×
198
        else:
199
            # treat unknown platforms and macOS as linux-like
200
            keyfile_path = self.keyfile_posix
×
201
            keyfile_help = "" if not key_hex else "".join([
×
202
                                         _('The file must have root permissions'),
203
                                         ".\n\n",
204
                                         _("To set it you can also use the Auto-Setup or run "
205
                                           "the following terminal command"),
206
                                         ":\n\n",
207
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
208
            ])
209
        return keyfile_path, keyfile_help
×
210

211
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
1✔
212
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
213
        try:
×
214
            if sys.platform in ['windows', 'win32']:
×
215
                self._write_key_to_regedit_windows(pubkey_hex)
×
216
            elif 'ANDROID_DATA' in os.environ:
×
217
                raise Exception('platform not supported')
×
218
            elif sys.platform.startswith('darwin'):  # macOS
×
219
                self._write_key_to_root_file_macos(pubkey_hex)
×
220
            else:
221
                self._write_key_to_root_file_linux(pubkey_hex)
×
222
        except Exception:
×
223
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
224
            return False
×
225
        return True
×
226

227
    def try_auto_key_reset(self) -> bool:
1✔
228
        try:
×
229
            if sys.platform in ['windows', 'win32']:
×
230
                self._delete_plugin_key_from_windows_registry()
×
231
            elif 'ANDROID_DATA' in os.environ:
×
232
                raise Exception('platform not supported')
×
233
            elif sys.platform.startswith('darwin'):  # macOS
×
234
                self._delete_macos_plugin_keyfile()
×
235
            else:
236
                self._delete_linux_plugin_keyfile()
×
237
        except Exception:
×
238
            self.logger.exception(f'auto-reset of plugin key failed')
×
239
            return False
×
240
        return True
×
241

242
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
1✔
243
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
244
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
245
        sh_command = (
×
246
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
247
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
248
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
249
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
250
        )
251
        return sh_command
×
252

253
    @staticmethod
1✔
254
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
1✔
255
        """
256
        Inspired by
257
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
258
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
259
        """
260
        from shlex import quote
×
261

262
        def quote_shell(args):
×
263
            return " ".join(quote(arg) for arg in args)
×
264

265
        def quote_applescript(string):
×
266
            charmap = {
×
267
                "\n": "\\n",
268
                "\r": "\\r",
269
                "\t": "\\t",
270
                "\"": "\\\"",
271
                "\\": "\\\\",
272
            }
273
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
274

275
        commands = [
×
276
            "osascript",
277
            "-e",
278
            "do shell script %s "
279
            "with administrator privileges "
280
            "without altering line endings"
281
            % quote_applescript(quote_shell(commands))
282
        ]
283
        return commands
×
284

285
    @staticmethod
1✔
286
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
1✔
287
        """
288
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
289
        """
290
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
291
        # for the result of the process (returns no process handle)
292
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
293
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
294

295
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
296
        class SHELLEXECUTEINFO(Structure):
×
297
            _fields_ = [
×
298
                ('cbSize', DWORD),
299
                ('fMask', c_ulong),
300
                ('hwnd', HWND),
301
                ('lpVerb', LPCWSTR),
302
                ('lpFile', LPCWSTR),
303
                ('lpParameters', LPCWSTR),
304
                ('lpDirectory', LPCWSTR),
305
                ('nShow', c_ulong),
306
                ('hInstApp', HINSTANCE),
307
                ('lpIDList', c_ulong),
308
                ('lpClass', LPCWSTR),
309
                ('hkeyClass', HKEY),
310
                ('dwHotKey', DWORD),
311
                ('hIcon', HANDLE),
312
                ('hProcess', HANDLE)
313
            ]
314

315
        info = SHELLEXECUTEINFO()
×
316
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
317
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
318
        info.hwnd = None
×
319
        info.lpVerb = 'runas'  # run as administrator
×
320
        info.lpFile = 'reg.exe'  # the executable to run
×
321
        info.lpParameters = reg_exe_command  # the registry edit command
×
322
        info.lpDirectory = None
×
323
        info.nShow = 1
×
324

325
        # Execute and wait
326
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
327
            error = windll.kernel32.GetLastError()
×
328
            raise Exception(f'Error executing registry command: {error}')
×
329

330
        # block until the process is done or 5 sec timeout
331
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
332

333
        # Close handle
334
        windll.kernel32.CloseHandle(info.hProcess)
×
335

336
    @staticmethod
1✔
337
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
1✔
338
        """
339
        Executes the given commands in a subprocess and asserts that it was successful.
340
        """
341
        import subprocess
×
342
        with subprocess.Popen(
×
343
            commands,
344
            stdout=subprocess.PIPE,
345
            stderr=subprocess.PIPE,
346
            text=True,
347
        ) as process:
348
            stdout, stderr = process.communicate()
×
349
            if process.returncode != 0:
×
350
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
351

352
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
1✔
353
        """
354
        Spawns a pkexec subprocess to write the key to a file with root permissions.
355
        This will open an OS dialog asking for the root password. Can only succeed if
356
        the system has polkit installed.
357
        """
358
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
359

360
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
361
        commands = ['pkexec', 'sh', '-c', sh_command]
×
362
        self._execute_commands_in_subprocess(commands)
×
363

364
        # check if the key was written correctly
365
        with open(self.keyfile_posix, 'r') as f:
×
366
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
367
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
368

369
    def _delete_linux_plugin_keyfile(self) -> None:
1✔
370
        """
371
        Deletes the root owned key file at self.keyfile_posix.
372
        """
373
        if not os.path.exists(self.keyfile_posix):
×
374
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
375
            return
×
376
        if not self._has_root_permissions(self.keyfile_posix):
×
377
            os.unlink(self.keyfile_posix)
×
378
            return
×
379

380
        # use pkexec to delete the file as root user
381
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
382
        self._execute_commands_in_subprocess(commands)
×
383
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
384

385
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
1✔
386
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
387

388
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
389
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
390

391
        self._execute_commands_in_subprocess(macos_commands)
×
392
        with open(self.keyfile_posix, 'r') as f:
×
393
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
394
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
395

396
    def _delete_macos_plugin_keyfile(self) -> None:
1✔
397
        if not os.path.exists(self.keyfile_posix):
×
398
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
399
            return
×
400
        if not self._has_root_permissions(self.keyfile_posix):
×
401
            os.unlink(self.keyfile_posix)
×
402
            return
×
403
        # use osascript to delete the file as root user
404
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
405
        self._execute_commands_in_subprocess(macos_commands)
×
406
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
407

408
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
1✔
409
        """
410
        Writes the key to the Windows registry with windows UAC prompt.
411
        """
412
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
413

414
        value_type = 'REG_SZ'
×
415
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
416

417
        self._run_win_regedit_as_admin(command)
×
418

419
        # check if the key was written correctly
420
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
421
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
422
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
423
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
424

425
    def _delete_plugin_key_from_windows_registry(self) -> None:
1✔
426
        """
427
        Deletes the PluginsKey dir in the Windows registry.
428
        """
429
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
430

431
        command = f'delete "{self.keyfile_windows}" /f'
×
432
        self._run_win_regedit_as_admin(command)
×
433

434
        try:
×
435
            # do a sanity check to see if the key has been deleted
436
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
437
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
438
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
439
        except FileNotFoundError:
×
440
            pass
×
441

442
    def create_new_key(self, password:str) -> str:
1✔
443
        salt = os.urandom(32)
×
444
        privkey = self.derive_privkey(password, salt)
×
445
        pubkey = privkey.get_public_key_bytes()
×
446
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
447
        return key.hex()
×
448

449
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], Optional[bytes]]:
1✔
450
        """
451
        returns pubkey, salt
452
        returns None, None if the pubkey has not been set
453
        """
454
        if sys.platform in ['windows', 'win32']:
×
455
            import winreg
×
456
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
457
                try:
×
458
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
459
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
460
                except Exception as e:
×
461
                    self.logger.info(f'winreg error: {e}')
×
462
                    return None, None
×
463
        elif 'ANDROID_DATA' in os.environ:
×
464
            return None, None
×
465
        else:
466
            # treat unknown platforms as linux-like
467
            if not os.path.exists(self.keyfile_posix):
×
468
                return None, None
×
469
            if not self._has_root_permissions(self.keyfile_posix):
×
470
                return None, None
×
471
            with open(self.keyfile_posix) as f:
×
472
                key_hex = f.read()
×
473
        try:
×
474
            key = bytes.fromhex(key_hex)
×
475
            version = key[0]
×
476
        except Exception:
×
477
            self.logger.exception(f'{key_hex=} invalid')
×
478
            return None, None
×
479
        if version != PLUGIN_PASSWORD_VERSION:
×
480
            self.logger.info(f'unknown plugin password version: {version}')
×
481
            return None, None
×
482
        # all good
483
        salt = key[1:1+32]
×
484
        pubkey = key[1+32:]
×
485
        return pubkey, salt
×
486

487
    def get_external_plugin_dir(self) -> str:
1✔
488
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
1✔
489
        make_dir(pkg_path)
1✔
490
        return pkg_path
1✔
491

492
    async def download_external_plugin(self, url: str) -> str:
1✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        network = Network.get_instance()
×
499
        proxy = network.proxy if network else None
×
500
        async with make_aiohttp_session(proxy=proxy) as session:
×
501
            async with session.get(url) as resp:
×
502
                if resp.status == 200:
×
503
                    with open(path, 'wb') as fd:
×
504
                        async for chunk in resp.content.iter_chunked(10):
×
505
                            fd.write(chunk)
×
506
        return path
×
507

508
    def read_manifest(self, path) -> dict:
1✔
509
        """ return json dict """
510
        with zipfile_lib.ZipFile(path) as file:
×
511
            for filename in file.namelist():
×
512
                if filename.endswith('manifest.json'):
×
513
                    break
×
514
            else:
515
                raise Exception('could not find manifest.json in zip archive')
×
516
            with file.open(filename, 'r') as f:
×
517
                manifest = json.load(f)
×
518
                manifest['path'] = path  # external, path of the zipfile
×
519
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
520
                manifest['is_zip'] = True
×
521
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
522
                return manifest
×
523

524
    def zip_plugin_path(self, name) -> str:
1✔
525
        path = self.get_metadata(name)['path']
×
526
        filename = os.path.basename(path)
×
527
        if name in self.internal_plugin_metadata:
×
528
            pkg_path = self.pkgpath
×
529
        else:
530
            pkg_path = self.get_external_plugin_dir()
×
531
        return os.path.join(pkg_path, filename)
×
532

533
    def find_zip_plugins(self, pkg_path: str, external: bool):
1✔
534
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
535
        if pkg_path is None:
1✔
536
            return
×
537
        for filename in os.listdir(pkg_path):
1✔
538
            path = os.path.join(pkg_path, filename)
×
539
            if not filename.endswith('.zip'):
×
540
                continue
×
541
            try:
×
542
                d = self.read_manifest(path)
×
543
                name = d['name']
×
544
            except Exception:
×
545
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
546
                continue
×
547
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
548
                self.logger.info(f"duplicate plugins for {name=}")
×
549
                continue
×
550
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
551
                continue
×
552
            min_version = d.get('min_electrum_version')
×
553
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
554
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
555
                continue
×
556
            max_version = d.get('max_electrum_version')
×
557
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
558
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
559
                continue
×
560

561
            if not self.cmd_only:
×
562
                gui_good = self.gui_name in d.get('available_for', [])
×
563
                if not gui_good:
×
564
                    continue
×
565
                if 'fullname' not in d:
×
566
                    continue
×
567
                details = d.get('registers_keystore')
×
568
                if details:
×
569
                    self.register_keystore(name, details)
×
570
            if external:
×
571
                self.external_plugin_metadata[name] = d
×
572
            else:
573
                self.internal_plugin_metadata[name] = d
×
574

575
    def get(self, name):
1✔
576
        return self.plugins.get(name)
×
577

578
    def count(self):
1✔
579
        return len(self.plugins)
×
580

581
    def load_plugin(self, name) -> 'BasePlugin':
1✔
582
        """Imports the code of the given plugin.
583
        note: can be called from any thread.
584
        """
585
        if self.get_metadata(name):
1✔
586
            return self.load_plugin_by_name(name)
1✔
587
        else:
588
            raise Exception(f"could not find plugin {name!r}")
×
589

590
    def maybe_load_plugin_init_method(self, name: str) -> None:
1✔
591
        """Loads the __init__.py module of the plugin if it is not already loaded."""
592
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
1✔
593
        if base_name not in sys.modules:
1✔
594
            metadata = self.get_metadata(name)
1✔
595
            is_zip = metadata.get('is_zip', False)
1✔
596
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
597
            if not is_zip:
1✔
598
                if self.is_external(name):
1✔
599
                    # this branch is deprecated: external plugins are always zip files
600
                    path = os.path.join(metadata['path'], '__init__.py')
×
601
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
602
                else:
603
                    init_spec = importlib.util.find_spec(base_name)
1✔
604
            else:
605
                zipfile = zipimport.zipimporter(metadata['path'])
×
606
                dirname = metadata['dirname']
×
607
                init_spec = zipfile.find_spec(dirname)
×
608

609
            self.exec_module_from_spec(init_spec, base_name)
1✔
610

611
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
1✔
612
        if name in self.plugins:
1✔
613
            return self.plugins[name]
1✔
614
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
615
        self.maybe_load_plugin_init_method(name)
1✔
616
        is_external = self.is_external(name)
1✔
617
        if is_external and not self.is_authorized(name):
1✔
618
            self.logger.info(f'plugin not authorized {name}')
×
619
            return
×
620
        if not is_external:
1✔
621
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
1✔
622
        else:
623
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
624

625
        spec = importlib.util.find_spec(full_name)
1✔
626
        if spec is None:
1✔
627
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
628
        try:
1✔
629
            module = self.exec_module_from_spec(spec, full_name)
1✔
630
            plugin = module.Plugin(self, self.config, name)
1✔
631
        except Exception as e:
×
632
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
633
        self.add_jobs(plugin.thread_jobs())
1✔
634
        self.plugins[name] = plugin
1✔
635
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
1✔
636
        return plugin
1✔
637

638
    def close_plugin(self, plugin):
1✔
639
        self.remove_jobs(plugin.thread_jobs())
×
640

641
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
1✔
642
        from hashlib import pbkdf2_hmac
×
643
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
644
        return ECPrivkey(secret)
×
645

646
    def uninstall(self, name: str):
1✔
647
        if self.config.get(f'plugins.{name}'):
×
648
            self.config.set_key(f'plugins.{name}', None)
×
649
        if name in self.external_plugin_metadata:
×
650
            zipfile = self.zip_plugin_path(name)
×
651
            os.unlink(zipfile)
×
652
            self.external_plugin_metadata.pop(name)
×
653

654
    def is_internal(self, name) -> bool:
1✔
655
        return name in self.internal_plugin_metadata
×
656

657
    def is_external(self, name) -> bool:
1✔
658
        return name in self.external_plugin_metadata
1✔
659

660
    def is_auto_loaded(self, name):
1✔
661
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
662
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
663

664
    def is_installed(self, name) -> bool:
1✔
665
        """an external plugin may be installed but not authorized """
666
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
667

668
    def is_authorized(self, name) -> bool:
1✔
669
        if name in self.internal_plugin_metadata:
×
670
            return True
×
671
        if name not in self.external_plugin_metadata:
×
672
            return False
×
673
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
674
        if not pubkey_bytes:
×
675
            return False
×
676
        if not self.is_plugin_zip(name):
×
677
            return False
×
678
        filename = self.zip_plugin_path(name)
×
679
        plugin_hash = get_file_hash256(filename)
×
680
        sig = self.config.get(f'plugins.{name}.authorized')
×
681
        if not sig:
×
682
            return False
×
683
        pubkey = ECPubkey(pubkey_bytes)
×
684
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
685

686
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
1✔
687
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
688
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
689
        plugin_hash = get_file_hash256(filename)
×
690
        sig = privkey.ecdsa_sign(plugin_hash)
×
691
        value = sig.hex()
×
692
        self.config.set_key(f'plugins.{name}.authorized', value)
×
693
        self.config.set_key(f'plugins.{name}.enabled', True)
×
694

695
    def enable(self, name: str) -> 'BasePlugin':
1✔
696
        self.config.enable_plugin(name)
×
697
        p = self.get(name)
×
698
        if p:
×
699
            return p
×
700
        return self.load_plugin(name)
×
701

702
    def disable(self, name: str) -> None:
1✔
703
        self.config.disable_plugin(name)
×
704
        p = self.get(name)
×
705
        if not p:
×
706
            return
×
707
        self.plugins.pop(name)
×
708
        p.close()
×
709
        self.logger.info(f"closed {name}")
×
710

711
    @classmethod
1✔
712
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
1✔
713
        return key.startswith('plugins.')
×
714

715
    def is_available(self, name: str) -> bool:
1✔
716
        d = self.descriptions.get(name)
×
717
        if not d:
×
718
            return False
×
719
        deps = d.get('requires', [])
×
720
        for dep, s in deps:
×
721
            try:
×
722
                __import__(dep)
×
723
            except ImportError as e:
×
724
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
725
                return False
×
726
        return True
×
727

728
    def get_hardware_support(self):
1✔
729
        out = []
×
730
        for name, details in self._hw_wallets.items():
×
731
            try:
×
732
                p = self.get_plugin(name)
×
733
                if p.is_available():
×
734
                    out.append(HardwarePluginToScan(
×
735
                        name=name,
736
                        description=details[2],
737
                        plugin=p,
738
                        exception=None))
739
            except Exception as e:
×
740
                self.logger.exception(f"cannot load plugin for: {name}")
×
741
                out.append(HardwarePluginToScan(
×
742
                    name=name,
743
                    description=details[2],
744
                    plugin=None,
745
                    exception=e))
746
        return out
×
747

748
    def register_wallet_type(self, name, gui_good, wallet_type):
1✔
749
        from .wallet import register_wallet_type, register_constructor
1✔
750
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
1✔
751

752
        def loader():
1✔
753
            plugin = self.get_plugin(name)
1✔
754
            register_constructor(wallet_type, plugin.wallet_class)
1✔
755
        register_wallet_type(wallet_type)
1✔
756
        plugin_loaders[wallet_type] = loader
1✔
757

758
    def register_keystore(self, name, details):
1✔
759
        from .keystore import register_keystore
1✔
760

761
        def dynamic_constructor(d):
1✔
762
            return self.get_plugin(name).keystore_class(d)
1✔
763
        if details[0] == 'hardware':
1✔
764
            self._hw_wallets[name] = details
1✔
765
            self.logger.info(f"registering hardware {name}: {details}")
1✔
766
            register_keystore(details[1], dynamic_constructor)
1✔
767

768
    def get_plugin(self, name: str) -> 'BasePlugin':
1✔
769
        if name not in self.plugins:
1✔
770
            self.load_plugin(name)
1✔
771
        return self.plugins[name]
1✔
772

773
    def is_plugin_zip(self, name: str) -> bool:
1✔
774
        """Returns True if the plugin is a zip file"""
775
        if (metadata := self.get_metadata(name)) is None:
×
776
            return False
×
777
        return metadata.get('is_zip', False)
×
778

779
    def get_metadata(self, name: str) -> Optional[dict]:
1✔
780
        """Returns the metadata of the plugin"""
781
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
1✔
782
        if not metadata:
1✔
783
            return None
×
784
        return metadata
1✔
785

786
    def run(self):
1✔
787
        while self.is_running():
1✔
788
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
1✔
789
            self.run_jobs()
1✔
790
        self.on_stop()
1✔
791

792
    def read_file(self, name: str, filename: str) -> bytes:
1✔
793
        if self.is_plugin_zip(name):
×
794
            plugin_filename = self.zip_plugin_path(name)
×
795
            metadata = self.external_plugin_metadata[name]
×
796
            dirname = metadata['dirname']
×
797
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
798
                with myzip.open("/".join([dirname,filename])) as myfile:
×
799
                    return myfile.read()
×
800
        elif name in self.internal_plugin_metadata:
×
801
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
802
            with open(path, 'rb') as myfile:
×
803
                return myfile.read()
×
804
        else:
805
            # no icon
806
            return None
×
807

808

809
def get_file_hash256(path: str) -> bytes:
1✔
810
    """Get the sha256 hash of a file, similar to `sha256sum`."""
811
    with open(path, 'rb') as f:
×
812
        return sha256(f.read())
×
813

814

815
def hook(func):
1✔
816
    hook_names.add(func.__name__)
1✔
817
    return func
1✔
818

819

820
def run_hook(name, *args):
1✔
821
    results = []
1✔
822
    f_list = hooks.get(name, [])
1✔
823
    for p, f in f_list:
1✔
824
        if p.is_enabled():
1✔
825
            try:
1✔
826
                r = f(*args)
1✔
827
            except Exception:
×
828
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
829
                r = False
×
830
            if r:
1✔
831
                results.append(r)
×
832

833
    if results:
1✔
834
        assert len(results) == 1, results
×
835
        return results[0]
×
836

837

838
class BasePlugin(Logger):
1✔
839

840
    def __init__(self, parent, config: 'SimpleConfig', name):
1✔
841
        self.parent = parent  # type: Plugins  # The plugins object
1✔
842
        self.name = name
1✔
843
        self.config = config
1✔
844
        Logger.__init__(self)
1✔
845
        # add self to hooks
846
        for k in dir(self):
1✔
847
            if k in hook_names:
1✔
848
                l = hooks.get(k, [])
1✔
849
                l.append((self, getattr(self, k)))
1✔
850
                hooks[k] = l
1✔
851

852
    def __str__(self):
1✔
853
        return self.name
×
854

855
    def close(self):
1✔
856
        # remove self from hooks
857
        for attr_name in dir(self):
×
858
            if attr_name in hook_names:
×
859
                # found attribute in self that is also the name of a hook
860
                l = hooks.get(attr_name, [])
×
861
                try:
×
862
                    l.remove((self, getattr(self, attr_name)))
×
863
                except ValueError:
×
864
                    # maybe attr name just collided with hook name and was not hook
865
                    continue
×
866
                hooks[attr_name] = l
×
867
        self.parent.close_plugin(self)
×
868
        self.on_close()
×
869

870
    def on_close(self):
1✔
871
        pass
×
872

873
    def requires_settings(self) -> bool:
1✔
874
        return False
×
875

876
    def thread_jobs(self):
1✔
877
        return []
1✔
878

879
    def is_enabled(self):
1✔
880
        if not self.is_available():
×
881
            return False
×
882
        if not self.parent.is_authorized(self.name):
×
883
            return False
×
884
        return self.config.is_plugin_enabled(self.name)
×
885

886
    def is_available(self):
1✔
887
        return True
×
888

889
    def can_user_disable(self):
1✔
890
        return True
×
891

892
    def settings_widget(self, window):
1✔
893
        raise NotImplementedError()
×
894

895
    def settings_dialog(self, window):
1✔
896
        raise NotImplementedError()
×
897

898
    def read_file(self, filename: str) -> bytes:
1✔
899
        return self.parent.read_file(self.name, filename)
×
900

901
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
1✔
902
        """Returns a dict which is persisted in the per-wallet database."""
903
        plugin_storage = wallet.db.get_plugin_storage()
×
904
        return plugin_storage.setdefault(self.name, {})
×
905

906

907
class DeviceUnpairableError(UserFacingException): pass
1✔
908
class HardwarePluginLibraryUnavailable(Exception): pass
1✔
909
class CannotAutoSelectDevice(Exception): pass
1✔
910

911

912
class Device(NamedTuple):
1✔
913
    path: Union[str, bytes]
1✔
914
    interface_number: int
1✔
915
    id_: str
1✔
916
    product_key: Any   # when using hid, often Tuple[int, int]
1✔
917
    usage_page: int
1✔
918
    transport_ui_string: str
1✔
919

920

921
class DeviceInfo(NamedTuple):
1✔
922
    device: Device
1✔
923
    label: Optional[str] = None
1✔
924
    initialized: Optional[bool] = None
1✔
925
    exception: Optional[Exception] = None
1✔
926
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
1✔
927
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
1✔
928
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
1✔
929

930
    def label_for_device_select(self) -> str:
1✔
931
        return (
×
932
            "{label} ({maybe_model}{init}, {transport})"
933
            .format(
934
                label=self.label or _("An unnamed {}").format(self.plugin_name),
935
                init=(_("initialized") if self.initialized else _("wiped")),
936
                transport=self.device.transport_ui_string,
937
                maybe_model=f"{self.model_name}, " if self.model_name else ""
938
            )
939
        )
940

941

942
class HardwarePluginToScan(NamedTuple):
1✔
943
    name: str
1✔
944
    description: str
1✔
945
    plugin: Optional['HW_PluginBase']
1✔
946
    exception: Optional[Exception]
1✔
947

948

949
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
1✔
950

951

952
# hidapi is not thread-safe
953
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
954
#     https://github.com/libusb/hidapi/issues/45
955
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
956
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
957
# It is not entirely clear to me, exactly what is safe and what isn't, when
958
# using multiple threads...
959
# Hence, we use a single thread for all device communications, including
960
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
961
# the following thread:
962
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
1✔
963
    max_workers=1,
964
    thread_name_prefix='hwd_comms_thread'
965
)
966

967
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
968
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
969
# To keep it simple, let's just import it now, as we are likely in the main thread here.
970
if threading.current_thread() is not threading.main_thread():
1✔
971
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
972
try:
1✔
973
    import hid
1✔
974
except ImportError:
1✔
975
    pass
1✔
976

977

978
T = TypeVar('T')
1✔
979

980

981
def run_in_hwd_thread(func: Callable[[], T]) -> T:
1✔
982
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
983
        return func()
×
984
    else:
985
        fut = _hwd_comms_executor.submit(func)
×
986
        return fut.result()
×
987
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
988

989

990
def runs_in_hwd_thread(func):
1✔
991
    @wraps(func)
1✔
992
    def wrapper(*args, **kwargs):
1✔
993
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
994
    return wrapper
1✔
995

996

997
def assert_runs_in_hwd_thread():
1✔
998
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
999
        raise Exception("must only be called from HWD communication thread")
×
1000

1001

1002
class DeviceMgr(ThreadJob):
1✔
1003
    """Manages hardware clients.  A client communicates over a hardware
1004
    channel with the device.
1005

1006
    In addition to tracking device HID IDs, the device manager tracks
1007
    hardware wallets and manages wallet pairing.  A HID ID may be
1008
    paired with a wallet when it is confirmed that the hardware device
1009
    matches the wallet, i.e. they have the same master public key.  A
1010
    HID ID can be unpaired if e.g. it is wiped.
1011

1012
    Because of hotplugging, a wallet must request its client
1013
    dynamically each time it is required, rather than caching it
1014
    itself.
1015

1016
    The device manager is shared across plugins, so just one place
1017
    does hardware scans when needed.  By tracking HID IDs, if a device
1018
    is plugged into a different port the wallet is automatically
1019
    re-paired.
1020

1021
    Wallets are informed on connect / disconnect events.  It must
1022
    implement connected(), disconnected() callbacks.  Being connected
1023
    implies a pairing.  Callbacks can happen in any thread context,
1024
    and we do them without holding the lock.
1025

1026
    Confusingly, the HID ID (serial number) reported by the HID system
1027
    doesn't match the device ID reported by the device itself.  We use
1028
    the HID IDs.
1029

1030
    This plugin is thread-safe.  Currently only devices supported by
1031
    hidapi are implemented."""
1032

1033
    def __init__(self, config: SimpleConfig):
1✔
1034
        ThreadJob.__init__(self)
1✔
1035
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1036
        self.pairing_code_to_id = {}  # type: Dict[str, str]
1✔
1037
        # A client->id_ map. Needs self.lock.
1038
        self.clients = {}  # type: Dict[HardwareClientBase, str]
1✔
1039
        # What we recognise.  (vendor_id, product_id) -> Plugin
1040
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
1✔
1041
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
1✔
1042
        # Custom enumerate functions for devices we don't know about.
1043
        self._enumerate_func = set()  # Needs self.lock.
1✔
1044
        self._ongoing_timeout_checks = {}  # type: Dict[str, Future]
1✔
1045

1046
        self.lock = threading.RLock()
1✔
1047

1048
        self.config = config
1✔
1049

1050
    def thread_jobs(self):
1✔
1051
        # Thread job to handle device timeouts
1052
        return [self]
1✔
1053

1054
    def run(self):
1✔
1055
        """Handle device timeouts.  Runs in the context of the Plugins
1056
        thread."""
1057
        with self.lock:
1✔
1058
            clients = list(self.clients.items())
1✔
1059
        cutoff = time.time() - self.config.get_session_timeout()
1✔
1060
        for client, client_id in clients:
1✔
NEW
1061
            if fut := self._ongoing_timeout_checks.get(client_id):
×
NEW
1062
                if not fut.done():
×
NEW
1063
                    continue
×
1064
            # scheduling the timeout check prevents blocking the Plugins DaemonThread if the
1065
            # _hwd_comms_executor Thread is blocked (e.g. due to it awaiting user input).
NEW
1066
            fut = _hwd_comms_executor.submit(client.timeout, cutoff)
×
NEW
1067
            self._ongoing_timeout_checks[client_id] = fut
×
1068

1069
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
1✔
1070
        for pair in device_pairs:
×
1071
            self._recognised_hardware[pair] = plugin
×
1072

1073
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
1✔
1074
        for vendor_id in vendor_ids:
×
1075
            self._recognised_vendor[vendor_id] = plugin
×
1076

1077
    def register_enumerate_func(self, func):
1✔
1078
        with self.lock:
×
1079
            self._enumerate_func.add(func)
×
1080

1081
    @runs_in_hwd_thread
1✔
1082
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
1✔
1083
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1084
        # Get from cache first
1085
        client = self._client_by_id(device.id_)
×
1086
        if client:
×
1087
            return client
×
1088
        client = plugin.create_client(device, handler)
×
1089
        if client:
×
1090
            self.logger.info(f"Registering {client}")
×
1091
            with self.lock:
×
1092
                self.clients[client] = device.id_
×
1093
        return client
×
1094

1095
    def id_by_pairing_code(self, pairing_code):
1✔
1096
        with self.lock:
×
1097
            return self.pairing_code_to_id.get(pairing_code)
×
1098

1099
    def pairing_code_by_id(self, id_):
1✔
1100
        with self.lock:
×
1101
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1102
                if id2 == id_:
×
1103
                    return pairing_code
×
1104
            return None
×
1105

1106
    def unpair_pairing_code(self, pairing_code):
1✔
1107
        with self.lock:
×
1108
            if pairing_code not in self.pairing_code_to_id:
×
1109
                return
×
1110
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1111
        self._close_client(_id)
×
1112

1113
    def unpair_id(self, id_):
1✔
1114
        pairing_code = self.pairing_code_by_id(id_)
×
1115
        if pairing_code:
×
1116
            self.unpair_pairing_code(pairing_code)
×
1117
        else:
1118
            self._close_client(id_)
×
1119

1120
    def _close_client(self, id_):
1✔
1121
        with self.lock:
×
1122
            client = self._client_by_id(id_)
×
1123
            self.clients.pop(client, None)
×
NEW
1124
            if fut := self._ongoing_timeout_checks.pop(id_, None):
×
NEW
1125
                fut.cancel()
×
1126
        if client:
×
1127
            client.close()
×
1128

1129
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
1✔
1130
        with self.lock:
×
1131
            for client, client_id in self.clients.items():
×
1132
                if client_id == id_:
×
1133
                    return client
×
1134
        return None
×
1135

1136
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
1✔
1137
        """Returns a client for the device ID if one is registered.  If
1138
        a device is wiped or in bootloader mode pairing is impossible;
1139
        in such cases we communicate by device ID and not wallet."""
1140
        if scan_now:
×
1141
            self.scan_devices()
×
1142
        return self._client_by_id(id_)
×
1143

1144
    @runs_in_hwd_thread
1✔
1145
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
1✔
1146
                            keystore: 'Hardware_KeyStore',
1147
                            force_pair: bool, *,
1148
                            devices: Sequence['Device'] = None,
1149
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1150
        self.logger.info("getting client for keystore")
×
1151
        if handler is None:
×
1152
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1153
        handler.update_status(False)
×
1154
        pcode = keystore.pairing_code()
×
1155
        client = None
×
1156
        # search existing clients first (fast-path)
1157
        if not devices:
×
1158
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1159
        # search clients again, now allowing a (slow) scan
1160
        if client is None:
×
1161
            if devices is None:
×
1162
                devices = self.scan_devices()
×
1163
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1164
        if client is None and force_pair:
×
1165
            try:
×
1166
                info = self.select_device(plugin, handler, keystore, devices,
×
1167
                                          allow_user_interaction=allow_user_interaction)
1168
            except CannotAutoSelectDevice:
×
1169
                pass
×
1170
            else:
1171
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1172
        if client:
×
1173
            handler.update_status(True)
×
1174
            # note: if select_device was called, we might also update label etc here:
1175
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1176
        self.logger.info("end client for keystore")
×
1177
        return client
×
1178

1179
    def client_by_pairing_code(
1✔
1180
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1181
        devices: Sequence['Device'],
1182
    ) -> Optional['HardwareClientBase']:
1183
        _id = self.id_by_pairing_code(pairing_code)
×
1184
        client = self._client_by_id(_id)
×
1185
        if client:
×
1186
            if type(client.plugin) != type(plugin):
×
1187
                return
×
1188
            # An unpaired client might have another wallet's handler
1189
            # from a prior scan.  Replace to fix dialog parenting.
1190
            client.handler = handler
×
1191
            return client
×
1192

1193
        for device in devices:
×
1194
            if device.id_ == _id:
×
1195
                return self.create_client(device, handler, plugin)
×
1196

1197
    def force_pair_keystore(
1✔
1198
        self,
1199
        *,
1200
        plugin: 'HW_PluginBase',
1201
        handler: 'HardwareHandlerBase',
1202
        info: 'DeviceInfo',
1203
        keystore: 'Hardware_KeyStore',
1204
    ) -> 'HardwareClientBase':
1205
        xpub = keystore.xpub
×
1206
        derivation = keystore.get_derivation_prefix()
×
1207
        assert derivation is not None
×
1208
        xtype = bip32.xpub_type(xpub)
×
1209
        client = self._client_by_id(info.device.id_)
×
1210
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1211
            # See comment above for same code
1212
            client.handler = handler
×
1213
            # This will trigger a PIN/passphrase entry request
1214
            try:
×
1215
                client_xpub = client.get_xpub(derivation, xtype)
×
1216
            except (UserCancelled, RuntimeError):
×
1217
                # Bad / cancelled PIN / passphrase
1218
                client_xpub = None
×
1219
            if client_xpub == xpub:
×
1220
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1221
                with self.lock:
×
1222
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1223
                return client
×
1224

1225
        # The user input has wrong PIN or passphrase, or cancelled input,
1226
        # or it is not pairable
1227
        raise DeviceUnpairableError(
×
1228
            _('Electrum cannot pair with your {}.\n\n'
1229
              'Before you request bitcoins to be sent to addresses in this '
1230
              'wallet, ensure you can pair with your device, or that you have '
1231
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1232
              'receive will be unspendable.').format(plugin.device))
1233

1234
    def list_pairable_device_infos(
1✔
1235
        self,
1236
        *,
1237
        handler: Optional['HardwareHandlerBase'],
1238
        plugin: 'HW_PluginBase',
1239
        devices: Sequence['Device'] = None,
1240
        include_failing_clients: bool = False,
1241
    ) -> List['DeviceInfo']:
1242
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1243
        Already paired devices are also included, as it is okay to reuse them.
1244
        """
1245
        if not plugin.libraries_available:
×
1246
            message = plugin.get_library_not_available_message()
×
1247
            raise HardwarePluginLibraryUnavailable(message)
×
1248
        if devices is None:
×
1249
            devices = self.scan_devices()
×
1250
        infos = []
×
1251
        for device in devices:
×
1252
            if not plugin.can_recognize_device(device):
×
1253
                continue
×
1254
            try:
×
1255
                client = self.create_client(device, handler, plugin)
×
1256
                if not client:
×
1257
                    continue
×
1258
                label = client.label()
×
1259
                is_initialized = client.is_initialized()
×
1260
                soft_device_id = client.get_soft_device_id()
×
1261
                model_name = client.device_model_name()
×
1262
            except Exception as e:
×
1263
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1264
                if include_failing_clients:
×
1265
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1266
                continue
×
1267
            infos.append(DeviceInfo(device=device,
×
1268
                                    label=label,
1269
                                    initialized=is_initialized,
1270
                                    plugin_name=plugin.name,
1271
                                    soft_device_id=soft_device_id,
1272
                                    model_name=model_name))
1273

1274
        return infos
×
1275

1276
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
1✔
1277
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1278
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1279
        """Select the device to use for keystore."""
1280
        # ideally this should not be called from the GUI thread...
1281
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1282
        while True:
×
1283
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1284
            if infos:
×
1285
                break
×
1286
            if not allow_user_interaction:
×
1287
                raise CannotAutoSelectDevice()
×
1288
            msg = _('Please insert your {}').format(plugin.device)
×
1289
            msg += " ("
×
1290
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1291
                msg += f"label: {keystore.label}, "
×
1292
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1293
            msg += ').\n\n{}\n\n{}'.format(
×
1294
                _('Verify the cable is connected and that '
1295
                  'no other application is using it.'),
1296
                _('Try to connect again?')
1297
            )
1298
            if not handler.yes_no_question(msg):
×
1299
                raise UserCancelled()
×
1300
            devices = None
×
1301

1302
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1303
        # method 1: select device by id
1304
        if keystore.soft_device_id:
×
1305
            for info in infos:
×
1306
                if info.soft_device_id == keystore.soft_device_id:
×
1307
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1308
                    return info
×
1309
        # method 2: select device by label
1310
        #           but only if not a placeholder label and only if there is no collision
1311
        device_labels = [info.label for info in infos]
×
1312
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1313
                and device_labels.count(keystore.label) == 1):
1314
            for info in infos:
×
1315
                if info.label == keystore.label:
×
1316
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1317
                    return info
×
1318
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1319
        #           saved for keystore anyway, select it
1320
        if (len(infos) == 1
×
1321
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1322
                and keystore.soft_device_id is None):
1323
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1324
            return infos[0]
×
1325

1326
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1327
        if not allow_user_interaction:
×
1328
            raise CannotAutoSelectDevice()
×
1329
        # ask user to select device manually
1330
        msg = (
×
1331
                _("Could not automatically pair with device for given keystore.") + "\n"
1332
                + f"(keystore label: {keystore.label!r}, "
1333
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1334
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1335
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1336
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1337
                   for (idx, info) in enumerate(infos)]
1338
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1339
                          f"num options: {len(infos)}. options: {infos}")
1340
        c = handler.query_choice(msg, choices)
×
1341
        if c is None:
×
1342
            raise UserCancelled()
×
1343
        info = infos[c]
×
1344
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1345
        # note: updated label/soft_device_id will be saved after pairing succeeds
1346
        return info
×
1347

1348
    @runs_in_hwd_thread
1✔
1349
    def _scan_devices_with_hid(self) -> List['Device']:
1✔
1350
        try:
×
1351
            import hid  # noqa: F811
×
1352
        except ImportError:
×
1353
            return []
×
1354

1355
        devices = []
×
1356
        for d in hid.enumerate(0, 0):
×
1357
            vendor_id = d['vendor_id']
×
1358
            product_key = (vendor_id, d['product_id'])
×
1359
            plugin = None
×
1360
            if product_key in self._recognised_hardware:
×
1361
                plugin = self._recognised_hardware[product_key]
×
1362
            elif vendor_id in self._recognised_vendor:
×
1363
                plugin = self._recognised_vendor[vendor_id]
×
1364
            if plugin:
×
1365
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1366
                if device:
×
1367
                    devices.append(device)
×
1368
        return devices
×
1369

1370
    @runs_in_hwd_thread
1✔
1371
    @profiler
1✔
1372
    def scan_devices(self) -> Sequence['Device']:
1✔
1373
        self.logger.info("scanning devices...")
×
1374

1375
        # First see what's connected that we know about
1376
        devices = self._scan_devices_with_hid()
×
1377

1378
        # Let plugin handlers enumerate devices we don't know about
1379
        with self.lock:
×
1380
            enumerate_funcs = list(self._enumerate_func)
×
1381
        for f in enumerate_funcs:
×
1382
            try:
×
1383
                new_devices = f()
×
1384
            except BaseException as e:
×
1385
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1386
            else:
1387
                devices.extend(new_devices)
×
1388

1389
        # find out what was disconnected
1390
        client_ids = [dev.id_ for dev in devices]
×
1391
        disconnected_clients = []
×
1392
        with self.lock:
×
1393
            connected = {}
×
1394
            for client, id_ in self.clients.items():
×
1395
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1396
                    connected[client] = id_
×
1397
                else:
1398
                    disconnected_clients.append((client, id_))
×
1399
            self.clients = connected
×
1400

1401
        # Unpair disconnected devices
1402
        for client, id_ in disconnected_clients:
×
1403
            self.unpair_id(id_)
×
1404
            if client.handler:
×
1405
                client.handler.update_status(False)
×
1406

1407
        return devices
×
1408

1409
    @classmethod
1✔
1410
    def version_info(cls) -> Mapping[str, Optional[str]]:
1✔
1411
        ret = {}
×
1412
        # add libusb
1413
        try:
×
1414
            import usb1
×
1415
        except Exception as e:
×
1416
            ret["libusb.version"] = None
×
1417
        else:
1418
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1419
            try:
×
1420
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1421
            except AttributeError:
×
1422
                ret["libusb.path"] = None
×
1423
        # add hidapi
1424
        try:
×
1425
            import hid  # noqa: F811
×
1426
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1427
        except Exception as e:
×
1428
            from importlib.metadata import version
×
1429
            try:
×
1430
                ret["hidapi.version"] = version("hidapi")
×
1431
            except ImportError:
×
1432
                ret["hidapi.version"] = None
×
1433
        return ret
×
1434

1435
    def trigger_pairings(
1✔
1436
            self,
1437
            keystores: Sequence['KeyStore'],
1438
            *,
1439
            allow_user_interaction: bool = True,
1440
            devices: Sequence['Device'] = None,
1441
    ) -> None:
1442
        """Given a list of keystores, try to pair each with a connected hardware device.
1443

1444
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1445
        try to pair each keystore individually. Consider the following scenario:
1446
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1447
        - assume none of the devices are paired yet
1448
        1. if we tried to individually pair keystores, we might try with ks1 first
1449
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1450
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1451
             which is confusing and error-prone. It's especially problematic if the hw device does
1452
             not support labels (such as Ledger), as then the user cannot easily distinguish
1453
             same-type devices. (see #4199)
1454
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1455
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1456
        """
1457
        from .keystore import Hardware_KeyStore
×
1458
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1459
        if not keystores:
×
1460
            return
×
1461
        if devices is None:
×
1462
            devices = self.scan_devices()
×
1463
        # first pair with all devices that can be auto-selected
1464
        for ks in keystores:
×
1465
            try:
×
1466
                ks.get_client(
×
1467
                    force_pair=True,
1468
                    allow_user_interaction=False,
1469
                    devices=devices,
1470
                )
1471
            except UserCancelled:
×
1472
                pass
×
1473
        if allow_user_interaction:
×
1474
            # now do manual selections
1475
            for ks in keystores:
×
1476
                try:
×
1477
                    ks.get_client(
×
1478
                        force_pair=True,
1479
                        allow_user_interaction=True,
1480
                        devices=devices,
1481
                    )
1482
                except UserCancelled:
×
1483
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc