• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5855187086082048

26 Mar 2026 06:13PM UTC coverage: 64.84% (+0.004%) from 64.836%
5855187086082048

push

CirrusCI

web-flow
Merge pull request #10552 from spesmilo/authorized_decorator

plugins: use decorator to early return if plugin not authorized

3 of 7 new or added lines in 1 file covered. (42.86%)

1 existing line in 1 file now uncovered.

24416 of 37656 relevant lines covered (64.84%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.1
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
1✔
27
import os
1✔
28
import pkgutil
1✔
29
import importlib.util
1✔
30
import time
1✔
31
import threading
1✔
32
import sys
1✔
33
import zipfile as zipfile_lib
1✔
34
from urllib.parse import urlparse
1✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
1✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
1✔
39
from concurrent.futures import Future
1✔
40
import zipimport
1✔
41
from functools import wraps, partial
1✔
42
from itertools import chain
1✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
1✔
45

46
from ._vendor.distutils.version import StrictVersion
1✔
47
from .version import ELECTRUM_VERSION
1✔
48
from .i18n import _
1✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
1✔
50
                   make_dir, make_aiohttp_session)
51
from . import bip32
1✔
52
from . import plugins
1✔
53
from .simple_config import SimpleConfig
1✔
54
from .logging import get_logger, Logger
1✔
55
from .crypto import sha256
1✔
56
from .network import Network
1✔
57

58
if TYPE_CHECKING:
59
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
60
    from .keystore import Hardware_KeyStore, KeyStore
61
    from .wallet import Abstract_Wallet
62

63

64
_logger = get_logger(__name__)
1✔
65
plugin_loaders = {}
1✔
66
hook_names = set()
1✔
67
hooks = {}
1✔
68
_exec_module_failure = {}  # type: Dict[str, Exception]
1✔
69

70
PLUGIN_PASSWORD_VERSION = 1
1✔
71

72

73
class Plugins(DaemonThread):
1✔
74

75
    pkgpath = os.path.dirname(plugins.__file__)
1✔
76
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
77
    keyfile_posix = '/etc/electrum/plugins_key'
1✔
78
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
1✔
79

80
    @profiler
1✔
81
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
1✔
82
        self.config = config
1✔
83
        self.cmd_only = cmd_only  # type: bool
1✔
84
        self.internal_plugin_metadata = {}
1✔
85
        self.external_plugin_metadata = {}
1✔
86
        if cmd_only:
1✔
87
            # only import the command modules of plugins
88
            Logger.__init__(self)
×
89
            self.find_plugins()
×
90
            self.load_plugins()
×
91
            return
×
92
        DaemonThread.__init__(self)
1✔
93
        self.device_manager = DeviceMgr(config)
1✔
94
        self.name = 'Plugins'  # set name of thread
1✔
95
        self._hw_wallets = {}
1✔
96
        self.plugins = {}  # type: Dict[str, BasePlugin]
1✔
97
        self.gui_name = gui_name
1✔
98
        self.find_plugins()
1✔
99
        self.load_plugins()
1✔
100
        self.add_jobs(self.device_manager.thread_jobs())
1✔
101
        self.start()
1✔
102

103
    @property
1✔
104
    def descriptions(self):
1✔
105
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
106

107
    def find_directory_plugins(self, pkg_path: str, external: bool):
1✔
108
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
109
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
1✔
110
        for loader, name, ispkg in iter_modules:
1✔
111
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
112
            #       once as data and once as code. To honor the "no duplicates" rule below,
113
            #       we exclude the ones packaged as *code*, here:
114
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
1✔
115
                continue
×
116
            module_path = os.path.join(pkg_path, name)
1✔
117
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
1✔
118
                continue
×
119
            try:
1✔
120
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
1✔
121
                    d = json.load(f)
1✔
122
            except FileNotFoundError:
×
123
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
124
                continue
×
125
            if 'fullname' not in d:
1✔
126
                continue
×
127
            d['path'] = module_path
1✔
128
            if not self.cmd_only:
1✔
129
                gui_good = self.gui_name in d.get('available_for', [])
1✔
130
                if not gui_good:
1✔
131
                    continue
1✔
132
                details = d.get('registers_wallet_type')
1✔
133
                if details:
1✔
134
                    self.register_wallet_type(name, gui_good, details)
1✔
135
                details = d.get('registers_keystore')
1✔
136
                if details:
1✔
137
                    self.register_keystore(name, details)
1✔
138
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
1✔
139
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
140
                _logger.info(f"duplicate plugins? for {name=}")
×
141
                continue
×
142
            if not external:
1✔
143
                self.internal_plugin_metadata[name] = d
1✔
144
            else:
145
                self.external_plugin_metadata[name] = d
×
146

147
    @staticmethod
1✔
148
    def exec_module_from_spec(spec, path: str):
1✔
149
        if prev_fail := _exec_module_failure.get(path):
1✔
150
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
151
        try:
1✔
152
            module = importlib.util.module_from_spec(spec)
1✔
153
            # sys.modules needs to be modified for relative imports to work
154
            # see https://stackoverflow.com/a/50395128
155
            sys.modules[path] = module
1✔
156
            spec.loader.exec_module(module)
1✔
157
        except Exception as e:
×
158
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
159
            # so the import system knows it failed. If called again for the same plugin, we do not
160
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
161
            # might have defined commands).
162
            _exec_module_failure[path] = e
×
163
            if path in sys.modules:
×
164
                sys.modules.pop(path, None)
×
165
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
166
        return module
1✔
167

168
    def find_plugins(self):
1✔
169
        internal_plugins_path = (self.pkgpath, False)
1✔
170
        external_plugins_path = (self.get_external_plugin_dir(), True)
1✔
171
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
1✔
172
            if pkg_path and os.path.exists(pkg_path):
1✔
173
                if not external:
1✔
174
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
1✔
175
                else:
176
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
1✔
177

178
    def load_plugins(self):
1✔
179
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
1✔
180
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
1✔
181
                try:
×
182
                    if self.cmd_only:  # only load init method to register commands
×
183
                        self.maybe_load_plugin_init_method(name)
×
184
                    else:
185
                        self.load_plugin_by_name(name)
×
186
                except BaseException as e:
×
187
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
188

189
    def _has_root_permissions(self, path):
1✔
190
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
191

192
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
1✔
193
        if sys.platform in ['windows', 'win32']:
×
194
            keyfile_path = self.keyfile_windows
×
195
            keyfile_help = _('This file can be edited with Regedit')
×
196
        elif 'ANDROID_DATA' in os.environ:
×
197
            raise Exception('platform not supported')
×
198
        else:
199
            # treat unknown platforms and macOS as linux-like
200
            keyfile_path = self.keyfile_posix
×
201
            keyfile_help = "" if not key_hex else "".join([
×
202
                                         _('The file must have root permissions'),
203
                                         ".\n\n",
204
                                         _("To set it you can also use the Auto-Setup or run "
205
                                           "the following terminal command"),
206
                                         ":\n\n",
207
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
208
            ])
209
        return keyfile_path, keyfile_help
×
210

211
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
1✔
212
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
213
        try:
×
214
            if sys.platform in ['windows', 'win32']:
×
215
                self._write_key_to_regedit_windows(pubkey_hex)
×
216
            elif 'ANDROID_DATA' in os.environ:
×
217
                raise Exception('platform not supported')
×
218
            elif sys.platform.startswith('darwin'):  # macOS
×
219
                self._write_key_to_root_file_macos(pubkey_hex)
×
220
            else:
221
                self._write_key_to_root_file_linux(pubkey_hex)
×
222
        except Exception:
×
223
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
224
            return False
×
225
        return True
×
226

227
    def try_auto_key_reset(self) -> bool:
1✔
228
        try:
×
229
            if sys.platform in ['windows', 'win32']:
×
230
                self._delete_plugin_key_from_windows_registry()
×
231
            elif 'ANDROID_DATA' in os.environ:
×
232
                raise Exception('platform not supported')
×
233
            elif sys.platform.startswith('darwin'):  # macOS
×
234
                self._delete_macos_plugin_keyfile()
×
235
            else:
236
                self._delete_linux_plugin_keyfile()
×
237
        except Exception:
×
238
            self.logger.exception(f'auto-reset of plugin key failed')
×
239
            return False
×
240
        return True
×
241

242
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
1✔
243
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
244
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
245
        sh_command = (
×
246
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
247
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
248
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
249
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
250
        )
251
        return sh_command
×
252

253
    @staticmethod
1✔
254
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
1✔
255
        """
256
        Inspired by
257
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
258
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
259
        """
260
        from shlex import quote
×
261

262
        def quote_shell(args):
×
263
            return " ".join(quote(arg) for arg in args)
×
264

265
        def quote_applescript(string):
×
266
            charmap = {
×
267
                "\n": "\\n",
268
                "\r": "\\r",
269
                "\t": "\\t",
270
                "\"": "\\\"",
271
                "\\": "\\\\",
272
            }
273
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
274

275
        commands = [
×
276
            "osascript",
277
            "-e",
278
            "do shell script %s "
279
            "with administrator privileges "
280
            "without altering line endings"
281
            % quote_applescript(quote_shell(commands))
282
        ]
283
        return commands
×
284

285
    @staticmethod
1✔
286
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
1✔
287
        """
288
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
289
        """
290
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
291
        # for the result of the process (returns no process handle)
292
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
293
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
294

295
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
296
        class SHELLEXECUTEINFO(Structure):
×
297
            _fields_ = [
×
298
                ('cbSize', DWORD),
299
                ('fMask', c_ulong),
300
                ('hwnd', HWND),
301
                ('lpVerb', LPCWSTR),
302
                ('lpFile', LPCWSTR),
303
                ('lpParameters', LPCWSTR),
304
                ('lpDirectory', LPCWSTR),
305
                ('nShow', c_ulong),
306
                ('hInstApp', HINSTANCE),
307
                ('lpIDList', c_ulong),
308
                ('lpClass', LPCWSTR),
309
                ('hkeyClass', HKEY),
310
                ('dwHotKey', DWORD),
311
                ('hIcon', HANDLE),
312
                ('hProcess', HANDLE)
313
            ]
314

315
        info = SHELLEXECUTEINFO()
×
316
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
317
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
318
        info.hwnd = None
×
319
        info.lpVerb = 'runas'  # run as administrator
×
320
        info.lpFile = 'reg.exe'  # the executable to run
×
321
        info.lpParameters = reg_exe_command  # the registry edit command
×
322
        info.lpDirectory = None
×
323
        info.nShow = 1
×
324

325
        # Execute and wait
326
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
327
            error = windll.kernel32.GetLastError()
×
328
            raise Exception(f'Error executing registry command: {error}')
×
329

330
        # block until the process is done or 5 sec timeout
331
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
332

333
        # Close handle
334
        windll.kernel32.CloseHandle(info.hProcess)
×
335

336
    @staticmethod
1✔
337
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
1✔
338
        """
339
        Executes the given commands in a subprocess and asserts that it was successful.
340
        """
341
        import subprocess
×
342
        with subprocess.Popen(
×
343
            commands,
344
            stdout=subprocess.PIPE,
345
            stderr=subprocess.PIPE,
346
            text=True,
347
        ) as process:
348
            stdout, stderr = process.communicate()
×
349
            if process.returncode != 0:
×
350
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
351

352
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
1✔
353
        """
354
        Spawns a pkexec subprocess to write the key to a file with root permissions.
355
        This will open an OS dialog asking for the root password. Can only succeed if
356
        the system has polkit installed.
357
        """
358
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
359

360
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
361
        commands = ['pkexec', 'sh', '-c', sh_command]
×
362
        self._execute_commands_in_subprocess(commands)
×
363

364
        # check if the key was written correctly
365
        with open(self.keyfile_posix, 'r') as f:
×
366
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
367
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
368

369
    def _delete_linux_plugin_keyfile(self) -> None:
1✔
370
        """
371
        Deletes the root owned key file at self.keyfile_posix.
372
        """
373
        if not os.path.exists(self.keyfile_posix):
×
374
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
375
            return
×
376
        if not self._has_root_permissions(self.keyfile_posix):
×
377
            os.unlink(self.keyfile_posix)
×
378
            return
×
379

380
        # use pkexec to delete the file as root user
381
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
382
        self._execute_commands_in_subprocess(commands)
×
383
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
384

385
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
1✔
386
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
387

388
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
389
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
390

391
        self._execute_commands_in_subprocess(macos_commands)
×
392
        with open(self.keyfile_posix, 'r') as f:
×
393
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
394
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
395

396
    def _delete_macos_plugin_keyfile(self) -> None:
1✔
397
        if not os.path.exists(self.keyfile_posix):
×
398
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
399
            return
×
400
        if not self._has_root_permissions(self.keyfile_posix):
×
401
            os.unlink(self.keyfile_posix)
×
402
            return
×
403
        # use osascript to delete the file as root user
404
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
405
        self._execute_commands_in_subprocess(macos_commands)
×
406
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
407

408
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
1✔
409
        """
410
        Writes the key to the Windows registry with windows UAC prompt.
411
        """
412
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
413

414
        value_type = 'REG_SZ'
×
415
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
416

417
        self._run_win_regedit_as_admin(command)
×
418

419
        # check if the key was written correctly
420
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
421
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
422
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
423
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
424

425
    def _delete_plugin_key_from_windows_registry(self) -> None:
1✔
426
        """
427
        Deletes the PluginsKey dir in the Windows registry.
428
        """
429
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
430

431
        command = f'delete "{self.keyfile_windows}" /f'
×
432
        self._run_win_regedit_as_admin(command)
×
433

434
        try:
×
435
            # do a sanity check to see if the key has been deleted
436
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
437
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
438
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
439
        except FileNotFoundError:
×
440
            pass
×
441

442
    def create_new_key(self, password:str) -> str:
1✔
443
        salt = os.urandom(32)
×
444
        privkey = self.derive_privkey(password, salt)
×
445
        pubkey = privkey.get_public_key_bytes()
×
446
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
447
        return key.hex()
×
448

449
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], Optional[bytes]]:
1✔
450
        """
451
        returns pubkey, salt
452
        returns None, None if the pubkey has not been set
453
        """
454
        if sys.platform in ['windows', 'win32']:
×
455
            import winreg
×
456
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
457
                try:
×
458
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
459
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
460
                except Exception as e:
×
461
                    self.logger.info(f'winreg error: {e}')
×
462
                    return None, None
×
463
        elif 'ANDROID_DATA' in os.environ:
×
464
            return None, None
×
465
        else:
466
            # treat unknown platforms as linux-like
467
            if not os.path.exists(self.keyfile_posix):
×
468
                return None, None
×
469
            if not self._has_root_permissions(self.keyfile_posix):
×
470
                return None, None
×
471
            with open(self.keyfile_posix) as f:
×
472
                key_hex = f.read()
×
473
        try:
×
474
            key = bytes.fromhex(key_hex)
×
475
            version = key[0]
×
476
        except Exception:
×
477
            self.logger.exception(f'{key_hex=} invalid')
×
478
            return None, None
×
479
        if version != PLUGIN_PASSWORD_VERSION:
×
480
            self.logger.info(f'unknown plugin password version: {version}')
×
481
            return None, None
×
482
        # all good
483
        salt = key[1:1+32]
×
484
        pubkey = key[1+32:]
×
485
        return pubkey, salt
×
486

487
    def get_external_plugin_dir(self) -> str:
1✔
488
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
1✔
489
        make_dir(pkg_path)
1✔
490
        return pkg_path
1✔
491

492
    async def download_external_plugin(self, url: str) -> str:
1✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        network = Network.get_instance()
×
499
        proxy = network.proxy if network else None
×
500
        async with make_aiohttp_session(proxy=proxy) as session:
×
501
            async with session.get(url) as resp:
×
502
                if resp.status == 200:
×
503
                    with open(path, 'wb') as fd:
×
504
                        async for chunk in resp.content.iter_chunked(10):
×
505
                            fd.write(chunk)
×
506
        return path
×
507

508
    def read_manifest(self, path) -> dict:
1✔
509
        """ return json dict """
510
        with zipfile_lib.ZipFile(path) as file:
×
511
            for filename in file.namelist():
×
512
                if filename.endswith('manifest.json'):
×
513
                    break
×
514
            else:
515
                raise Exception('could not find manifest.json in zip archive')
×
516
            with file.open(filename, 'r') as f:
×
517
                manifest = json.load(f)
×
518
                manifest['path'] = path  # external, path of the zipfile
×
519
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
520
                manifest['is_zip'] = True
×
521
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
522
                return manifest
×
523

524
    def zip_plugin_path(self, name) -> str:
1✔
525
        path = self.get_metadata(name)['path']
×
526
        filename = os.path.basename(path)
×
527
        if name in self.internal_plugin_metadata:
×
528
            pkg_path = self.pkgpath
×
529
        else:
530
            pkg_path = self.get_external_plugin_dir()
×
531
        return os.path.join(pkg_path, filename)
×
532

533
    def find_zip_plugins(self, pkg_path: str, external: bool):
1✔
534
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
535
        if pkg_path is None:
1✔
536
            return
×
537
        for filename in os.listdir(pkg_path):
1✔
538
            path = os.path.join(pkg_path, filename)
×
539
            if not filename.endswith('.zip'):
×
540
                continue
×
541
            try:
×
542
                d = self.read_manifest(path)
×
543
                name = d['name']
×
544
            except Exception:
×
545
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
546
                continue
×
547
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
548
                self.logger.info(f"duplicate plugins for {name=}")
×
549
                continue
×
550
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
551
                continue
×
552
            min_version = d.get('min_electrum_version')
×
553
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
554
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
555
                continue
×
556
            max_version = d.get('max_electrum_version')
×
557
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
558
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
559
                continue
×
560

561
            if not self.cmd_only:
×
562
                gui_good = self.gui_name in d.get('available_for', [])
×
563
                if not gui_good:
×
564
                    continue
×
565
                if 'fullname' not in d:
×
566
                    continue
×
567
                details = d.get('registers_keystore')
×
568
                if details:
×
569
                    self.register_keystore(name, details)
×
570
            if external:
×
571
                self.external_plugin_metadata[name] = d
×
572
            else:
573
                self.internal_plugin_metadata[name] = d
×
574

575
    def get(self, name):
1✔
576
        return self.plugins.get(name)
×
577

578
    def count(self):
1✔
579
        return len(self.plugins)
×
580

581
    def load_plugin(self, name) -> 'BasePlugin':
1✔
582
        """Imports the code of the given plugin.
583
        note: can be called from any thread.
584
        """
585
        if self.get_metadata(name):
1✔
586
            return self.load_plugin_by_name(name)
1✔
587
        else:
588
            raise Exception(f"could not find plugin {name!r}")
×
589

590
    def maybe_load_plugin_init_method(self, name: str) -> None:
1✔
591
        """Loads the __init__.py module of the plugin if it is not already loaded."""
592
        if not self.is_authorized(name):
1✔
NEW
593
            return
×
594
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
1✔
595
        if base_name not in sys.modules:
1✔
596
            metadata = self.get_metadata(name)
1✔
597
            is_zip = metadata.get('is_zip', False)
1✔
598
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
599
            if not is_zip:
1✔
600
                if self.is_external(name):
1✔
601
                    # this branch is deprecated: external plugins are always zip files
602
                    path = os.path.join(metadata['path'], '__init__.py')
×
603
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
604
                else:
605
                    init_spec = importlib.util.find_spec(base_name)
1✔
606
            else:
607
                zipfile = zipimport.zipimporter(metadata['path'])
×
608
                dirname = metadata['dirname']
×
609
                init_spec = zipfile.find_spec(dirname)
×
610

611
            self.exec_module_from_spec(init_spec, base_name)
1✔
612

613
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
1✔
614
        if not self.is_authorized(name):
1✔
NEW
615
            return
×
616
        if name in self.plugins:
1✔
617
            return self.plugins[name]
1✔
618
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
619
        self.maybe_load_plugin_init_method(name)
1✔
620
        is_external = self.is_external(name)
1✔
621
        if not is_external:
1✔
622
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
1✔
623
        else:
624
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
625

626
        spec = importlib.util.find_spec(full_name)
1✔
627
        if spec is None:
1✔
628
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
629
        try:
1✔
630
            module = self.exec_module_from_spec(spec, full_name)
1✔
631
            plugin = module.Plugin(self, self.config, name)
1✔
632
        except Exception as e:
×
633
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
634
        self.add_jobs(plugin.thread_jobs())
1✔
635
        self.plugins[name] = plugin
1✔
636
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
1✔
637
        return plugin
1✔
638

639
    def close_plugin(self, plugin):
1✔
640
        self.remove_jobs(plugin.thread_jobs())
×
641

642
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
1✔
643
        from hashlib import pbkdf2_hmac
×
644
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
645
        return ECPrivkey(secret)
×
646

647
    def uninstall(self, name: str):
1✔
648
        if self.config.get(f'plugins.{name}'):
×
649
            self.config.set_key(f'plugins.{name}', None)
×
650
        if name in self.external_plugin_metadata:
×
651
            zipfile = self.zip_plugin_path(name)
×
652
            os.unlink(zipfile)
×
653
            self.external_plugin_metadata.pop(name)
×
654

655
    def is_internal(self, name) -> bool:
1✔
656
        return name in self.internal_plugin_metadata
×
657

658
    def is_external(self, name) -> bool:
1✔
659
        return name in self.external_plugin_metadata
1✔
660

661
    def is_auto_loaded(self, name):
1✔
662
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
663
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
664

665
    def is_installed(self, name) -> bool:
1✔
666
        """an external plugin may be installed but not authorized """
667
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
668

669
    def is_authorized(self, name) -> bool:
1✔
670
        if name in self.internal_plugin_metadata:
1✔
671
            return True
1✔
672
        if name not in self.external_plugin_metadata:
×
673
            return False
×
674
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
675
        if not pubkey_bytes:
×
676
            return False
×
677
        if not self.is_plugin_zip(name):
×
678
            return False
×
679
        filename = self.zip_plugin_path(name)
×
680
        plugin_hash = get_file_hash256(filename)
×
681
        sig = self.config.get(f'plugins.{name}.authorized')
×
682
        if not sig:
×
683
            return False
×
684
        pubkey = ECPubkey(pubkey_bytes)
×
685
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
686

687
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
1✔
688
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
689
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
690
        plugin_hash = get_file_hash256(filename)
×
691
        sig = privkey.ecdsa_sign(plugin_hash)
×
692
        value = sig.hex()
×
693
        self.config.set_key(f'plugins.{name}.authorized', value)
×
694
        self.config.set_key(f'plugins.{name}.enabled', True)
×
695

696
    def enable(self, name: str) -> 'BasePlugin':
1✔
697
        self.config.enable_plugin(name)
×
698
        p = self.get(name)
×
699
        if p:
×
700
            return p
×
701
        return self.load_plugin(name)
×
702

703
    def disable(self, name: str) -> None:
1✔
704
        self.config.disable_plugin(name)
×
705
        p = self.get(name)
×
706
        if not p:
×
707
            return
×
708
        self.plugins.pop(name)
×
709
        p.close()
×
710
        self.logger.info(f"closed {name}")
×
711

712
    @classmethod
1✔
713
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
1✔
714
        return key.startswith('plugins.')
×
715

716
    def is_available(self, name: str) -> bool:
1✔
717
        d = self.descriptions.get(name)
×
718
        if not d:
×
719
            return False
×
720
        deps = d.get('requires', [])
×
721
        for dep, s in deps:
×
722
            try:
×
723
                __import__(dep)
×
724
            except ImportError as e:
×
725
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
726
                return False
×
727
        return True
×
728

729
    def get_hardware_support(self):
1✔
730
        out = []
×
731
        for name, details in self._hw_wallets.items():
×
NEW
732
            if not self.is_authorized(name):
×
733
                # we allow non-authorized plugins to populate self._hw_wallets
734
                # so that a plugin can be loaded and authorized without having
735
                # to start a new session
NEW
736
                continue
×
737
            try:
×
738
                p = self.get_plugin(name)
×
739
                if p.is_available():
×
740
                    out.append(HardwarePluginToScan(
×
741
                        name=name,
742
                        description=details[2],
743
                        plugin=p,
744
                        exception=None))
745
            except Exception as e:
×
746
                self.logger.exception(f"cannot load plugin for: {name}")
×
747
                out.append(HardwarePluginToScan(
×
748
                    name=name,
749
                    description=details[2],
750
                    plugin=None,
751
                    exception=e))
752
        return out
×
753

754
    def register_wallet_type(self, name, gui_good, wallet_type):
1✔
755
        from .wallet import register_wallet_type, register_constructor
1✔
756
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
1✔
757

758
        def loader():
1✔
759
            plugin = self.get_plugin(name)
1✔
760
            register_constructor(wallet_type, plugin.wallet_class)
1✔
761
        register_wallet_type(wallet_type)
1✔
762
        plugin_loaders[wallet_type] = loader
1✔
763

764
    def register_keystore(self, name, details):
1✔
765
        from .keystore import register_keystore
1✔
766

767
        def dynamic_constructor(d):
1✔
768
            return self.get_plugin(name).keystore_class(d)
1✔
769
        if details[0] == 'hardware':
1✔
770
            self._hw_wallets[name] = details
1✔
771
            self.logger.info(f"registering hardware {name}: {details}")
1✔
772
            register_keystore(details[1], dynamic_constructor)
1✔
773

774
    def get_plugin(self, name: str) -> 'BasePlugin':
1✔
775
        assert self.is_authorized(name)
1✔
776
        if name not in self.plugins:
1✔
777
            self.load_plugin(name)
1✔
778
        return self.plugins[name]
1✔
779

780
    def is_plugin_zip(self, name: str) -> bool:
1✔
781
        """Returns True if the plugin is a zip file"""
782
        if (metadata := self.get_metadata(name)) is None:
×
783
            return False
×
784
        return metadata.get('is_zip', False)
×
785

786
    def get_metadata(self, name: str) -> Optional[dict]:
1✔
787
        """Returns the metadata of the plugin"""
788
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
1✔
789
        if not metadata:
1✔
790
            return None
×
791
        return metadata
1✔
792

793
    def run(self):
1✔
794
        while self.is_running():
1✔
795
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
1✔
796
            self.run_jobs()
1✔
797
        self.on_stop()
1✔
798

799
    def read_file(self, name: str, filename: str) -> bytes:
1✔
800
        if self.is_plugin_zip(name):
×
801
            plugin_filename = self.zip_plugin_path(name)
×
802
            metadata = self.external_plugin_metadata[name]
×
803
            dirname = metadata['dirname']
×
804
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
805
                with myzip.open("/".join([dirname,filename])) as myfile:
×
806
                    return myfile.read()
×
807
        elif name in self.internal_plugin_metadata:
×
808
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
809
            with open(path, 'rb') as myfile:
×
810
                return myfile.read()
×
811
        else:
812
            # no icon
813
            return None
×
814

815

816
def get_file_hash256(path: str) -> bytes:
1✔
817
    """Get the sha256 hash of a file, similar to `sha256sum`."""
818
    with open(path, 'rb') as f:
×
819
        return sha256(f.read())
×
820

821

822
def hook(func):
1✔
823
    hook_names.add(func.__name__)
1✔
824
    return func
1✔
825

826

827
def run_hook(name, *args):
1✔
828
    results = []
1✔
829
    f_list = hooks.get(name, [])
1✔
830
    for p, f in f_list:
1✔
831
        if p.is_enabled():
1✔
832
            try:
1✔
833
                r = f(*args)
1✔
834
            except Exception:
×
835
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
836
                r = False
×
837
            if r:
1✔
838
                results.append(r)
×
839

840
    if results:
1✔
841
        assert len(results) == 1, results
×
842
        return results[0]
×
843

844

845
class BasePlugin(Logger):
1✔
846

847
    def __init__(self, parent, config: 'SimpleConfig', name):
1✔
848
        self.parent = parent  # type: Plugins  # The plugins object
1✔
849
        self.name = name
1✔
850
        self.config = config
1✔
851
        Logger.__init__(self)
1✔
852
        # add self to hooks
853
        for k in dir(self):
1✔
854
            if k in hook_names:
1✔
855
                l = hooks.get(k, [])
1✔
856
                l.append((self, getattr(self, k)))
1✔
857
                hooks[k] = l
1✔
858

859
    def __str__(self):
1✔
860
        return self.name
×
861

862
    def close(self):
1✔
863
        # remove self from hooks
864
        for attr_name in dir(self):
×
865
            if attr_name in hook_names:
×
866
                # found attribute in self that is also the name of a hook
867
                l = hooks.get(attr_name, [])
×
868
                try:
×
869
                    l.remove((self, getattr(self, attr_name)))
×
870
                except ValueError:
×
871
                    # maybe attr name just collided with hook name and was not hook
872
                    continue
×
873
                hooks[attr_name] = l
×
874
        self.parent.close_plugin(self)
×
875
        self.on_close()
×
876

877
    def on_close(self):
1✔
878
        pass
×
879

880
    def requires_settings(self) -> bool:
1✔
881
        return False
×
882

883
    def thread_jobs(self):
1✔
884
        return []
1✔
885

886
    def is_enabled(self):
1✔
887
        if not self.is_available():
×
888
            return False
×
889
        if not self.parent.is_authorized(self.name):
×
890
            return False
×
891
        return self.config.is_plugin_enabled(self.name)
×
892

893
    def is_available(self):
1✔
894
        return True
×
895

896
    def can_user_disable(self):
1✔
897
        return True
×
898

899
    def settings_widget(self, window):
1✔
900
        raise NotImplementedError()
×
901

902
    def settings_dialog(self, window):
1✔
903
        raise NotImplementedError()
×
904

905
    def read_file(self, filename: str) -> bytes:
1✔
906
        return self.parent.read_file(self.name, filename)
×
907

908
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
1✔
909
        """Returns a dict which is persisted in the per-wallet database."""
910
        plugin_storage = wallet.db.get_plugin_storage()
×
911
        return plugin_storage.setdefault(self.name, {})
×
912

913

914
class DeviceUnpairableError(UserFacingException): pass
1✔
915
class HardwarePluginLibraryUnavailable(Exception): pass
1✔
916
class CannotAutoSelectDevice(Exception): pass
1✔
917

918

919
class Device(NamedTuple):
1✔
920
    path: Union[str, bytes]
1✔
921
    interface_number: int
1✔
922
    id_: str
1✔
923
    product_key: Any   # when using hid, often Tuple[int, int]
1✔
924
    usage_page: int
1✔
925
    transport_ui_string: str
1✔
926

927

928
class DeviceInfo(NamedTuple):
1✔
929
    device: Device
1✔
930
    label: Optional[str] = None
1✔
931
    initialized: Optional[bool] = None
1✔
932
    exception: Optional[Exception] = None
1✔
933
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
1✔
934
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
1✔
935
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
1✔
936

937
    def label_for_device_select(self) -> str:
1✔
938
        return (
×
939
            "{label} ({maybe_model}{init}, {transport})"
940
            .format(
941
                label=self.label or _("An unnamed {}").format(self.plugin_name),
942
                init=(_("initialized") if self.initialized else _("wiped")),
943
                transport=self.device.transport_ui_string,
944
                maybe_model=f"{self.model_name}, " if self.model_name else ""
945
            )
946
        )
947

948

949
class HardwarePluginToScan(NamedTuple):
1✔
950
    name: str
1✔
951
    description: str
1✔
952
    plugin: Optional['HW_PluginBase']
1✔
953
    exception: Optional[Exception]
1✔
954

955

956
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
1✔
957

958

959
# hidapi is not thread-safe
960
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
961
#     https://github.com/libusb/hidapi/issues/45
962
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
963
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
964
# It is not entirely clear to me, exactly what is safe and what isn't, when
965
# using multiple threads...
966
# Hence, we use a single thread for all device communications, including
967
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
968
# the following thread:
969
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
1✔
970
    max_workers=1,
971
    thread_name_prefix='hwd_comms_thread'
972
)
973

974
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
975
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
976
# To keep it simple, let's just import it now, as we are likely in the main thread here.
977
if threading.current_thread() is not threading.main_thread():
1✔
978
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
979
try:
1✔
980
    import hid
1✔
981
except ImportError:
1✔
982
    pass
1✔
983

984

985
T = TypeVar('T')
1✔
986

987

988
def run_in_hwd_thread(func: Callable[[], T]) -> T:
1✔
989
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
990
        return func()
×
991
    else:
992
        fut = _hwd_comms_executor.submit(func)
×
993
        return fut.result()
×
994
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
995

996

997
def runs_in_hwd_thread(func):
1✔
998
    @wraps(func)
1✔
999
    def wrapper(*args, **kwargs):
1✔
1000
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
1001
    return wrapper
1✔
1002

1003

1004
def assert_runs_in_hwd_thread():
1✔
1005
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
1006
        raise Exception("must only be called from HWD communication thread")
×
1007

1008

1009
class DeviceMgr(ThreadJob):
1✔
1010
    """Manages hardware clients.  A client communicates over a hardware
1011
    channel with the device.
1012

1013
    In addition to tracking device HID IDs, the device manager tracks
1014
    hardware wallets and manages wallet pairing.  A HID ID may be
1015
    paired with a wallet when it is confirmed that the hardware device
1016
    matches the wallet, i.e. they have the same master public key.  A
1017
    HID ID can be unpaired if e.g. it is wiped.
1018

1019
    Because of hotplugging, a wallet must request its client
1020
    dynamically each time it is required, rather than caching it
1021
    itself.
1022

1023
    The device manager is shared across plugins, so just one place
1024
    does hardware scans when needed.  By tracking HID IDs, if a device
1025
    is plugged into a different port the wallet is automatically
1026
    re-paired.
1027

1028
    Wallets are informed on connect / disconnect events.  It must
1029
    implement connected(), disconnected() callbacks.  Being connected
1030
    implies a pairing.  Callbacks can happen in any thread context,
1031
    and we do them without holding the lock.
1032

1033
    Confusingly, the HID ID (serial number) reported by the HID system
1034
    doesn't match the device ID reported by the device itself.  We use
1035
    the HID IDs.
1036

1037
    This plugin is thread-safe.  Currently only devices supported by
1038
    hidapi are implemented."""
1039

1040
    def __init__(self, config: SimpleConfig):
1✔
1041
        ThreadJob.__init__(self)
1✔
1042
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1043
        self.pairing_code_to_id = {}  # type: Dict[str, str]
1✔
1044
        # A client->id_ map. Needs self.lock.
1045
        self.clients = {}  # type: Dict[HardwareClientBase, str]
1✔
1046
        # What we recognise.  (vendor_id, product_id) -> Plugin
1047
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
1✔
1048
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
1✔
1049
        # Custom enumerate functions for devices we don't know about.
1050
        self._enumerate_func = set()  # Needs self.lock.
1✔
1051
        self._ongoing_timeout_checks = {}  # type: Dict[str, Future]
1✔
1052

1053
        self.lock = threading.RLock()
1✔
1054

1055
        self.config = config
1✔
1056

1057
    def thread_jobs(self):
1✔
1058
        # Thread job to handle device timeouts
1059
        return [self]
1✔
1060

1061
    def run(self):
1✔
1062
        """Handle device timeouts.  Runs in the context of the Plugins
1063
        thread."""
1064
        with self.lock:
1✔
1065
            clients = list(self.clients.items())
1✔
1066
        cutoff = time.time() - self.config.get_session_timeout()
1✔
1067
        for client, client_id in clients:
1✔
1068
            if fut := self._ongoing_timeout_checks.get(client_id):
×
1069
                if not fut.done():
×
1070
                    continue
×
1071
            # scheduling the timeout check prevents blocking the Plugins DaemonThread if the
1072
            # _hwd_comms_executor Thread is blocked (e.g. due to it awaiting user input).
1073
            fut = _hwd_comms_executor.submit(client.timeout, cutoff)
×
1074
            self._ongoing_timeout_checks[client_id] = fut
×
1075

1076
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
1✔
1077
        for pair in device_pairs:
×
1078
            self._recognised_hardware[pair] = plugin
×
1079

1080
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
1✔
1081
        for vendor_id in vendor_ids:
×
1082
            self._recognised_vendor[vendor_id] = plugin
×
1083

1084
    def register_enumerate_func(self, func):
1✔
1085
        with self.lock:
×
1086
            self._enumerate_func.add(func)
×
1087

1088
    @runs_in_hwd_thread
1✔
1089
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
1✔
1090
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1091
        # Get from cache first
1092
        client = self._client_by_id(device.id_)
×
1093
        if client:
×
1094
            return client
×
1095
        client = plugin.create_client(device, handler)
×
1096
        if client:
×
1097
            self.logger.info(f"Registering {client}")
×
1098
            with self.lock:
×
1099
                self.clients[client] = device.id_
×
1100
        return client
×
1101

1102
    def id_by_pairing_code(self, pairing_code):
1✔
1103
        with self.lock:
×
1104
            return self.pairing_code_to_id.get(pairing_code)
×
1105

1106
    def pairing_code_by_id(self, id_):
1✔
1107
        with self.lock:
×
1108
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1109
                if id2 == id_:
×
1110
                    return pairing_code
×
1111
            return None
×
1112

1113
    def unpair_pairing_code(self, pairing_code):
1✔
1114
        with self.lock:
×
1115
            if pairing_code not in self.pairing_code_to_id:
×
1116
                return
×
1117
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1118
        self._close_client(_id)
×
1119

1120
    def unpair_id(self, id_):
1✔
1121
        pairing_code = self.pairing_code_by_id(id_)
×
1122
        if pairing_code:
×
1123
            self.unpair_pairing_code(pairing_code)
×
1124
        else:
1125
            self._close_client(id_)
×
1126

1127
    def _close_client(self, id_):
1✔
1128
        with self.lock:
×
1129
            client = self._client_by_id(id_)
×
1130
            self.clients.pop(client, None)
×
1131
            if fut := self._ongoing_timeout_checks.pop(id_, None):
×
1132
                fut.cancel()
×
1133
        if client:
×
1134
            client.close()
×
1135

1136
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
1✔
1137
        with self.lock:
×
1138
            for client, client_id in self.clients.items():
×
1139
                if client_id == id_:
×
1140
                    return client
×
1141
        return None
×
1142

1143
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
1✔
1144
        """Returns a client for the device ID if one is registered.  If
1145
        a device is wiped or in bootloader mode pairing is impossible;
1146
        in such cases we communicate by device ID and not wallet."""
1147
        if scan_now:
×
1148
            self.scan_devices()
×
1149
        return self._client_by_id(id_)
×
1150

1151
    @runs_in_hwd_thread
1✔
1152
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
1✔
1153
                            keystore: 'Hardware_KeyStore',
1154
                            force_pair: bool, *,
1155
                            devices: Sequence['Device'] = None,
1156
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1157
        self.logger.info("getting client for keystore")
×
1158
        if handler is None:
×
1159
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1160
        handler.update_status(False)
×
1161
        pcode = keystore.pairing_code()
×
1162
        client = None
×
1163
        # search existing clients first (fast-path)
1164
        if not devices:
×
1165
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1166
        # search clients again, now allowing a (slow) scan
1167
        if client is None:
×
1168
            if devices is None:
×
1169
                devices = self.scan_devices()
×
1170
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1171
        if client is None and force_pair:
×
1172
            try:
×
1173
                info = self.select_device(plugin, handler, keystore, devices,
×
1174
                                          allow_user_interaction=allow_user_interaction)
1175
            except CannotAutoSelectDevice:
×
1176
                pass
×
1177
            else:
1178
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1179
        if client:
×
1180
            handler.update_status(True)
×
1181
            # note: if select_device was called, we might also update label etc here:
1182
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1183
        self.logger.info("end client for keystore")
×
1184
        return client
×
1185

1186
    def client_by_pairing_code(
1✔
1187
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1188
        devices: Sequence['Device'],
1189
    ) -> Optional['HardwareClientBase']:
1190
        _id = self.id_by_pairing_code(pairing_code)
×
1191
        client = self._client_by_id(_id)
×
1192
        if client:
×
1193
            if type(client.plugin) != type(plugin):
×
1194
                return
×
1195
            # An unpaired client might have another wallet's handler
1196
            # from a prior scan.  Replace to fix dialog parenting.
1197
            client.handler = handler
×
1198
            return client
×
1199

1200
        for device in devices:
×
1201
            if device.id_ == _id:
×
1202
                return self.create_client(device, handler, plugin)
×
1203

1204
    def force_pair_keystore(
1✔
1205
        self,
1206
        *,
1207
        plugin: 'HW_PluginBase',
1208
        handler: 'HardwareHandlerBase',
1209
        info: 'DeviceInfo',
1210
        keystore: 'Hardware_KeyStore',
1211
    ) -> 'HardwareClientBase':
1212
        xpub = keystore.xpub
×
1213
        derivation = keystore.get_derivation_prefix()
×
1214
        assert derivation is not None
×
1215
        xtype = bip32.xpub_type(xpub)
×
1216
        client = self._client_by_id(info.device.id_)
×
1217
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1218
            # See comment above for same code
1219
            client.handler = handler
×
1220
            # This will trigger a PIN/passphrase entry request
1221
            try:
×
1222
                client_xpub = client.get_xpub(derivation, xtype)
×
1223
            except (UserCancelled, RuntimeError):
×
1224
                # Bad / cancelled PIN / passphrase
1225
                client_xpub = None
×
1226
            if client_xpub == xpub:
×
1227
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1228
                with self.lock:
×
1229
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1230
                return client
×
1231

1232
        # The user input has wrong PIN or passphrase, or cancelled input,
1233
        # or it is not pairable
1234
        raise DeviceUnpairableError(
×
1235
            _('Electrum cannot pair with your {}.\n\n'
1236
              'Before you request bitcoins to be sent to addresses in this '
1237
              'wallet, ensure you can pair with your device, or that you have '
1238
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1239
              'receive will be unspendable.').format(plugin.device))
1240

1241
    def list_pairable_device_infos(
1✔
1242
        self,
1243
        *,
1244
        handler: Optional['HardwareHandlerBase'],
1245
        plugin: 'HW_PluginBase',
1246
        devices: Sequence['Device'] = None,
1247
        include_failing_clients: bool = False,
1248
    ) -> List['DeviceInfo']:
1249
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1250
        Already paired devices are also included, as it is okay to reuse them.
1251
        """
1252
        if not plugin.libraries_available:
×
1253
            message = plugin.get_library_not_available_message()
×
1254
            raise HardwarePluginLibraryUnavailable(message)
×
1255
        if devices is None:
×
1256
            devices = self.scan_devices()
×
1257
        infos = []
×
1258
        for device in devices:
×
1259
            if not plugin.can_recognize_device(device):
×
1260
                continue
×
1261
            try:
×
1262
                client = self.create_client(device, handler, plugin)
×
1263
                if not client:
×
1264
                    continue
×
1265
                label = client.label()
×
1266
                is_initialized = client.is_initialized()
×
1267
                soft_device_id = client.get_soft_device_id()
×
1268
                model_name = client.device_model_name()
×
1269
            except Exception as e:
×
1270
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1271
                if include_failing_clients:
×
1272
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1273
                continue
×
1274
            infos.append(DeviceInfo(device=device,
×
1275
                                    label=label,
1276
                                    initialized=is_initialized,
1277
                                    plugin_name=plugin.name,
1278
                                    soft_device_id=soft_device_id,
1279
                                    model_name=model_name))
1280

1281
        return infos
×
1282

1283
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
1✔
1284
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1285
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1286
        """Select the device to use for keystore."""
1287
        # ideally this should not be called from the GUI thread...
1288
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1289
        while True:
×
1290
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1291
            if infos:
×
1292
                break
×
1293
            if not allow_user_interaction:
×
1294
                raise CannotAutoSelectDevice()
×
1295
            msg = _('Please insert your {}').format(plugin.device)
×
1296
            msg += " ("
×
1297
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1298
                msg += f"label: {keystore.label}, "
×
1299
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1300
            msg += ').\n\n{}\n\n{}'.format(
×
1301
                _('Verify the cable is connected and that '
1302
                  'no other application is using it.'),
1303
                _('Try to connect again?')
1304
            )
1305
            if not handler.yes_no_question(msg):
×
1306
                raise UserCancelled()
×
1307
            devices = None
×
1308

1309
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1310
        # method 1: select device by id
1311
        if keystore.soft_device_id:
×
1312
            for info in infos:
×
1313
                if info.soft_device_id == keystore.soft_device_id:
×
1314
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1315
                    return info
×
1316
        # method 2: select device by label
1317
        #           but only if not a placeholder label and only if there is no collision
1318
        device_labels = [info.label for info in infos]
×
1319
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1320
                and device_labels.count(keystore.label) == 1):
1321
            for info in infos:
×
1322
                if info.label == keystore.label:
×
1323
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1324
                    return info
×
1325
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1326
        #           saved for keystore anyway, select it
1327
        if (len(infos) == 1
×
1328
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1329
                and keystore.soft_device_id is None):
1330
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1331
            return infos[0]
×
1332

1333
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1334
        if not allow_user_interaction:
×
1335
            raise CannotAutoSelectDevice()
×
1336
        # ask user to select device manually
1337
        msg = (
×
1338
                _("Could not automatically pair with device for given keystore.") + "\n"
1339
                + f"(keystore label: {keystore.label!r}, "
1340
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1341
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1342
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1343
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1344
                   for (idx, info) in enumerate(infos)]
1345
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1346
                          f"num options: {len(infos)}. options: {infos}")
1347
        c = handler.query_choice(msg, choices)
×
1348
        if c is None:
×
1349
            raise UserCancelled()
×
1350
        info = infos[c]
×
1351
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1352
        # note: updated label/soft_device_id will be saved after pairing succeeds
1353
        return info
×
1354

1355
    @runs_in_hwd_thread
1✔
1356
    def _scan_devices_with_hid(self) -> List['Device']:
1✔
1357
        try:
×
1358
            import hid  # noqa: F811
×
1359
        except ImportError:
×
1360
            return []
×
1361

1362
        devices = []
×
1363
        for d in hid.enumerate(0, 0):
×
1364
            vendor_id = d['vendor_id']
×
1365
            product_key = (vendor_id, d['product_id'])
×
1366
            plugin = None
×
1367
            if product_key in self._recognised_hardware:
×
1368
                plugin = self._recognised_hardware[product_key]
×
1369
            elif vendor_id in self._recognised_vendor:
×
1370
                plugin = self._recognised_vendor[vendor_id]
×
1371
            if plugin:
×
1372
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1373
                if device:
×
1374
                    devices.append(device)
×
1375
        return devices
×
1376

1377
    @runs_in_hwd_thread
1✔
1378
    @profiler
1✔
1379
    def scan_devices(self) -> Sequence['Device']:
1✔
1380
        self.logger.info("scanning devices...")
×
1381

1382
        # First see what's connected that we know about
1383
        devices = self._scan_devices_with_hid()
×
1384

1385
        # Let plugin handlers enumerate devices we don't know about
1386
        with self.lock:
×
1387
            enumerate_funcs = list(self._enumerate_func)
×
1388
        for f in enumerate_funcs:
×
1389
            try:
×
1390
                new_devices = f()
×
1391
            except BaseException as e:
×
1392
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1393
            else:
1394
                devices.extend(new_devices)
×
1395

1396
        # find out what was disconnected
1397
        client_ids = [dev.id_ for dev in devices]
×
1398
        disconnected_clients = []
×
1399
        with self.lock:
×
1400
            connected = {}
×
1401
            for client, id_ in self.clients.items():
×
1402
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1403
                    connected[client] = id_
×
1404
                else:
1405
                    disconnected_clients.append((client, id_))
×
1406
            self.clients = connected
×
1407

1408
        # Unpair disconnected devices
1409
        for client, id_ in disconnected_clients:
×
1410
            self.unpair_id(id_)
×
1411
            if client.handler:
×
1412
                client.handler.update_status(False)
×
1413

1414
        return devices
×
1415

1416
    @classmethod
1✔
1417
    def version_info(cls) -> Mapping[str, Optional[str]]:
1✔
1418
        ret = {}
×
1419
        # add libusb
1420
        try:
×
1421
            import usb1
×
1422
        except Exception as e:
×
1423
            ret["libusb.version"] = None
×
1424
        else:
1425
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1426
            try:
×
1427
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1428
            except AttributeError:
×
1429
                ret["libusb.path"] = None
×
1430
        # add hidapi
1431
        try:
×
1432
            import hid  # noqa: F811
×
1433
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1434
        except Exception as e:
×
1435
            from importlib.metadata import version
×
1436
            try:
×
1437
                ret["hidapi.version"] = version("hidapi")
×
1438
            except ImportError:
×
1439
                ret["hidapi.version"] = None
×
1440
        return ret
×
1441

1442
    def trigger_pairings(
1✔
1443
            self,
1444
            keystores: Sequence['KeyStore'],
1445
            *,
1446
            allow_user_interaction: bool = True,
1447
            devices: Sequence['Device'] = None,
1448
    ) -> None:
1449
        """Given a list of keystores, try to pair each with a connected hardware device.
1450

1451
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1452
        try to pair each keystore individually. Consider the following scenario:
1453
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1454
        - assume none of the devices are paired yet
1455
        1. if we tried to individually pair keystores, we might try with ks1 first
1456
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1457
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1458
             which is confusing and error-prone. It's especially problematic if the hw device does
1459
             not support labels (such as Ledger), as then the user cannot easily distinguish
1460
             same-type devices. (see #4199)
1461
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1462
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1463
        """
1464
        from .keystore import Hardware_KeyStore
×
1465
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1466
        if not keystores:
×
1467
            return
×
1468
        if devices is None:
×
1469
            devices = self.scan_devices()
×
1470
        # first pair with all devices that can be auto-selected
1471
        for ks in keystores:
×
1472
            try:
×
1473
                ks.get_client(
×
1474
                    force_pair=True,
1475
                    allow_user_interaction=False,
1476
                    devices=devices,
1477
                )
1478
            except UserCancelled:
×
1479
                pass
×
1480
        if allow_user_interaction:
×
1481
            # now do manual selections
1482
            for ks in keystores:
×
1483
                try:
×
1484
                    ks.get_client(
×
1485
                        force_pair=True,
1486
                        allow_user_interaction=True,
1487
                        devices=devices,
1488
                    )
1489
                except UserCancelled:
×
1490
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc