• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6348072453668864

10 Jun 2025 09:02AM UTC coverage: 59.753% (+0.002%) from 59.751%
6348072453668864

push

CirrusCI

ecdsa
plugins dialog: fix is_available, do not show plugins that are not available

1 of 2 new or added lines in 1 file covered. (50.0%)

1 existing line in 1 file now uncovered.

21912 of 36671 relevant lines covered (59.75%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.89
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
5✔
50
                   make_dir, make_aiohttp_session)
51
from . import bip32
5✔
52
from . import plugins
5✔
53
from .simple_config import SimpleConfig
5✔
54
from .logging import get_logger, Logger
5✔
55
from .crypto import sha256
5✔
56
from .network import Network
5✔
57

58
if TYPE_CHECKING:
5✔
59
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
60
    from .keystore import Hardware_KeyStore, KeyStore
×
61
    from .wallet import Abstract_Wallet
×
62

63

64
_logger = get_logger(__name__)
5✔
65
plugin_loaders = {}
5✔
66
hook_names = set()
5✔
67
hooks = {}
5✔
68
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
69

70
PLUGIN_PASSWORD_VERSION = 1
5✔
71

72

73
class Plugins(DaemonThread):
5✔
74

75
    pkgpath = os.path.dirname(plugins.__file__)
5✔
76
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
77
    keyfile_posix = '/etc/electrum/plugins_key'
5✔
78
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
79

80
    @profiler
5✔
81
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
82
        self.config = config
5✔
83
        self.cmd_only = cmd_only  # type: bool
5✔
84
        self.internal_plugin_metadata = {}
5✔
85
        self.external_plugin_metadata = {}
5✔
86
        if cmd_only:
5✔
87
            # only import the command modules of plugins
88
            Logger.__init__(self)
×
89
            self.find_plugins()
×
90
            self.load_plugins()
×
91
            return
×
92
        DaemonThread.__init__(self)
5✔
93
        self.device_manager = DeviceMgr(config)
5✔
94
        self.name = 'Plugins'  # set name of thread
5✔
95
        self.hw_wallets = {}
5✔
96
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
97
        self.gui_name = gui_name
5✔
98
        self.find_plugins()
5✔
99
        self.load_plugins()
5✔
100
        self.add_jobs(self.device_manager.thread_jobs())
5✔
101
        self.start()
5✔
102

103
    @property
5✔
104
    def descriptions(self):
5✔
105
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
106

107
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
108
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
109
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
110
        for loader, name, ispkg in iter_modules:
5✔
111
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
112
            #       once as data and once as code. To honor the "no duplicates" rule below,
113
            #       we exclude the ones packaged as *code*, here:
114
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
115
                continue
×
116
            module_path = os.path.join(pkg_path, name)
5✔
117
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
118
                continue
×
119
            try:
5✔
120
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
121
                    d = json.load(f)
5✔
122
            except FileNotFoundError:
×
123
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
124
                continue
×
125
            if 'fullname' not in d:
5✔
126
                continue
×
127
            d['path'] = module_path
5✔
128
            if not self.cmd_only:
5✔
129
                gui_good = self.gui_name in d.get('available_for', [])
5✔
130
                if not gui_good:
5✔
131
                    continue
5✔
132
                details = d.get('registers_wallet_type')
5✔
133
                if details:
5✔
134
                    self.register_wallet_type(name, gui_good, details)
5✔
135
                details = d.get('registers_keystore')
5✔
136
                if details:
5✔
137
                    self.register_keystore(name, gui_good, details)
5✔
138
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
139
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
140
                _logger.info(f"duplicate plugins? for {name=}")
×
141
                continue
×
142
            if not external:
5✔
143
                self.internal_plugin_metadata[name] = d
5✔
144
            else:
145
                self.external_plugin_metadata[name] = d
×
146

147
    @staticmethod
5✔
148
    def exec_module_from_spec(spec, path: str):
5✔
149
        if prev_fail := _exec_module_failure.get(path):
5✔
150
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
151
        try:
5✔
152
            module = importlib.util.module_from_spec(spec)
5✔
153
            # sys.modules needs to be modified for relative imports to work
154
            # see https://stackoverflow.com/a/50395128
155
            sys.modules[path] = module
5✔
156
            spec.loader.exec_module(module)
5✔
157
        except Exception as e:
×
158
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
159
            # so the import system knows it failed. If called again for the same plugin, we do not
160
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
161
            # might have defined commands).
162
            _exec_module_failure[path] = e
×
163
            if path in sys.modules:
×
164
                sys.modules.pop(path, None)
×
165
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
166
        return module
5✔
167

168
    def find_plugins(self):
5✔
169
        internal_plugins_path = (self.pkgpath, False)
5✔
170
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
171
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
172
            if pkg_path and os.path.exists(pkg_path):
5✔
173
                if not external:
5✔
174
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
175
                else:
176
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
177

178
    def load_plugins(self):
5✔
179
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
180
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
181
                try:
×
182
                    if self.cmd_only:  # only load init method to register commands
×
183
                        self.maybe_load_plugin_init_method(name)
×
184
                    else:
185
                        self.load_plugin_by_name(name)
×
186
                except BaseException as e:
×
187
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
188

189
    def _has_root_permissions(self, path):
5✔
190
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
191

192
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
193
        if sys.platform in ['windows', 'win32']:
×
194
            keyfile_path = self.keyfile_windows
×
195
            keyfile_help = _('This file can be edited with Regdit')
×
196
        elif 'ANDROID_DATA' in os.environ:
×
197
            raise Exception('platform not supported')
×
198
        else:
199
            # treat unknown platforms and macOS as linux-like
200
            keyfile_path = self.keyfile_posix
×
201
            keyfile_help = "" if not key_hex else "".join([
×
202
                                         _('The file must have root permissions'),
203
                                         ".\n\n",
204
                                         _("To set it you can also use the Auto-Setup or run "
205
                                           "the following terminal command"),
206
                                         ":\n\n",
207
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
208
            ])
209
        return keyfile_path, keyfile_help
×
210

211
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
212
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
213
        try:
×
214
            if sys.platform in ['windows', 'win32']:
×
215
                self._write_key_to_regedit_windows(pubkey_hex)
×
216
            elif 'ANDROID_DATA' in os.environ:
×
217
                raise Exception('platform not supported')
×
218
            elif sys.platform.startswith('darwin'):  # macOS
×
219
                self._write_key_to_root_file_macos(pubkey_hex)
×
220
            else:
221
                self._write_key_to_root_file_linux(pubkey_hex)
×
222
        except Exception:
×
223
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
224
            return False
×
225
        return True
×
226

227
    def try_auto_key_reset(self) -> bool:
5✔
228
        try:
×
229
            if sys.platform in ['windows', 'win32']:
×
230
                self._delete_plugin_key_from_windows_registry()
×
231
            elif 'ANDROID_DATA' in os.environ:
×
232
                raise Exception('platform not supported')
×
233
            elif sys.platform.startswith('darwin'):  # macOS
×
234
                self._delete_macos_plugin_keyfile()
×
235
            else:
236
                self._delete_linux_plugin_keyfile()
×
237
        except Exception:
×
238
            self.logger.exception(f'auto-reset of plugin key failed')
×
239
            return False
×
240
        return True
×
241

242
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
243
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
244
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
245
        sh_command = (
×
246
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
247
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
248
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
249
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
250
        )
251
        return sh_command
×
252

253
    @staticmethod
5✔
254
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
5✔
255
        """
256
        Inspired by
257
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
258
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
259
        """
260
        from shlex import quote
×
261

262
        def quote_shell(args):
×
263
            return " ".join(quote(arg) for arg in args)
×
264

265
        def quote_applescript(string):
×
266
            charmap = {
×
267
                "\n": "\\n",
268
                "\r": "\\r",
269
                "\t": "\\t",
270
                "\"": "\\\"",
271
                "\\": "\\\\",
272
            }
273
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
274

275
        commands = [
×
276
            "osascript",
277
            "-e",
278
            "do shell script %s "
279
            "with administrator privileges "
280
            "without altering line endings"
281
            % quote_applescript(quote_shell(commands))
282
        ]
283
        return commands
×
284

285
    @staticmethod
5✔
286
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
287
        """
288
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
289
        """
290
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
291
        # for the result of the process (returns no process handle)
292
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
293
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
294

295
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
296
        class SHELLEXECUTEINFO(Structure):
×
297
            _fields_ = [
×
298
                ('cbSize', DWORD),
299
                ('fMask', c_ulong),
300
                ('hwnd', HWND),
301
                ('lpVerb', LPCWSTR),
302
                ('lpFile', LPCWSTR),
303
                ('lpParameters', LPCWSTR),
304
                ('lpDirectory', LPCWSTR),
305
                ('nShow', c_ulong),
306
                ('hInstApp', HINSTANCE),
307
                ('lpIDList', c_ulong),
308
                ('lpClass', LPCWSTR),
309
                ('hkeyClass', HKEY),
310
                ('dwHotKey', DWORD),
311
                ('hIcon', HANDLE),
312
                ('hProcess', HANDLE)
313
            ]
314

315
        info = SHELLEXECUTEINFO()
×
316
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
317
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
318
        info.hwnd = None
×
319
        info.lpVerb = 'runas'  # run as administrator
×
320
        info.lpFile = 'reg.exe'  # the executable to run
×
321
        info.lpParameters = reg_exe_command  # the registry edit command
×
322
        info.lpDirectory = None
×
323
        info.nShow = 1
×
324

325
        # Execute and wait
326
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
327
            error = windll.kernel32.GetLastError()
×
328
            raise Exception(f'Error executing registry command: {error}')
×
329

330
        # block until the process is done or 5 sec timeout
331
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
332

333
        # Close handle
334
        windll.kernel32.CloseHandle(info.hProcess)
×
335

336
    @staticmethod
5✔
337
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
5✔
338
        """
339
        Executes the given commands in a subprocess and asserts that it was successful.
340
        """
341
        import subprocess
×
342
        with subprocess.Popen(
×
343
            commands,
344
            stdout=subprocess.PIPE,
345
            stderr=subprocess.PIPE,
346
            text=True,
347
        ) as process:
348
            stdout, stderr = process.communicate()
×
349
            if process.returncode != 0:
×
350
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
351

352
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
353
        """
354
        Spawns a pkexec subprocess to write the key to a file with root permissions.
355
        This will open an OS dialog asking for the root password. Can only succeed if
356
        the system has polkit installed.
357
        """
358
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
359

360
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
361
        commands = ['pkexec', 'sh', '-c', sh_command]
×
362
        self._execute_commands_in_subprocess(commands)
×
363

364
        # check if the key was written correctly
365
        with open(self.keyfile_posix, 'r') as f:
×
366
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
367
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
368

369
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
370
        """
371
        Deletes the root owned key file at self.keyfile_posix.
372
        """
373
        if not os.path.exists(self.keyfile_posix):
×
374
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
375
            return
×
376
        if not self._has_root_permissions(self.keyfile_posix):
×
377
            os.unlink(self.keyfile_posix)
×
378
            return
×
379

380
        # use pkexec to delete the file as root user
381
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
382
        self._execute_commands_in_subprocess(commands)
×
383
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
384

385
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
5✔
386
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
387

388
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
389
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
390

391
        self._execute_commands_in_subprocess(macos_commands)
×
392
        with open(self.keyfile_posix, 'r') as f:
×
393
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
394
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
395

396
    def _delete_macos_plugin_keyfile(self) -> None:
5✔
397
        if not os.path.exists(self.keyfile_posix):
×
398
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
399
            return
×
400
        if not self._has_root_permissions(self.keyfile_posix):
×
401
            os.unlink(self.keyfile_posix)
×
402
            return
×
403
        # use osascript to delete the file as root user
404
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
405
        self._execute_commands_in_subprocess(macos_commands)
×
406
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
407

408
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
409
        """
410
        Writes the key to the Windows registry with windows UAC prompt.
411
        """
412
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
413

414
        value_type = 'REG_SZ'
×
415
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
416

417
        self._run_win_regedit_as_admin(command)
×
418

419
        # check if the key was written correctly
420
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
421
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
422
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
423
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
424

425
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
426
        """
427
        Deletes the PluginsKey dir in the Windows registry.
428
        """
429
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
430

431
        command = f'delete "{self.keyfile_windows}" /f'
×
432
        self._run_win_regedit_as_admin(command)
×
433

434
        try:
×
435
            # do a sanity check to see if the key has been deleted
436
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
437
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
438
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
439
        except FileNotFoundError:
×
440
            pass
×
441

442
    def create_new_key(self, password:str) -> str:
5✔
443
        salt = os.urandom(32)
×
444
        privkey = self.derive_privkey(password, salt)
×
445
        pubkey = privkey.get_public_key_bytes()
×
446
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
447
        return key.hex()
×
448

449
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
450
        """
451
        returns pubkey, salt
452
        returns None, None if the pubkey has not been set
453
        """
454
        if sys.platform in ['windows', 'win32']:
×
455
            import winreg
×
456
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
457
                try:
×
458
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
459
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
460
                except Exception as e:
×
461
                    self.logger.info(f'winreg error: {e}')
×
462
                    return None, None
×
463
        elif 'ANDROID_DATA' in os.environ:
×
464
            return None, None
×
465
        else:
466
            # treat unknown platforms as linux-like
467
            if not os.path.exists(self.keyfile_posix):
×
468
                return None, None
×
469
            if not self._has_root_permissions(self.keyfile_posix):
×
470
                return
×
471
            with open(self.keyfile_posix) as f:
×
472
                key_hex = f.read()
×
473
        try:
×
474
            key = bytes.fromhex(key_hex)
×
475
            version = key[0]
×
476
        except Exception:
×
477
            self.logger.exception(f'{key_hex=} invalid')
×
478
            return None, None
×
479
        if version != PLUGIN_PASSWORD_VERSION:
×
480
            self.logger.info(f'unknown plugin password version: {version}')
×
481
            return None, None
×
482
        # all good
483
        salt = key[1:1+32]
×
484
        pubkey = key[1+32:]
×
485
        return pubkey, salt
×
486

487
    def get_external_plugin_dir(self) -> str:
5✔
488
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
489
        make_dir(pkg_path)
5✔
490
        return pkg_path
5✔
491

492
    async def download_external_plugin(self, url: str) -> str:
5✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        network = Network.get_instance()
×
499
        proxy = network.proxy if network else None
×
500
        async with make_aiohttp_session(proxy=proxy) as session:
×
501
            async with session.get(url) as resp:
×
502
                if resp.status == 200:
×
503
                    with open(path, 'wb') as fd:
×
504
                        async for chunk in resp.content.iter_chunked(10):
×
505
                            fd.write(chunk)
×
506
        return path
×
507

508
    def read_manifest(self, path) -> dict:
5✔
509
        """ return json dict """
510
        with zipfile_lib.ZipFile(path) as file:
×
511
            for filename in file.namelist():
×
512
                if filename.endswith('manifest.json'):
×
513
                    break
×
514
            else:
515
                raise Exception('could not find manifest.json in zip archive')
×
516
            with file.open(filename, 'r') as f:
×
517
                manifest = json.load(f)
×
518
                manifest['path'] = path  # external, path of the zipfile
×
519
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
520
                manifest['is_zip'] = True
×
521
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
522
                return manifest
×
523

524
    def zip_plugin_path(self, name) -> str:
5✔
525
        path = self.get_metadata(name)['path']
×
526
        filename = os.path.basename(path)
×
527
        if name in self.internal_plugin_metadata:
×
528
            pkg_path = self.pkgpath
×
529
        else:
530
            pkg_path = self.get_external_plugin_dir()
×
531
        return os.path.join(pkg_path, filename)
×
532

533
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
534
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
535
        if pkg_path is None:
5✔
536
            return
×
537
        for filename in os.listdir(pkg_path):
5✔
538
            path = os.path.join(pkg_path, filename)
×
539
            if not filename.endswith('.zip'):
×
540
                continue
×
541
            try:
×
542
                d = self.read_manifest(path)
×
543
                name = d['name']
×
544
            except Exception:
×
545
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
546
                continue
×
547
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
548
                self.logger.info(f"duplicate plugins for {name=}")
×
549
                continue
×
550
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
551
                continue
×
552
            min_version = d.get('min_electrum_version')
×
553
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
554
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
555
                continue
×
556
            max_version = d.get('max_electrum_version')
×
557
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
558
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
559
                continue
×
560

561
            if not self.cmd_only:
×
562
                gui_good = self.gui_name in d.get('available_for', [])
×
563
                if not gui_good:
×
564
                    continue
×
565
                if 'fullname' not in d:
×
566
                    continue
×
567
                details = d.get('registers_keystore')
×
568
                if details:
×
569
                    self.register_keystore(name, gui_good, details)
×
570
            if external:
×
571
                self.external_plugin_metadata[name] = d
×
572
            else:
573
                self.internal_plugin_metadata[name] = d
×
574

575
    def get(self, name):
5✔
576
        return self.plugins.get(name)
×
577

578
    def count(self):
5✔
579
        return len(self.plugins)
×
580

581
    def load_plugin(self, name) -> 'BasePlugin':
5✔
582
        """Imports the code of the given plugin.
583
        note: can be called from any thread.
584
        """
585
        if self.get_metadata(name):
5✔
586
            return self.load_plugin_by_name(name)
5✔
587
        else:
588
            raise Exception(f"could not find plugin {name!r}")
×
589

590
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
591
        """Loads the __init__.py module of the plugin if it is not already loaded."""
592
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
5✔
593
        if base_name not in sys.modules:
5✔
594
            metadata = self.get_metadata(name)
5✔
595
            is_zip = metadata.get('is_zip', False)
5✔
596
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
597
            if not is_zip:
5✔
598
                if self.is_external(name):
5✔
599
                    # this branch is deprecated: external plugins are always zip files
600
                    path = os.path.join(metadata['path'], '__init__.py')
×
601
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
602
                else:
603
                    init_spec = importlib.util.find_spec(base_name)
5✔
604
            else:
605
                zipfile = zipimport.zipimporter(metadata['path'])
×
606
                dirname = metadata['dirname']
×
607
                init_spec = zipfile.find_spec(dirname)
×
608

609
            self.exec_module_from_spec(init_spec, base_name)
5✔
610

611
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
612
        if name in self.plugins:
5✔
613
            return self.plugins[name]
5✔
614
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
615
        self.maybe_load_plugin_init_method(name)
5✔
616
        is_external = self.is_external(name)
5✔
617
        if is_external and not self.is_authorized(name):
5✔
618
            self.logger.info(f'plugin not authorized {name}')
×
619
            return
×
620
        if not is_external:
5✔
621
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
622
        else:
623
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
624

625
        spec = importlib.util.find_spec(full_name)
5✔
626
        if spec is None:
5✔
627
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
628
        try:
5✔
629
            module = self.exec_module_from_spec(spec, full_name)
5✔
630
            plugin = module.Plugin(self, self.config, name)
5✔
631
        except Exception as e:
×
632
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
633
        self.add_jobs(plugin.thread_jobs())
5✔
634
        self.plugins[name] = plugin
5✔
635
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
636
        return plugin
5✔
637

638
    def close_plugin(self, plugin):
5✔
639
        self.remove_jobs(plugin.thread_jobs())
×
640

641
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
642
        from hashlib import pbkdf2_hmac
×
643
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
644
        return ECPrivkey(secret)
×
645

646
    def uninstall(self, name: str):
5✔
647
        if self.config.get(f'plugins.{name}'):
×
648
            self.config.set_key(f'plugins.{name}', None)
×
649
        if name in self.external_plugin_metadata:
×
650
            zipfile = self.zip_plugin_path(name)
×
651
            os.unlink(zipfile)
×
652
            self.external_plugin_metadata.pop(name)
×
653

654
    def is_internal(self, name) -> bool:
5✔
655
        return name in self.internal_plugin_metadata
×
656

657
    def is_external(self, name) -> bool:
5✔
658
        return name in self.external_plugin_metadata
5✔
659

660
    def is_auto_loaded(self, name):
5✔
661
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
662
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
663

664
    def is_installed(self, name) -> bool:
5✔
665
        """an external plugin may be installed but not authorized """
666
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
667

668
    def is_authorized(self, name) -> bool:
5✔
669
        if name in self.internal_plugin_metadata:
×
670
            return True
×
671
        if name not in self.external_plugin_metadata:
×
672
            return False
×
673
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
674
        if not pubkey_bytes:
×
675
            return False
×
676
        if not self.is_plugin_zip(name):
×
677
            return False
×
678
        filename = self.zip_plugin_path(name)
×
679
        plugin_hash = get_file_hash256(filename)
×
680
        sig = self.config.get(f'plugins.{name}.authorized')
×
681
        if not sig:
×
682
            return False
×
683
        pubkey = ECPubkey(pubkey_bytes)
×
684
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
685

686
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
687
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
688
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
689
        plugin_hash = get_file_hash256(filename)
×
690
        sig = privkey.ecdsa_sign(plugin_hash)
×
691
        value = sig.hex()
×
692
        self.config.set_key(f'plugins.{name}.authorized', value)
×
693
        self.config.set_key(f'plugins.{name}.enabled', True)
×
694

695
    def enable(self, name: str) -> 'BasePlugin':
5✔
696
        self.config.enable_plugin(name)
×
697
        p = self.get(name)
×
698
        if p:
×
699
            return p
×
700
        return self.load_plugin(name)
×
701

702
    def disable(self, name: str) -> None:
5✔
703
        self.config.disable_plugin(name)
×
704
        p = self.get(name)
×
705
        if not p:
×
706
            return
×
707
        self.plugins.pop(name)
×
708
        p.close()
×
709
        self.logger.info(f"closed {name}")
×
710

711
    @classmethod
5✔
712
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
713
        return key.startswith('plugins.')
×
714

715
    def is_available(self, name: str) -> bool:
5✔
716
        d = self.descriptions.get(name)
×
717
        if not d:
×
718
            return False
×
719
        deps = d.get('requires', [])
×
720
        for dep, s in deps:
×
721
            try:
×
722
                __import__(dep)
×
723
            except ImportError as e:
×
724
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
725
                return False
×
NEW
726
        return True
×
727

728
    def get_hardware_support(self):
5✔
729
        out = []
×
730
        for name, (gui_good, details) in self.hw_wallets.items():
×
731
            if gui_good:
×
732
                try:
×
733
                    p = self.get_plugin(name)
×
734
                    if p.is_available():
×
735
                        out.append(HardwarePluginToScan(name=name,
×
736
                                                        description=details[2],
737
                                                        plugin=p,
738
                                                        exception=None))
739
                except Exception as e:
×
740
                    self.logger.exception(f"cannot load plugin for: {name}")
×
741
                    out.append(HardwarePluginToScan(name=name,
×
742
                                                    description=details[2],
743
                                                    plugin=None,
744
                                                    exception=e))
745
        return out
×
746

747
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
748
        from .wallet import register_wallet_type, register_constructor
5✔
749
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
750

751
        def loader():
5✔
752
            plugin = self.get_plugin(name)
5✔
753
            register_constructor(wallet_type, plugin.wallet_class)
5✔
754
        register_wallet_type(wallet_type)
5✔
755
        plugin_loaders[wallet_type] = loader
5✔
756

757
    def register_keystore(self, name, gui_good, details):
5✔
758
        from .keystore import register_keystore
5✔
759

760
        def dynamic_constructor(d):
5✔
761
            return self.get_plugin(name).keystore_class(d)
5✔
762
        if details[0] == 'hardware':
5✔
763
            self.hw_wallets[name] = (gui_good, details)
5✔
764
            self.logger.info(f"registering hardware {name}: {details}")
5✔
765
            register_keystore(details[1], dynamic_constructor)
5✔
766

767
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
768
        if name not in self.plugins:
5✔
769
            self.load_plugin(name)
5✔
770
        return self.plugins[name]
5✔
771

772
    def is_plugin_zip(self, name: str) -> bool:
5✔
773
        """Returns True if the plugin is a zip file"""
774
        if (metadata := self.get_metadata(name)) is None:
×
775
            return False
×
776
        return metadata.get('is_zip', False)
×
777

778
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
779
        """Returns the metadata of the plugin"""
780
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
781
        if not metadata:
5✔
782
            return None
×
783
        return metadata
5✔
784

785
    def run(self):
5✔
786
        while self.is_running():
5✔
787
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
788
            self.run_jobs()
5✔
789
        self.on_stop()
5✔
790

791
    def read_file(self, name: str, filename: str) -> bytes:
5✔
792
        if self.is_plugin_zip(name):
×
793
            plugin_filename = self.zip_plugin_path(name)
×
794
            metadata = self.external_plugin_metadata[name]
×
795
            dirname = metadata['dirname']
×
796
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
797
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
798
                    return myfile.read()
×
799
        elif name in self.internal_plugin_metadata:
×
800
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
801
            with open(path, 'rb') as myfile:
×
802
                return myfile.read()
×
803
        else:
804
            # no icon
805
            return None
×
806

807
def get_file_hash256(path: str) -> bytes:
5✔
808
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
809
    with open(path, 'rb') as f:
×
810
        return sha256(f.read())
×
811

812

813
def hook(func):
5✔
814
    hook_names.add(func.__name__)
5✔
815
    return func
5✔
816

817

818
def run_hook(name, *args):
5✔
819
    results = []
5✔
820
    f_list = hooks.get(name, [])
5✔
821
    for p, f in f_list:
5✔
822
        if p.is_enabled():
5✔
823
            try:
5✔
824
                r = f(*args)
5✔
825
            except Exception:
×
826
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
827
                r = False
×
828
            if r:
5✔
829
                results.append(r)
×
830

831
    if results:
5✔
832
        assert len(results) == 1, results
×
833
        return results[0]
×
834

835

836
class BasePlugin(Logger):
5✔
837

838
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
839
        self.parent = parent  # type: Plugins  # The plugins object
5✔
840
        self.name = name
5✔
841
        self.config = config
5✔
842
        Logger.__init__(self)
5✔
843
        # add self to hooks
844
        for k in dir(self):
5✔
845
            if k in hook_names:
5✔
846
                l = hooks.get(k, [])
5✔
847
                l.append((self, getattr(self, k)))
5✔
848
                hooks[k] = l
5✔
849

850
    def __str__(self):
5✔
851
        return self.name
×
852

853
    def close(self):
5✔
854
        # remove self from hooks
855
        for attr_name in dir(self):
×
856
            if attr_name in hook_names:
×
857
                # found attribute in self that is also the name of a hook
858
                l = hooks.get(attr_name, [])
×
859
                try:
×
860
                    l.remove((self, getattr(self, attr_name)))
×
861
                except ValueError:
×
862
                    # maybe attr name just collided with hook name and was not hook
863
                    continue
×
864
                hooks[attr_name] = l
×
865
        self.parent.close_plugin(self)
×
866
        self.on_close()
×
867

868
    def on_close(self):
5✔
869
        pass
×
870

871
    def requires_settings(self) -> bool:
5✔
872
        return False
×
873

874
    def thread_jobs(self):
5✔
875
        return []
5✔
876

877
    def is_enabled(self):
5✔
878
        if not self.is_available():
×
879
            return False
×
880
        if not self.parent.is_authorized(self.name):
×
881
            return False
×
882
        return self.config.is_plugin_enabled(self.name)
×
883

884
    def is_available(self):
5✔
885
        return True
×
886

887
    def can_user_disable(self):
5✔
888
        return True
×
889

890
    def settings_widget(self, window):
5✔
891
        raise NotImplementedError()
×
892

893
    def settings_dialog(self, window):
5✔
894
        raise NotImplementedError()
×
895

896
    def read_file(self, filename: str) -> bytes:
5✔
897
        return self.parent.read_file(self.name, filename)
×
898

899
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
900
        """Returns a dict which is persisted in the per-wallet database."""
901
        plugin_storage = wallet.db.get_plugin_storage()
×
902
        return plugin_storage.setdefault(self.name, {})
×
903

904
class DeviceUnpairableError(UserFacingException): pass
5✔
905
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
906
class CannotAutoSelectDevice(Exception): pass
5✔
907

908

909
class Device(NamedTuple):
5✔
910
    path: Union[str, bytes]
5✔
911
    interface_number: int
5✔
912
    id_: str
5✔
913
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
914
    usage_page: int
5✔
915
    transport_ui_string: str
5✔
916

917

918
class DeviceInfo(NamedTuple):
5✔
919
    device: Device
5✔
920
    label: Optional[str] = None
5✔
921
    initialized: Optional[bool] = None
5✔
922
    exception: Optional[Exception] = None
5✔
923
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
924
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
925
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
926

927
    def label_for_device_select(self) -> str:
5✔
928
        return (
×
929
            "{label} ({maybe_model}{init}, {transport})"
930
            .format(
931
                label=self.label or _("An unnamed {}").format(self.plugin_name),
932
                init=(_("initialized") if self.initialized else _("wiped")),
933
                transport=self.device.transport_ui_string,
934
                maybe_model=f"{self.model_name}, " if self.model_name else ""
935
            )
936
        )
937

938

939
class HardwarePluginToScan(NamedTuple):
5✔
940
    name: str
5✔
941
    description: str
5✔
942
    plugin: Optional['HW_PluginBase']
5✔
943
    exception: Optional[Exception]
5✔
944

945

946
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
947

948

949
# hidapi is not thread-safe
950
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
951
#     https://github.com/libusb/hidapi/issues/45
952
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
953
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
954
# It is not entirely clear to me, exactly what is safe and what isn't, when
955
# using multiple threads...
956
# Hence, we use a single thread for all device communications, including
957
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
958
# the following thread:
959
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
960
    max_workers=1,
961
    thread_name_prefix='hwd_comms_thread'
962
)
963

964
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
965
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
966
# To keep it simple, let's just import it now, as we are likely in the main thread here.
967
if threading.current_thread() is not threading.main_thread():
5✔
968
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
969
try:
5✔
970
    import hid
5✔
971
except ImportError:
5✔
972
    pass
5✔
973

974

975
T = TypeVar('T')
5✔
976

977

978
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
979
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
980
        return func()
×
981
    else:
982
        fut = _hwd_comms_executor.submit(func)
×
983
        return fut.result()
×
984
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
985

986

987
def runs_in_hwd_thread(func):
5✔
988
    @wraps(func)
5✔
989
    def wrapper(*args, **kwargs):
5✔
990
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
991
    return wrapper
5✔
992

993

994
def assert_runs_in_hwd_thread():
5✔
995
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
996
        raise Exception("must only be called from HWD communication thread")
×
997

998

999
class DeviceMgr(ThreadJob):
5✔
1000
    """Manages hardware clients.  A client communicates over a hardware
1001
    channel with the device.
1002

1003
    In addition to tracking device HID IDs, the device manager tracks
1004
    hardware wallets and manages wallet pairing.  A HID ID may be
1005
    paired with a wallet when it is confirmed that the hardware device
1006
    matches the wallet, i.e. they have the same master public key.  A
1007
    HID ID can be unpaired if e.g. it is wiped.
1008

1009
    Because of hotplugging, a wallet must request its client
1010
    dynamically each time it is required, rather than caching it
1011
    itself.
1012

1013
    The device manager is shared across plugins, so just one place
1014
    does hardware scans when needed.  By tracking HID IDs, if a device
1015
    is plugged into a different port the wallet is automatically
1016
    re-paired.
1017

1018
    Wallets are informed on connect / disconnect events.  It must
1019
    implement connected(), disconnected() callbacks.  Being connected
1020
    implies a pairing.  Callbacks can happen in any thread context,
1021
    and we do them without holding the lock.
1022

1023
    Confusingly, the HID ID (serial number) reported by the HID system
1024
    doesn't match the device ID reported by the device itself.  We use
1025
    the HID IDs.
1026

1027
    This plugin is thread-safe.  Currently only devices supported by
1028
    hidapi are implemented."""
1029

1030
    def __init__(self, config: SimpleConfig):
5✔
1031
        ThreadJob.__init__(self)
5✔
1032
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1033
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
1034
        # A client->id_ map. Needs self.lock.
1035
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
1036
        # What we recognise.  (vendor_id, product_id) -> Plugin
1037
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
1038
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
1039
        # Custom enumerate functions for devices we don't know about.
1040
        self._enumerate_func = set()  # Needs self.lock.
5✔
1041

1042
        self.lock = threading.RLock()
5✔
1043

1044
        self.config = config
5✔
1045

1046
    def thread_jobs(self):
5✔
1047
        # Thread job to handle device timeouts
1048
        return [self]
5✔
1049

1050
    def run(self):
5✔
1051
        '''Handle device timeouts.  Runs in the context of the Plugins
1052
        thread.'''
1053
        with self.lock:
5✔
1054
            clients = list(self.clients.keys())
5✔
1055
        cutoff = time.time() - self.config.get_session_timeout()
5✔
1056
        for client in clients:
5✔
1057
            client.timeout(cutoff)
×
1058

1059
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1060
        for pair in device_pairs:
×
1061
            self._recognised_hardware[pair] = plugin
×
1062

1063
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1064
        for vendor_id in vendor_ids:
×
1065
            self._recognised_vendor[vendor_id] = plugin
×
1066

1067
    def register_enumerate_func(self, func):
5✔
1068
        with self.lock:
×
1069
            self._enumerate_func.add(func)
×
1070

1071
    @runs_in_hwd_thread
5✔
1072
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1073
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1074
        # Get from cache first
1075
        client = self._client_by_id(device.id_)
×
1076
        if client:
×
1077
            return client
×
1078
        client = plugin.create_client(device, handler)
×
1079
        if client:
×
1080
            self.logger.info(f"Registering {client}")
×
1081
            with self.lock:
×
1082
                self.clients[client] = device.id_
×
1083
        return client
×
1084

1085
    def id_by_pairing_code(self, pairing_code):
5✔
1086
        with self.lock:
×
1087
            return self.pairing_code_to_id.get(pairing_code)
×
1088

1089
    def pairing_code_by_id(self, id_):
5✔
1090
        with self.lock:
×
1091
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1092
                if id2 == id_:
×
1093
                    return pairing_code
×
1094
            return None
×
1095

1096
    def unpair_pairing_code(self, pairing_code):
5✔
1097
        with self.lock:
×
1098
            if pairing_code not in self.pairing_code_to_id:
×
1099
                return
×
1100
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1101
        self._close_client(_id)
×
1102

1103
    def unpair_id(self, id_):
5✔
1104
        pairing_code = self.pairing_code_by_id(id_)
×
1105
        if pairing_code:
×
1106
            self.unpair_pairing_code(pairing_code)
×
1107
        else:
1108
            self._close_client(id_)
×
1109

1110
    def _close_client(self, id_):
5✔
1111
        with self.lock:
×
1112
            client = self._client_by_id(id_)
×
1113
            self.clients.pop(client, None)
×
1114
        if client:
×
1115
            client.close()
×
1116

1117
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1118
        with self.lock:
×
1119
            for client, client_id in self.clients.items():
×
1120
                if client_id == id_:
×
1121
                    return client
×
1122
        return None
×
1123

1124
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1125
        '''Returns a client for the device ID if one is registered.  If
1126
        a device is wiped or in bootloader mode pairing is impossible;
1127
        in such cases we communicate by device ID and not wallet.'''
1128
        if scan_now:
×
1129
            self.scan_devices()
×
1130
        return self._client_by_id(id_)
×
1131

1132
    @runs_in_hwd_thread
5✔
1133
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1134
                            keystore: 'Hardware_KeyStore',
1135
                            force_pair: bool, *,
1136
                            devices: Sequence['Device'] = None,
1137
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1138
        self.logger.info("getting client for keystore")
×
1139
        if handler is None:
×
1140
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1141
        handler.update_status(False)
×
1142
        pcode = keystore.pairing_code()
×
1143
        client = None
×
1144
        # search existing clients first (fast-path)
1145
        if not devices:
×
1146
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1147
        # search clients again, now allowing a (slow) scan
1148
        if client is None:
×
1149
            if devices is None:
×
1150
                devices = self.scan_devices()
×
1151
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1152
        if client is None and force_pair:
×
1153
            try:
×
1154
                info = self.select_device(plugin, handler, keystore, devices,
×
1155
                                          allow_user_interaction=allow_user_interaction)
1156
            except CannotAutoSelectDevice:
×
1157
                pass
×
1158
            else:
1159
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1160
        if client:
×
1161
            handler.update_status(True)
×
1162
            # note: if select_device was called, we might also update label etc here:
1163
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1164
        self.logger.info("end client for keystore")
×
1165
        return client
×
1166

1167
    def client_by_pairing_code(
5✔
1168
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1169
        devices: Sequence['Device'],
1170
    ) -> Optional['HardwareClientBase']:
1171
        _id = self.id_by_pairing_code(pairing_code)
×
1172
        client = self._client_by_id(_id)
×
1173
        if client:
×
1174
            if type(client.plugin) != type(plugin):
×
1175
                return
×
1176
            # An unpaired client might have another wallet's handler
1177
            # from a prior scan.  Replace to fix dialog parenting.
1178
            client.handler = handler
×
1179
            return client
×
1180

1181
        for device in devices:
×
1182
            if device.id_ == _id:
×
1183
                return self.create_client(device, handler, plugin)
×
1184

1185
    def force_pair_keystore(
5✔
1186
        self,
1187
        *,
1188
        plugin: 'HW_PluginBase',
1189
        handler: 'HardwareHandlerBase',
1190
        info: 'DeviceInfo',
1191
        keystore: 'Hardware_KeyStore',
1192
    ) -> 'HardwareClientBase':
1193
        xpub = keystore.xpub
×
1194
        derivation = keystore.get_derivation_prefix()
×
1195
        assert derivation is not None
×
1196
        xtype = bip32.xpub_type(xpub)
×
1197
        client = self._client_by_id(info.device.id_)
×
1198
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1199
            # See comment above for same code
1200
            client.handler = handler
×
1201
            # This will trigger a PIN/passphrase entry request
1202
            try:
×
1203
                client_xpub = client.get_xpub(derivation, xtype)
×
1204
            except (UserCancelled, RuntimeError):
×
1205
                # Bad / cancelled PIN / passphrase
1206
                client_xpub = None
×
1207
            if client_xpub == xpub:
×
1208
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1209
                with self.lock:
×
1210
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1211
                return client
×
1212

1213
        # The user input has wrong PIN or passphrase, or cancelled input,
1214
        # or it is not pairable
1215
        raise DeviceUnpairableError(
×
1216
            _('Electrum cannot pair with your {}.\n\n'
1217
              'Before you request bitcoins to be sent to addresses in this '
1218
              'wallet, ensure you can pair with your device, or that you have '
1219
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1220
              'receive will be unspendable.').format(plugin.device))
1221

1222
    def list_pairable_device_infos(
5✔
1223
        self,
1224
        *,
1225
        handler: Optional['HardwareHandlerBase'],
1226
        plugin: 'HW_PluginBase',
1227
        devices: Sequence['Device'] = None,
1228
        include_failing_clients: bool = False,
1229
    ) -> List['DeviceInfo']:
1230
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1231
        Already paired devices are also included, as it is okay to reuse them.
1232
        """
1233
        if not plugin.libraries_available:
×
1234
            message = plugin.get_library_not_available_message()
×
1235
            raise HardwarePluginLibraryUnavailable(message)
×
1236
        if devices is None:
×
1237
            devices = self.scan_devices()
×
1238
        infos = []
×
1239
        for device in devices:
×
1240
            if not plugin.can_recognize_device(device):
×
1241
                continue
×
1242
            try:
×
1243
                client = self.create_client(device, handler, plugin)
×
1244
                if not client:
×
1245
                    continue
×
1246
                label = client.label()
×
1247
                is_initialized = client.is_initialized()
×
1248
                soft_device_id = client.get_soft_device_id()
×
1249
                model_name = client.device_model_name()
×
1250
            except Exception as e:
×
1251
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1252
                if include_failing_clients:
×
1253
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1254
                continue
×
1255
            infos.append(DeviceInfo(device=device,
×
1256
                                    label=label,
1257
                                    initialized=is_initialized,
1258
                                    plugin_name=plugin.name,
1259
                                    soft_device_id=soft_device_id,
1260
                                    model_name=model_name))
1261

1262
        return infos
×
1263

1264
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1265
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1266
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1267
        """Select the device to use for keystore."""
1268
        # ideally this should not be called from the GUI thread...
1269
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1270
        while True:
×
1271
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1272
            if infos:
×
1273
                break
×
1274
            if not allow_user_interaction:
×
1275
                raise CannotAutoSelectDevice()
×
1276
            msg = _('Please insert your {}').format(plugin.device)
×
1277
            msg += " ("
×
1278
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1279
                msg += f"label: {keystore.label}, "
×
1280
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1281
            msg += ').\n\n{}\n\n{}'.format(
×
1282
                _('Verify the cable is connected and that '
1283
                  'no other application is using it.'),
1284
                _('Try to connect again?')
1285
            )
1286
            if not handler.yes_no_question(msg):
×
1287
                raise UserCancelled()
×
1288
            devices = None
×
1289

1290
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1291
        # method 1: select device by id
1292
        if keystore.soft_device_id:
×
1293
            for info in infos:
×
1294
                if info.soft_device_id == keystore.soft_device_id:
×
1295
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1296
                    return info
×
1297
        # method 2: select device by label
1298
        #           but only if not a placeholder label and only if there is no collision
1299
        device_labels = [info.label for info in infos]
×
1300
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1301
                and device_labels.count(keystore.label) == 1):
1302
            for info in infos:
×
1303
                if info.label == keystore.label:
×
1304
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1305
                    return info
×
1306
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1307
        #           saved for keystore anyway, select it
1308
        if (len(infos) == 1
×
1309
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1310
                and keystore.soft_device_id is None):
1311
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1312
            return infos[0]
×
1313

1314
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1315
        if not allow_user_interaction:
×
1316
            raise CannotAutoSelectDevice()
×
1317
        # ask user to select device manually
1318
        msg = (
×
1319
                _("Could not automatically pair with device for given keystore.") + "\n"
1320
                + f"(keystore label: {keystore.label!r}, "
1321
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1322
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1323
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1324
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1325
                   for (idx, info) in enumerate(infos)]
1326
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1327
                          f"num options: {len(infos)}. options: {infos}")
1328
        c = handler.query_choice(msg, choices)
×
1329
        if c is None:
×
1330
            raise UserCancelled()
×
1331
        info = infos[c]
×
1332
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1333
        # note: updated label/soft_device_id will be saved after pairing succeeds
1334
        return info
×
1335

1336
    @runs_in_hwd_thread
5✔
1337
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1338
        try:
×
1339
            import hid  # noqa: F811
×
1340
        except ImportError:
×
1341
            return []
×
1342

1343
        devices = []
×
1344
        for d in hid.enumerate(0, 0):
×
1345
            vendor_id = d['vendor_id']
×
1346
            product_key = (vendor_id, d['product_id'])
×
1347
            plugin = None
×
1348
            if product_key in self._recognised_hardware:
×
1349
                plugin = self._recognised_hardware[product_key]
×
1350
            elif vendor_id in self._recognised_vendor:
×
1351
                plugin = self._recognised_vendor[vendor_id]
×
1352
            if plugin:
×
1353
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1354
                if device:
×
1355
                    devices.append(device)
×
1356
        return devices
×
1357

1358
    @runs_in_hwd_thread
5✔
1359
    @profiler
5✔
1360
    def scan_devices(self) -> Sequence['Device']:
5✔
1361
        self.logger.info("scanning devices...")
×
1362

1363
        # First see what's connected that we know about
1364
        devices = self._scan_devices_with_hid()
×
1365

1366
        # Let plugin handlers enumerate devices we don't know about
1367
        with self.lock:
×
1368
            enumerate_funcs = list(self._enumerate_func)
×
1369
        for f in enumerate_funcs:
×
1370
            try:
×
1371
                new_devices = f()
×
1372
            except BaseException as e:
×
1373
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1374
            else:
1375
                devices.extend(new_devices)
×
1376

1377
        # find out what was disconnected
1378
        client_ids = [dev.id_ for dev in devices]
×
1379
        disconnected_clients = []
×
1380
        with self.lock:
×
1381
            connected = {}
×
1382
            for client, id_ in self.clients.items():
×
1383
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1384
                    connected[client] = id_
×
1385
                else:
1386
                    disconnected_clients.append((client, id_))
×
1387
            self.clients = connected
×
1388

1389
        # Unpair disconnected devices
1390
        for client, id_ in disconnected_clients:
×
1391
            self.unpair_id(id_)
×
1392
            if client.handler:
×
1393
                client.handler.update_status(False)
×
1394

1395
        return devices
×
1396

1397
    @classmethod
5✔
1398
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1399
        ret = {}
×
1400
        # add libusb
1401
        try:
×
1402
            import usb1
×
1403
        except Exception as e:
×
1404
            ret["libusb.version"] = None
×
1405
        else:
1406
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1407
            try:
×
1408
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1409
            except AttributeError:
×
1410
                ret["libusb.path"] = None
×
1411
        # add hidapi
1412
        try:
×
1413
            import hid  # noqa: F811
×
1414
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1415
        except Exception as e:
×
1416
            from importlib.metadata import version
×
1417
            try:
×
1418
                ret["hidapi.version"] = version("hidapi")
×
1419
            except ImportError:
×
1420
                ret["hidapi.version"] = None
×
1421
        return ret
×
1422

1423
    def trigger_pairings(
5✔
1424
            self,
1425
            keystores: Sequence['KeyStore'],
1426
            *,
1427
            allow_user_interaction: bool = True,
1428
            devices: Sequence['Device'] = None,
1429
    ) -> None:
1430
        """Given a list of keystores, try to pair each with a connected hardware device.
1431

1432
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1433
        try to pair each keystore individually. Consider the following scenario:
1434
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1435
        - assume none of the devices are paired yet
1436
        1. if we tried to individually pair keystores, we might try with ks1 first
1437
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1438
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1439
             which is confusing and error-prone. It's especially problematic if the hw device does
1440
             not support labels (such as Ledger), as then the user cannot easily distinguish
1441
             same-type devices. (see #4199)
1442
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1443
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1444
        """
1445
        from .keystore import Hardware_KeyStore
×
1446
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1447
        if not keystores:
×
1448
            return
×
1449
        if devices is None:
×
1450
            devices = self.scan_devices()
×
1451
        # first pair with all devices that can be auto-selected
1452
        for ks in keystores:
×
1453
            try:
×
1454
                ks.get_client(
×
1455
                    force_pair=True,
1456
                    allow_user_interaction=False,
1457
                    devices=devices,
1458
                )
1459
            except UserCancelled:
×
1460
                pass
×
1461
        if allow_user_interaction:
×
1462
            # now do manual selections
1463
            for ks in keystores:
×
1464
                try:
×
1465
                    ks.get_client(
×
1466
                        force_pair=True,
1467
                        allow_user_interaction=True,
1468
                        devices=devices,
1469
                    )
1470
                except UserCancelled:
×
1471
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc