• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5921940256325632

27 Mar 2026 07:02AM UTC coverage: 64.84%. Remained the same
5921940256325632

Pull #10553

CirrusCI

f321x
don't merge: removing unused comments in plugins.py

Totally basic commit removing some outdated comments in plugins.py and doing
some minor simplifications. Should be a no brainer.
Pull Request #10553: ci: add claude code code review

1 of 4 new or added lines in 1 file covered. (25.0%)

3 existing lines in 2 files now uncovered.

24416 of 37656 relevant lines covered (64.84%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.1
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
1✔
27
import os
1✔
28
import pkgutil
1✔
29
import importlib.util
1✔
30
import time
1✔
31
import threading
1✔
32
import sys
1✔
33
import zipfile as zipfile_lib
1✔
34
from urllib.parse import urlparse
1✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
1✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
1✔
39
from concurrent.futures import Future
1✔
40
import zipimport
1✔
41
from functools import wraps, partial
1✔
42
from itertools import chain
1✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
1✔
45

46
from ._vendor.distutils.version import StrictVersion
1✔
47
from .version import ELECTRUM_VERSION
1✔
48
from .i18n import _
1✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem,
1✔
50
                   make_dir, make_aiohttp_session)
51
from . import bip32
1✔
52
from . import plugins
1✔
53
from .simple_config import SimpleConfig
1✔
54
from .logging import get_logger, Logger
1✔
55
from .crypto import sha256
1✔
56
from .network import Network
1✔
57

58
if TYPE_CHECKING:
59
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
60
    from .keystore import Hardware_KeyStore, KeyStore
61
    from .wallet import Abstract_Wallet
62

63

64
_logger = get_logger(__name__)
1✔
65
plugin_loaders = {}
1✔
66
hook_names = set()
1✔
67
hooks = {}
1✔
68
_exec_module_failure = {}  # type: Dict[str, Exception]
1✔
69

70
PLUGIN_PASSWORD_VERSION = 1
1✔
71

72

73
class Plugins(DaemonThread):
1✔
74

75
    pkgpath = os.path.dirname(plugins.__file__)
1✔
76
    keyfile_posix = '/etc/electrum/plugins_key'
1✔
77
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
1✔
78

79
    @profiler
1✔
80
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
1✔
81
        self.config = config
1✔
82
        self.cmd_only = cmd_only  # type: bool
1✔
83
        self.internal_plugin_metadata = {}
1✔
84
        self.external_plugin_metadata = {}
1✔
85
        if cmd_only:
1✔
86
            # only import the command modules of plugins
87
            Logger.__init__(self)
×
88
            self.find_plugins()
×
89
            self.load_plugins()
×
90
            return
×
91
        DaemonThread.__init__(self)
1✔
92
        self.device_manager = DeviceMgr(config)
1✔
93
        self.name = 'Plugins'  # set name of thread
1✔
94
        self._hw_wallets = {}
1✔
95
        self.plugins = {}  # type: Dict[str, BasePlugin]
1✔
96
        self.gui_name = gui_name
1✔
97
        self.find_plugins()
1✔
98
        self.load_plugins()
1✔
99
        self.add_jobs(self.device_manager.thread_jobs())
1✔
100
        self.start()
1✔
101

102
    @property
1✔
103
    def descriptions(self):
1✔
104
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
105

106
    def find_directory_plugins(self, pkg_path: str, external: bool):
1✔
107
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
108
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
1✔
109
        for loader, name, ispkg in iter_modules:
1✔
110
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
1✔
UNCOV
111
                continue
×
112
            module_path = os.path.join(pkg_path, name)
1✔
113
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
1✔
114
                continue
×
115
            try:
1✔
116
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
1✔
117
                    d = json.load(f)
1✔
118
            except FileNotFoundError:
×
119
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
120
                continue
×
121
            if 'fullname' not in d:
1✔
122
                continue
×
123
            d['path'] = module_path
1✔
124
            if not self.cmd_only:
1✔
125
                gui_good = self.gui_name in d.get('available_for', [])
1✔
126
                if not gui_good:
1✔
127
                    continue
1✔
128
                details = d.get('registers_wallet_type')
1✔
129
                if details:
1✔
130
                    self.register_wallet_type(name, gui_good, details)
1✔
131
                details = d.get('registers_keystore')
1✔
132
                if details:
1✔
133
                    self.register_keystore(name, details)
1✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
1✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                _logger.info(f"duplicate plugins? for {name=}")
×
137
                continue
×
138
            if not external:
1✔
139
                self.internal_plugin_metadata[name] = d
1✔
140
            else:
141
                self.external_plugin_metadata[name] = d
×
142

143
    @staticmethod
1✔
144
    def exec_module_from_spec(spec, path: str):
1✔
145
        if prev_fail := _exec_module_failure.get(path):
1✔
146
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
147
        try:
1✔
148
            module = importlib.util.module_from_spec(spec)
1✔
149
            sys.modules[path] = module
1✔
150
            spec.loader.exec_module(module)
1✔
151
        except Exception as e:
×
152
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
153
            # so the import system knows it failed. If called again for the same plugin, we do not
154
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
155
            # might have defined commands).
156
            _exec_module_failure[path] = e
×
157
            if path in sys.modules:
×
158
                sys.modules.pop(path, None)
×
159
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
160
        return module
1✔
161

162
    def find_plugins(self):
1✔
163
        internal_plugins_path = (self.pkgpath, False)
1✔
164
        external_plugins_path = (self.get_external_plugin_dir(), True)
1✔
165
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
1✔
166
            if pkg_path and os.path.exists(pkg_path):
1✔
167
                if not external:
1✔
168
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
1✔
169
                else:
170
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
1✔
171

172
    def load_plugins(self):
1✔
173
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
1✔
174
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
1✔
175
                try:
×
176
                    if self.cmd_only:  # only load init method to register commands
×
177
                        self.maybe_load_plugin_init_method(name)
×
178
                    else:
179
                        self.load_plugin_by_name(name)
×
180
                except BaseException as e:
×
181
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
182

183
    def _has_root_permissions(self, path):
1✔
184
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
185

186
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
1✔
187
        if sys.platform in ['windows', 'win32']:
×
188
            keyfile_path = self.keyfile_windows
×
189
            keyfile_help = _('This file can be edited with Regedit')
×
190
        elif 'ANDROID_DATA' in os.environ:
×
191
            raise Exception('platform not supported')
×
192
        else:
UNCOV
193
            keyfile_path = self.keyfile_posix
×
194
            keyfile_help = "" if not key_hex else "".join([
×
195
                                         _('The file must have root permissions'),
196
                                         ".\n\n",
197
                                         _("To set it you can also use the Auto-Setup or run "
198
                                           "the following terminal command"),
199
                                         ":\n\n",
200
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
201
            ])
202
        return keyfile_path, keyfile_help
×
203

204
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
1✔
205
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
206
        try:
×
207
            if sys.platform in ['windows', 'win32']:
×
208
                self._write_key_to_regedit_windows(pubkey_hex)
×
209
            elif 'ANDROID_DATA' in os.environ:
×
210
                raise Exception('platform not supported')
×
211
            elif sys.platform.startswith('darwin'):  # macOS
×
212
                self._write_key_to_root_file_macos(pubkey_hex)
×
213
            else:
214
                self._write_key_to_root_file_linux(pubkey_hex)
×
215
        except Exception:
×
216
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
217
            return False
×
218
        return True
×
219

220
    def try_auto_key_reset(self) -> bool:
1✔
221
        try:
×
222
            if sys.platform in ['windows', 'win32']:
×
223
                self._delete_plugin_key_from_windows_registry()
×
224
            elif 'ANDROID_DATA' in os.environ:
×
225
                raise Exception('platform not supported')
×
226
            elif sys.platform.startswith('darwin'):  # macOS
×
227
                self._delete_macos_plugin_keyfile()
×
228
            else:
229
                self._delete_linux_plugin_keyfile()
×
230
        except Exception:
×
231
            self.logger.exception(f'auto-reset of plugin key failed')
×
232
            return False
×
233
        return True
×
234

235
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
1✔
236
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
237
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
238
        sh_command = (
×
239
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
240
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
241
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
242
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
243
        )
244
        return sh_command
×
245

246
    @staticmethod
1✔
247
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
1✔
248
        """
249
        Inspired by
250
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
251
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
252
        """
253
        from shlex import quote
×
254

255
        def quote_shell(args):
×
256
            return " ".join(quote(arg) for arg in args)
×
257

258
        def quote_applescript(string):
×
259
            charmap = {
×
260
                "\n": "\\n",
261
                "\r": "\\r",
262
                "\t": "\\t",
263
                "\"": "\\\"",
264
                "\\": "\\\\",
265
            }
266
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
267

268
        commands = [
×
269
            "osascript",
270
            "-e",
271
            "do shell script %s "
272
            "with administrator privileges "
273
            "without altering line endings"
274
            % quote_applescript(quote_shell(commands))
275
        ]
276
        return commands
×
277

278
    @staticmethod
1✔
279
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
1✔
280
        """
281
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
282
        """
283
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
284
        # for the result of the process (returns no process handle)
285
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
286
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
287

288
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
289
        class SHELLEXECUTEINFO(Structure):
×
290
            _fields_ = [
×
291
                ('cbSize', DWORD),
292
                ('fMask', c_ulong),
293
                ('hwnd', HWND),
294
                ('lpVerb', LPCWSTR),
295
                ('lpFile', LPCWSTR),
296
                ('lpParameters', LPCWSTR),
297
                ('lpDirectory', LPCWSTR),
298
                ('nShow', c_ulong),
299
                ('hInstApp', HINSTANCE),
300
                ('lpIDList', c_ulong),
301
                ('lpClass', LPCWSTR),
302
                ('hkeyClass', HKEY),
303
                ('dwHotKey', DWORD),
304
                ('hIcon', HANDLE),
305
                ('hProcess', HANDLE)
306
            ]
307

308
        info = SHELLEXECUTEINFO()
×
309
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
310
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
311
        info.hwnd = None
×
312
        info.lpVerb = 'runas'  # run as administrator
×
313
        info.lpFile = 'reg.exe'  # the executable to run
×
314
        info.lpParameters = reg_exe_command  # the registry edit command
×
315
        info.lpDirectory = None
×
316
        info.nShow = 1
×
317

318
        # Execute and wait
319
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
320
            error = windll.kernel32.GetLastError()
×
321
            raise Exception(f'Error executing registry command: {error}')
×
322

323
        # block until the process is done or 5 sec timeout
324
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
325

326
        # Close handle
327
        windll.kernel32.CloseHandle(info.hProcess)
×
328

329
    @staticmethod
1✔
330
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
1✔
331
        """
332
        Executes the given commands in a subprocess and asserts that it was successful.
333
        """
334
        import subprocess
×
335
        with subprocess.Popen(
×
336
            commands,
337
            stdout=subprocess.PIPE,
338
            stderr=subprocess.PIPE,
339
            text=True,
340
        ) as process:
341
            stdout, stderr = process.communicate()
×
342
            if process.returncode != 0:
×
343
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
344

345
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
1✔
346
        """
347
        Spawns a pkexec subprocess to write the key to a file with root permissions.
348
        This will open an OS dialog asking for the root password. Can only succeed if
349
        the system has polkit installed.
350
        """
351
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
352

353
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
354
        commands = ['pkexec', 'sh', '-c', sh_command]
×
355
        self._execute_commands_in_subprocess(commands)
×
356

357
        # check if the key was written correctly
358
        with open(self.keyfile_posix, 'r') as f:
×
359
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
360
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
361

362
    def _delete_linux_plugin_keyfile(self) -> None:
1✔
363
        """
364
        Deletes the root owned key file at self.keyfile_posix.
365
        """
366
        if not os.path.exists(self.keyfile_posix):
×
367
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
368
            return
×
369
        if not self._has_root_permissions(self.keyfile_posix):
×
370
            os.unlink(self.keyfile_posix)
×
371
            return
×
372

373
        # use pkexec to delete the file as root user
374
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
375
        self._execute_commands_in_subprocess(commands)
×
376
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
377

378
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
1✔
379
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
380

381
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
382
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
383

384
        self._execute_commands_in_subprocess(macos_commands)
×
385
        with open(self.keyfile_posix, 'r') as f:
×
386
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
387
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
388

389
    def _delete_macos_plugin_keyfile(self) -> None:
1✔
390
        if not os.path.exists(self.keyfile_posix):
×
391
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
392
            return
×
393
        if not self._has_root_permissions(self.keyfile_posix):
×
394
            os.unlink(self.keyfile_posix)
×
395
            return
×
396
        # use osascript to delete the file as root user
397
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
398
        self._execute_commands_in_subprocess(macos_commands)
×
399
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
400

401
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
1✔
402
        """
403
        Writes the key to the Windows registry with windows UAC prompt.
404
        """
405
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
406

407
        value_type = 'REG_SZ'
×
408
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
409

410
        self._run_win_regedit_as_admin(command)
×
411

412
        # check if the key was written correctly
413
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
414
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
415
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
416
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
417

418
    def _delete_plugin_key_from_windows_registry(self) -> None:
1✔
419
        """
420
        Deletes the PluginsKey dir in the Windows registry.
421
        """
422
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
423

424
        command = f'delete "{self.keyfile_windows}" /f'
×
425
        self._run_win_regedit_as_admin(command)
×
426

427
        try:
×
428
            # do a sanity check to see if the key has been deleted
429
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
430
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
431
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
432
        except FileNotFoundError:
×
433
            pass
×
434

435
    def create_new_key(self, password:str) -> str:
1✔
436
        salt = os.urandom(32)
×
437
        privkey = self.derive_privkey(password, salt)
×
438
        pubkey = privkey.get_public_key_bytes()
×
439
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
440
        return key.hex()
×
441

442
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], Optional[bytes]]:
1✔
443
        """
444
        returns pubkey, salt
445
        returns None, None if the pubkey has not been set
446
        """
447
        if sys.platform in ['windows', 'win32']:
×
448
            import winreg
×
449
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
450
                try:
×
451
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
452
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
453
                except Exception as e:
×
454
                    self.logger.info(f'winreg error: {e}')
×
455
                    return None, None
×
456
        elif 'ANDROID_DATA' in os.environ:
×
NEW
457
            raise NotImplementedError("not supported")
×
458
        else:
459
            # treat unknown platforms as linux-like
460
            if not os.path.exists(self.keyfile_posix):
×
461
                return None, None
×
462
            if not self._has_root_permissions(self.keyfile_posix):
×
463
                return None, None
×
464
            with open(self.keyfile_posix) as f:
×
465
                key_hex = f.read()
×
466
        try:
×
467
            key = bytes.fromhex(key_hex)
×
468
            version = key[0]
×
469
        except Exception:
×
470
            self.logger.exception(f'{key_hex=} invalid')
×
471
            return None, None
×
472
        if version != PLUGIN_PASSWORD_VERSION:
×
473
            self.logger.info(f'unknown plugin password version: {version}')
×
474
            return None, None
×
475
        # all good
476
        salt = key[1:1+32]
×
477
        pubkey = key[1+32:]
×
478
        return pubkey, salt
×
479

480
    def get_external_plugin_dir(self) -> str:
1✔
481
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
1✔
482
        make_dir(pkg_path)
1✔
483
        return pkg_path
1✔
484

485
    async def download_external_plugin(self, url: str) -> str:
1✔
486
        filename = os.path.basename(urlparse(url).path)
×
487
        pkg_path = self.get_external_plugin_dir()
×
488
        path = os.path.join(pkg_path, filename)
×
489
        if os.path.exists(path):
×
490
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
491
        network = Network.get_instance()
×
492
        proxy = network.proxy if network else None
×
493
        async with make_aiohttp_session(proxy=proxy) as session:
×
494
            async with session.get(url) as resp:
×
495
                if resp.status == 200:
×
496
                    with open(path, 'wb') as fd:
×
497
                        async for chunk in resp.content.iter_chunked(10):
×
498
                            fd.write(chunk)
×
499
        return path
×
500

501
    def read_manifest(self, path) -> dict:
1✔
502
        """ return json dict """
503
        with zipfile_lib.ZipFile(path) as file:
×
504
            for filename in file.namelist():
×
505
                if filename.endswith('manifest.json'):
×
506
                    break
×
507
            else:
508
                raise Exception('could not find manifest.json in zip archive')
×
509
            with file.open(filename, 'r') as f:
×
510
                manifest = json.load(f)
×
511
                manifest['path'] = path  # external, path of the zipfile
×
512
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
513
                manifest['is_zip'] = True
×
514
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
515
                return manifest
×
516

517
    def zip_plugin_path(self, name) -> str:
1✔
518
        path = self.get_metadata(name)['path']
×
519
        filename = os.path.basename(path)
×
520
        if name in self.internal_plugin_metadata:
×
521
            pkg_path = self.pkgpath
×
522
        else:
523
            pkg_path = self.get_external_plugin_dir()
×
524
        return os.path.join(pkg_path, filename)
×
525

526
    def find_zip_plugins(self, pkg_path: str, external: bool):
1✔
527
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
528
        if pkg_path is None:
1✔
529
            return
×
530
        for filename in os.listdir(pkg_path):
1✔
531
            path = os.path.join(pkg_path, filename)
×
532
            if not filename.endswith('.zip'):
×
533
                continue
×
534
            try:
×
535
                d = self.read_manifest(path)
×
536
                name = d['name']
×
537
            except Exception:
×
NEW
538
                self.logger.info(f"manifest.json not available {filename}", exc_info=True)
×
539
                continue
×
540
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
541
                self.logger.info(f"duplicate plugins for {name=}")
×
542
                continue
×
543
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
544
                continue
×
545
            min_version = d.get('min_electrum_version')
×
546
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
547
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
548
                continue
×
549
            max_version = d.get('max_electrum_version')
×
550
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
551
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
552
                continue
×
553

554
            if not self.cmd_only:
×
555
                gui_good = self.gui_name in d.get('available_for', [])
×
556
                if not gui_good:
×
557
                    continue
×
558
                if 'fullname' not in d:
×
559
                    continue
×
560
                details = d.get('registers_keystore')
×
561
                if details:
×
562
                    self.register_keystore(name, details)
×
563
            if external:
×
564
                self.external_plugin_metadata[name] = d
×
565
            else:
566
                self.internal_plugin_metadata[name] = d
×
567

568
    def get(self, name):
1✔
569
        return self.plugins.get(name)
×
570

571
    def count(self):
1✔
572
        return len(self.plugins)
×
573

574
    def load_plugin(self, name) -> 'BasePlugin':
1✔
575
        """Imports the code of the given plugin.
576
        note: can be called from any thread.
577
        """
578
        if self.get_metadata(name):
1✔
579
            return self.load_plugin_by_name(name)
1✔
580
        else:
581
            raise Exception(f"could not find plugin {name!r}")
×
582

583
    def maybe_load_plugin_init_method(self, name: str) -> None:
1✔
584
        """Loads the __init__.py module of the plugin if it is not already loaded."""
585
        if not self.is_authorized(name):
1✔
586
            return
×
587
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
1✔
588
        if base_name not in sys.modules:
1✔
589
            metadata = self.get_metadata(name)
1✔
590
            is_zip = metadata.get('is_zip', False)
1✔
591
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
592
            if not is_zip:
1✔
593
                if self.is_external(name):
1✔
594
                    # this branch is deprecated: external plugins are always zip files
595
                    path = os.path.join(metadata['path'], '__init__.py')
×
596
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
597
                else:
598
                    init_spec = importlib.util.find_spec(base_name)
1✔
599
            else:
600
                zipfile = zipimport.zipimporter(metadata['path'])
×
601
                dirname = metadata['dirname']
×
602
                init_spec = zipfile.find_spec(dirname)
×
603

604
            self.exec_module_from_spec(init_spec, base_name)
1✔
605

606
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
1✔
607
        if name in self.plugins:
1✔
608
            return self.plugins[name]
1✔
609
        self.maybe_load_plugin_init_method(name)
1✔
610
        if not self.is_authorized(name):
1✔
NEW
611
            return
×
612
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
613
        is_external = self.is_external(name)
1✔
614
        if not is_external:
1✔
615
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
1✔
616
        else:
617
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
618

619
        spec = importlib.util.find_spec(full_name)
1✔
620
        if spec is None:
1✔
621
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
622
        try:
1✔
623
            module = self.exec_module_from_spec(spec, full_name)
1✔
624
            plugin = module.Plugin(self, self.config, name)
1✔
625
        except Exception as e:
×
626
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
627
        self.add_jobs(plugin.thread_jobs())
1✔
628
        self.plugins[name] = plugin
1✔
629
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
1✔
630
        return plugin
1✔
631

632
    def close_plugin(self, plugin):
1✔
633
        self.remove_jobs(plugin.thread_jobs())
×
634

635
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
1✔
636
        from hashlib import pbkdf2_hmac
×
637
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
638
        return ECPrivkey(secret)
×
639

640
    def uninstall(self, name: str):
1✔
641
        if self.config.get(f'plugins.{name}'):
×
642
            self.config.set_key(f'plugins.{name}', None)
×
643
        if name in self.external_plugin_metadata:
×
644
            zipfile = self.zip_plugin_path(name)
×
645
            os.unlink(zipfile)
×
646
            self.external_plugin_metadata.pop(name)
×
647

648
    def is_internal(self, name) -> bool:
1✔
649
        return name in self.internal_plugin_metadata
×
650

651
    def is_external(self, name) -> bool:
1✔
652
        return name in self.external_plugin_metadata
1✔
653

654
    def is_auto_loaded(self, name):
1✔
655
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
656
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
657

658
    def is_installed(self, name) -> bool:
1✔
659
        """an external plugin may be installed but not authorized """
660
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
661

662
    def is_authorized(self, name) -> bool:
1✔
663
        if name in self.internal_plugin_metadata:
1✔
664
            return True
1✔
665
        if name not in self.external_plugin_metadata:
×
666
            return False
×
667
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
668
        if not pubkey_bytes:
×
669
            return False
×
670
        if not self.is_plugin_zip(name):
×
671
            return False
×
672
        filename = self.zip_plugin_path(name)
×
673
        plugin_hash = get_file_hash256(filename)
×
674
        sig = self.config.get(f'plugins.{name}.authorized')
×
675
        if not sig:
×
676
            return False
×
677
        pubkey = ECPubkey(pubkey_bytes)
×
678
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
679

680
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
1✔
681
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
682
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
683
        plugin_hash = get_file_hash256(filename)
×
684
        sig = privkey.ecdsa_sign(plugin_hash)
×
685
        value = sig.hex()
×
686
        self.config.set_key(f'plugins.{name}.authorized', value)
×
687
        self.config.set_key(f'plugins.{name}.enabled', True)
×
688

689
    def enable(self, name: str) -> 'BasePlugin':
1✔
690
        self.config.enable_plugin(name)
×
691
        p = self.get(name)
×
692
        if p:
×
693
            return p
×
694
        return self.load_plugin(name)
×
695

696
    def disable(self, name: str) -> None:
1✔
697
        self.config.disable_plugin(name)
×
698
        p = self.get(name)
×
699
        if not p:
×
700
            return
×
701
        self.plugins.pop(name)
×
702
        p.close()
×
703
        self.logger.info(f"closed {name}")
×
704

705
    @classmethod
1✔
706
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
1✔
707
        return key.startswith('plugins.')
×
708

709
    def is_available(self, name: str) -> bool:
1✔
710
        d = self.descriptions.get(name)
×
711
        if not d:
×
712
            return False
×
713
        deps = d.get('requires', [])
×
714
        for dep, s in deps:
×
715
            try:
×
716
                __import__(dep)
×
717
            except ImportError as e:
×
718
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
719
                return False
×
720
        return True
×
721

722
    def get_hardware_support(self):
1✔
723
        out = []
×
724
        for name, details in self._hw_wallets.items():
×
725
            if not self.is_authorized(name):
×
726
                # we allow non-authorized plugins to populate self._hw_wallets
727
                # so that a plugin can be loaded and authorized without having
728
                # to start a new session
729
                continue
×
730
            try:
×
731
                p = self.get_plugin(name)
×
732
                if p.is_available():
×
733
                    out.append(HardwarePluginToScan(
×
734
                        name=name,
735
                        description=details[2],
736
                        plugin=p,
737
                        exception=None))
738
            except Exception as e:
×
739
                self.logger.exception(f"cannot load plugin for: {name}")
×
740
                out.append(HardwarePluginToScan(
×
741
                    name=name,
742
                    description=details[2],
743
                    plugin=None,
744
                    exception=e))
745
        return out
×
746

747
    def register_wallet_type(self, name, gui_good, wallet_type):
1✔
748
        from .wallet import register_wallet_type, register_constructor
1✔
749
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
1✔
750

751
        def loader():
1✔
752
            plugin = self.get_plugin(name)
1✔
753
            register_constructor(wallet_type, plugin.wallet_class)
1✔
754
        register_wallet_type(wallet_type)
1✔
755
        plugin_loaders[wallet_type] = loader
1✔
756

757
    def register_keystore(self, name, details):
1✔
758
        from .keystore import register_keystore
1✔
759

760
        def dynamic_constructor(d):
1✔
761
            return self.get_plugin(name).keystore_class(d)
1✔
762
        if details[0] == 'hardware':
1✔
763
            self._hw_wallets[name] = details
1✔
764
            self.logger.info(f"registering hardware {name}: {details}")
1✔
765
            register_keystore(details[1], dynamic_constructor)
1✔
766

767
    def get_plugin(self, name: str) -> 'BasePlugin':
1✔
768
        assert self.is_authorized(name)
1✔
769
        if name not in self.plugins:
1✔
770
            self.load_plugin(name)
1✔
771
        return self.plugins[name]
1✔
772

773
    def is_plugin_zip(self, name: str) -> bool:
1✔
774
        """Returns True if the plugin is a zip file"""
775
        if (metadata := self.get_metadata(name)) is None:
×
776
            return False
×
777
        return metadata.get('is_zip', False)
×
778

779
    def get_metadata(self, name: str) -> Optional[dict]:
1✔
780
        """Returns the metadata of the plugin"""
781
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
1✔
782
        if not metadata:
1✔
783
            return None
×
784
        return metadata
1✔
785

786
    def run(self):
1✔
787
        while self.is_running():
1✔
788
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
1✔
789
            self.run_jobs()
1✔
790
        self.on_stop()
1✔
791

792
    def read_file(self, name: str, filename: str) -> bytes:
1✔
793
        if self.is_plugin_zip(name):
×
794
            plugin_filename = self.zip_plugin_path(name)
×
795
            metadata = self.external_plugin_metadata[name]
×
796
            dirname = metadata['dirname']
×
797
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
798
                with myzip.open("/".join([dirname,filename])) as myfile:
×
799
                    return myfile.read()
×
800
        elif name in self.internal_plugin_metadata:
×
801
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
802
            with open(path, 'rb') as myfile:
×
803
                return myfile.read()
×
804
        else:
805
            # no icon
806
            return None
×
807

808

809
def get_file_hash256(path: str) -> bytes:
1✔
810
    """Get the sha256 hash of a file, similar to `sha256sum`."""
811
    with open(path, 'rb') as f:
×
812
        return sha256(f.read())
×
813

814

815
def hook(func):
1✔
816
    hook_names.add(func.__name__)
1✔
817
    return func
1✔
818

819

820
def run_hook(name, *args):
1✔
821
    results = []
1✔
822
    f_list = hooks.get(name, [])
1✔
823
    for p, f in f_list:
1✔
824
        if p.is_enabled():
1✔
825
            try:
1✔
826
                r = f(*args)
1✔
827
            except Exception:
×
828
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
829
                r = False
×
830
            if r:
1✔
831
                results.append(r)
×
832

833
    if results:
1✔
834
        assert len(results) == 1, results
×
835
        return results[0]
×
836

837

838
class BasePlugin(Logger):
1✔
839

840
    def __init__(self, parent, config: 'SimpleConfig', name):
1✔
841
        self.parent = parent  # type: Plugins  # The plugins object
1✔
842
        self.name = name
1✔
843
        self.config = config
1✔
844
        Logger.__init__(self)
1✔
845
        # add self to hooks
846
        for k in dir(self):
1✔
847
            if k in hook_names:
1✔
848
                l = hooks.get(k, [])
1✔
849
                l.append((self, getattr(self, k)))
1✔
850
                hooks[k] = l
1✔
851

852
    def __str__(self):
1✔
853
        return self.name
×
854

855
    def close(self):
1✔
856
        # remove self from hooks
857
        for attr_name in dir(self):
×
858
            if attr_name in hook_names:
×
859
                # found attribute in self that is also the name of a hook
860
                l = hooks.get(attr_name, [])
×
861
                try:
×
862
                    l.remove((self, getattr(self, attr_name)))
×
863
                except ValueError:
×
864
                    # maybe attr name just collided with hook name and was not hook
865
                    continue
×
866
                hooks[attr_name] = l
×
867
        self.parent.close_plugin(self)
×
868
        self.on_close()
×
869

870
    def on_close(self):
1✔
871
        pass
×
872

873
    def requires_settings(self) -> bool:
1✔
874
        return False
×
875

876
    def thread_jobs(self):
1✔
877
        return []
1✔
878

879
    def is_enabled(self):
1✔
880
        if not self.is_available():
×
881
            return False
×
882
        if not self.parent.is_authorized(self.name):
×
883
            return False
×
884
        return self.config.is_plugin_enabled(self.name)
×
885

886
    def is_available(self):
1✔
887
        return True
×
888

889
    def can_user_disable(self):
1✔
890
        return True
×
891

892
    def settings_widget(self, window):
1✔
893
        raise NotImplementedError()
×
894

895
    def settings_dialog(self, window):
1✔
896
        raise NotImplementedError()
×
897

898
    def read_file(self, filename: str) -> bytes:
1✔
899
        return self.parent.read_file(self.name, filename)
×
900

901
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
1✔
902
        """Returns a dict which is persisted in the per-wallet database."""
903
        plugin_storage = wallet.db.get_plugin_storage()
×
904
        return plugin_storage.setdefault(self.name, {})
×
905

906

907
class DeviceUnpairableError(UserFacingException): pass
1✔
908
class HardwarePluginLibraryUnavailable(Exception): pass
1✔
909
class CannotAutoSelectDevice(Exception): pass
1✔
910

911

912
class Device(NamedTuple):
1✔
913
    path: Union[str, bytes]
1✔
914
    interface_number: int
1✔
915
    id_: str
1✔
916
    product_key: Any   # when using hid, often Tuple[int, int]
1✔
917
    usage_page: int
1✔
918
    transport_ui_string: str
1✔
919

920

921
class DeviceInfo(NamedTuple):
1✔
922
    device: Device
1✔
923
    label: Optional[str] = None
1✔
924
    initialized: Optional[bool] = None
1✔
925
    exception: Optional[Exception] = None
1✔
926
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
1✔
927
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
1✔
928
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
1✔
929

930
    def label_for_device_select(self) -> str:
1✔
931
        return (
×
932
            "{label} ({maybe_model}{init}, {transport})"
933
            .format(
934
                label=self.label or _("An unnamed {}").format(self.plugin_name),
935
                init=(_("initialized") if self.initialized else _("wiped")),
936
                transport=self.device.transport_ui_string,
937
                maybe_model=f"{self.model_name}, " if self.model_name else ""
938
            )
939
        )
940

941

942
class HardwarePluginToScan(NamedTuple):
1✔
943
    name: str
1✔
944
    description: str
1✔
945
    plugin: Optional['HW_PluginBase']
1✔
946
    exception: Optional[Exception]
1✔
947

948

949
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
1✔
950

951

952
# hidapi is not thread-safe
953
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
954
#     https://github.com/libusb/hidapi/issues/45
955
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
956
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
957
# It is not entirely clear to me, exactly what is safe and what isn't, when
958
# using multiple threads...
959
# Hence, we use a single thread for all device communications, including
960
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
961
# the following thread:
962
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
1✔
963
    max_workers=1,
964
    thread_name_prefix='hwd_comms_thread'
965
)
966

967
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
968
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
969
# To keep it simple, let's just import it now, as we are likely in the main thread here.
970
if threading.current_thread() is not threading.main_thread():
1✔
971
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
972
try:
1✔
973
    import hid
1✔
974
except ImportError:
1✔
975
    pass
1✔
976

977

978
T = TypeVar('T')
1✔
979

980

981
def run_in_hwd_thread(func: Callable[[], T]) -> T:
1✔
982
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
983
        return func()
×
984
    else:
985
        fut = _hwd_comms_executor.submit(func)
×
986
        return fut.result()
×
987
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
988

989

990
def runs_in_hwd_thread(func):
1✔
991
    @wraps(func)
1✔
992
    def wrapper(*args, **kwargs):
1✔
993
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
994
    return wrapper
1✔
995

996

997
def assert_runs_in_hwd_thread():
1✔
998
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
999
        raise Exception("must only be called from HWD communication thread")
×
1000

1001

1002
class DeviceMgr(ThreadJob):
1✔
1003
    """Manages hardware clients.  A client communicates over a hardware
1004
    channel with the device.
1005

1006
    In addition to tracking device HID IDs, the device manager tracks
1007
    hardware wallets and manages wallet pairing.  A HID ID may be
1008
    paired with a wallet when it is confirmed that the hardware device
1009
    matches the wallet, i.e. they have the same master public key.  A
1010
    HID ID can be unpaired if e.g. it is wiped.
1011

1012
    Because of hotplugging, a wallet must request its client
1013
    dynamically each time it is required, rather than caching it
1014
    itself.
1015

1016
    The device manager is shared across plugins, so just one place
1017
    does hardware scans when needed.  By tracking HID IDs, if a device
1018
    is plugged into a different port the wallet is automatically
1019
    re-paired.
1020

1021
    Wallets are informed on connect / disconnect events.  It must
1022
    implement connected(), disconnected() callbacks.  Being connected
1023
    implies a pairing.  Callbacks can happen in any thread context,
1024
    and we do them without holding the lock.
1025

1026
    Confusingly, the HID ID (serial number) reported by the HID system
1027
    doesn't match the device ID reported by the device itself.  We use
1028
    the HID IDs.
1029

1030
    This plugin is thread-safe.  Currently only devices supported by
1031
    hidapi are implemented."""
1032

1033
    def __init__(self, config: SimpleConfig):
1✔
1034
        ThreadJob.__init__(self)
1✔
1035
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1036
        self.pairing_code_to_id = {}  # type: Dict[str, str]
1✔
1037
        # A client->id_ map. Needs self.lock.
1038
        self.clients = {}  # type: Dict[HardwareClientBase, str]
1✔
1039
        # What we recognise.  (vendor_id, product_id) -> Plugin
1040
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
1✔
1041
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
1✔
1042
        # Custom enumerate functions for devices we don't know about.
1043
        self._enumerate_func = set()  # Needs self.lock.
1✔
1044
        self._ongoing_timeout_checks = {}  # type: Dict[str, Future]
1✔
1045

1046
        self.lock = threading.RLock()
1✔
1047

1048
        self.config = config
1✔
1049

1050
    def thread_jobs(self):
1✔
1051
        # Thread job to handle device timeouts
1052
        return [self]
1✔
1053

1054
    def run(self):
1✔
1055
        """Handle device timeouts.  Runs in the context of the Plugins
1056
        thread."""
1057
        with self.lock:
1✔
1058
            clients = list(self.clients.items())
1✔
1059
        cutoff = time.time() - self.config.get_session_timeout()
1✔
1060
        for client, client_id in clients:
1✔
1061
            if fut := self._ongoing_timeout_checks.get(client_id):
×
1062
                if not fut.done():
×
1063
                    continue
×
1064
            # scheduling the timeout check prevents blocking the Plugins DaemonThread if the
1065
            # _hwd_comms_executor Thread is blocked (e.g. due to it awaiting user input).
1066
            fut = _hwd_comms_executor.submit(client.timeout, cutoff)
×
1067
            self._ongoing_timeout_checks[client_id] = fut
×
1068

1069
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
1✔
1070
        for pair in device_pairs:
×
1071
            self._recognised_hardware[pair] = plugin
×
1072

1073
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
1✔
1074
        for vendor_id in vendor_ids:
×
1075
            self._recognised_vendor[vendor_id] = plugin
×
1076

1077
    def register_enumerate_func(self, func):
1✔
1078
        with self.lock:
×
1079
            self._enumerate_func.add(func)
×
1080

1081
    @runs_in_hwd_thread
1✔
1082
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
1✔
1083
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1084
        # Get from cache first
1085
        client = self._client_by_id(device.id_)
×
1086
        if client:
×
1087
            return client
×
1088
        client = plugin.create_client(device, handler)
×
1089
        if client:
×
1090
            self.logger.info(f"Registering {client}")
×
1091
            with self.lock:
×
1092
                self.clients[client] = device.id_
×
1093
        return client
×
1094

1095
    def id_by_pairing_code(self, pairing_code):
1✔
1096
        with self.lock:
×
1097
            return self.pairing_code_to_id.get(pairing_code)
×
1098

1099
    def pairing_code_by_id(self, id_):
1✔
1100
        with self.lock:
×
1101
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1102
                if id2 == id_:
×
1103
                    return pairing_code
×
1104
            return None
×
1105

1106
    def unpair_pairing_code(self, pairing_code):
1✔
1107
        with self.lock:
×
1108
            if pairing_code not in self.pairing_code_to_id:
×
1109
                return
×
1110
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1111
        self._close_client(_id)
×
1112

1113
    def unpair_id(self, id_):
1✔
1114
        pairing_code = self.pairing_code_by_id(id_)
×
1115
        if pairing_code:
×
1116
            self.unpair_pairing_code(pairing_code)
×
1117
        else:
1118
            self._close_client(id_)
×
1119

1120
    def _close_client(self, id_):
1✔
1121
        with self.lock:
×
1122
            client = self._client_by_id(id_)
×
1123
            self.clients.pop(client, None)
×
1124
            if fut := self._ongoing_timeout_checks.pop(id_, None):
×
1125
                fut.cancel()
×
1126
        if client:
×
1127
            client.close()
×
1128

1129
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
1✔
1130
        with self.lock:
×
1131
            for client, client_id in self.clients.items():
×
1132
                if client_id == id_:
×
1133
                    return client
×
1134
        return None
×
1135

1136
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
1✔
1137
        """Returns a client for the device ID if one is registered.  If
1138
        a device is wiped or in bootloader mode pairing is impossible;
1139
        in such cases we communicate by device ID and not wallet."""
1140
        if scan_now:
×
1141
            self.scan_devices()
×
1142
        return self._client_by_id(id_)
×
1143

1144
    @runs_in_hwd_thread
1✔
1145
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
1✔
1146
                            keystore: 'Hardware_KeyStore',
1147
                            force_pair: bool, *,
1148
                            devices: Sequence['Device'] = None,
1149
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1150
        self.logger.info("getting client for keystore")
×
1151
        if handler is None:
×
1152
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1153
        handler.update_status(False)
×
1154
        pcode = keystore.pairing_code()
×
1155
        client = None
×
1156
        # search existing clients first (fast-path)
1157
        if not devices:
×
1158
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1159
        # search clients again, now allowing a (slow) scan
1160
        if client is None:
×
1161
            if devices is None:
×
1162
                devices = self.scan_devices()
×
1163
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1164
        if client is None and force_pair:
×
1165
            try:
×
1166
                info = self.select_device(plugin, handler, keystore, devices,
×
1167
                                          allow_user_interaction=allow_user_interaction)
1168
            except CannotAutoSelectDevice:
×
1169
                pass
×
1170
            else:
1171
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1172
        if client:
×
1173
            handler.update_status(True)
×
1174
            # note: if select_device was called, we might also update label etc here:
1175
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1176
        self.logger.info("end client for keystore")
×
1177
        return client
×
1178

1179
    def client_by_pairing_code(
1✔
1180
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1181
        devices: Sequence['Device'],
1182
    ) -> Optional['HardwareClientBase']:
1183
        _id = self.id_by_pairing_code(pairing_code)
×
1184
        client = self._client_by_id(_id)
×
1185
        if client:
×
1186
            if type(client.plugin) != type(plugin):
×
1187
                return
×
1188
            # An unpaired client might have another wallet's handler
1189
            # from a prior scan.  Replace to fix dialog parenting.
1190
            client.handler = handler
×
1191
            return client
×
1192

1193
        for device in devices:
×
1194
            if device.id_ == _id:
×
1195
                return self.create_client(device, handler, plugin)
×
1196

1197
    def force_pair_keystore(
1✔
1198
        self,
1199
        *,
1200
        plugin: 'HW_PluginBase',
1201
        handler: 'HardwareHandlerBase',
1202
        info: 'DeviceInfo',
1203
        keystore: 'Hardware_KeyStore',
1204
    ) -> 'HardwareClientBase':
1205
        xpub = keystore.xpub
×
1206
        derivation = keystore.get_derivation_prefix()
×
1207
        assert derivation is not None
×
1208
        xtype = bip32.xpub_type(xpub)
×
1209
        client = self._client_by_id(info.device.id_)
×
1210
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1211
            # See comment above for same code
1212
            client.handler = handler
×
1213
            # This will trigger a PIN/passphrase entry request
1214
            try:
×
1215
                client_xpub = client.get_xpub(derivation, xtype)
×
1216
            except (UserCancelled, RuntimeError):
×
1217
                # Bad / cancelled PIN / passphrase
1218
                client_xpub = None
×
1219
            if client_xpub == xpub:
×
1220
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1221
                with self.lock:
×
1222
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1223
                return client
×
1224

1225
        # The user input has wrong PIN or passphrase, or cancelled input,
1226
        # or it is not pairable
1227
        raise DeviceUnpairableError(
×
1228
            _('Electrum cannot pair with your {}.\n\n'
1229
              'Before you request bitcoins to be sent to addresses in this '
1230
              'wallet, ensure you can pair with your device, or that you have '
1231
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1232
              'receive will be unspendable.').format(plugin.device))
1233

1234
    def list_pairable_device_infos(
1✔
1235
        self,
1236
        *,
1237
        handler: Optional['HardwareHandlerBase'],
1238
        plugin: 'HW_PluginBase',
1239
        devices: Sequence['Device'] = None,
1240
        include_failing_clients: bool = False,
1241
    ) -> List['DeviceInfo']:
1242
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1243
        Already paired devices are also included, as it is okay to reuse them.
1244
        """
1245
        if not plugin.libraries_available:
×
1246
            message = plugin.get_library_not_available_message()
×
1247
            raise HardwarePluginLibraryUnavailable(message)
×
1248
        if devices is None:
×
1249
            devices = self.scan_devices()
×
1250
        infos = []
×
1251
        for device in devices:
×
1252
            if not plugin.can_recognize_device(device):
×
1253
                continue
×
1254
            try:
×
1255
                client = self.create_client(device, handler, plugin)
×
1256
                if not client:
×
1257
                    continue
×
1258
                label = client.label()
×
1259
                is_initialized = client.is_initialized()
×
1260
                soft_device_id = client.get_soft_device_id()
×
1261
                model_name = client.device_model_name()
×
1262
            except Exception as e:
×
1263
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1264
                if include_failing_clients:
×
1265
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1266
                continue
×
1267
            infos.append(DeviceInfo(device=device,
×
1268
                                    label=label,
1269
                                    initialized=is_initialized,
1270
                                    plugin_name=plugin.name,
1271
                                    soft_device_id=soft_device_id,
1272
                                    model_name=model_name))
1273

1274
        return infos
×
1275

1276
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
1✔
1277
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1278
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1279
        """Select the device to use for keystore."""
1280
        # ideally this should not be called from the GUI thread...
1281
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1282
        while True:
×
1283
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1284
            if infos:
×
1285
                break
×
1286
            if not allow_user_interaction:
×
1287
                raise CannotAutoSelectDevice()
×
1288
            msg = _('Please insert your {}').format(plugin.device)
×
1289
            msg += " ("
×
1290
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1291
                msg += f"label: {keystore.label}, "
×
1292
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1293
            msg += ').\n\n{}\n\n{}'.format(
×
1294
                _('Verify the cable is connected and that '
1295
                  'no other application is using it.'),
1296
                _('Try to connect again?')
1297
            )
1298
            if not handler.yes_no_question(msg):
×
1299
                raise UserCancelled()
×
1300
            devices = None
×
1301

1302
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1303
        # method 1: select device by id
1304
        if keystore.soft_device_id:
×
1305
            for info in infos:
×
1306
                if info.soft_device_id == keystore.soft_device_id:
×
1307
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1308
                    return info
×
1309
        # method 2: select device by label
1310
        #           but only if not a placeholder label and only if there is no collision
1311
        device_labels = [info.label for info in infos]
×
1312
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1313
                and device_labels.count(keystore.label) == 1):
1314
            for info in infos:
×
1315
                if info.label == keystore.label:
×
1316
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1317
                    return info
×
1318
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1319
        #           saved for keystore anyway, select it
1320
        if (len(infos) == 1
×
1321
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1322
                and keystore.soft_device_id is None):
1323
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1324
            return infos[0]
×
1325

1326
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1327
        if not allow_user_interaction:
×
1328
            raise CannotAutoSelectDevice()
×
1329
        # ask user to select device manually
1330
        msg = (
×
1331
                _("Could not automatically pair with device for given keystore.") + "\n"
1332
                + f"(keystore label: {keystore.label!r}, "
1333
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1334
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1335
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1336
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1337
                   for (idx, info) in enumerate(infos)]
1338
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1339
                          f"num options: {len(infos)}. options: {infos}")
1340
        c = handler.query_choice(msg, choices)
×
1341
        if c is None:
×
1342
            raise UserCancelled()
×
1343
        info = infos[c]
×
1344
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1345
        # note: updated label/soft_device_id will be saved after pairing succeeds
1346
        return info
×
1347

1348
    @runs_in_hwd_thread
1✔
1349
    def _scan_devices_with_hid(self) -> List['Device']:
1✔
1350
        try:
×
1351
            import hid  # noqa: F811
×
1352
        except ImportError:
×
1353
            return []
×
1354

1355
        devices = []
×
1356
        for d in hid.enumerate(0, 0):
×
1357
            vendor_id = d['vendor_id']
×
1358
            product_key = (vendor_id, d['product_id'])
×
1359
            plugin = None
×
1360
            if product_key in self._recognised_hardware:
×
1361
                plugin = self._recognised_hardware[product_key]
×
1362
            elif vendor_id in self._recognised_vendor:
×
1363
                plugin = self._recognised_vendor[vendor_id]
×
1364
            if plugin:
×
1365
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1366
                if device:
×
1367
                    devices.append(device)
×
1368
        return devices
×
1369

1370
    @runs_in_hwd_thread
1✔
1371
    @profiler
1✔
1372
    def scan_devices(self) -> Sequence['Device']:
1✔
1373
        self.logger.info("scanning devices...")
×
1374

1375
        # First see what's connected that we know about
1376
        devices = self._scan_devices_with_hid()
×
1377

1378
        # Let plugin handlers enumerate devices we don't know about
1379
        with self.lock:
×
1380
            enumerate_funcs = list(self._enumerate_func)
×
1381
        for f in enumerate_funcs:
×
1382
            try:
×
1383
                new_devices = f()
×
1384
            except BaseException as e:
×
1385
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1386
            else:
1387
                devices.extend(new_devices)
×
1388

1389
        # find out what was disconnected
1390
        client_ids = [dev.id_ for dev in devices]
×
1391
        disconnected_clients = []
×
1392
        with self.lock:
×
1393
            connected = {}
×
1394
            for client, id_ in self.clients.items():
×
1395
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1396
                    connected[client] = id_
×
1397
                else:
1398
                    disconnected_clients.append((client, id_))
×
1399
            self.clients = connected
×
1400

1401
        # Unpair disconnected devices
1402
        for client, id_ in disconnected_clients:
×
1403
            self.unpair_id(id_)
×
1404
            if client.handler:
×
1405
                client.handler.update_status(False)
×
1406

1407
        return devices
×
1408

1409
    @classmethod
1✔
1410
    def version_info(cls) -> Mapping[str, Optional[str]]:
1✔
1411
        ret = {}
×
1412
        # add libusb
1413
        try:
×
1414
            import usb1
×
1415
        except Exception as e:
×
1416
            ret["libusb.version"] = None
×
1417
        else:
1418
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1419
            try:
×
1420
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1421
            except AttributeError:
×
1422
                ret["libusb.path"] = None
×
1423
        # add hidapi
1424
        try:
×
1425
            import hid  # noqa: F811
×
1426
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1427
        except Exception as e:
×
1428
            from importlib.metadata import version
×
1429
            try:
×
1430
                ret["hidapi.version"] = version("hidapi")
×
1431
            except ImportError:
×
1432
                ret["hidapi.version"] = None
×
1433
        return ret
×
1434

1435
    def trigger_pairings(
1✔
1436
            self,
1437
            keystores: Sequence['KeyStore'],
1438
            *,
1439
            allow_user_interaction: bool = True,
1440
            devices: Sequence['Device'] = None,
1441
    ) -> None:
1442
        """Given a list of keystores, try to pair each with a connected hardware device.
1443

1444
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1445
        try to pair each keystore individually. Consider the following scenario:
1446
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1447
        - assume none of the devices are paired yet
1448
        1. if we tried to individually pair keystores, we might try with ks1 first
1449
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1450
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1451
             which is confusing and error-prone. It's especially problematic if the hw device does
1452
             not support labels (such as Ledger), as then the user cannot easily distinguish
1453
             same-type devices. (see #4199)
1454
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1455
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1456
        """
1457
        from .keystore import Hardware_KeyStore
×
1458
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1459
        if not keystores:
×
1460
            return
×
1461
        if devices is None:
×
1462
            devices = self.scan_devices()
×
1463
        # first pair with all devices that can be auto-selected
1464
        for ks in keystores:
×
1465
            try:
×
1466
                ks.get_client(
×
1467
                    force_pair=True,
1468
                    allow_user_interaction=False,
1469
                    devices=devices,
1470
                )
1471
            except UserCancelled:
×
1472
                pass
×
1473
        if allow_user_interaction:
×
1474
            # now do manual selections
1475
            for ks in keystores:
×
1476
                try:
×
1477
                    ks.get_client(
×
1478
                        force_pair=True,
1479
                        allow_user_interaction=True,
1480
                        devices=devices,
1481
                    )
1482
                except UserCancelled:
×
1483
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc