• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5650251131912192

31 Jul 2025 04:32PM UTC coverage: 60.09% (+0.002%) from 60.088%
5650251131912192

push

CirrusCI

web-flow
Merge pull request #10083 from f321x/format_cli_help

cli: set formatter_class for command descriptions

22073 of 36733 relevant lines covered (60.09%)

3.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.03
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
5✔
50
                   make_dir, make_aiohttp_session)
51
from . import bip32
5✔
52
from . import plugins
5✔
53
from .simple_config import SimpleConfig
5✔
54
from .logging import get_logger, Logger
5✔
55
from .crypto import sha256
5✔
56
from .network import Network
5✔
57

58
if TYPE_CHECKING:
2✔
59
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
60
    from .keystore import Hardware_KeyStore, KeyStore
61
    from .wallet import Abstract_Wallet
62

63

64
_logger = get_logger(__name__)
5✔
65
plugin_loaders = {}
5✔
66
hook_names = set()
5✔
67
hooks = {}
5✔
68
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
69

70
PLUGIN_PASSWORD_VERSION = 1
5✔
71

72

73
class Plugins(DaemonThread):
5✔
74

75
    pkgpath = os.path.dirname(plugins.__file__)
5✔
76
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
77
    keyfile_posix = '/etc/electrum/plugins_key'
5✔
78
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
79

80
    @profiler
5✔
81
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
82
        self.config = config
5✔
83
        self.cmd_only = cmd_only  # type: bool
5✔
84
        self.internal_plugin_metadata = {}
5✔
85
        self.external_plugin_metadata = {}
5✔
86
        if cmd_only:
5✔
87
            # only import the command modules of plugins
88
            Logger.__init__(self)
×
89
            self.find_plugins()
×
90
            self.load_plugins()
×
91
            return
×
92
        DaemonThread.__init__(self)
5✔
93
        self.device_manager = DeviceMgr(config)
5✔
94
        self.name = 'Plugins'  # set name of thread
5✔
95
        self._hw_wallets = {}
5✔
96
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
97
        self.gui_name = gui_name
5✔
98
        self.find_plugins()
5✔
99
        self.load_plugins()
5✔
100
        self.add_jobs(self.device_manager.thread_jobs())
5✔
101
        self.start()
5✔
102

103
    @property
5✔
104
    def descriptions(self):
5✔
105
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
106

107
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
108
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
109
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
110
        for loader, name, ispkg in iter_modules:
5✔
111
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
112
            #       once as data and once as code. To honor the "no duplicates" rule below,
113
            #       we exclude the ones packaged as *code*, here:
114
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
115
                continue
×
116
            module_path = os.path.join(pkg_path, name)
5✔
117
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
118
                continue
×
119
            try:
5✔
120
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
121
                    d = json.load(f)
5✔
122
            except FileNotFoundError:
×
123
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
124
                continue
×
125
            if 'fullname' not in d:
5✔
126
                continue
×
127
            d['path'] = module_path
5✔
128
            if not self.cmd_only:
5✔
129
                gui_good = self.gui_name in d.get('available_for', [])
5✔
130
                if not gui_good:
5✔
131
                    continue
5✔
132
                details = d.get('registers_wallet_type')
5✔
133
                if details:
5✔
134
                    self.register_wallet_type(name, gui_good, details)
5✔
135
                details = d.get('registers_keystore')
5✔
136
                if details:
5✔
137
                    self.register_keystore(name, details)
5✔
138
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
139
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
140
                _logger.info(f"duplicate plugins? for {name=}")
×
141
                continue
×
142
            if not external:
5✔
143
                self.internal_plugin_metadata[name] = d
5✔
144
            else:
145
                self.external_plugin_metadata[name] = d
×
146

147
    @staticmethod
5✔
148
    def exec_module_from_spec(spec, path: str):
5✔
149
        if prev_fail := _exec_module_failure.get(path):
5✔
150
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
151
        try:
5✔
152
            module = importlib.util.module_from_spec(spec)
5✔
153
            # sys.modules needs to be modified for relative imports to work
154
            # see https://stackoverflow.com/a/50395128
155
            sys.modules[path] = module
5✔
156
            spec.loader.exec_module(module)
5✔
157
        except Exception as e:
×
158
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
159
            # so the import system knows it failed. If called again for the same plugin, we do not
160
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
161
            # might have defined commands).
162
            _exec_module_failure[path] = e
×
163
            if path in sys.modules:
×
164
                sys.modules.pop(path, None)
×
165
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
166
        return module
5✔
167

168
    def find_plugins(self):
5✔
169
        internal_plugins_path = (self.pkgpath, False)
5✔
170
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
171
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
172
            if pkg_path and os.path.exists(pkg_path):
5✔
173
                if not external:
5✔
174
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
175
                else:
176
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
177

178
    def load_plugins(self):
5✔
179
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
180
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
181
                try:
×
182
                    if self.cmd_only:  # only load init method to register commands
×
183
                        self.maybe_load_plugin_init_method(name)
×
184
                    else:
185
                        self.load_plugin_by_name(name)
×
186
                except BaseException as e:
×
187
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
188

189
    def _has_root_permissions(self, path):
5✔
190
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
191

192
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
193
        if sys.platform in ['windows', 'win32']:
×
194
            keyfile_path = self.keyfile_windows
×
195
            keyfile_help = _('This file can be edited with Regdit')
×
196
        elif 'ANDROID_DATA' in os.environ:
×
197
            raise Exception('platform not supported')
×
198
        else:
199
            # treat unknown platforms and macOS as linux-like
200
            keyfile_path = self.keyfile_posix
×
201
            keyfile_help = "" if not key_hex else "".join([
×
202
                                         _('The file must have root permissions'),
203
                                         ".\n\n",
204
                                         _("To set it you can also use the Auto-Setup or run "
205
                                           "the following terminal command"),
206
                                         ":\n\n",
207
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
208
            ])
209
        return keyfile_path, keyfile_help
×
210

211
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
212
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
213
        try:
×
214
            if sys.platform in ['windows', 'win32']:
×
215
                self._write_key_to_regedit_windows(pubkey_hex)
×
216
            elif 'ANDROID_DATA' in os.environ:
×
217
                raise Exception('platform not supported')
×
218
            elif sys.platform.startswith('darwin'):  # macOS
×
219
                self._write_key_to_root_file_macos(pubkey_hex)
×
220
            else:
221
                self._write_key_to_root_file_linux(pubkey_hex)
×
222
        except Exception:
×
223
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
224
            return False
×
225
        return True
×
226

227
    def try_auto_key_reset(self) -> bool:
5✔
228
        try:
×
229
            if sys.platform in ['windows', 'win32']:
×
230
                self._delete_plugin_key_from_windows_registry()
×
231
            elif 'ANDROID_DATA' in os.environ:
×
232
                raise Exception('platform not supported')
×
233
            elif sys.platform.startswith('darwin'):  # macOS
×
234
                self._delete_macos_plugin_keyfile()
×
235
            else:
236
                self._delete_linux_plugin_keyfile()
×
237
        except Exception:
×
238
            self.logger.exception(f'auto-reset of plugin key failed')
×
239
            return False
×
240
        return True
×
241

242
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
243
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
244
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
245
        sh_command = (
×
246
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
247
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
248
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
249
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
250
        )
251
        return sh_command
×
252

253
    @staticmethod
5✔
254
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
5✔
255
        """
256
        Inspired by
257
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
258
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
259
        """
260
        from shlex import quote
×
261

262
        def quote_shell(args):
×
263
            return " ".join(quote(arg) for arg in args)
×
264

265
        def quote_applescript(string):
×
266
            charmap = {
×
267
                "\n": "\\n",
268
                "\r": "\\r",
269
                "\t": "\\t",
270
                "\"": "\\\"",
271
                "\\": "\\\\",
272
            }
273
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
274

275
        commands = [
×
276
            "osascript",
277
            "-e",
278
            "do shell script %s "
279
            "with administrator privileges "
280
            "without altering line endings"
281
            % quote_applescript(quote_shell(commands))
282
        ]
283
        return commands
×
284

285
    @staticmethod
5✔
286
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
287
        """
288
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
289
        """
290
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
291
        # for the result of the process (returns no process handle)
292
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
293
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
294

295
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
296
        class SHELLEXECUTEINFO(Structure):
×
297
            _fields_ = [
×
298
                ('cbSize', DWORD),
299
                ('fMask', c_ulong),
300
                ('hwnd', HWND),
301
                ('lpVerb', LPCWSTR),
302
                ('lpFile', LPCWSTR),
303
                ('lpParameters', LPCWSTR),
304
                ('lpDirectory', LPCWSTR),
305
                ('nShow', c_ulong),
306
                ('hInstApp', HINSTANCE),
307
                ('lpIDList', c_ulong),
308
                ('lpClass', LPCWSTR),
309
                ('hkeyClass', HKEY),
310
                ('dwHotKey', DWORD),
311
                ('hIcon', HANDLE),
312
                ('hProcess', HANDLE)
313
            ]
314

315
        info = SHELLEXECUTEINFO()
×
316
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
317
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
318
        info.hwnd = None
×
319
        info.lpVerb = 'runas'  # run as administrator
×
320
        info.lpFile = 'reg.exe'  # the executable to run
×
321
        info.lpParameters = reg_exe_command  # the registry edit command
×
322
        info.lpDirectory = None
×
323
        info.nShow = 1
×
324

325
        # Execute and wait
326
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
327
            error = windll.kernel32.GetLastError()
×
328
            raise Exception(f'Error executing registry command: {error}')
×
329

330
        # block until the process is done or 5 sec timeout
331
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
332

333
        # Close handle
334
        windll.kernel32.CloseHandle(info.hProcess)
×
335

336
    @staticmethod
5✔
337
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
5✔
338
        """
339
        Executes the given commands in a subprocess and asserts that it was successful.
340
        """
341
        import subprocess
×
342
        with subprocess.Popen(
×
343
            commands,
344
            stdout=subprocess.PIPE,
345
            stderr=subprocess.PIPE,
346
            text=True,
347
        ) as process:
348
            stdout, stderr = process.communicate()
×
349
            if process.returncode != 0:
×
350
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
351

352
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
353
        """
354
        Spawns a pkexec subprocess to write the key to a file with root permissions.
355
        This will open an OS dialog asking for the root password. Can only succeed if
356
        the system has polkit installed.
357
        """
358
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
359

360
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
361
        commands = ['pkexec', 'sh', '-c', sh_command]
×
362
        self._execute_commands_in_subprocess(commands)
×
363

364
        # check if the key was written correctly
365
        with open(self.keyfile_posix, 'r') as f:
×
366
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
367
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
368

369
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
370
        """
371
        Deletes the root owned key file at self.keyfile_posix.
372
        """
373
        if not os.path.exists(self.keyfile_posix):
×
374
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
375
            return
×
376
        if not self._has_root_permissions(self.keyfile_posix):
×
377
            os.unlink(self.keyfile_posix)
×
378
            return
×
379

380
        # use pkexec to delete the file as root user
381
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
382
        self._execute_commands_in_subprocess(commands)
×
383
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
384

385
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
5✔
386
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
387

388
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
389
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
390

391
        self._execute_commands_in_subprocess(macos_commands)
×
392
        with open(self.keyfile_posix, 'r') as f:
×
393
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
394
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
395

396
    def _delete_macos_plugin_keyfile(self) -> None:
5✔
397
        if not os.path.exists(self.keyfile_posix):
×
398
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
399
            return
×
400
        if not self._has_root_permissions(self.keyfile_posix):
×
401
            os.unlink(self.keyfile_posix)
×
402
            return
×
403
        # use osascript to delete the file as root user
404
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
405
        self._execute_commands_in_subprocess(macos_commands)
×
406
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
407

408
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
409
        """
410
        Writes the key to the Windows registry with windows UAC prompt.
411
        """
412
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
413

414
        value_type = 'REG_SZ'
×
415
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
416

417
        self._run_win_regedit_as_admin(command)
×
418

419
        # check if the key was written correctly
420
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
421
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
422
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
423
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
424

425
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
426
        """
427
        Deletes the PluginsKey dir in the Windows registry.
428
        """
429
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
430

431
        command = f'delete "{self.keyfile_windows}" /f'
×
432
        self._run_win_regedit_as_admin(command)
×
433

434
        try:
×
435
            # do a sanity check to see if the key has been deleted
436
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
437
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
438
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
439
        except FileNotFoundError:
×
440
            pass
×
441

442
    def create_new_key(self, password:str) -> str:
5✔
443
        salt = os.urandom(32)
×
444
        privkey = self.derive_privkey(password, salt)
×
445
        pubkey = privkey.get_public_key_bytes()
×
446
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
447
        return key.hex()
×
448

449
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
450
        """
451
        returns pubkey, salt
452
        returns None, None if the pubkey has not been set
453
        """
454
        if sys.platform in ['windows', 'win32']:
×
455
            import winreg
×
456
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
457
                try:
×
458
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
459
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
460
                except Exception as e:
×
461
                    self.logger.info(f'winreg error: {e}')
×
462
                    return None, None
×
463
        elif 'ANDROID_DATA' in os.environ:
×
464
            return None, None
×
465
        else:
466
            # treat unknown platforms as linux-like
467
            if not os.path.exists(self.keyfile_posix):
×
468
                return None, None
×
469
            if not self._has_root_permissions(self.keyfile_posix):
×
470
                return
×
471
            with open(self.keyfile_posix) as f:
×
472
                key_hex = f.read()
×
473
        try:
×
474
            key = bytes.fromhex(key_hex)
×
475
            version = key[0]
×
476
        except Exception:
×
477
            self.logger.exception(f'{key_hex=} invalid')
×
478
            return None, None
×
479
        if version != PLUGIN_PASSWORD_VERSION:
×
480
            self.logger.info(f'unknown plugin password version: {version}')
×
481
            return None, None
×
482
        # all good
483
        salt = key[1:1+32]
×
484
        pubkey = key[1+32:]
×
485
        return pubkey, salt
×
486

487
    def get_external_plugin_dir(self) -> str:
5✔
488
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
489
        make_dir(pkg_path)
5✔
490
        return pkg_path
5✔
491

492
    async def download_external_plugin(self, url: str) -> str:
5✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        network = Network.get_instance()
×
499
        proxy = network.proxy if network else None
×
500
        async with make_aiohttp_session(proxy=proxy) as session:
×
501
            async with session.get(url) as resp:
×
502
                if resp.status == 200:
×
503
                    with open(path, 'wb') as fd:
×
504
                        async for chunk in resp.content.iter_chunked(10):
×
505
                            fd.write(chunk)
×
506
        return path
×
507

508
    def read_manifest(self, path) -> dict:
5✔
509
        """ return json dict """
510
        with zipfile_lib.ZipFile(path) as file:
×
511
            for filename in file.namelist():
×
512
                if filename.endswith('manifest.json'):
×
513
                    break
×
514
            else:
515
                raise Exception('could not find manifest.json in zip archive')
×
516
            with file.open(filename, 'r') as f:
×
517
                manifest = json.load(f)
×
518
                manifest['path'] = path  # external, path of the zipfile
×
519
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
520
                manifest['is_zip'] = True
×
521
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
522
                return manifest
×
523

524
    def zip_plugin_path(self, name) -> str:
5✔
525
        path = self.get_metadata(name)['path']
×
526
        filename = os.path.basename(path)
×
527
        if name in self.internal_plugin_metadata:
×
528
            pkg_path = self.pkgpath
×
529
        else:
530
            pkg_path = self.get_external_plugin_dir()
×
531
        return os.path.join(pkg_path, filename)
×
532

533
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
534
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
535
        if pkg_path is None:
5✔
536
            return
×
537
        for filename in os.listdir(pkg_path):
5✔
538
            path = os.path.join(pkg_path, filename)
×
539
            if not filename.endswith('.zip'):
×
540
                continue
×
541
            try:
×
542
                d = self.read_manifest(path)
×
543
                name = d['name']
×
544
            except Exception:
×
545
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
546
                continue
×
547
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
548
                self.logger.info(f"duplicate plugins for {name=}")
×
549
                continue
×
550
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
551
                continue
×
552
            min_version = d.get('min_electrum_version')
×
553
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
554
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
555
                continue
×
556
            max_version = d.get('max_electrum_version')
×
557
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
558
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
559
                continue
×
560

561
            if not self.cmd_only:
×
562
                gui_good = self.gui_name in d.get('available_for', [])
×
563
                if not gui_good:
×
564
                    continue
×
565
                if 'fullname' not in d:
×
566
                    continue
×
567
                details = d.get('registers_keystore')
×
568
                if details:
×
569
                    self.register_keystore(name, details)
×
570
            if external:
×
571
                self.external_plugin_metadata[name] = d
×
572
            else:
573
                self.internal_plugin_metadata[name] = d
×
574

575
    def get(self, name):
5✔
576
        return self.plugins.get(name)
×
577

578
    def count(self):
5✔
579
        return len(self.plugins)
×
580

581
    def load_plugin(self, name) -> 'BasePlugin':
5✔
582
        """Imports the code of the given plugin.
583
        note: can be called from any thread.
584
        """
585
        if self.get_metadata(name):
5✔
586
            return self.load_plugin_by_name(name)
5✔
587
        else:
588
            raise Exception(f"could not find plugin {name!r}")
×
589

590
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
591
        """Loads the __init__.py module of the plugin if it is not already loaded."""
592
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
5✔
593
        if base_name not in sys.modules:
5✔
594
            metadata = self.get_metadata(name)
5✔
595
            is_zip = metadata.get('is_zip', False)
5✔
596
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
597
            if not is_zip:
5✔
598
                if self.is_external(name):
5✔
599
                    # this branch is deprecated: external plugins are always zip files
600
                    path = os.path.join(metadata['path'], '__init__.py')
×
601
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
602
                else:
603
                    init_spec = importlib.util.find_spec(base_name)
5✔
604
            else:
605
                zipfile = zipimport.zipimporter(metadata['path'])
×
606
                dirname = metadata['dirname']
×
607
                init_spec = zipfile.find_spec(dirname)
×
608

609
            self.exec_module_from_spec(init_spec, base_name)
5✔
610

611
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
612
        if name in self.plugins:
5✔
613
            return self.plugins[name]
5✔
614
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
615
        self.maybe_load_plugin_init_method(name)
5✔
616
        is_external = self.is_external(name)
5✔
617
        if is_external and not self.is_authorized(name):
5✔
618
            self.logger.info(f'plugin not authorized {name}')
×
619
            return
×
620
        if not is_external:
5✔
621
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
622
        else:
623
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
624

625
        spec = importlib.util.find_spec(full_name)
5✔
626
        if spec is None:
5✔
627
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
628
        try:
5✔
629
            module = self.exec_module_from_spec(spec, full_name)
5✔
630
            plugin = module.Plugin(self, self.config, name)
5✔
631
        except Exception as e:
×
632
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
633
        self.add_jobs(plugin.thread_jobs())
5✔
634
        self.plugins[name] = plugin
5✔
635
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
636
        return plugin
5✔
637

638
    def close_plugin(self, plugin):
5✔
639
        self.remove_jobs(plugin.thread_jobs())
×
640

641
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
642
        from hashlib import pbkdf2_hmac
×
643
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
644
        return ECPrivkey(secret)
×
645

646
    def uninstall(self, name: str):
5✔
647
        if self.config.get(f'plugins.{name}'):
×
648
            self.config.set_key(f'plugins.{name}', None)
×
649
        if name in self.external_plugin_metadata:
×
650
            zipfile = self.zip_plugin_path(name)
×
651
            os.unlink(zipfile)
×
652
            self.external_plugin_metadata.pop(name)
×
653

654
    def is_internal(self, name) -> bool:
5✔
655
        return name in self.internal_plugin_metadata
×
656

657
    def is_external(self, name) -> bool:
5✔
658
        return name in self.external_plugin_metadata
5✔
659

660
    def is_auto_loaded(self, name):
5✔
661
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
662
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
663

664
    def is_installed(self, name) -> bool:
5✔
665
        """an external plugin may be installed but not authorized """
666
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
667

668
    def is_authorized(self, name) -> bool:
5✔
669
        if name in self.internal_plugin_metadata:
×
670
            return True
×
671
        if name not in self.external_plugin_metadata:
×
672
            return False
×
673
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
674
        if not pubkey_bytes:
×
675
            return False
×
676
        if not self.is_plugin_zip(name):
×
677
            return False
×
678
        filename = self.zip_plugin_path(name)
×
679
        plugin_hash = get_file_hash256(filename)
×
680
        sig = self.config.get(f'plugins.{name}.authorized')
×
681
        if not sig:
×
682
            return False
×
683
        pubkey = ECPubkey(pubkey_bytes)
×
684
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
685

686
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
687
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
688
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
689
        plugin_hash = get_file_hash256(filename)
×
690
        sig = privkey.ecdsa_sign(plugin_hash)
×
691
        value = sig.hex()
×
692
        self.config.set_key(f'plugins.{name}.authorized', value)
×
693
        self.config.set_key(f'plugins.{name}.enabled', True)
×
694

695
    def enable(self, name: str) -> 'BasePlugin':
5✔
696
        self.config.enable_plugin(name)
×
697
        p = self.get(name)
×
698
        if p:
×
699
            return p
×
700
        return self.load_plugin(name)
×
701

702
    def disable(self, name: str) -> None:
5✔
703
        self.config.disable_plugin(name)
×
704
        p = self.get(name)
×
705
        if not p:
×
706
            return
×
707
        self.plugins.pop(name)
×
708
        p.close()
×
709
        self.logger.info(f"closed {name}")
×
710

711
    @classmethod
5✔
712
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
713
        return key.startswith('plugins.')
×
714

715
    def is_available(self, name: str) -> bool:
5✔
716
        d = self.descriptions.get(name)
×
717
        if not d:
×
718
            return False
×
719
        deps = d.get('requires', [])
×
720
        for dep, s in deps:
×
721
            try:
×
722
                __import__(dep)
×
723
            except ImportError as e:
×
724
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
725
                return False
×
726
        return True
×
727

728
    def get_hardware_support(self):
5✔
729
        out = []
×
730
        for name, details in self._hw_wallets.items():
×
731
            try:
×
732
                p = self.get_plugin(name)
×
733
                if p.is_available():
×
734
                    out.append(HardwarePluginToScan(
×
735
                        name=name,
736
                        description=details[2],
737
                        plugin=p,
738
                        exception=None))
739
            except Exception as e:
×
740
                self.logger.exception(f"cannot load plugin for: {name}")
×
741
                out.append(HardwarePluginToScan(
×
742
                    name=name,
743
                    description=details[2],
744
                    plugin=None,
745
                    exception=e))
746
        return out
×
747

748
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
749
        from .wallet import register_wallet_type, register_constructor
5✔
750
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
751

752
        def loader():
5✔
753
            plugin = self.get_plugin(name)
5✔
754
            register_constructor(wallet_type, plugin.wallet_class)
5✔
755
        register_wallet_type(wallet_type)
5✔
756
        plugin_loaders[wallet_type] = loader
5✔
757

758
    def register_keystore(self, name, details):
5✔
759
        from .keystore import register_keystore
5✔
760

761
        def dynamic_constructor(d):
5✔
762
            return self.get_plugin(name).keystore_class(d)
5✔
763
        if details[0] == 'hardware':
5✔
764
            self._hw_wallets[name] = details
5✔
765
            self.logger.info(f"registering hardware {name}: {details}")
5✔
766
            register_keystore(details[1], dynamic_constructor)
5✔
767

768
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
769
        if name not in self.plugins:
5✔
770
            self.load_plugin(name)
5✔
771
        return self.plugins[name]
5✔
772

773
    def is_plugin_zip(self, name: str) -> bool:
5✔
774
        """Returns True if the plugin is a zip file"""
775
        if (metadata := self.get_metadata(name)) is None:
×
776
            return False
×
777
        return metadata.get('is_zip', False)
×
778

779
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
780
        """Returns the metadata of the plugin"""
781
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
782
        if not metadata:
5✔
783
            return None
×
784
        return metadata
5✔
785

786
    def run(self):
5✔
787
        while self.is_running():
5✔
788
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
789
            self.run_jobs()
5✔
790
        self.on_stop()
5✔
791

792
    def read_file(self, name: str, filename: str) -> bytes:
5✔
793
        if self.is_plugin_zip(name):
×
794
            plugin_filename = self.zip_plugin_path(name)
×
795
            metadata = self.external_plugin_metadata[name]
×
796
            dirname = metadata['dirname']
×
797
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
798
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
799
                    return myfile.read()
×
800
        elif name in self.internal_plugin_metadata:
×
801
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
802
            with open(path, 'rb') as myfile:
×
803
                return myfile.read()
×
804
        else:
805
            # no icon
806
            return None
×
807

808
def get_file_hash256(path: str) -> bytes:
5✔
809
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
810
    with open(path, 'rb') as f:
×
811
        return sha256(f.read())
×
812

813

814
def hook(func):
5✔
815
    hook_names.add(func.__name__)
5✔
816
    return func
5✔
817

818

819
def run_hook(name, *args):
5✔
820
    results = []
5✔
821
    f_list = hooks.get(name, [])
5✔
822
    for p, f in f_list:
5✔
823
        if p.is_enabled():
5✔
824
            try:
5✔
825
                r = f(*args)
5✔
826
            except Exception:
×
827
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
828
                r = False
×
829
            if r:
5✔
830
                results.append(r)
×
831

832
    if results:
5✔
833
        assert len(results) == 1, results
×
834
        return results[0]
×
835

836

837
class BasePlugin(Logger):
5✔
838

839
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
840
        self.parent = parent  # type: Plugins  # The plugins object
5✔
841
        self.name = name
5✔
842
        self.config = config
5✔
843
        Logger.__init__(self)
5✔
844
        # add self to hooks
845
        for k in dir(self):
5✔
846
            if k in hook_names:
5✔
847
                l = hooks.get(k, [])
5✔
848
                l.append((self, getattr(self, k)))
5✔
849
                hooks[k] = l
5✔
850

851
    def __str__(self):
5✔
852
        return self.name
×
853

854
    def close(self):
5✔
855
        # remove self from hooks
856
        for attr_name in dir(self):
×
857
            if attr_name in hook_names:
×
858
                # found attribute in self that is also the name of a hook
859
                l = hooks.get(attr_name, [])
×
860
                try:
×
861
                    l.remove((self, getattr(self, attr_name)))
×
862
                except ValueError:
×
863
                    # maybe attr name just collided with hook name and was not hook
864
                    continue
×
865
                hooks[attr_name] = l
×
866
        self.parent.close_plugin(self)
×
867
        self.on_close()
×
868

869
    def on_close(self):
5✔
870
        pass
×
871

872
    def requires_settings(self) -> bool:
5✔
873
        return False
×
874

875
    def thread_jobs(self):
5✔
876
        return []
5✔
877

878
    def is_enabled(self):
5✔
879
        if not self.is_available():
×
880
            return False
×
881
        if not self.parent.is_authorized(self.name):
×
882
            return False
×
883
        return self.config.is_plugin_enabled(self.name)
×
884

885
    def is_available(self):
5✔
886
        return True
×
887

888
    def can_user_disable(self):
5✔
889
        return True
×
890

891
    def settings_widget(self, window):
5✔
892
        raise NotImplementedError()
×
893

894
    def settings_dialog(self, window):
5✔
895
        raise NotImplementedError()
×
896

897
    def read_file(self, filename: str) -> bytes:
5✔
898
        return self.parent.read_file(self.name, filename)
×
899

900
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
901
        """Returns a dict which is persisted in the per-wallet database."""
902
        plugin_storage = wallet.db.get_plugin_storage()
×
903
        return plugin_storage.setdefault(self.name, {})
×
904

905
class DeviceUnpairableError(UserFacingException): pass
5✔
906
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
907
class CannotAutoSelectDevice(Exception): pass
5✔
908

909

910
class Device(NamedTuple):
5✔
911
    path: Union[str, bytes]
5✔
912
    interface_number: int
5✔
913
    id_: str
5✔
914
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
915
    usage_page: int
5✔
916
    transport_ui_string: str
5✔
917

918

919
class DeviceInfo(NamedTuple):
5✔
920
    device: Device
5✔
921
    label: Optional[str] = None
5✔
922
    initialized: Optional[bool] = None
5✔
923
    exception: Optional[Exception] = None
5✔
924
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
925
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
926
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
927

928
    def label_for_device_select(self) -> str:
5✔
929
        return (
×
930
            "{label} ({maybe_model}{init}, {transport})"
931
            .format(
932
                label=self.label or _("An unnamed {}").format(self.plugin_name),
933
                init=(_("initialized") if self.initialized else _("wiped")),
934
                transport=self.device.transport_ui_string,
935
                maybe_model=f"{self.model_name}, " if self.model_name else ""
936
            )
937
        )
938

939

940
class HardwarePluginToScan(NamedTuple):
5✔
941
    name: str
5✔
942
    description: str
5✔
943
    plugin: Optional['HW_PluginBase']
5✔
944
    exception: Optional[Exception]
5✔
945

946

947
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
948

949

950
# hidapi is not thread-safe
951
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
952
#     https://github.com/libusb/hidapi/issues/45
953
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
954
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
955
# It is not entirely clear to me, exactly what is safe and what isn't, when
956
# using multiple threads...
957
# Hence, we use a single thread for all device communications, including
958
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
959
# the following thread:
960
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
961
    max_workers=1,
962
    thread_name_prefix='hwd_comms_thread'
963
)
964

965
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
966
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
967
# To keep it simple, let's just import it now, as we are likely in the main thread here.
968
if threading.current_thread() is not threading.main_thread():
5✔
969
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
970
try:
5✔
971
    import hid
5✔
972
except ImportError:
5✔
973
    pass
5✔
974

975

976
T = TypeVar('T')
5✔
977

978

979
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
980
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
981
        return func()
×
982
    else:
983
        fut = _hwd_comms_executor.submit(func)
×
984
        return fut.result()
×
985
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
986

987

988
def runs_in_hwd_thread(func):
5✔
989
    @wraps(func)
5✔
990
    def wrapper(*args, **kwargs):
5✔
991
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
992
    return wrapper
5✔
993

994

995
def assert_runs_in_hwd_thread():
5✔
996
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
997
        raise Exception("must only be called from HWD communication thread")
×
998

999

1000
class DeviceMgr(ThreadJob):
5✔
1001
    """Manages hardware clients.  A client communicates over a hardware
1002
    channel with the device.
1003

1004
    In addition to tracking device HID IDs, the device manager tracks
1005
    hardware wallets and manages wallet pairing.  A HID ID may be
1006
    paired with a wallet when it is confirmed that the hardware device
1007
    matches the wallet, i.e. they have the same master public key.  A
1008
    HID ID can be unpaired if e.g. it is wiped.
1009

1010
    Because of hotplugging, a wallet must request its client
1011
    dynamically each time it is required, rather than caching it
1012
    itself.
1013

1014
    The device manager is shared across plugins, so just one place
1015
    does hardware scans when needed.  By tracking HID IDs, if a device
1016
    is plugged into a different port the wallet is automatically
1017
    re-paired.
1018

1019
    Wallets are informed on connect / disconnect events.  It must
1020
    implement connected(), disconnected() callbacks.  Being connected
1021
    implies a pairing.  Callbacks can happen in any thread context,
1022
    and we do them without holding the lock.
1023

1024
    Confusingly, the HID ID (serial number) reported by the HID system
1025
    doesn't match the device ID reported by the device itself.  We use
1026
    the HID IDs.
1027

1028
    This plugin is thread-safe.  Currently only devices supported by
1029
    hidapi are implemented."""
1030

1031
    def __init__(self, config: SimpleConfig):
5✔
1032
        ThreadJob.__init__(self)
5✔
1033
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1034
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
1035
        # A client->id_ map. Needs self.lock.
1036
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
1037
        # What we recognise.  (vendor_id, product_id) -> Plugin
1038
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
1039
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
1040
        # Custom enumerate functions for devices we don't know about.
1041
        self._enumerate_func = set()  # Needs self.lock.
5✔
1042

1043
        self.lock = threading.RLock()
5✔
1044

1045
        self.config = config
5✔
1046

1047
    def thread_jobs(self):
5✔
1048
        # Thread job to handle device timeouts
1049
        return [self]
5✔
1050

1051
    def run(self):
5✔
1052
        '''Handle device timeouts.  Runs in the context of the Plugins
1053
        thread.'''
1054
        with self.lock:
5✔
1055
            clients = list(self.clients.keys())
5✔
1056
        cutoff = time.time() - self.config.get_session_timeout()
5✔
1057
        for client in clients:
5✔
1058
            client.timeout(cutoff)
×
1059

1060
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1061
        for pair in device_pairs:
×
1062
            self._recognised_hardware[pair] = plugin
×
1063

1064
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1065
        for vendor_id in vendor_ids:
×
1066
            self._recognised_vendor[vendor_id] = plugin
×
1067

1068
    def register_enumerate_func(self, func):
5✔
1069
        with self.lock:
×
1070
            self._enumerate_func.add(func)
×
1071

1072
    @runs_in_hwd_thread
5✔
1073
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1074
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1075
        # Get from cache first
1076
        client = self._client_by_id(device.id_)
×
1077
        if client:
×
1078
            return client
×
1079
        client = plugin.create_client(device, handler)
×
1080
        if client:
×
1081
            self.logger.info(f"Registering {client}")
×
1082
            with self.lock:
×
1083
                self.clients[client] = device.id_
×
1084
        return client
×
1085

1086
    def id_by_pairing_code(self, pairing_code):
5✔
1087
        with self.lock:
×
1088
            return self.pairing_code_to_id.get(pairing_code)
×
1089

1090
    def pairing_code_by_id(self, id_):
5✔
1091
        with self.lock:
×
1092
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1093
                if id2 == id_:
×
1094
                    return pairing_code
×
1095
            return None
×
1096

1097
    def unpair_pairing_code(self, pairing_code):
5✔
1098
        with self.lock:
×
1099
            if pairing_code not in self.pairing_code_to_id:
×
1100
                return
×
1101
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1102
        self._close_client(_id)
×
1103

1104
    def unpair_id(self, id_):
5✔
1105
        pairing_code = self.pairing_code_by_id(id_)
×
1106
        if pairing_code:
×
1107
            self.unpair_pairing_code(pairing_code)
×
1108
        else:
1109
            self._close_client(id_)
×
1110

1111
    def _close_client(self, id_):
5✔
1112
        with self.lock:
×
1113
            client = self._client_by_id(id_)
×
1114
            self.clients.pop(client, None)
×
1115
        if client:
×
1116
            client.close()
×
1117

1118
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1119
        with self.lock:
×
1120
            for client, client_id in self.clients.items():
×
1121
                if client_id == id_:
×
1122
                    return client
×
1123
        return None
×
1124

1125
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1126
        '''Returns a client for the device ID if one is registered.  If
1127
        a device is wiped or in bootloader mode pairing is impossible;
1128
        in such cases we communicate by device ID and not wallet.'''
1129
        if scan_now:
×
1130
            self.scan_devices()
×
1131
        return self._client_by_id(id_)
×
1132

1133
    @runs_in_hwd_thread
5✔
1134
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1135
                            keystore: 'Hardware_KeyStore',
1136
                            force_pair: bool, *,
1137
                            devices: Sequence['Device'] = None,
1138
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1139
        self.logger.info("getting client for keystore")
×
1140
        if handler is None:
×
1141
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1142
        handler.update_status(False)
×
1143
        pcode = keystore.pairing_code()
×
1144
        client = None
×
1145
        # search existing clients first (fast-path)
1146
        if not devices:
×
1147
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1148
        # search clients again, now allowing a (slow) scan
1149
        if client is None:
×
1150
            if devices is None:
×
1151
                devices = self.scan_devices()
×
1152
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1153
        if client is None and force_pair:
×
1154
            try:
×
1155
                info = self.select_device(plugin, handler, keystore, devices,
×
1156
                                          allow_user_interaction=allow_user_interaction)
1157
            except CannotAutoSelectDevice:
×
1158
                pass
×
1159
            else:
1160
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1161
        if client:
×
1162
            handler.update_status(True)
×
1163
            # note: if select_device was called, we might also update label etc here:
1164
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1165
        self.logger.info("end client for keystore")
×
1166
        return client
×
1167

1168
    def client_by_pairing_code(
5✔
1169
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1170
        devices: Sequence['Device'],
1171
    ) -> Optional['HardwareClientBase']:
1172
        _id = self.id_by_pairing_code(pairing_code)
×
1173
        client = self._client_by_id(_id)
×
1174
        if client:
×
1175
            if type(client.plugin) != type(plugin):
×
1176
                return
×
1177
            # An unpaired client might have another wallet's handler
1178
            # from a prior scan.  Replace to fix dialog parenting.
1179
            client.handler = handler
×
1180
            return client
×
1181

1182
        for device in devices:
×
1183
            if device.id_ == _id:
×
1184
                return self.create_client(device, handler, plugin)
×
1185

1186
    def force_pair_keystore(
5✔
1187
        self,
1188
        *,
1189
        plugin: 'HW_PluginBase',
1190
        handler: 'HardwareHandlerBase',
1191
        info: 'DeviceInfo',
1192
        keystore: 'Hardware_KeyStore',
1193
    ) -> 'HardwareClientBase':
1194
        xpub = keystore.xpub
×
1195
        derivation = keystore.get_derivation_prefix()
×
1196
        assert derivation is not None
×
1197
        xtype = bip32.xpub_type(xpub)
×
1198
        client = self._client_by_id(info.device.id_)
×
1199
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1200
            # See comment above for same code
1201
            client.handler = handler
×
1202
            # This will trigger a PIN/passphrase entry request
1203
            try:
×
1204
                client_xpub = client.get_xpub(derivation, xtype)
×
1205
            except (UserCancelled, RuntimeError):
×
1206
                # Bad / cancelled PIN / passphrase
1207
                client_xpub = None
×
1208
            if client_xpub == xpub:
×
1209
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1210
                with self.lock:
×
1211
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1212
                return client
×
1213

1214
        # The user input has wrong PIN or passphrase, or cancelled input,
1215
        # or it is not pairable
1216
        raise DeviceUnpairableError(
×
1217
            _('Electrum cannot pair with your {}.\n\n'
1218
              'Before you request bitcoins to be sent to addresses in this '
1219
              'wallet, ensure you can pair with your device, or that you have '
1220
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1221
              'receive will be unspendable.').format(plugin.device))
1222

1223
    def list_pairable_device_infos(
5✔
1224
        self,
1225
        *,
1226
        handler: Optional['HardwareHandlerBase'],
1227
        plugin: 'HW_PluginBase',
1228
        devices: Sequence['Device'] = None,
1229
        include_failing_clients: bool = False,
1230
    ) -> List['DeviceInfo']:
1231
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1232
        Already paired devices are also included, as it is okay to reuse them.
1233
        """
1234
        if not plugin.libraries_available:
×
1235
            message = plugin.get_library_not_available_message()
×
1236
            raise HardwarePluginLibraryUnavailable(message)
×
1237
        if devices is None:
×
1238
            devices = self.scan_devices()
×
1239
        infos = []
×
1240
        for device in devices:
×
1241
            if not plugin.can_recognize_device(device):
×
1242
                continue
×
1243
            try:
×
1244
                client = self.create_client(device, handler, plugin)
×
1245
                if not client:
×
1246
                    continue
×
1247
                label = client.label()
×
1248
                is_initialized = client.is_initialized()
×
1249
                soft_device_id = client.get_soft_device_id()
×
1250
                model_name = client.device_model_name()
×
1251
            except Exception as e:
×
1252
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1253
                if include_failing_clients:
×
1254
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1255
                continue
×
1256
            infos.append(DeviceInfo(device=device,
×
1257
                                    label=label,
1258
                                    initialized=is_initialized,
1259
                                    plugin_name=plugin.name,
1260
                                    soft_device_id=soft_device_id,
1261
                                    model_name=model_name))
1262

1263
        return infos
×
1264

1265
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1266
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1267
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1268
        """Select the device to use for keystore."""
1269
        # ideally this should not be called from the GUI thread...
1270
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1271
        while True:
×
1272
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1273
            if infos:
×
1274
                break
×
1275
            if not allow_user_interaction:
×
1276
                raise CannotAutoSelectDevice()
×
1277
            msg = _('Please insert your {}').format(plugin.device)
×
1278
            msg += " ("
×
1279
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1280
                msg += f"label: {keystore.label}, "
×
1281
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1282
            msg += ').\n\n{}\n\n{}'.format(
×
1283
                _('Verify the cable is connected and that '
1284
                  'no other application is using it.'),
1285
                _('Try to connect again?')
1286
            )
1287
            if not handler.yes_no_question(msg):
×
1288
                raise UserCancelled()
×
1289
            devices = None
×
1290

1291
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1292
        # method 1: select device by id
1293
        if keystore.soft_device_id:
×
1294
            for info in infos:
×
1295
                if info.soft_device_id == keystore.soft_device_id:
×
1296
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1297
                    return info
×
1298
        # method 2: select device by label
1299
        #           but only if not a placeholder label and only if there is no collision
1300
        device_labels = [info.label for info in infos]
×
1301
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1302
                and device_labels.count(keystore.label) == 1):
1303
            for info in infos:
×
1304
                if info.label == keystore.label:
×
1305
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1306
                    return info
×
1307
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1308
        #           saved for keystore anyway, select it
1309
        if (len(infos) == 1
×
1310
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1311
                and keystore.soft_device_id is None):
1312
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1313
            return infos[0]
×
1314

1315
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1316
        if not allow_user_interaction:
×
1317
            raise CannotAutoSelectDevice()
×
1318
        # ask user to select device manually
1319
        msg = (
×
1320
                _("Could not automatically pair with device for given keystore.") + "\n"
1321
                + f"(keystore label: {keystore.label!r}, "
1322
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1323
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1324
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1325
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1326
                   for (idx, info) in enumerate(infos)]
1327
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1328
                          f"num options: {len(infos)}. options: {infos}")
1329
        c = handler.query_choice(msg, choices)
×
1330
        if c is None:
×
1331
            raise UserCancelled()
×
1332
        info = infos[c]
×
1333
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1334
        # note: updated label/soft_device_id will be saved after pairing succeeds
1335
        return info
×
1336

1337
    @runs_in_hwd_thread
5✔
1338
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1339
        try:
×
1340
            import hid  # noqa: F811
×
1341
        except ImportError:
×
1342
            return []
×
1343

1344
        devices = []
×
1345
        for d in hid.enumerate(0, 0):
×
1346
            vendor_id = d['vendor_id']
×
1347
            product_key = (vendor_id, d['product_id'])
×
1348
            plugin = None
×
1349
            if product_key in self._recognised_hardware:
×
1350
                plugin = self._recognised_hardware[product_key]
×
1351
            elif vendor_id in self._recognised_vendor:
×
1352
                plugin = self._recognised_vendor[vendor_id]
×
1353
            if plugin:
×
1354
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1355
                if device:
×
1356
                    devices.append(device)
×
1357
        return devices
×
1358

1359
    @runs_in_hwd_thread
5✔
1360
    @profiler
5✔
1361
    def scan_devices(self) -> Sequence['Device']:
5✔
1362
        self.logger.info("scanning devices...")
×
1363

1364
        # First see what's connected that we know about
1365
        devices = self._scan_devices_with_hid()
×
1366

1367
        # Let plugin handlers enumerate devices we don't know about
1368
        with self.lock:
×
1369
            enumerate_funcs = list(self._enumerate_func)
×
1370
        for f in enumerate_funcs:
×
1371
            try:
×
1372
                new_devices = f()
×
1373
            except BaseException as e:
×
1374
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1375
            else:
1376
                devices.extend(new_devices)
×
1377

1378
        # find out what was disconnected
1379
        client_ids = [dev.id_ for dev in devices]
×
1380
        disconnected_clients = []
×
1381
        with self.lock:
×
1382
            connected = {}
×
1383
            for client, id_ in self.clients.items():
×
1384
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1385
                    connected[client] = id_
×
1386
                else:
1387
                    disconnected_clients.append((client, id_))
×
1388
            self.clients = connected
×
1389

1390
        # Unpair disconnected devices
1391
        for client, id_ in disconnected_clients:
×
1392
            self.unpair_id(id_)
×
1393
            if client.handler:
×
1394
                client.handler.update_status(False)
×
1395

1396
        return devices
×
1397

1398
    @classmethod
5✔
1399
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1400
        ret = {}
×
1401
        # add libusb
1402
        try:
×
1403
            import usb1
×
1404
        except Exception as e:
×
1405
            ret["libusb.version"] = None
×
1406
        else:
1407
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1408
            try:
×
1409
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1410
            except AttributeError:
×
1411
                ret["libusb.path"] = None
×
1412
        # add hidapi
1413
        try:
×
1414
            import hid  # noqa: F811
×
1415
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1416
        except Exception as e:
×
1417
            from importlib.metadata import version
×
1418
            try:
×
1419
                ret["hidapi.version"] = version("hidapi")
×
1420
            except ImportError:
×
1421
                ret["hidapi.version"] = None
×
1422
        return ret
×
1423

1424
    def trigger_pairings(
5✔
1425
            self,
1426
            keystores: Sequence['KeyStore'],
1427
            *,
1428
            allow_user_interaction: bool = True,
1429
            devices: Sequence['Device'] = None,
1430
    ) -> None:
1431
        """Given a list of keystores, try to pair each with a connected hardware device.
1432

1433
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1434
        try to pair each keystore individually. Consider the following scenario:
1435
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1436
        - assume none of the devices are paired yet
1437
        1. if we tried to individually pair keystores, we might try with ks1 first
1438
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1439
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1440
             which is confusing and error-prone. It's especially problematic if the hw device does
1441
             not support labels (such as Ledger), as then the user cannot easily distinguish
1442
             same-type devices. (see #4199)
1443
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1444
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1445
        """
1446
        from .keystore import Hardware_KeyStore
×
1447
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1448
        if not keystores:
×
1449
            return
×
1450
        if devices is None:
×
1451
            devices = self.scan_devices()
×
1452
        # first pair with all devices that can be auto-selected
1453
        for ks in keystores:
×
1454
            try:
×
1455
                ks.get_client(
×
1456
                    force_pair=True,
1457
                    allow_user_interaction=False,
1458
                    devices=devices,
1459
                )
1460
            except UserCancelled:
×
1461
                pass
×
1462
        if allow_user_interaction:
×
1463
            # now do manual selections
1464
            for ks in keystores:
×
1465
                try:
×
1466
                    ks.get_client(
×
1467
                        force_pair=True,
1468
                        allow_user_interaction=True,
1469
                        devices=devices,
1470
                    )
1471
                except UserCancelled:
×
1472
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc