• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6530579539165184

22 May 2025 09:48PM UTC coverage: 59.521%. Remained the same
6530579539165184

push

CirrusCI

SomberNight
plugin: _execute_commands_in_subprocess: make sure pipes get closed

0 of 4 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

21611 of 36308 relevant lines covered (59.52%)

2.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.0
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException, ChoiceItem)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    # TODO: use XDG Base Directory Specification instead of hardcoding /etc
76
    keyfile_posix = '/etc/electrum/plugins_key'
5✔
77
    keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
5✔
78

79
    @profiler
5✔
80
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
81
        self.config = config
5✔
82
        self.cmd_only = cmd_only  # type: bool
5✔
83
        self.internal_plugin_metadata = {}
5✔
84
        self.external_plugin_metadata = {}
5✔
85
        if cmd_only:
5✔
86
            # only import the command modules of plugins
87
            Logger.__init__(self)
×
88
            self.find_plugins()
×
89
            self.load_plugins()
×
90
            return
×
91
        DaemonThread.__init__(self)
5✔
92
        self.device_manager = DeviceMgr(config)
5✔
93
        self.name = 'Plugins'  # set name of thread
5✔
94
        self.hw_wallets = {}
5✔
95
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
96
        self.gui_name = gui_name
5✔
97
        self.find_plugins()
5✔
98
        self.load_plugins()
5✔
99
        self.add_jobs(self.device_manager.thread_jobs())
5✔
100
        self.start()
5✔
101

102
    @property
5✔
103
    def descriptions(self):
5✔
104
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
105

106
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
107
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
108
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
109
        for loader, name, ispkg in iter_modules:
5✔
110
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
111
            #       once as data and once as code. To honor the "no duplicates" rule below,
112
            #       we exclude the ones packaged as *code*, here:
113
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
114
                continue
×
115
            module_path = os.path.join(pkg_path, name)
5✔
116
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
117
                continue
×
118
            try:
5✔
119
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
120
                    d = json.load(f)
5✔
121
            except FileNotFoundError:
×
122
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
123
                continue
×
124
            if 'fullname' not in d:
5✔
125
                continue
×
126
            d['path'] = module_path
5✔
127
            if not self.cmd_only:
5✔
128
                gui_good = self.gui_name in d.get('available_for', [])
5✔
129
                if not gui_good:
5✔
130
                    continue
5✔
131
                details = d.get('registers_wallet_type')
5✔
132
                if details:
5✔
133
                    self.register_wallet_type(name, gui_good, details)
5✔
134
                details = d.get('registers_keystore')
5✔
135
                if details:
5✔
136
                    self.register_keystore(name, gui_good, details)
5✔
137
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
138
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
139
                _logger.info(f"duplicate plugins? for {name=}")
×
140
                continue
×
141
            if not external:
5✔
142
                self.internal_plugin_metadata[name] = d
5✔
143
            else:
144
                self.external_plugin_metadata[name] = d
×
145

146
    @staticmethod
5✔
147
    def exec_module_from_spec(spec, path: str):
5✔
148
        if prev_fail := _exec_module_failure.get(path):
5✔
149
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
150
        try:
5✔
151
            module = importlib.util.module_from_spec(spec)
5✔
152
            # sys.modules needs to be modified for relative imports to work
153
            # see https://stackoverflow.com/a/50395128
154
            sys.modules[path] = module
5✔
155
            spec.loader.exec_module(module)
5✔
156
        except Exception as e:
×
157
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
158
            # so the import system knows it failed. If called again for the same plugin, we do not
159
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
160
            # might have defined commands).
161
            _exec_module_failure[path] = e
×
162
            if path in sys.modules:
×
163
                sys.modules.pop(path, None)
×
164
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
165
        return module
5✔
166

167
    def find_plugins(self):
5✔
168
        internal_plugins_path = (self.pkgpath, False)
5✔
169
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
170
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
171
            if pkg_path and os.path.exists(pkg_path):
5✔
172
                if not external:
5✔
173
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
174
                else:
175
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
176

177
    def load_plugins(self):
5✔
178
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
179
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
180
                try:
×
181
                    if self.cmd_only:  # only load init method to register commands
×
182
                        self.maybe_load_plugin_init_method(name)
×
183
                    else:
184
                        self.load_plugin_by_name(name)
×
185
                except BaseException as e:
×
186
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
187

188
    def _has_root_permissions(self, path):
5✔
189
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
190

191
    def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
5✔
192
        if sys.platform in ['windows', 'win32']:
×
193
            keyfile_path = self.keyfile_windows
×
194
            keyfile_help = _('This file can be edited with Regdit')
×
195
        elif 'ANDROID_DATA' in os.environ:
×
196
            raise Exception('platform not supported')
×
197
        else:
198
            # treat unknown platforms and macOS as linux-like
199
            keyfile_path = self.keyfile_posix
×
200
            keyfile_help = "" if not key_hex else "".join([
×
201
                                         _('The file must have root permissions'),
202
                                         ".\n\n",
203
                                         _("To set it you can also use the Auto-Setup or run "
204
                                           "the following terminal command"),
205
                                         ":\n\n",
206
                                         f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
207
            ])
208
        return keyfile_path, keyfile_help
×
209

210
    def try_auto_key_setup(self, pubkey_hex: str) -> bool:
5✔
211
        """Can be called from the GUI to store the plugin pubkey as root/admin user"""
212
        try:
×
213
            if sys.platform in ['windows', 'win32']:
×
214
                self._write_key_to_regedit_windows(pubkey_hex)
×
215
            elif 'ANDROID_DATA' in os.environ:
×
216
                raise Exception('platform not supported')
×
217
            elif sys.platform.startswith('darwin'):  # macOS
×
218
                self._write_key_to_root_file_macos(pubkey_hex)
×
219
            else:
220
                self._write_key_to_root_file_linux(pubkey_hex)
×
221
        except Exception:
×
222
            self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
×
223
            return False
×
224
        return True
×
225

226
    def try_auto_key_reset(self) -> bool:
5✔
227
        try:
×
228
            if sys.platform in ['windows', 'win32']:
×
229
                self._delete_plugin_key_from_windows_registry()
×
230
            elif 'ANDROID_DATA' in os.environ:
×
231
                raise Exception('platform not supported')
×
232
            elif sys.platform.startswith('darwin'):  # macOS
×
233
                self._delete_macos_plugin_keyfile()
×
234
            else:
235
                self._delete_linux_plugin_keyfile()
×
236
        except Exception:
×
237
            self.logger.exception(f'auto-reset of plugin key failed')
×
238
            return False
×
239
        return True
×
240

241
    def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
5✔
242
        """creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
243
        dir_path: str = os.path.dirname(self.keyfile_posix)
×
244
        sh_command = (
×
245
                     f"mkdir -p {dir_path} "  # create the /etc/electrum dir
246
                     f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} "  # write the key to the file
247
                     f"&& chmod 644 {self.keyfile_posix} "  # set read permissions for the file
248
                     f"&& chmod 755 {dir_path}"  # set read permissions for the dir
249
        )
250
        return sh_command
×
251

252
    @staticmethod
5✔
253
    def _get_macos_osascript_command(commands: List[str]) -> List[str]:
5✔
254
        """
255
        Inspired by
256
        https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
257
        Wraps the given commands in a macOS osascript command to prompt for root permissions.
258
        """
259
        from shlex import quote
×
260

261
        def quote_shell(args):
×
262
            return " ".join(quote(arg) for arg in args)
×
263

264
        def quote_applescript(string):
×
265
            charmap = {
×
266
                "\n": "\\n",
267
                "\r": "\\r",
268
                "\t": "\\t",
269
                "\"": "\\\"",
270
                "\\": "\\\\",
271
            }
272
            return '"%s"' % "".join(charmap.get(char, char) for char in string)
×
273

274
        commands = [
×
275
            "osascript",
276
            "-e",
277
            "do shell script %s "
278
            "with administrator privileges "
279
            "without altering line endings"
280
            % quote_applescript(quote_shell(commands))
281
        ]
282
        return commands
×
283

284
    @staticmethod
5✔
285
    def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
5✔
286
        """
287
        Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
288
        """
289
        # has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
290
        # for the result of the process (returns no process handle)
291
        from ctypes import byref, sizeof, windll, Structure, c_ulong
×
292
        from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
×
293

294
        # https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
295
        class SHELLEXECUTEINFO(Structure):
×
296
            _fields_ = [
×
297
                ('cbSize', DWORD),
298
                ('fMask', c_ulong),
299
                ('hwnd', HWND),
300
                ('lpVerb', LPCWSTR),
301
                ('lpFile', LPCWSTR),
302
                ('lpParameters', LPCWSTR),
303
                ('lpDirectory', LPCWSTR),
304
                ('nShow', c_ulong),
305
                ('hInstApp', HINSTANCE),
306
                ('lpIDList', c_ulong),
307
                ('lpClass', LPCWSTR),
308
                ('hkeyClass', HKEY),
309
                ('dwHotKey', DWORD),
310
                ('hIcon', HANDLE),
311
                ('hProcess', HANDLE)
312
            ]
313

314
        info = SHELLEXECUTEINFO()
×
315
        info.cbSize = sizeof(SHELLEXECUTEINFO)
×
316
        info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
×
317
        info.hwnd = None
×
318
        info.lpVerb = 'runas'  # run as administrator
×
319
        info.lpFile = 'reg.exe'  # the executable to run
×
320
        info.lpParameters = reg_exe_command  # the registry edit command
×
321
        info.lpDirectory = None
×
322
        info.nShow = 1
×
323

324
        # Execute and wait
325
        if not windll.shell32.ShellExecuteExW(byref(info)):
×
326
            error = windll.kernel32.GetLastError()
×
327
            raise Exception(f'Error executing registry command: {error}')
×
328

329
        # block until the process is done or 5 sec timeout
330
        windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
×
331

332
        # Close handle
333
        windll.kernel32.CloseHandle(info.hProcess)
×
334

335
    @staticmethod
5✔
336
    def _execute_commands_in_subprocess(commands: List[str]) -> None:
5✔
337
        """
338
        Executes the given commands in a subprocess and asserts that it was successful.
339
        """
340
        import subprocess
×
NEW
341
        with subprocess.Popen(
×
342
            commands,
343
            stdout=subprocess.PIPE,
344
            stderr=subprocess.PIPE,
345
            text=True,
346
        ) as process:
NEW
347
            stdout, stderr = process.communicate()
×
NEW
348
            if process.returncode != 0:
×
NEW
349
                raise Exception(f'error executing command ({process.returncode}): {stderr}')
×
350

351
    def _write_key_to_root_file_linux(self, key_hex: str) -> None:
5✔
352
        """
353
        Spawns a pkexec subprocess to write the key to a file with root permissions.
354
        This will open an OS dialog asking for the root password. Can only succeed if
355
        the system has polkit installed.
356
        """
357
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
358

359
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
360
        commands = ['pkexec', 'sh', '-c', sh_command]
×
361
        self._execute_commands_in_subprocess(commands)
×
362

363
        # check if the key was written correctly
364
        with open(self.keyfile_posix, 'r') as f:
×
365
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
366
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
367

368
    def _delete_linux_plugin_keyfile(self) -> None:
5✔
369
        """
370
        Deletes the root owned key file at self.keyfile_posix.
371
        """
372
        if not os.path.exists(self.keyfile_posix):
×
373
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
374
            return
×
375
        if not self._has_root_permissions(self.keyfile_posix):
×
376
            os.unlink(self.keyfile_posix)
×
377
            return
×
378

379
        # use pkexec to delete the file as root user
380
        commands = ['pkexec', 'rm', self.keyfile_posix]
×
381
        self._execute_commands_in_subprocess(commands)
×
382
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
383

384
    def _write_key_to_root_file_macos(self, key_hex: str) -> None:
5✔
385
        assert os.path.exists("/etc"), "System does not have /etc directory"
×
386

387
        sh_command: str = self._posix_plugin_key_creation_command(key_hex)
×
388
        macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
×
389

390
        self._execute_commands_in_subprocess(macos_commands)
×
391
        with open(self.keyfile_posix, 'r') as f:
×
392
            assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
×
393
        self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
×
394

395
    def _delete_macos_plugin_keyfile(self) -> None:
5✔
396
        if not os.path.exists(self.keyfile_posix):
×
397
            self.logger.debug(f'file {self.keyfile_posix} does not exist')
×
398
            return
×
399
        if not self._has_root_permissions(self.keyfile_posix):
×
400
            os.unlink(self.keyfile_posix)
×
401
            return
×
402
        # use osascript to delete the file as root user
403
        macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
×
404
        self._execute_commands_in_subprocess(macos_commands)
×
405
        assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
×
406

407
    def _write_key_to_regedit_windows(self, key_hex: str) -> None:
5✔
408
        """
409
        Writes the key to the Windows registry with windows UAC prompt.
410
        """
411
        from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
×
412

413
        value_type = 'REG_SZ'
×
414
        command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
×
415

416
        self._run_win_regedit_as_admin(command)
×
417

418
        # check if the key was written correctly
419
        with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
420
            with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
×
421
                assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
×
422
        self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
×
423

424
    def _delete_plugin_key_from_windows_registry(self) -> None:
5✔
425
        """
426
        Deletes the PluginsKey dir in the Windows registry.
427
        """
428
        from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
×
429

430
        command = f'delete "{self.keyfile_windows}" /f'
×
431
        self._run_win_regedit_as_admin(command)
×
432

433
        try:
×
434
            # do a sanity check to see if the key has been deleted
435
            with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
×
436
                with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
×
437
                    raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
×
438
        except FileNotFoundError:
×
439
            pass
×
440

441
    def create_new_key(self, password:str) -> str:
5✔
442
        salt = os.urandom(32)
×
443
        privkey = self.derive_privkey(password, salt)
×
444
        pubkey = privkey.get_public_key_bytes()
×
445
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
446
        return key.hex()
×
447

448
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
449
        """
450
        returns pubkey, salt
451
        returns None, None if the pubkey has not been set
452
        """
453
        if sys.platform in ['windows', 'win32']:
×
454
            import winreg
×
455
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
456
                try:
×
457
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
458
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
459
                except Exception as e:
×
460
                    self.logger.info(f'winreg error: {e}')
×
461
                    return None, None
×
462
        elif 'ANDROID_DATA' in os.environ:
×
463
            return None, None
×
464
        else:
465
            # treat unknown platforms as linux-like
466
            if not os.path.exists(self.keyfile_posix):
×
467
                return None, None
×
468
            if not self._has_root_permissions(self.keyfile_posix):
×
469
                return
×
470
            with open(self.keyfile_posix) as f:
×
471
                key_hex = f.read()
×
472
        try:
×
473
            key = bytes.fromhex(key_hex)
×
474
            version = key[0]
×
475
        except Exception:
×
476
            self.logger.exception(f'{key_hex=} invalid')
×
477
            return None, None
×
478
        if version != PLUGIN_PASSWORD_VERSION:
×
479
            self.logger.info(f'unknown plugin password version: {version}')
×
480
            return None, None
×
481
        # all good
482
        salt = key[1:1+32]
×
483
        pubkey = key[1+32:]
×
484
        return pubkey, salt
×
485

486
    def get_external_plugin_dir(self) -> str:
5✔
487
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
488
        if not os.path.exists(pkg_path):
5✔
489
            os.mkdir(pkg_path)
5✔
490
        return pkg_path
5✔
491

492
    async def download_external_plugin(self, url: str) -> str:
5✔
493
        filename = os.path.basename(urlparse(url).path)
×
494
        pkg_path = self.get_external_plugin_dir()
×
495
        path = os.path.join(pkg_path, filename)
×
496
        if os.path.exists(path):
×
497
            raise FileExistsError(f"Plugin {filename} already exists at {path}")
×
498
        async with aiohttp.ClientSession() as session:
×
499
            async with session.get(url) as resp:
×
500
                if resp.status == 200:
×
501
                    with open(path, 'wb') as fd:
×
502
                        async for chunk in resp.content.iter_chunked(10):
×
503
                            fd.write(chunk)
×
504
        return path
×
505

506
    def read_manifest(self, path) -> dict:
5✔
507
        """ return json dict """
508
        with zipfile_lib.ZipFile(path) as file:
×
509
            for filename in file.namelist():
×
510
                if filename.endswith('manifest.json'):
×
511
                    break
×
512
            else:
513
                raise Exception('could not find manifest.json in zip archive')
×
514
            with file.open(filename, 'r') as f:
×
515
                manifest = json.load(f)
×
516
                manifest['path'] = path  # external, path of the zipfile
×
517
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
518
                manifest['is_zip'] = True
×
519
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
520
                return manifest
×
521

522
    def zip_plugin_path(self, name) -> str:
5✔
523
        path = self.get_metadata(name)['path']
×
524
        filename = os.path.basename(path)
×
525
        if name in self.internal_plugin_metadata:
×
526
            pkg_path = self.pkgpath
×
527
        else:
528
            pkg_path = self.get_external_plugin_dir()
×
529
        return os.path.join(pkg_path, filename)
×
530

531
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
532
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
533
        if pkg_path is None:
5✔
534
            return
×
535
        for filename in os.listdir(pkg_path):
5✔
536
            path = os.path.join(pkg_path, filename)
×
537
            if not filename.endswith('.zip'):
×
538
                continue
×
539
            try:
×
540
                d = self.read_manifest(path)
×
541
                name = d['name']
×
542
            except Exception:
×
543
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
544
                continue
×
545
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
546
                self.logger.info(f"duplicate plugins for {name=}")
×
547
                continue
×
548
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
549
                continue
×
550
            min_version = d.get('min_electrum_version')
×
551
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
552
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
553
                continue
×
554
            max_version = d.get('max_electrum_version')
×
555
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
556
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
557
                continue
×
558

559
            if not self.cmd_only:
×
560
                gui_good = self.gui_name in d.get('available_for', [])
×
561
                if not gui_good:
×
562
                    continue
×
563
                if 'fullname' not in d:
×
564
                    continue
×
565
                details = d.get('registers_keystore')
×
566
                if details:
×
567
                    self.register_keystore(name, gui_good, details)
×
568
            if external:
×
569
                self.external_plugin_metadata[name] = d
×
570
            else:
571
                self.internal_plugin_metadata[name] = d
×
572

573
    def get(self, name):
5✔
574
        return self.plugins.get(name)
×
575

576
    def count(self):
5✔
577
        return len(self.plugins)
×
578

579
    def load_plugin(self, name) -> 'BasePlugin':
5✔
580
        """Imports the code of the given plugin.
581
        note: can be called from any thread.
582
        """
583
        if self.get_metadata(name):
5✔
584
            return self.load_plugin_by_name(name)
5✔
585
        else:
586
            raise Exception(f"could not find plugin {name!r}")
×
587

588
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
589
        """Loads the __init__.py module of the plugin if it is not already loaded."""
590
        base_name = ('electrum_external_plugins.' if self.is_external(name) else 'electrum.plugins.') + name
5✔
591
        if base_name not in sys.modules:
5✔
592
            metadata = self.get_metadata(name)
5✔
593
            is_zip = metadata.get('is_zip', False)
5✔
594
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
595
            if not is_zip:
5✔
596
                if self.is_external(name):
5✔
597
                    # this branch is deprecated: external plugins are always zip files
598
                    path = os.path.join(metadata['path'], '__init__.py')
×
599
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
600
                else:
601
                    init_spec = importlib.util.find_spec(base_name)
5✔
602
            else:
603
                zipfile = zipimport.zipimporter(metadata['path'])
×
604
                dirname = metadata['dirname']
×
605
                init_spec = zipfile.find_spec(dirname)
×
606

607
            self.exec_module_from_spec(init_spec, base_name)
5✔
608

609
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
610
        if name in self.plugins:
5✔
611
            return self.plugins[name]
5✔
612
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
613
        self.maybe_load_plugin_init_method(name)
5✔
614
        is_external = self.is_external(name)
5✔
615
        if is_external and not self.is_authorized(name):
5✔
616
            self.logger.info(f'plugin not authorized {name}')
×
617
            return
×
618
        if not is_external:
5✔
619
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
620
        else:
621
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
622

623
        spec = importlib.util.find_spec(full_name)
5✔
624
        if spec is None:
5✔
625
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
626
        try:
5✔
627
            module = self.exec_module_from_spec(spec, full_name)
5✔
628
            plugin = module.Plugin(self, self.config, name)
5✔
629
        except Exception as e:
×
630
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
631
        self.add_jobs(plugin.thread_jobs())
5✔
632
        self.plugins[name] = plugin
5✔
633
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
634
        return plugin
5✔
635

636
    def close_plugin(self, plugin):
5✔
637
        self.remove_jobs(plugin.thread_jobs())
×
638

639
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
640
        from hashlib import pbkdf2_hmac
×
641
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
642
        return ECPrivkey(secret)
×
643

644
    def uninstall(self, name: str):
5✔
645
        if self.config.get(f'plugins.{name}'):
×
646
            self.config.set_key(f'plugins.{name}', None)
×
647
        if name in self.external_plugin_metadata:
×
648
            zipfile = self.zip_plugin_path(name)
×
649
            os.unlink(zipfile)
×
650
            self.external_plugin_metadata.pop(name)
×
651

652
    def is_internal(self, name) -> bool:
5✔
653
        return name in self.internal_plugin_metadata
×
654

655
    def is_external(self, name) -> bool:
5✔
656
        return name in self.external_plugin_metadata
5✔
657

658
    def is_auto_loaded(self, name):
5✔
659
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
660
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
661

662
    def is_installed(self, name) -> bool:
5✔
663
        """an external plugin may be installed but not authorized """
664
        return (name in self.internal_plugin_metadata or name in self.external_plugin_metadata)
×
665

666
    def is_authorized(self, name) -> bool:
5✔
667
        if name in self.internal_plugin_metadata:
×
668
            return True
×
669
        if name not in self.external_plugin_metadata:
×
670
            return False
×
671
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
672
        if not pubkey_bytes:
×
673
            return False
×
674
        if not self.is_plugin_zip(name):
×
675
            return False
×
676
        filename = self.zip_plugin_path(name)
×
677
        plugin_hash = get_file_hash256(filename)
×
678
        sig = self.config.get(f'plugins.{name}.authorized')
×
679
        if not sig:
×
680
            return False
×
681
        pubkey = ECPubkey(pubkey_bytes)
×
682
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
683

684
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
685
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
686
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
687
        plugin_hash = get_file_hash256(filename)
×
688
        sig = privkey.ecdsa_sign(plugin_hash)
×
689
        value = sig.hex()
×
690
        self.config.set_key(f'plugins.{name}.authorized', value)
×
691
        self.config.set_key(f'plugins.{name}.enabled', True)
×
692

693
    def enable(self, name: str) -> 'BasePlugin':
5✔
694
        self.config.enable_plugin(name)
×
695
        p = self.get(name)
×
696
        if p:
×
697
            return p
×
698
        return self.load_plugin(name)
×
699

700
    def disable(self, name: str) -> None:
5✔
701
        self.config.disable_plugin(name)
×
702
        p = self.get(name)
×
703
        if not p:
×
704
            return
×
705
        self.plugins.pop(name)
×
706
        p.close()
×
707
        self.logger.info(f"closed {name}")
×
708

709
    @classmethod
5✔
710
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
711
        return key.startswith('plugins.')
×
712

713
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
714
        d = self.descriptions.get(name)
×
715
        if not d:
×
716
            return False
×
717
        deps = d.get('requires', [])
×
718
        for dep, s in deps:
×
719
            try:
×
720
                __import__(dep)
×
721
            except ImportError as e:
×
722
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
723
                return False
×
724
        requires = d.get('requires_wallet_type', [])
×
725
        return not requires or wallet.wallet_type in requires
×
726

727
    def get_hardware_support(self):
5✔
728
        out = []
×
729
        for name, (gui_good, details) in self.hw_wallets.items():
×
730
            if gui_good:
×
731
                try:
×
732
                    p = self.get_plugin(name)
×
733
                    if p.is_available():
×
734
                        out.append(HardwarePluginToScan(name=name,
×
735
                                                        description=details[2],
736
                                                        plugin=p,
737
                                                        exception=None))
738
                except Exception as e:
×
739
                    self.logger.exception(f"cannot load plugin for: {name}")
×
740
                    out.append(HardwarePluginToScan(name=name,
×
741
                                                    description=details[2],
742
                                                    plugin=None,
743
                                                    exception=e))
744
        return out
×
745

746
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
747
        from .wallet import register_wallet_type, register_constructor
5✔
748
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
749

750
        def loader():
5✔
751
            plugin = self.get_plugin(name)
5✔
752
            register_constructor(wallet_type, plugin.wallet_class)
5✔
753
        register_wallet_type(wallet_type)
5✔
754
        plugin_loaders[wallet_type] = loader
5✔
755

756
    def register_keystore(self, name, gui_good, details):
5✔
757
        from .keystore import register_keystore
5✔
758

759
        def dynamic_constructor(d):
5✔
760
            return self.get_plugin(name).keystore_class(d)
5✔
761
        if details[0] == 'hardware':
5✔
762
            self.hw_wallets[name] = (gui_good, details)
5✔
763
            self.logger.info(f"registering hardware {name}: {details}")
5✔
764
            register_keystore(details[1], dynamic_constructor)
5✔
765

766
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
767
        if name not in self.plugins:
5✔
768
            self.load_plugin(name)
5✔
769
        return self.plugins[name]
5✔
770

771
    def is_plugin_zip(self, name: str) -> bool:
5✔
772
        """Returns True if the plugin is a zip file"""
773
        if (metadata := self.get_metadata(name)) is None:
×
774
            return False
×
775
        return metadata.get('is_zip', False)
×
776

777
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
778
        """Returns the metadata of the plugin"""
779
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
780
        if not metadata:
5✔
781
            return None
×
782
        return metadata
5✔
783

784
    def run(self):
5✔
785
        while self.is_running():
5✔
786
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
787
            self.run_jobs()
5✔
788
        self.on_stop()
5✔
789

790
    def read_file(self, name: str, filename: str) -> bytes:
5✔
791
        if self.is_plugin_zip(name):
×
792
            plugin_filename = self.zip_plugin_path(name)
×
793
            metadata = self.external_plugin_metadata[name]
×
794
            dirname = metadata['dirname']
×
795
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
796
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
797
                    return myfile.read()
×
798
        elif name in self.internal_plugin_metadata:
×
799
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
800
            with open(path, 'rb') as myfile:
×
801
                return myfile.read()
×
802
        else:
803
            # no icon
804
            return None
×
805

806
def get_file_hash256(path: str) -> bytes:
5✔
807
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
808
    with open(path, 'rb') as f:
×
809
        return sha256(f.read())
×
810

811

812
def hook(func):
5✔
813
    hook_names.add(func.__name__)
5✔
814
    return func
5✔
815

816

817
def run_hook(name, *args):
5✔
818
    results = []
5✔
819
    f_list = hooks.get(name, [])
5✔
820
    for p, f in f_list:
5✔
821
        if p.is_enabled():
5✔
822
            try:
5✔
823
                r = f(*args)
5✔
824
            except Exception:
×
825
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
826
                r = False
×
827
            if r:
5✔
828
                results.append(r)
×
829

830
    if results:
5✔
831
        assert len(results) == 1, results
×
832
        return results[0]
×
833

834

835
class BasePlugin(Logger):
5✔
836

837
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
838
        self.parent = parent  # type: Plugins  # The plugins object
5✔
839
        self.name = name
5✔
840
        self.config = config
5✔
841
        Logger.__init__(self)
5✔
842
        # add self to hooks
843
        for k in dir(self):
5✔
844
            if k in hook_names:
5✔
845
                l = hooks.get(k, [])
5✔
846
                l.append((self, getattr(self, k)))
5✔
847
                hooks[k] = l
5✔
848

849
    def __str__(self):
5✔
850
        return self.name
×
851

852
    def close(self):
5✔
853
        # remove self from hooks
854
        for attr_name in dir(self):
×
855
            if attr_name in hook_names:
×
856
                # found attribute in self that is also the name of a hook
857
                l = hooks.get(attr_name, [])
×
858
                try:
×
859
                    l.remove((self, getattr(self, attr_name)))
×
860
                except ValueError:
×
861
                    # maybe attr name just collided with hook name and was not hook
862
                    continue
×
863
                hooks[attr_name] = l
×
864
        self.parent.close_plugin(self)
×
865
        self.on_close()
×
866

867
    def on_close(self):
5✔
868
        pass
×
869

870
    def requires_settings(self) -> bool:
5✔
871
        return False
×
872

873
    def thread_jobs(self):
5✔
874
        return []
5✔
875

876
    def is_enabled(self):
5✔
877
        if not self.is_available():
×
878
            return False
×
879
        if not self.parent.is_authorized(self.name):
×
880
            return False
×
881
        return self.config.is_plugin_enabled(self.name)
×
882

883
    def is_available(self):
5✔
884
        return True
×
885

886
    def can_user_disable(self):
5✔
887
        return True
×
888

889
    def settings_widget(self, window):
5✔
890
        raise NotImplementedError()
×
891

892
    def settings_dialog(self, window):
5✔
893
        raise NotImplementedError()
×
894

895
    def read_file(self, filename: str) -> bytes:
5✔
896
        return self.parent.read_file(self.name, filename)
×
897

898
    def get_storage(self, wallet: 'Abstract_Wallet') -> dict:
5✔
899
        """Returns a dict which is persisted in the per-wallet database."""
900
        plugin_storage = wallet.db.get_plugin_storage()
×
901
        return plugin_storage.setdefault(self.name, {})
×
902

903
class DeviceUnpairableError(UserFacingException): pass
5✔
904
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
905
class CannotAutoSelectDevice(Exception): pass
5✔
906

907

908
class Device(NamedTuple):
5✔
909
    path: Union[str, bytes]
5✔
910
    interface_number: int
5✔
911
    id_: str
5✔
912
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
913
    usage_page: int
5✔
914
    transport_ui_string: str
5✔
915

916

917
class DeviceInfo(NamedTuple):
5✔
918
    device: Device
5✔
919
    label: Optional[str] = None
5✔
920
    initialized: Optional[bool] = None
5✔
921
    exception: Optional[Exception] = None
5✔
922
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
923
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
924
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
925

926
    def label_for_device_select(self) -> str:
5✔
927
        return (
×
928
            "{label} ({maybe_model}{init}, {transport})"
929
            .format(
930
                label=self.label or _("An unnamed {}").format(self.plugin_name),
931
                init=(_("initialized") if self.initialized else _("wiped")),
932
                transport=self.device.transport_ui_string,
933
                maybe_model=f"{self.model_name}, " if self.model_name else ""
934
            )
935
        )
936

937

938
class HardwarePluginToScan(NamedTuple):
5✔
939
    name: str
5✔
940
    description: str
5✔
941
    plugin: Optional['HW_PluginBase']
5✔
942
    exception: Optional[Exception]
5✔
943

944

945
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
946

947

948
# hidapi is not thread-safe
949
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
950
#     https://github.com/libusb/hidapi/issues/45
951
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
952
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
953
# It is not entirely clear to me, exactly what is safe and what isn't, when
954
# using multiple threads...
955
# Hence, we use a single thread for all device communications, including
956
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
957
# the following thread:
958
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
959
    max_workers=1,
960
    thread_name_prefix='hwd_comms_thread'
961
)
962

963
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
964
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
965
# To keep it simple, let's just import it now, as we are likely in the main thread here.
966
if threading.current_thread() is not threading.main_thread():
5✔
967
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
968
try:
5✔
969
    import hid
5✔
970
except ImportError:
5✔
971
    pass
5✔
972

973

974
T = TypeVar('T')
5✔
975

976

977
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
978
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
979
        return func()
×
980
    else:
981
        fut = _hwd_comms_executor.submit(func)
×
982
        return fut.result()
×
983
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
984

985

986
def runs_in_hwd_thread(func):
5✔
987
    @wraps(func)
5✔
988
    def wrapper(*args, **kwargs):
5✔
989
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
990
    return wrapper
5✔
991

992

993
def assert_runs_in_hwd_thread():
5✔
994
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
995
        raise Exception("must only be called from HWD communication thread")
×
996

997

998
class DeviceMgr(ThreadJob):
5✔
999
    """Manages hardware clients.  A client communicates over a hardware
1000
    channel with the device.
1001

1002
    In addition to tracking device HID IDs, the device manager tracks
1003
    hardware wallets and manages wallet pairing.  A HID ID may be
1004
    paired with a wallet when it is confirmed that the hardware device
1005
    matches the wallet, i.e. they have the same master public key.  A
1006
    HID ID can be unpaired if e.g. it is wiped.
1007

1008
    Because of hotplugging, a wallet must request its client
1009
    dynamically each time it is required, rather than caching it
1010
    itself.
1011

1012
    The device manager is shared across plugins, so just one place
1013
    does hardware scans when needed.  By tracking HID IDs, if a device
1014
    is plugged into a different port the wallet is automatically
1015
    re-paired.
1016

1017
    Wallets are informed on connect / disconnect events.  It must
1018
    implement connected(), disconnected() callbacks.  Being connected
1019
    implies a pairing.  Callbacks can happen in any thread context,
1020
    and we do them without holding the lock.
1021

1022
    Confusingly, the HID ID (serial number) reported by the HID system
1023
    doesn't match the device ID reported by the device itself.  We use
1024
    the HID IDs.
1025

1026
    This plugin is thread-safe.  Currently only devices supported by
1027
    hidapi are implemented."""
1028

1029
    def __init__(self, config: SimpleConfig):
5✔
1030
        ThreadJob.__init__(self)
5✔
1031
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
1032
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
1033
        # A client->id_ map. Needs self.lock.
1034
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
1035
        # What we recognise.  (vendor_id, product_id) -> Plugin
1036
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
1037
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
1038
        # Custom enumerate functions for devices we don't know about.
1039
        self._enumerate_func = set()  # Needs self.lock.
5✔
1040

1041
        self.lock = threading.RLock()
5✔
1042

1043
        self.config = config
5✔
1044

1045
    def thread_jobs(self):
5✔
1046
        # Thread job to handle device timeouts
1047
        return [self]
5✔
1048

1049
    def run(self):
5✔
1050
        '''Handle device timeouts.  Runs in the context of the Plugins
1051
        thread.'''
1052
        with self.lock:
5✔
1053
            clients = list(self.clients.keys())
5✔
1054
        cutoff = time.time() - self.config.get_session_timeout()
5✔
1055
        for client in clients:
5✔
1056
            client.timeout(cutoff)
×
1057

1058
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
1059
        for pair in device_pairs:
×
1060
            self._recognised_hardware[pair] = plugin
×
1061

1062
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
1063
        for vendor_id in vendor_ids:
×
1064
            self._recognised_vendor[vendor_id] = plugin
×
1065

1066
    def register_enumerate_func(self, func):
5✔
1067
        with self.lock:
×
1068
            self._enumerate_func.add(func)
×
1069

1070
    @runs_in_hwd_thread
5✔
1071
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
1072
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
1073
        # Get from cache first
1074
        client = self._client_by_id(device.id_)
×
1075
        if client:
×
1076
            return client
×
1077
        client = plugin.create_client(device, handler)
×
1078
        if client:
×
1079
            self.logger.info(f"Registering {client}")
×
1080
            with self.lock:
×
1081
                self.clients[client] = device.id_
×
1082
        return client
×
1083

1084
    def id_by_pairing_code(self, pairing_code):
5✔
1085
        with self.lock:
×
1086
            return self.pairing_code_to_id.get(pairing_code)
×
1087

1088
    def pairing_code_by_id(self, id_):
5✔
1089
        with self.lock:
×
1090
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
1091
                if id2 == id_:
×
1092
                    return pairing_code
×
1093
            return None
×
1094

1095
    def unpair_pairing_code(self, pairing_code):
5✔
1096
        with self.lock:
×
1097
            if pairing_code not in self.pairing_code_to_id:
×
1098
                return
×
1099
            _id = self.pairing_code_to_id.pop(pairing_code)
×
1100
        self._close_client(_id)
×
1101

1102
    def unpair_id(self, id_):
5✔
1103
        pairing_code = self.pairing_code_by_id(id_)
×
1104
        if pairing_code:
×
1105
            self.unpair_pairing_code(pairing_code)
×
1106
        else:
1107
            self._close_client(id_)
×
1108

1109
    def _close_client(self, id_):
5✔
1110
        with self.lock:
×
1111
            client = self._client_by_id(id_)
×
1112
            self.clients.pop(client, None)
×
1113
        if client:
×
1114
            client.close()
×
1115

1116
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
1117
        with self.lock:
×
1118
            for client, client_id in self.clients.items():
×
1119
                if client_id == id_:
×
1120
                    return client
×
1121
        return None
×
1122

1123
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
1124
        '''Returns a client for the device ID if one is registered.  If
1125
        a device is wiped or in bootloader mode pairing is impossible;
1126
        in such cases we communicate by device ID and not wallet.'''
1127
        if scan_now:
×
1128
            self.scan_devices()
×
1129
        return self._client_by_id(id_)
×
1130

1131
    @runs_in_hwd_thread
5✔
1132
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
1133
                            keystore: 'Hardware_KeyStore',
1134
                            force_pair: bool, *,
1135
                            devices: Sequence['Device'] = None,
1136
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
1137
        self.logger.info("getting client for keystore")
×
1138
        if handler is None:
×
1139
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
1140
        handler.update_status(False)
×
1141
        pcode = keystore.pairing_code()
×
1142
        client = None
×
1143
        # search existing clients first (fast-path)
1144
        if not devices:
×
1145
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
1146
        # search clients again, now allowing a (slow) scan
1147
        if client is None:
×
1148
            if devices is None:
×
1149
                devices = self.scan_devices()
×
1150
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
1151
        if client is None and force_pair:
×
1152
            try:
×
1153
                info = self.select_device(plugin, handler, keystore, devices,
×
1154
                                          allow_user_interaction=allow_user_interaction)
1155
            except CannotAutoSelectDevice:
×
1156
                pass
×
1157
            else:
1158
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
1159
        if client:
×
1160
            handler.update_status(True)
×
1161
            # note: if select_device was called, we might also update label etc here:
1162
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1163
        self.logger.info("end client for keystore")
×
1164
        return client
×
1165

1166
    def client_by_pairing_code(
5✔
1167
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
1168
        devices: Sequence['Device'],
1169
    ) -> Optional['HardwareClientBase']:
1170
        _id = self.id_by_pairing_code(pairing_code)
×
1171
        client = self._client_by_id(_id)
×
1172
        if client:
×
1173
            if type(client.plugin) != type(plugin):
×
1174
                return
×
1175
            # An unpaired client might have another wallet's handler
1176
            # from a prior scan.  Replace to fix dialog parenting.
1177
            client.handler = handler
×
1178
            return client
×
1179

1180
        for device in devices:
×
1181
            if device.id_ == _id:
×
1182
                return self.create_client(device, handler, plugin)
×
1183

1184
    def force_pair_keystore(
5✔
1185
        self,
1186
        *,
1187
        plugin: 'HW_PluginBase',
1188
        handler: 'HardwareHandlerBase',
1189
        info: 'DeviceInfo',
1190
        keystore: 'Hardware_KeyStore',
1191
    ) -> 'HardwareClientBase':
1192
        xpub = keystore.xpub
×
1193
        derivation = keystore.get_derivation_prefix()
×
1194
        assert derivation is not None
×
1195
        xtype = bip32.xpub_type(xpub)
×
1196
        client = self._client_by_id(info.device.id_)
×
1197
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
1198
            # See comment above for same code
1199
            client.handler = handler
×
1200
            # This will trigger a PIN/passphrase entry request
1201
            try:
×
1202
                client_xpub = client.get_xpub(derivation, xtype)
×
1203
            except (UserCancelled, RuntimeError):
×
1204
                # Bad / cancelled PIN / passphrase
1205
                client_xpub = None
×
1206
            if client_xpub == xpub:
×
1207
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
1208
                with self.lock:
×
1209
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
1210
                return client
×
1211

1212
        # The user input has wrong PIN or passphrase, or cancelled input,
1213
        # or it is not pairable
1214
        raise DeviceUnpairableError(
×
1215
            _('Electrum cannot pair with your {}.\n\n'
1216
              'Before you request bitcoins to be sent to addresses in this '
1217
              'wallet, ensure you can pair with your device, or that you have '
1218
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
1219
              'receive will be unspendable.').format(plugin.device))
1220

1221
    def list_pairable_device_infos(
5✔
1222
        self,
1223
        *,
1224
        handler: Optional['HardwareHandlerBase'],
1225
        plugin: 'HW_PluginBase',
1226
        devices: Sequence['Device'] = None,
1227
        include_failing_clients: bool = False,
1228
    ) -> List['DeviceInfo']:
1229
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
1230
        Already paired devices are also included, as it is okay to reuse them.
1231
        """
1232
        if not plugin.libraries_available:
×
1233
            message = plugin.get_library_not_available_message()
×
1234
            raise HardwarePluginLibraryUnavailable(message)
×
1235
        if devices is None:
×
1236
            devices = self.scan_devices()
×
1237
        infos = []
×
1238
        for device in devices:
×
1239
            if not plugin.can_recognize_device(device):
×
1240
                continue
×
1241
            try:
×
1242
                client = self.create_client(device, handler, plugin)
×
1243
                if not client:
×
1244
                    continue
×
1245
                label = client.label()
×
1246
                is_initialized = client.is_initialized()
×
1247
                soft_device_id = client.get_soft_device_id()
×
1248
                model_name = client.device_model_name()
×
1249
            except Exception as e:
×
1250
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
1251
                if include_failing_clients:
×
1252
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
1253
                continue
×
1254
            infos.append(DeviceInfo(device=device,
×
1255
                                    label=label,
1256
                                    initialized=is_initialized,
1257
                                    plugin_name=plugin.name,
1258
                                    soft_device_id=soft_device_id,
1259
                                    model_name=model_name))
1260

1261
        return infos
×
1262

1263
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1264
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1265
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1266
        """Select the device to use for keystore."""
1267
        # ideally this should not be called from the GUI thread...
1268
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
1269
        while True:
×
1270
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
1271
            if infos:
×
1272
                break
×
1273
            if not allow_user_interaction:
×
1274
                raise CannotAutoSelectDevice()
×
1275
            msg = _('Please insert your {}').format(plugin.device)
×
1276
            msg += " ("
×
1277
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1278
                msg += f"label: {keystore.label}, "
×
1279
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1280
            msg += ').\n\n{}\n\n{}'.format(
×
1281
                _('Verify the cable is connected and that '
1282
                  'no other application is using it.'),
1283
                _('Try to connect again?')
1284
            )
1285
            if not handler.yes_no_question(msg):
×
1286
                raise UserCancelled()
×
1287
            devices = None
×
1288

1289
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1290
        # method 1: select device by id
1291
        if keystore.soft_device_id:
×
1292
            for info in infos:
×
1293
                if info.soft_device_id == keystore.soft_device_id:
×
1294
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1295
                    return info
×
1296
        # method 2: select device by label
1297
        #           but only if not a placeholder label and only if there is no collision
1298
        device_labels = [info.label for info in infos]
×
1299
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1300
                and device_labels.count(keystore.label) == 1):
1301
            for info in infos:
×
1302
                if info.label == keystore.label:
×
1303
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1304
                    return info
×
1305
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1306
        #           saved for keystore anyway, select it
1307
        if (len(infos) == 1
×
1308
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1309
                and keystore.soft_device_id is None):
1310
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1311
            return infos[0]
×
1312

1313
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1314
        if not allow_user_interaction:
×
1315
            raise CannotAutoSelectDevice()
×
1316
        # ask user to select device manually
1317
        msg = (
×
1318
                _("Could not automatically pair with device for given keystore.") + "\n"
1319
                + f"(keystore label: {keystore.label!r}, "
1320
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1321
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1322
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1323
        choices = [ChoiceItem(key=idx, label=info.label_for_device_select())
×
1324
                   for (idx, info) in enumerate(infos)]
1325
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1326
                          f"num options: {len(infos)}. options: {infos}")
1327
        c = handler.query_choice(msg, choices)
×
1328
        if c is None:
×
1329
            raise UserCancelled()
×
1330
        info = infos[c]
×
1331
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1332
        # note: updated label/soft_device_id will be saved after pairing succeeds
1333
        return info
×
1334

1335
    @runs_in_hwd_thread
5✔
1336
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1337
        try:
×
1338
            import hid  # noqa: F811
×
1339
        except ImportError:
×
1340
            return []
×
1341

1342
        devices = []
×
1343
        for d in hid.enumerate(0, 0):
×
1344
            vendor_id = d['vendor_id']
×
1345
            product_key = (vendor_id, d['product_id'])
×
1346
            plugin = None
×
1347
            if product_key in self._recognised_hardware:
×
1348
                plugin = self._recognised_hardware[product_key]
×
1349
            elif vendor_id in self._recognised_vendor:
×
1350
                plugin = self._recognised_vendor[vendor_id]
×
1351
            if plugin:
×
1352
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1353
                if device:
×
1354
                    devices.append(device)
×
1355
        return devices
×
1356

1357
    @runs_in_hwd_thread
5✔
1358
    @profiler
5✔
1359
    def scan_devices(self) -> Sequence['Device']:
5✔
1360
        self.logger.info("scanning devices...")
×
1361

1362
        # First see what's connected that we know about
1363
        devices = self._scan_devices_with_hid()
×
1364

1365
        # Let plugin handlers enumerate devices we don't know about
1366
        with self.lock:
×
1367
            enumerate_funcs = list(self._enumerate_func)
×
1368
        for f in enumerate_funcs:
×
1369
            try:
×
1370
                new_devices = f()
×
1371
            except BaseException as e:
×
1372
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1373
            else:
1374
                devices.extend(new_devices)
×
1375

1376
        # find out what was disconnected
1377
        client_ids = [dev.id_ for dev in devices]
×
1378
        disconnected_clients = []
×
1379
        with self.lock:
×
1380
            connected = {}
×
1381
            for client, id_ in self.clients.items():
×
1382
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1383
                    connected[client] = id_
×
1384
                else:
1385
                    disconnected_clients.append((client, id_))
×
1386
            self.clients = connected
×
1387

1388
        # Unpair disconnected devices
1389
        for client, id_ in disconnected_clients:
×
1390
            self.unpair_id(id_)
×
1391
            if client.handler:
×
1392
                client.handler.update_status(False)
×
1393

1394
        return devices
×
1395

1396
    @classmethod
5✔
1397
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1398
        ret = {}
×
1399
        # add libusb
1400
        try:
×
1401
            import usb1
×
1402
        except Exception as e:
×
1403
            ret["libusb.version"] = None
×
1404
        else:
1405
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1406
            try:
×
1407
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1408
            except AttributeError:
×
1409
                ret["libusb.path"] = None
×
1410
        # add hidapi
1411
        try:
×
1412
            import hid  # noqa: F811
×
1413
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1414
        except Exception as e:
×
1415
            from importlib.metadata import version
×
1416
            try:
×
1417
                ret["hidapi.version"] = version("hidapi")
×
1418
            except ImportError:
×
1419
                ret["hidapi.version"] = None
×
1420
        return ret
×
1421

1422
    def trigger_pairings(
5✔
1423
            self,
1424
            keystores: Sequence['KeyStore'],
1425
            *,
1426
            allow_user_interaction: bool = True,
1427
            devices: Sequence['Device'] = None,
1428
    ) -> None:
1429
        """Given a list of keystores, try to pair each with a connected hardware device.
1430

1431
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1432
        try to pair each keystore individually. Consider the following scenario:
1433
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1434
        - assume none of the devices are paired yet
1435
        1. if we tried to individually pair keystores, we might try with ks1 first
1436
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1437
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1438
             which is confusing and error-prone. It's especially problematic if the hw device does
1439
             not support labels (such as Ledger), as then the user cannot easily distinguish
1440
             same-type devices. (see #4199)
1441
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1442
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1443
        """
1444
        from .keystore import Hardware_KeyStore
×
1445
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1446
        if not keystores:
×
1447
            return
×
1448
        if devices is None:
×
1449
            devices = self.scan_devices()
×
1450
        # first pair with all devices that can be auto-selected
1451
        for ks in keystores:
×
1452
            try:
×
1453
                ks.get_client(
×
1454
                    force_pair=True,
1455
                    allow_user_interaction=False,
1456
                    devices=devices,
1457
                )
1458
            except UserCancelled:
×
1459
                pass
×
1460
        if allow_user_interaction:
×
1461
            # now do manual selections
1462
            for ks in keystores:
×
1463
                try:
×
1464
                    ks.get_client(
×
1465
                        force_pair=True,
1466
                        allow_user_interaction=True,
1467
                        devices=devices,
1468
                    )
1469
                except UserCancelled:
×
1470
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc