• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5537477114462208

25 Aug 2025 12:23PM UTC coverage: 61.329%. Remained the same
5537477114462208

push

CirrusCI

SomberNight
contrib/make_download: sort signers, instead of random fs order

22820 of 37209 relevant lines covered (61.33%)

3.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.96
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import zipfile as zipfile_lib
5✔
34
from urllib.parse import urlparse
5✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
5✔
39
import zipimport
5✔
40
from functools import wraps, partial
5✔
41
from itertools import chain
5✔
42

43
from electrum_ecc import ECPrivkey, ECPubkey
5✔
44

45
from ._vendor.distutils.version import StrictVersion
5✔
46
from .version import ELECTRUM_VERSION
5✔
47
from .i18n import _
5✔
48
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
5✔
49
                   make_dir, make_aiohttp_session)
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55
from .network import Network
5✔
56

57
if TYPE_CHECKING:
2✔
58
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
59
    from .keystore import Hardware_KeyStore, KeyStore
60
    from .wallet import Abstract_Wallet
61

62

63
_logger = get_logger(__name__)
5✔
64
plugin_loaders = {}
5✔
65
hook_names = set()
5✔
66
hooks = {}
5✔
67
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
68

69
PLUGIN_PASSWORD_VERSION = 1
5✔
70

71

72
class Plugins(DaemonThread):
5✔
73

74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
76
    keyfile_posix = '/etc/electrum/plugins_key'
5✔
77
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
78

79
    @profiler
5✔
80
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
81
        self.config = config
5✔
82
        self.cmd_only = cmd_only  # type: bool
5✔
83
        self.internal_plugin_metadata = {}
5✔
84
        self.external_plugin_metadata = {}
5✔
85
        if cmd_only:
5✔
86
            # only import the command modules of plugins
87
            Logger.__init__(self)
×
88
            self.find_plugins()
×
89
            self.load_plugins()
×
90
            return
×
91
        DaemonThread.__init__(self)
5✔
92
        self.device_manager = DeviceMgr(config)
5✔
93
        self.name = 'Plugins'  # set name of thread
5✔
94
        self._hw_wallets = {}
5✔
95
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
96
        self.gui_name = gui_name
5✔
97
        self.find_plugins()
5✔
98
        self.load_plugins()
5✔
99
        self.add_jobs(self.device_manager.thread_jobs())
5✔
100
        self.start()
5✔
101

102
    @property
5✔
103
    def descriptions(self):
5✔
104
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
105

106
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
107
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
108
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
109
        for loader, name, ispkg in iter_modules:
5✔
110
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
111
            #       once as data and once as code. To honor the "no duplicates" rule below,
112
            #       we exclude the ones packaged as *code*, here:
113
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
114
                continue
×
115
            module_path = os.path.join(pkg_path, name)
5✔
116
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
117
                continue
×
118
            try:
5✔
119
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
120
                    d = json.load(f)
5✔
121
            except FileNotFoundError:
×
122
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
123
                continue
×
124
            if 'fullname' not in d:
5✔
125
                continue
×
126
            d['path'] = module_path
5✔
127
            if not self.cmd_only:
5✔
128
                gui_good = self.gui_name in d.get('available_for', [])
5✔
129
                if not gui_good:
5✔
130
                    continue
5✔
131
                details = d.get('registers_wallet_type')
5✔
132
                if details:
5✔
133
                    self.register_wallet_type(name, gui_good, details)
5✔
134
                details = d.get('registers_keystore')
5✔
135
                if details:
5✔
136
                    self.register_keystore(name, details)
5✔
137
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
138
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
139
                _logger.info(f"duplicate plugins? for {name=}")
×
140
                continue
×
141
            if not external:
5✔
142
                self.internal_plugin_metadata[name] = d
5✔
143
            else:
144
                self.external_plugin_metadata[name] = d
×
145

146
    @staticmethod
5✔
147
    def exec_module_from_spec(spec, path: str):
5✔
148
        if prev_fail := _exec_module_failure.get(path):
5✔
149
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
150
        try:
5✔
151
            module = importlib.util.module_from_spec(spec)
5✔
152
            # sys.modules needs to be modified for relative imports to work
153
            # see https://stackoverflow.com/a/50395128
154
            sys.modules[path] = module
5✔
155
            spec.loader.exec_module(module)
5✔
156
        except Exception as e:
×
157
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
158
            # so the import system knows it failed. If called again for the same plugin, we do not
159
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
160
            # might have defined commands).
161
            _exec_module_failure[path] = e
×
162
            if path in sys.modules:
×
163
                sys.modules.pop(path, None)
×
164
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
165
        return module
5✔
166

167
    def find_plugins(self):
5✔
168
        internal_plugins_path = (self.pkgpath, False)
5✔
169
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
170
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
171
            if pkg_path and os.path.exists(pkg_path):
5✔
172
                if not external:
5✔
173
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
174
                else:
175
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
176

177
    def load_plugins(self):
5✔
178
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
179
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
180
                try:
×
181
                    if self.cmd_only:  # only load init method to register commands
×
182
                        self.maybe_load_plugin_init_method(name)
×
183
                    else:
184
                        self.load_plugin_by_name(name)
×
185
                except BaseException as e:
×
186
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
187

188
    def _has_root_permissions(self, path):
5✔
189
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
190

191
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
192
        if sys.platform in ['windows', 'win32']:
×
193
            keyfile_path = self.keyfile_windows
×
194
            keyfile_help = _('This file can be edited with Regedit')
×
195
        elif 'ANDROID_DATA' in os.environ:
×
196
            raise Exception('platform not supported')
×
197
        else:
198
            # treat unknown platforms and macOS as linux-like
199
            keyfile_path = self.keyfile_posix
×
200
            keyfile_help = "" if not key_hex else "".join([
×
201
                                         _('The file must have root permissions'),
202
                                         ".\n\n",
203
                                         _("To set it you can also use the Auto-Setup or run "
204
                                           "the following terminal command"),
205
                                         ":\n\n",
206
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
207
            ])
208
        return keyfile_path, keyfile_help
×
209

210
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
211
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
212
        try:
×
213
            if sys.platform in ['windows', 'win32']:
×
214
                self._write_key_to_regedit_windows(pubkey_hex)
×
215
            elif 'ANDROID_DATA' in os.environ:
×
216
                raise Exception('platform not supported')
×
217
            elif sys.platform.startswith('darwin'):  # macOS
×
218
                self._write_key_to_root_file_macos(pubkey_hex)
×
219
            else:
220
                self._write_key_to_root_file_linux(pubkey_hex)
×
221
        except Exception:
×
222
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
223
            return False
×
224
        return True
×
225

226
    def try_auto_key_reset(self) -> bool:
5✔
227
        try:
×
228
            if sys.platform in ['windows', 'win32']:
×
229
                self._delete_plugin_key_from_windows_registry()
×
230
            elif 'ANDROID_DATA' in os.environ:
×
231
                raise Exception('platform not supported')
×
232
            elif sys.platform.startswith('darwin'):  # macOS
×
233
                self._delete_macos_plugin_keyfile()
×
234
            else:
235
                self._delete_linux_plugin_keyfile()
×
236
        except Exception:
×
237
            self.logger.exception(f'auto-reset of plugin key failed')
×
238
            return False
×
239
        return True
×
240

241
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
242
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
243
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
244
        sh_command = (
×
245
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
246
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
247
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
248
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
249
        )
250
        return sh_command
×
251

252
    @staticmethod
5✔
253
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
5✔
254
        """
255
        Inspired by
256
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
257
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
258
        """
259
        from shlex import quote
×
260

261
        def quote_shell(args):
×
262
            return " ".join(quote(arg) for arg in args)
×
263

264
        def quote_applescript(string):
×
265
            charmap = {
×
266
                "\n": "\\n",
267
                "\r": "\\r",
268
                "\t": "\\t",
269
                "\"": "\\\"",
270
                "\\": "\\\\",
271
            }
272
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
273

274
        commands = [
×
275
            "osascript",
276
            "-e",
277
            "do shell script %s "
278
            "with administrator privileges "
279
            "without altering line endings"
280
            % quote_applescript(quote_shell(commands))
281
        ]
282
        return commands
×
283

284
    @staticmethod
5✔
285
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
286
        """
287
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
288
        """
289
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
290
        # for the result of the process (returns no process handle)
291
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
292
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
293

294
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
295
        class SHELLEXECUTEINFO(Structure):
×
296
            _fields_ = [
×
297
                ('cbSize', DWORD),
298
                ('fMask', c_ulong),
299
                ('hwnd', HWND),
300
                ('lpVerb', LPCWSTR),
301
                ('lpFile', LPCWSTR),
302
                ('lpParameters', LPCWSTR),
303
                ('lpDirectory', LPCWSTR),
304
                ('nShow', c_ulong),
305
                ('hInstApp', HINSTANCE),
306
                ('lpIDList', c_ulong),
307
                ('lpClass', LPCWSTR),
308
                ('hkeyClass', HKEY),
309
                ('dwHotKey', DWORD),
310
                ('hIcon', HANDLE),
311
                ('hProcess', HANDLE)
312
            ]
313

314
        info = SHELLEXECUTEINFO()
×
315
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
316
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
317
        info.hwnd = None
×
318
        info.lpVerb = 'runas'  # run as administrator
×
319
        info.lpFile = 'reg.exe'  # the executable to run
×
320
        info.lpParameters = reg_exe_command  # the registry edit command
×
321
        info.lpDirectory = None
×
322
        info.nShow = 1
×
323

324
        # Execute and wait
325
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
326
            error = windll.kernel32.GetLastError()
×
327
            raise Exception(f'Error executing registry command: {error}')
×
328

329
        # block until the process is done or 5 sec timeout
330
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
331

332
        # Close handle
333
        windll.kernel32.CloseHandle(info.hProcess)
×
334

335
    @staticmethod
5✔
336
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
5✔
337
        """
338
        Executes the given commands in a subprocess and asserts that it was successful.
339
        """
340
        import subprocess
×
341
        with subprocess.Popen(
×
342
            commands,
343
            stdout=subprocess.PIPE,
344
            stderr=subprocess.PIPE,
345
            text=True,
346
        ) as process:
347
            stdout, stderr = process.communicate()
×
348
            if process.returncode != 0:
×
349
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
350

351
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
352
        """
353
        Spawns a pkexec subprocess to write the key to a file with root permissions.
354
        This will open an OS dialog asking for the root password. Can only succeed if
355
        the system has polkit installed.
356
        """
357
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
358

359
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
360
        commands = ['pkexec', 'sh', '-c', sh_command]
×
361
        self._execute_commands_in_subprocess(commands)
×
362

363
        # check if the key was written correctly
364
        with open(self.keyfile_posix, 'r') as f:
×
365
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
366
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
367

368
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
369
        """
370
        Deletes the root owned key file at self.keyfile_posix.
371
        """
372
        if not os.path.exists(self.keyfile_posix):
×
373
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
374
            return
×
375
        if not self._has_root_permissions(self.keyfile_posix):
×
376
            os.unlink(self.keyfile_posix)
×
377
            return
×
378

379
        # use pkexec to delete the file as root user
380
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
381
        self._execute_commands_in_subprocess(commands)
×
382
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
383

384
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
5✔
385
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
386

387
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
388
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
389

390
        self._execute_commands_in_subprocess(macos_commands)
×
391
        with open(self.keyfile_posix, 'r') as f:
×
392
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
393
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
394

395
    def _delete_macos_plugin_keyfile(self) -> None:
5✔
396
        if not os.path.exists(self.keyfile_posix):
×
397
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
398
            return
×
399
        if not self._has_root_permissions(self.keyfile_posix):
×
400
            os.unlink(self.keyfile_posix)
×
401
            return
×
402
        # use osascript to delete the file as root user
403
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
404
        self._execute_commands_in_subprocess(macos_commands)
×
405
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
406

407
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
408
        """
409
        Writes the key to the Windows registry with windows UAC prompt.
410
        """
411
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
412

413
        value_type = 'REG_SZ'
×
414
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
415

416
        self._run_win_regedit_as_admin(command)
×
417

418
        # check if the key was written correctly
419
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
420
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
421
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
422
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
423

424
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
425
        """
426
        Deletes the PluginsKey dir in the Windows registry.
427
        """
428
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
429

430
        command = f'delete "{self.keyfile_windows}" /f'
×
431
        self._run_win_regedit_as_admin(command)
×
432

433
        try:
×
434
            # do a sanity check to see if the key has been deleted
435
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
436
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
437
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
438
        except FileNotFoundError:
×
439
            pass
×
440

441
    def create_new_key(self, password:str) -> str:
5✔
442
        salt = os.urandom(32)
×
443
        privkey = self.derive_privkey(password, salt)
×
444
        pubkey = privkey.get_public_key_bytes()
×
445
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
446
        return key.hex()
×
447

448
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], Optional[bytes]]:
5✔
449
        """
450
        returns pubkey, salt
451
        returns None, None if the pubkey has not been set
452
        """
453
        if sys.platform in ['windows', 'win32']:
×
454
            import winreg
×
455
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
456
                try:
×
457
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
458
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
459
                except Exception as e:
×
460
                    self.logger.info(f'winreg error: {e}')
×
461
                    return None, None
×
462
        elif 'ANDROID_DATA' in os.environ:
×
463
            return None, None
×
464
        else:
465
            # treat unknown platforms as linux-like
466
            if not os.path.exists(self.keyfile_posix):
×
467
                return None, None
×
468
            if not self._has_root_permissions(self.keyfile_posix):
×
469
                return None, None
×
470
            with open(self.keyfile_posix) as f:
×
471
                key_hex = f.read()
×
472
        try:
×
473
            key = bytes.fromhex(key_hex)
×
474
            version = key[0]
×
475
        except Exception:
×
476
            self.logger.exception(f'{key_hex=} invalid')
×
477
            return None, None
×
478
        if version != PLUGIN_PASSWORD_VERSION:
×
479
            self.logger.info(f'unknown plugin password version: {version}')
×
480
            return None, None
×
481
        # all good
482
        salt = key[1:1+32]
×
483
        pubkey = key[1+32:]
×
484
        return pubkey, salt
×
485

486
    def get_external_plugin_dir(self) -> str:
5✔
487
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
488
        make_dir(pkg_path)
5✔
489
        return pkg_path
5✔
490

491
    async def download_external_plugin(self, url: str) -> str:
5✔
492
        filename = os.path.basename(urlparse(url).path)
×
493
        pkg_path = self.get_external_plugin_dir()
×
494
        path = os.path.join(pkg_path, filename)
×
495
        if os.path.exists(path):
×
496
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
497
        network = Network.get_instance()
×
498
        proxy = network.proxy if network else None
×
499
        async with make_aiohttp_session(proxy=proxy) as session:
×
500
            async with session.get(url) as resp:
×
501
                if resp.status == 200:
×
502
                    with open(path, 'wb') as fd:
×
503
                        async for chunk in resp.content.iter_chunked(10):
×
504
                            fd.write(chunk)
×
505
        return path
×
506

507
    def read_manifest(self, path) -> dict:
5✔
508
        """ return json dict """
509
        with zipfile_lib.ZipFile(path) as file:
×
510
            for filename in file.namelist():
×
511
                if filename.endswith('manifest.json'):
×
512
                    break
×
513
            else:
514
                raise Exception('could not find manifest.json in zip archive')
×
515
            with file.open(filename, 'r') as f:
×
516
                manifest = json.load(f)
×
517
                manifest['path'] = path  # external, path of the zipfile
×
518
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
519
                manifest['is_zip'] = True
×
520
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
521
                return manifest
×
522

523
    def zip_plugin_path(self, name) -> str:
5✔
524
        path = self.get_metadata(name)['path']
×
525
        filename = os.path.basename(path)
×
526
        if name in self.internal_plugin_metadata:
×
527
            pkg_path = self.pkgpath
×
528
        else:
529
            pkg_path = self.get_external_plugin_dir()
×
530
        return os.path.join(pkg_path, filename)
×
531

532
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
533
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
534
        if pkg_path is None:
5✔
535
            return
×
536
        for filename in os.listdir(pkg_path):
5✔
537
            path = os.path.join(pkg_path, filename)
×
538
            if not filename.endswith('.zip'):
×
539
                continue
×
540
            try:
×
541
                d = self.read_manifest(path)
×
542
                name = d['name']
×
543
            except Exception:
×
544
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
545
                continue
×
546
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
547
                self.logger.info(f"duplicate plugins for {name=}")
×
548
                continue
×
549
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
550
                continue
×
551
            min_version = d.get('min_electrum_version')
×
552
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
553
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
554
                continue
×
555
            max_version = d.get('max_electrum_version')
×
556
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
557
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
558
                continue
×
559

560
            if not self.cmd_only:
×
561
                gui_good = self.gui_name in d.get('available_for', [])
×
562
                if not gui_good:
×
563
                    continue
×
564
                if 'fullname' not in d:
×
565
                    continue
×
566
                details = d.get('registers_keystore')
×
567
                if details:
×
568
                    self.register_keystore(name, details)
×
569
            if external:
×
570
                self.external_plugin_metadata[name] = d
×
571
            else:
572
                self.internal_plugin_metadata[name] = d
×
573

574
    def get(self, name):
5✔
575
        return self.plugins.get(name)
×
576

577
    def count(self):
5✔
578
        return len(self.plugins)
×
579

580
    def load_plugin(self, name) -> 'BasePlugin':
5✔
581
        """Imports the code of the given plugin.
582
        note: can be called from any thread.
583
        """
584
        if self.get_metadata(name):
5✔
585
            return self.load_plugin_by_name(name)
5✔
586
        else:
587
            raise Exception(f"could not find plugin {name!r}")
×
588

589
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
590
        """Loads the __init__.py module of the plugin if it is not already loaded."""
591
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
5✔
592
        if base_name not in sys.modules:
5✔
593
            metadata = self.get_metadata(name)
5✔
594
            is_zip = metadata.get('is_zip', False)
5✔
595
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
596
            if not is_zip:
5✔
597
                if self.is_external(name):
5✔
598
                    # this branch is deprecated: external plugins are always zip files
599
                    path = os.path.join(metadata['path'], '__init__.py')
×
600
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
601
                else:
602
                    init_spec = importlib.util.find_spec(base_name)
5✔
603
            else:
604
                zipfile = zipimport.zipimporter(metadata['path'])
×
605
                dirname = metadata['dirname']
×
606
                init_spec = zipfile.find_spec(dirname)
×
607

608
            self.exec_module_from_spec(init_spec, base_name)
5✔
609

610
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
611
        if name in self.plugins:
5✔
612
            return self.plugins[name]
5✔
613
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
614
        self.maybe_load_plugin_init_method(name)
5✔
615
        is_external = self.is_external(name)
5✔
616
        if is_external and not self.is_authorized(name):
5✔
617
            self.logger.info(f'plugin not authorized {name}')
×
618
            return
×
619
        if not is_external:
5✔
620
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
621
        else:
622
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
623

624
        spec = importlib.util.find_spec(full_name)
5✔
625
        if spec is None:
5✔
626
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
627
        try:
5✔
628
            module = self.exec_module_from_spec(spec, full_name)
5✔
629
            plugin = module.Plugin(self, self.config, name)
5✔
630
        except Exception as e:
×
631
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
632
        self.add_jobs(plugin.thread_jobs())
5✔
633
        self.plugins[name] = plugin
5✔
634
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
635
        return plugin
5✔
636

637
    def close_plugin(self, plugin):
5✔
638
        self.remove_jobs(plugin.thread_jobs())
×
639

640
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
641
        from hashlib import pbkdf2_hmac
×
642
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
643
        return ECPrivkey(secret)
×
644

645
    def uninstall(self, name: str):
5✔
646
        if self.config.get(f'plugins.{name}'):
×
647
            self.config.set_key(f'plugins.{name}', None)
×
648
        if name in self.external_plugin_metadata:
×
649
            zipfile = self.zip_plugin_path(name)
×
650
            os.unlink(zipfile)
×
651
            self.external_plugin_metadata.pop(name)
×
652

653
    def is_internal(self, name) -> bool:
5✔
654
        return name in self.internal_plugin_metadata
×
655

656
    def is_external(self, name) -> bool:
5✔
657
        return name in self.external_plugin_metadata
5✔
658

659
    def is_auto_loaded(self, name):
5✔
660
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
661
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
662

663
    def is_installed(self, name) -> bool:
5✔
664
        """an external plugin may be installed but not authorized """
665
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
666

667
    def is_authorized(self, name) -> bool:
5✔
668
        if name in self.internal_plugin_metadata:
×
669
            return True
×
670
        if name not in self.external_plugin_metadata:
×
671
            return False
×
672
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
673
        if not pubkey_bytes:
×
674
            return False
×
675
        if not self.is_plugin_zip(name):
×
676
            return False
×
677
        filename = self.zip_plugin_path(name)
×
678
        plugin_hash = get_file_hash256(filename)
×
679
        sig = self.config.get(f'plugins.{name}.authorized')
×
680
        if not sig:
×
681
            return False
×
682
        pubkey = ECPubkey(pubkey_bytes)
×
683
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
684

685
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
686
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
687
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
688
        plugin_hash = get_file_hash256(filename)
×
689
        sig = privkey.ecdsa_sign(plugin_hash)
×
690
        value = sig.hex()
×
691
        self.config.set_key(f'plugins.{name}.authorized', value)
×
692
        self.config.set_key(f'plugins.{name}.enabled', True)
×
693

694
    def enable(self, name: str) -> 'BasePlugin':
5✔
695
        self.config.enable_plugin(name)
×
696
        p = self.get(name)
×
697
        if p:
×
698
            return p
×
699
        return self.load_plugin(name)
×
700

701
    def disable(self, name: str) -> None:
5✔
702
        self.config.disable_plugin(name)
×
703
        p = self.get(name)
×
704
        if not p:
×
705
            return
×
706
        self.plugins.pop(name)
×
707
        p.close()
×
708
        self.logger.info(f"closed {name}")
×
709

710
    @classmethod
5✔
711
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
712
        return key.startswith('plugins.')
×
713

714
    def is_available(self, name: str) -> bool:
5✔
715
        d = self.descriptions.get(name)
×
716
        if not d:
×
717
            return False
×
718
        deps = d.get('requires', [])
×
719
        for dep, s in deps:
×
720
            try:
×
721
                __import__(dep)
×
722
            except ImportError as e:
×
723
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
724
                return False
×
725
        return True
×
726

727
    def get_hardware_support(self):
5✔
728
        out = []
×
729
        for name, details in self._hw_wallets.items():
×
730
            try:
×
731
                p = self.get_plugin(name)
×
732
                if p.is_available():
×
733
                    out.append(HardwarePluginToScan(
×
734
                        name=name,
735
                        description=details[2],
736
                        plugin=p,
737
                        exception=None))
738
            except Exception as e:
×
739
                self.logger.exception(f"cannot load plugin for: {name}")
×
740
                out.append(HardwarePluginToScan(
×
741
                    name=name,
742
                    description=details[2],
743
                    plugin=None,
744
                    exception=e))
745
        return out
×
746

747
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
748
        from .wallet import register_wallet_type, register_constructor
5✔
749
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
750

751
        def loader():
5✔
752
            plugin = self.get_plugin(name)
5✔
753
            register_constructor(wallet_type, plugin.wallet_class)
5✔
754
        register_wallet_type(wallet_type)
5✔
755
        plugin_loaders[wallet_type] = loader
5✔
756

757
    def register_keystore(self, name, details):
5✔
758
        from .keystore import register_keystore
5✔
759

760
        def dynamic_constructor(d):
5✔
761
            return self.get_plugin(name).keystore_class(d)
5✔
762
        if details[0] == 'hardware':
5✔
763
            self._hw_wallets[name] = details
5✔
764
            self.logger.info(f"registering hardware {name}: {details}")
5✔
765
            register_keystore(details[1], dynamic_constructor)
5✔
766

767
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
768
        if name not in self.plugins:
5✔
769
            self.load_plugin(name)
5✔
770
        return self.plugins[name]
5✔
771

772
    def is_plugin_zip(self, name: str) -> bool:
5✔
773
        """Returns True if the plugin is a zip file"""
774
        if (metadata := self.get_metadata(name)) is None:
×
775
            return False
×
776
        return metadata.get('is_zip', False)
×
777

778
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
779
        """Returns the metadata of the plugin"""
780
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
781
        if not metadata:
5✔
782
            return None
×
783
        return metadata
5✔
784

785
    def run(self):
5✔
786
        while self.is_running():
5✔
787
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
788
            self.run_jobs()
5✔
789
        self.on_stop()
5✔
790

791
    def read_file(self, name: str, filename: str) -> bytes:
5✔
792
        if self.is_plugin_zip(name):
×
793
            plugin_filename = self.zip_plugin_path(name)
×
794
            metadata = self.external_plugin_metadata[name]
×
795
            dirname = metadata['dirname']
×
796
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
797
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
798
                    return myfile.read()
×
799
        elif name in self.internal_plugin_metadata:
×
800
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
801
            with open(path, 'rb') as myfile:
×
802
                return myfile.read()
×
803
        else:
804
            # no icon
805
            return None
×
806

807

808
def get_file_hash256(path: str) -> bytes:
5✔
809
    """Get the sha256 hash of a file, similar to `sha256sum`."""
810
    with open(path, 'rb') as f:
×
811
        return sha256(f.read())
×
812

813

814
def hook(func):
5✔
815
    hook_names.add(func.__name__)
5✔
816
    return func
5✔
817

818

819
def run_hook(name, *args):
5✔
820
    results = []
5✔
821
    f_list = hooks.get(name, [])
5✔
822
    for p, f in f_list:
5✔
823
        if p.is_enabled():
5✔
824
            try:
5✔
825
                r = f(*args)
5✔
826
            except Exception:
×
827
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
828
                r = False
×
829
            if r:
5✔
830
                results.append(r)
×
831

832
    if results:
5✔
833
        assert len(results) == 1, results
×
834
        return results[0]
×
835

836

837
class BasePlugin(Logger):
5✔
838

839
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
840
        self.parent = parent  # type: Plugins  # The plugins object
5✔
841
        self.name = name
5✔
842
        self.config = config
5✔
843
        Logger.__init__(self)
5✔
844
        # add self to hooks
845
        for k in dir(self):
5✔
846
            if k in hook_names:
5✔
847
                l = hooks.get(k, [])
5✔
848
                l.append((self, getattr(self, k)))
5✔
849
                hooks[k] = l
5✔
850

851
    def __str__(self):
5✔
852
        return self.name
×
853

854
    def close(self):
5✔
855
        # remove self from hooks
856
        for attr_name in dir(self):
×
857
            if attr_name in hook_names:
×
858
                # found attribute in self that is also the name of a hook
859
                l = hooks.get(attr_name, [])
×
860
                try:
×
861
                    l.remove((self, getattr(self, attr_name)))
×
862
                except ValueError:
×
863
                    # maybe attr name just collided with hook name and was not hook
864
                    continue
×
865
                hooks[attr_name] = l
×
866
        self.parent.close_plugin(self)
×
867
        self.on_close()
×
868

869
    def on_close(self):
5✔
870
        pass
×
871

872
    def requires_settings(self) -> bool:
5✔
873
        return False
×
874

875
    def thread_jobs(self):
5✔
876
        return []
5✔
877

878
    def is_enabled(self):
5✔
879
        if not self.is_available():
×
880
            return False
×
881
        if not self.parent.is_authorized(self.name):
×
882
            return False
×
883
        return self.config.is_plugin_enabled(self.name)
×
884

885
    def is_available(self):
5✔
886
        return True
×
887

888
    def can_user_disable(self):
5✔
889
        return True
×
890

891
    def settings_widget(self, window):
5✔
892
        raise NotImplementedError()
×
893

894
    def settings_dialog(self, window):
5✔
895
        raise NotImplementedError()
×
896

897
    def read_file(self, filename: str) -> bytes:
5✔
898
        return self.parent.read_file(self.name, filename)
×
899

900
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
901
        """Returns a dict which is persisted in the per-wallet database."""
902
        plugin_storage = wallet.db.get_plugin_storage()
×
903
        return plugin_storage.setdefault(self.name, {})
×
904

905

906
class DeviceUnpairableError(UserFacingException): pass
5✔
907
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
908
class CannotAutoSelectDevice(Exception): pass
5✔
909

910

911
class Device(NamedTuple):
5✔
912
    path: Union[str, bytes]
5✔
913
    interface_number: int
5✔
914
    id_: str
5✔
915
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
916
    usage_page: int
5✔
917
    transport_ui_string: str
5✔
918

919

920
class DeviceInfo(NamedTuple):
5✔
921
    device: Device
5✔
922
    label: Optional[str] = None
5✔
923
    initialized: Optional[bool] = None
5✔
924
    exception: Optional[Exception] = None
5✔
925
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
926
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
927
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
928

929
    def label_for_device_select(self) -> str:
5✔
930
        return (
×
931
            "{label} ({maybe_model}{init}, {transport})"
932
            .format(
933
                label=self.label or _("An unnamed {}").format(self.plugin_name),
934
                init=(_("initialized") if self.initialized else _("wiped")),
935
                transport=self.device.transport_ui_string,
936
                maybe_model=f"{self.model_name}, " if self.model_name else ""
937
            )
938
        )
939

940

941
class HardwarePluginToScan(NamedTuple):
5✔
942
    name: str
5✔
943
    description: str
5✔
944
    plugin: Optional['HW_PluginBase']
5✔
945
    exception: Optional[Exception]
5✔
946

947

948
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
949

950

951
# hidapi is not thread-safe
952
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
953
#     https://github.com/libusb/hidapi/issues/45
954
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
955
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
956
# It is not entirely clear to me, exactly what is safe and what isn't, when
957
# using multiple threads...
958
# Hence, we use a single thread for all device communications, including
959
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
960
# the following thread:
961
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
962
    max_workers=1,
963
    thread_name_prefix='hwd_comms_thread'
964
)
965

966
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
967
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
968
# To keep it simple, let's just import it now, as we are likely in the main thread here.
969
if threading.current_thread() is not threading.main_thread():
5✔
970
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
971
try:
5✔
972
    import hid
5✔
973
except ImportError:
5✔
974
    pass
5✔
975

976

977
T = TypeVar('T')
5✔
978

979

980
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
981
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
982
        return func()
×
983
    else:
984
        fut = _hwd_comms_executor.submit(func)
×
985
        return fut.result()
×
986
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
987

988

989
def runs_in_hwd_thread(func):
5✔
990
    @wraps(func)
5✔
991
    def wrapper(*args, **kwargs):
5✔
992
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
993
    return wrapper
5✔
994

995

996
def assert_runs_in_hwd_thread():
5✔
997
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
998
        raise Exception("must only be called from HWD communication thread")
×
999

1000

1001
class DeviceMgr(ThreadJob):
5✔
1002
    """Manages hardware clients.  A client communicates over a hardware
1003
    channel with the device.
1004

1005
    In addition to tracking device HID IDs, the device manager tracks
1006
    hardware wallets and manages wallet pairing.  A HID ID may be
1007
    paired with a wallet when it is confirmed that the hardware device
1008
    matches the wallet, i.e. they have the same master public key.  A
1009
    HID ID can be unpaired if e.g. it is wiped.
1010

1011
    Because of hotplugging, a wallet must request its client
1012
    dynamically each time it is required, rather than caching it
1013
    itself.
1014

1015
    The device manager is shared across plugins, so just one place
1016
    does hardware scans when needed.  By tracking HID IDs, if a device
1017
    is plugged into a different port the wallet is automatically
1018
    re-paired.
1019

1020
    Wallets are informed on connect / disconnect events.  It must
1021
    implement connected(), disconnected() callbacks.  Being connected
1022
    implies a pairing.  Callbacks can happen in any thread context,
1023
    and we do them without holding the lock.
1024

1025
    Confusingly, the HID ID (serial number) reported by the HID system
1026
    doesn't match the device ID reported by the device itself.  We use
1027
    the HID IDs.
1028

1029
    This plugin is thread-safe.  Currently only devices supported by
1030
    hidapi are implemented."""
1031

1032
    def __init__(self, config: SimpleConfig):
5✔
1033
        ThreadJob.__init__(self)
5✔
1034
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1035
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
1036
        # A client->id_ map. Needs self.lock.
1037
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
1038
        # What we recognise.  (vendor_id, product_id) -> Plugin
1039
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
1040
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
1041
        # Custom enumerate functions for devices we don't know about.
1042
        self._enumerate_func = set()  # Needs self.lock.
5✔
1043

1044
        self.lock = threading.RLock()
5✔
1045

1046
        self.config = config
5✔
1047

1048
    def thread_jobs(self):
5✔
1049
        # Thread job to handle device timeouts
1050
        return [self]
5✔
1051

1052
    def run(self):
5✔
1053
        """Handle device timeouts.  Runs in the context of the Plugins
1054
        thread."""
1055
        with self.lock:
5✔
1056
            clients = list(self.clients.keys())
5✔
1057
        cutoff = time.time() - self.config.get_session_timeout()
5✔
1058
        for client in clients:
5✔
1059
            client.timeout(cutoff)
×
1060

1061
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1062
        for pair in device_pairs:
×
1063
            self._recognised_hardware[pair] = plugin
×
1064

1065
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1066
        for vendor_id in vendor_ids:
×
1067
            self._recognised_vendor[vendor_id] = plugin
×
1068

1069
    def register_enumerate_func(self, func):
5✔
1070
        with self.lock:
×
1071
            self._enumerate_func.add(func)
×
1072

1073
    @runs_in_hwd_thread
5✔
1074
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1075
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1076
        # Get from cache first
1077
        client = self._client_by_id(device.id_)
×
1078
        if client:
×
1079
            return client
×
1080
        client = plugin.create_client(device, handler)
×
1081
        if client:
×
1082
            self.logger.info(f"Registering {client}")
×
1083
            with self.lock:
×
1084
                self.clients[client] = device.id_
×
1085
        return client
×
1086

1087
    def id_by_pairing_code(self, pairing_code):
5✔
1088
        with self.lock:
×
1089
            return self.pairing_code_to_id.get(pairing_code)
×
1090

1091
    def pairing_code_by_id(self, id_):
5✔
1092
        with self.lock:
×
1093
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1094
                if id2 == id_:
×
1095
                    return pairing_code
×
1096
            return None
×
1097

1098
    def unpair_pairing_code(self, pairing_code):
5✔
1099
        with self.lock:
×
1100
            if pairing_code not in self.pairing_code_to_id:
×
1101
                return
×
1102
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1103
        self._close_client(_id)
×
1104

1105
    def unpair_id(self, id_):
5✔
1106
        pairing_code = self.pairing_code_by_id(id_)
×
1107
        if pairing_code:
×
1108
            self.unpair_pairing_code(pairing_code)
×
1109
        else:
1110
            self._close_client(id_)
×
1111

1112
    def _close_client(self, id_):
5✔
1113
        with self.lock:
×
1114
            client = self._client_by_id(id_)
×
1115
            self.clients.pop(client, None)
×
1116
        if client:
×
1117
            client.close()
×
1118

1119
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1120
        with self.lock:
×
1121
            for client, client_id in self.clients.items():
×
1122
                if client_id == id_:
×
1123
                    return client
×
1124
        return None
×
1125

1126
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1127
        """Returns a client for the device ID if one is registered.  If
1128
        a device is wiped or in bootloader mode pairing is impossible;
1129
        in such cases we communicate by device ID and not wallet."""
1130
        if scan_now:
×
1131
            self.scan_devices()
×
1132
        return self._client_by_id(id_)
×
1133

1134
    @runs_in_hwd_thread
5✔
1135
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1136
                            keystore: 'Hardware_KeyStore',
1137
                            force_pair: bool, *,
1138
                            devices: Sequence['Device'] = None,
1139
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1140
        self.logger.info("getting client for keystore")
×
1141
        if handler is None:
×
1142
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1143
        handler.update_status(False)
×
1144
        pcode = keystore.pairing_code()
×
1145
        client = None
×
1146
        # search existing clients first (fast-path)
1147
        if not devices:
×
1148
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1149
        # search clients again, now allowing a (slow) scan
1150
        if client is None:
×
1151
            if devices is None:
×
1152
                devices = self.scan_devices()
×
1153
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1154
        if client is None and force_pair:
×
1155
            try:
×
1156
                info = self.select_device(plugin, handler, keystore, devices,
×
1157
                                          allow_user_interaction=allow_user_interaction)
1158
            except CannotAutoSelectDevice:
×
1159
                pass
×
1160
            else:
1161
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1162
        if client:
×
1163
            handler.update_status(True)
×
1164
            # note: if select_device was called, we might also update label etc here:
1165
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1166
        self.logger.info("end client for keystore")
×
1167
        return client
×
1168

1169
    def client_by_pairing_code(
5✔
1170
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1171
        devices: Sequence['Device'],
1172
    ) -> Optional['HardwareClientBase']:
1173
        _id = self.id_by_pairing_code(pairing_code)
×
1174
        client = self._client_by_id(_id)
×
1175
        if client:
×
1176
            if type(client.plugin) != type(plugin):
×
1177
                return
×
1178
            # An unpaired client might have another wallet's handler
1179
            # from a prior scan.  Replace to fix dialog parenting.
1180
            client.handler = handler
×
1181
            return client
×
1182

1183
        for device in devices:
×
1184
            if device.id_ == _id:
×
1185
                return self.create_client(device, handler, plugin)
×
1186

1187
    def force_pair_keystore(
5✔
1188
        self,
1189
        *,
1190
        plugin: 'HW_PluginBase',
1191
        handler: 'HardwareHandlerBase',
1192
        info: 'DeviceInfo',
1193
        keystore: 'Hardware_KeyStore',
1194
    ) -> 'HardwareClientBase':
1195
        xpub = keystore.xpub
×
1196
        derivation = keystore.get_derivation_prefix()
×
1197
        assert derivation is not None
×
1198
        xtype = bip32.xpub_type(xpub)
×
1199
        client = self._client_by_id(info.device.id_)
×
1200
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1201
            # See comment above for same code
1202
            client.handler = handler
×
1203
            # This will trigger a PIN/passphrase entry request
1204
            try:
×
1205
                client_xpub = client.get_xpub(derivation, xtype)
×
1206
            except (UserCancelled, RuntimeError):
×
1207
                # Bad / cancelled PIN / passphrase
1208
                client_xpub = None
×
1209
            if client_xpub == xpub:
×
1210
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1211
                with self.lock:
×
1212
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1213
                return client
×
1214

1215
        # The user input has wrong PIN or passphrase, or cancelled input,
1216
        # or it is not pairable
1217
        raise DeviceUnpairableError(
×
1218
            _('Electrum cannot pair with your {}.\n\n'
1219
              'Before you request bitcoins to be sent to addresses in this '
1220
              'wallet, ensure you can pair with your device, or that you have '
1221
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1222
              'receive will be unspendable.').format(plugin.device))
1223

1224
    def list_pairable_device_infos(
5✔
1225
        self,
1226
        *,
1227
        handler: Optional['HardwareHandlerBase'],
1228
        plugin: 'HW_PluginBase',
1229
        devices: Sequence['Device'] = None,
1230
        include_failing_clients: bool = False,
1231
    ) -> List['DeviceInfo']:
1232
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1233
        Already paired devices are also included, as it is okay to reuse them.
1234
        """
1235
        if not plugin.libraries_available:
×
1236
            message = plugin.get_library_not_available_message()
×
1237
            raise HardwarePluginLibraryUnavailable(message)
×
1238
        if devices is None:
×
1239
            devices = self.scan_devices()
×
1240
        infos = []
×
1241
        for device in devices:
×
1242
            if not plugin.can_recognize_device(device):
×
1243
                continue
×
1244
            try:
×
1245
                client = self.create_client(device, handler, plugin)
×
1246
                if not client:
×
1247
                    continue
×
1248
                label = client.label()
×
1249
                is_initialized = client.is_initialized()
×
1250
                soft_device_id = client.get_soft_device_id()
×
1251
                model_name = client.device_model_name()
×
1252
            except Exception as e:
×
1253
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1254
                if include_failing_clients:
×
1255
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1256
                continue
×
1257
            infos.append(DeviceInfo(device=device,
×
1258
                                    label=label,
1259
                                    initialized=is_initialized,
1260
                                    plugin_name=plugin.name,
1261
                                    soft_device_id=soft_device_id,
1262
                                    model_name=model_name))
1263

1264
        return infos
×
1265

1266
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1267
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1268
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1269
        """Select the device to use for keystore."""
1270
        # ideally this should not be called from the GUI thread...
1271
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1272
        while True:
×
1273
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1274
            if infos:
×
1275
                break
×
1276
            if not allow_user_interaction:
×
1277
                raise CannotAutoSelectDevice()
×
1278
            msg = _('Please insert your {}').format(plugin.device)
×
1279
            msg += " ("
×
1280
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1281
                msg += f"label: {keystore.label}, "
×
1282
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1283
            msg += ').\n\n{}\n\n{}'.format(
×
1284
                _('Verify the cable is connected and that '
1285
                  'no other application is using it.'),
1286
                _('Try to connect again?')
1287
            )
1288
            if not handler.yes_no_question(msg):
×
1289
                raise UserCancelled()
×
1290
            devices = None
×
1291

1292
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1293
        # method 1: select device by id
1294
        if keystore.soft_device_id:
×
1295
            for info in infos:
×
1296
                if info.soft_device_id == keystore.soft_device_id:
×
1297
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1298
                    return info
×
1299
        # method 2: select device by label
1300
        #           but only if not a placeholder label and only if there is no collision
1301
        device_labels = [info.label for info in infos]
×
1302
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1303
                and device_labels.count(keystore.label) == 1):
1304
            for info in infos:
×
1305
                if info.label == keystore.label:
×
1306
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1307
                    return info
×
1308
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1309
        #           saved for keystore anyway, select it
1310
        if (len(infos) == 1
×
1311
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1312
                and keystore.soft_device_id is None):
1313
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1314
            return infos[0]
×
1315

1316
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1317
        if not allow_user_interaction:
×
1318
            raise CannotAutoSelectDevice()
×
1319
        # ask user to select device manually
1320
        msg = (
×
1321
                _("Could not automatically pair with device for given keystore.") + "\n"
1322
                + f"(keystore label: {keystore.label!r}, "
1323
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1324
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1325
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1326
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1327
                   for (idx, info) in enumerate(infos)]
1328
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1329
                          f"num options: {len(infos)}. options: {infos}")
1330
        c = handler.query_choice(msg, choices)
×
1331
        if c is None:
×
1332
            raise UserCancelled()
×
1333
        info = infos[c]
×
1334
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1335
        # note: updated label/soft_device_id will be saved after pairing succeeds
1336
        return info
×
1337

1338
    @runs_in_hwd_thread
5✔
1339
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1340
        try:
×
1341
            import hid  # noqa: F811
×
1342
        except ImportError:
×
1343
            return []
×
1344

1345
        devices = []
×
1346
        for d in hid.enumerate(0, 0):
×
1347
            vendor_id = d['vendor_id']
×
1348
            product_key = (vendor_id, d['product_id'])
×
1349
            plugin = None
×
1350
            if product_key in self._recognised_hardware:
×
1351
                plugin = self._recognised_hardware[product_key]
×
1352
            elif vendor_id in self._recognised_vendor:
×
1353
                plugin = self._recognised_vendor[vendor_id]
×
1354
            if plugin:
×
1355
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1356
                if device:
×
1357
                    devices.append(device)
×
1358
        return devices
×
1359

1360
    @runs_in_hwd_thread
5✔
1361
    @profiler
5✔
1362
    def scan_devices(self) -> Sequence['Device']:
5✔
1363
        self.logger.info("scanning devices...")
×
1364

1365
        # First see what's connected that we know about
1366
        devices = self._scan_devices_with_hid()
×
1367

1368
        # Let plugin handlers enumerate devices we don't know about
1369
        with self.lock:
×
1370
            enumerate_funcs = list(self._enumerate_func)
×
1371
        for f in enumerate_funcs:
×
1372
            try:
×
1373
                new_devices = f()
×
1374
            except BaseException as e:
×
1375
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1376
            else:
1377
                devices.extend(new_devices)
×
1378

1379
        # find out what was disconnected
1380
        client_ids = [dev.id_ for dev in devices]
×
1381
        disconnected_clients = []
×
1382
        with self.lock:
×
1383
            connected = {}
×
1384
            for client, id_ in self.clients.items():
×
1385
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1386
                    connected[client] = id_
×
1387
                else:
1388
                    disconnected_clients.append((client, id_))
×
1389
            self.clients = connected
×
1390

1391
        # Unpair disconnected devices
1392
        for client, id_ in disconnected_clients:
×
1393
            self.unpair_id(id_)
×
1394
            if client.handler:
×
1395
                client.handler.update_status(False)
×
1396

1397
        return devices
×
1398

1399
    @classmethod
5✔
1400
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1401
        ret = {}
×
1402
        # add libusb
1403
        try:
×
1404
            import usb1
×
1405
        except Exception as e:
×
1406
            ret["libusb.version"] = None
×
1407
        else:
1408
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1409
            try:
×
1410
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1411
            except AttributeError:
×
1412
                ret["libusb.path"] = None
×
1413
        # add hidapi
1414
        try:
×
1415
            import hid  # noqa: F811
×
1416
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1417
        except Exception as e:
×
1418
            from importlib.metadata import version
×
1419
            try:
×
1420
                ret["hidapi.version"] = version("hidapi")
×
1421
            except ImportError:
×
1422
                ret["hidapi.version"] = None
×
1423
        return ret
×
1424

1425
    def trigger_pairings(
5✔
1426
            self,
1427
            keystores: Sequence['KeyStore'],
1428
            *,
1429
            allow_user_interaction: bool = True,
1430
            devices: Sequence['Device'] = None,
1431
    ) -> None:
1432
        """Given a list of keystores, try to pair each with a connected hardware device.
1433

1434
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1435
        try to pair each keystore individually. Consider the following scenario:
1436
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1437
        - assume none of the devices are paired yet
1438
        1. if we tried to individually pair keystores, we might try with ks1 first
1439
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1440
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1441
             which is confusing and error-prone. It's especially problematic if the hw device does
1442
             not support labels (such as Ledger), as then the user cannot easily distinguish
1443
             same-type devices. (see #4199)
1444
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1445
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1446
        """
1447
        from .keystore import Hardware_KeyStore
×
1448
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1449
        if not keystores:
×
1450
            return
×
1451
        if devices is None:
×
1452
            devices = self.scan_devices()
×
1453
        # first pair with all devices that can be auto-selected
1454
        for ks in keystores:
×
1455
            try:
×
1456
                ks.get_client(
×
1457
                    force_pair=True,
1458
                    allow_user_interaction=False,
1459
                    devices=devices,
1460
                )
1461
            except UserCancelled:
×
1462
                pass
×
1463
        if allow_user_interaction:
×
1464
            # now do manual selections
1465
            for ks in keystores:
×
1466
                try:
×
1467
                    ks.get_client(
×
1468
                        force_pair=True,
1469
                        allow_user_interaction=True,
1470
                        devices=devices,
1471
                    )
1472
                except UserCancelled:
×
1473
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc