• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6669814476832768

01 May 2025 09:12AM UTC coverage: 60.203% (+0.001%) from 60.202%
6669814476832768

Pull #9773

CirrusCI

f321x
validate and deduplicate relay config input in qt gui

Adds validation and deduplication of the relay urls entered in the QT
settings dialog. This is supposed to prevent malformed or duplicated
relay entries.
Also resets the relays to the default value if no (valid) url
is entered. This prevents the user from getting stuck without relays
(otherwise the user would have to research for relay urls manually if
they don't know any).
Pull Request #9773: qt: validate and deduplicate relay config input in qt gui

2 of 8 new or added lines in 1 file covered. (25.0%)

273 existing lines in 5 files now uncovered.

21614 of 35902 relevant lines covered (60.2%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.59
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled') is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                _logger.info(f"duplicate plugins? for {name=}")
×
139
                continue
×
140
            if not external:
5✔
141
                self.internal_plugin_metadata[name] = d
5✔
142
            else:
143
                self.external_plugin_metadata[name] = d
×
144

145
    @staticmethod
5✔
146
    def exec_module_from_spec(spec, path: str):
5✔
147
        if prev_fail := _exec_module_failure.get(path):
5✔
148
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
149
        try:
5✔
150
            module = importlib.util.module_from_spec(spec)
5✔
151
            # sys.modules needs to be modified for relative imports to work
152
            # see https://stackoverflow.com/a/50395128
153
            sys.modules[path] = module
5✔
154
            spec.loader.exec_module(module)
5✔
155
        except Exception as e:
×
156
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
157
            # so the import system knows it failed. If called again for the same plugin, we do not
158
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
159
            # might have defined commands).
160
            _exec_module_failure[path] = e
×
161
            if path in sys.modules:
×
162
                sys.modules.pop(path, None)
×
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
5✔
165

166
    def find_plugins(self):
5✔
167
        internal_plugins_path = (self.pkgpath, False)
5✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
170
            if pkg_path and os.path.exists(pkg_path):
5✔
171
                if not external:
5✔
172
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
173
                else:
174
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
175

176
    def load_plugins(self):
5✔
177
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
178
            if not d.get('requires_wallet_type') and self.config.get(f'plugins.{name}.enabled'):
5✔
179
                try:
×
180
                    if self.cmd_only:  # only load init method to register commands
×
181
                        self.maybe_load_plugin_init_method(name)
×
182
                    else:
183
                        self.load_plugin_by_name(name)
×
184
                except BaseException as e:
×
185
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
186

187
    def _has_root_permissions(self, path):
5✔
188
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
189

190
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
191
        if sys.platform in ['windows', 'win32']:
×
192
            keyfile_path = self.keyfile_windows
×
193
            keyfile_help = _('This file can be edited with Regdit')
×
194
        elif 'ANDROID_DATA' in os.environ:
×
195
            raise Exception('platform not supported')
×
196
        else:
197
            # treat unknown platforms as linux-like
198
            keyfile_path = self.keyfile_linux
×
199
            keyfile_help = _('The file must have root permissions')
×
200
        return keyfile_path, keyfile_help
×
201

202
    def create_new_key(self, password:str) -> str:
5✔
203
        salt = os.urandom(32)
×
204
        privkey = self.derive_privkey(password, salt)
×
205
        pubkey = privkey.get_public_key_bytes()
×
206
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
207
        return key.hex()
×
208

209
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
210
        """
211
        returns pubkey, salt
212
        returns None, None if the pubkey has not been set
213
        """
214
        if sys.platform in ['windows', 'win32']:
×
215
            import winreg
×
216
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
217
                try:
×
218
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
219
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
220
                except Exception as e:
×
221
                    self.logger.info(f'winreg error: {e}')
×
222
                    return None, None
×
223
        elif 'ANDROID_DATA' in os.environ:
×
224
            return None, None
×
225
        else:
226
            # treat unknown platforms as linux-like
227
            if not os.path.exists(self.keyfile_linux):
×
228
                return None, None
×
229
            if not self._has_root_permissions(self.keyfile_linux):
×
230
                return
×
231
            with open(self.keyfile_linux) as f:
×
232
                key_hex = f.read()
×
233
        key = bytes.fromhex(key_hex)
×
234
        version = key[0]
×
235
        if version != PLUGIN_PASSWORD_VERSION:
×
236
            self.logger.info(f'unknown plugin password version: {version}')
×
237
            return None, None
×
238
        # all good
239
        salt = key[1:1+32]
×
240
        pubkey = key[1+32:]
×
241
        return pubkey, salt
×
242

243
    def get_external_plugin_dir(self) -> str:
5✔
244
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
245
        if not os.path.exists(pkg_path):
5✔
246
            os.mkdir(pkg_path)
5✔
247
        return pkg_path
5✔
248

249
    async def download_external_plugin(self, url):
5✔
250
        filename = os.path.basename(urlparse(url).path)
×
251
        pkg_path = self.get_external_plugin_dir()
×
252
        path = os.path.join(pkg_path, filename)
×
253
        async with aiohttp.ClientSession() as session:
×
254
            async with session.get(url) as resp:
×
255
                if resp.status == 200:
×
256
                    with open(path, 'wb') as fd:
×
257
                        async for chunk in resp.content.iter_chunked(10):
×
258
                            fd.write(chunk)
×
259
        return path
×
260

261
    def read_manifest(self, path) -> dict:
5✔
262
        """ return json dict """
UNCOV
263
        with zipfile_lib.ZipFile(path) as file:
×
UNCOV
264
            for filename in file.namelist():
×
265
                if filename.endswith('manifest.json'):
×
266
                    break
×
267
            else:
268
                raise Exception('could not find manifest.json in zip archive')
×
UNCOV
269
            with file.open(filename, 'r') as f:
×
270
                manifest = json.load(f)
×
271
                manifest['path'] = path  # external, path of the zipfile
×
272
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
273
                manifest['is_zip'] = True
×
274
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
275
                return manifest
×
276

277
    def zip_plugin_path(self, name) -> str:
5✔
UNCOV
278
        path = self.get_metadata(name)['path']
×
UNCOV
279
        filename = os.path.basename(path)
×
280
        if name in self.internal_plugin_metadata:
×
281
            pkg_path = self.pkgpath
×
282
        else:
283
            pkg_path = self.get_external_plugin_dir()
×
UNCOV
284
        return os.path.join(pkg_path, filename)
×
285

286
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
287
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
288
        if pkg_path is None:
5✔
UNCOV
289
            return
×
290
        for filename in os.listdir(pkg_path):
5✔
291
            path = os.path.join(pkg_path, filename)
×
UNCOV
292
            if not filename.endswith('.zip'):
×
293
                continue
×
294
            try:
×
295
                d = self.read_manifest(path)
×
296
                name = d['name']
×
297
            except Exception:
×
298
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
299
                continue
×
300
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
×
301
                self.logger.info(f"duplicate plugins for {name=}")
×
302
                continue
×
303
            if self.cmd_only and not self.config.get(f'plugins.{name}.enabled'):
×
304
                continue
×
305
            min_version = d.get('min_electrum_version')
×
306
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
307
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
308
                continue
×
309
            max_version = d.get('max_electrum_version')
×
310
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
311
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
312
                continue
×
313

314
            if not self.cmd_only:
×
UNCOV
315
                gui_good = self.gui_name in d.get('available_for', [])
×
316
                if not gui_good:
×
317
                    continue
×
318
                if 'fullname' not in d:
×
319
                    continue
×
320
                details = d.get('registers_keystore')
×
321
                if details:
×
322
                    self.register_keystore(name, gui_good, details)
×
323
            if external:
×
324
                self.external_plugin_metadata[name] = d
×
325
            else:
326
                self.internal_plugin_metadata[name] = d
×
327

328
    def get(self, name):
5✔
UNCOV
329
        return self.plugins.get(name)
×
330

331
    def count(self):
5✔
UNCOV
332
        return len(self.plugins)
×
333

334
    def load_plugin(self, name) -> 'BasePlugin':
5✔
335
        """Imports the code of the given plugin.
336
        note: can be called from any thread.
337
        """
338
        if self.get_metadata(name):
5✔
339
            return self.load_plugin_by_name(name)
5✔
340
        else:
UNCOV
341
            raise Exception(f"could not find plugin {name!r}")
×
342

343
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
344
        """Loads the __init__.py module of the plugin if it is not already loaded."""
345
        is_external = name in self.external_plugin_metadata
5✔
346
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
347
        if base_name not in sys.modules:
5✔
348
            metadata = self.get_metadata(name)
5✔
349
            is_zip = metadata.get('is_zip', False)
5✔
350
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
351
            if not is_zip:
5✔
352
                if is_external:
5✔
353
                    # this branch is deprecated: external plugins are always zip files
UNCOV
354
                    path = os.path.join(metadata['path'], '__init__.py')
×
UNCOV
355
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
356
                else:
357
                    init_spec = importlib.util.find_spec(base_name)
5✔
358
            else:
UNCOV
359
                zipfile = zipimport.zipimporter(metadata['path'])
×
UNCOV
360
                dirname = metadata['dirname']
×
361
                init_spec = zipfile.find_spec(dirname)
×
362

363
            self.exec_module_from_spec(init_spec, base_name)
5✔
364

365
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
366
        if name in self.plugins:
5✔
367
            return self.plugins[name]
5✔
368
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
369
        self.maybe_load_plugin_init_method(name)
5✔
370
        is_external = name in self.external_plugin_metadata
5✔
371
        if is_external and not self.is_authorized(name):
5✔
UNCOV
372
            self.logger.info(f'plugin not authorized {name}')
×
UNCOV
373
            return
×
374
        if not is_external:
5✔
375
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
376
        else:
UNCOV
377
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
378

379
        spec = importlib.util.find_spec(full_name)
5✔
380
        if spec is None:
5✔
UNCOV
381
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
382
        try:
5✔
383
            module = self.exec_module_from_spec(spec, full_name)
5✔
384
            plugin = module.Plugin(self, self.config, name)
5✔
UNCOV
385
        except Exception as e:
×
UNCOV
386
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
387
        self.add_jobs(plugin.thread_jobs())
5✔
388
        self.plugins[name] = plugin
5✔
389
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
390
        return plugin
5✔
391

392
    def close_plugin(self, plugin):
5✔
UNCOV
393
        self.remove_jobs(plugin.thread_jobs())
×
394

395
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
UNCOV
396
        from hashlib import pbkdf2_hmac
×
UNCOV
397
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
398
        return ECPrivkey(secret)
×
399

400
    def install_internal_plugin(self, name):
5✔
UNCOV
401
        self.config.set_key(f'plugins.{name}.enabled', [])
×
402

403
    def install_external_plugin(self, name, path, privkey, manifest):
5✔
UNCOV
404
        self.external_plugin_metadata[name] = manifest
×
UNCOV
405
        self.authorize_plugin(name, path, privkey)
×
406

407
    def uninstall(self, name: str):
5✔
408
        self.config.set_key(f'plugins.{name}', None)
×
409
        if name in self.external_plugin_metadata:
×
UNCOV
410
            zipfile = self.zip_plugin_path(name)
×
UNCOV
411
            os.unlink(zipfile)
×
412
            self.external_plugin_metadata.pop(name)
×
413

414
    def is_internal(self, name) -> bool:
5✔
415
        return name in self.internal_plugin_metadata
×
416

417
    def is_auto_loaded(self, name):
5✔
UNCOV
418
        metadata = self.external_plugin_metadata.get(name) or self.internal_plugin_metadata.get(name)
×
419
        return metadata and (metadata.get('registers_keystore') or metadata.get('registers_wallet_type'))
×
420

421
    def is_installed(self, name) -> bool:
5✔
422
        """an external plugin may be installed but not authorized """
423
        return (name in self.internal_plugin_metadata and self.config.get(f'plugins.{name}'))\
×
424
            or name in self.external_plugin_metadata
425

426
    def is_authorized(self, name) -> bool:
5✔
427
        if name in self.internal_plugin_metadata:
×
UNCOV
428
            return True
×
UNCOV
429
        if name not in self.external_plugin_metadata:
×
UNCOV
430
            return False
×
431
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
432
        if not pubkey_bytes:
×
433
            return False
×
434
        if not self.is_plugin_zip(name):
×
435
            return False
×
436
        filename = self.zip_plugin_path(name)
×
437
        plugin_hash = get_file_hash256(filename)
×
438
        sig = self.config.get(f'plugins.{name}.authorized')
×
439
        if not sig:
×
440
            return False
×
441
        pubkey = ECPubkey(pubkey_bytes)
×
442
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
443

444
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
445
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
446
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
UNCOV
447
        plugin_hash = get_file_hash256(filename)
×
UNCOV
448
        sig = privkey.ecdsa_sign(plugin_hash)
×
449
        value = sig.hex()
×
450
        self.config.set_key(f'plugins.{name}.authorized', value, save=True)
×
451

452
    def enable(self, name: str) -> 'BasePlugin':
5✔
453
        self.config.enable_plugin(name)
×
454
        p = self.get(name)
×
UNCOV
455
        if p:
×
UNCOV
456
            return p
×
457
        return self.load_plugin(name)
×
458

459
    def disable(self, name: str) -> None:
5✔
460
        self.config.disable_plugin(name)
×
461
        p = self.get(name)
×
UNCOV
462
        if not p:
×
UNCOV
463
            return
×
464
        self.plugins.pop(name)
×
465
        p.close()
×
466
        self.logger.info(f"closed {name}")
×
467

468
    @classmethod
5✔
469
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
470
        return key.startswith('plugins.')
×
471

472
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
UNCOV
473
        d = self.descriptions.get(name)
×
474
        if not d:
×
UNCOV
475
            return False
×
UNCOV
476
        deps = d.get('requires', [])
×
477
        for dep, s in deps:
×
478
            try:
×
479
                __import__(dep)
×
480
            except ImportError as e:
×
481
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
482
                return False
×
483
        requires = d.get('requires_wallet_type', [])
×
484
        return not requires or wallet.wallet_type in requires
×
485

486
    def get_hardware_support(self):
5✔
487
        out = []
×
488
        for name, (gui_good, details) in self.hw_wallets.items():
×
UNCOV
489
            if gui_good:
×
UNCOV
490
                try:
×
491
                    p = self.get_plugin(name)
×
492
                    if p.is_available():
×
493
                        out.append(HardwarePluginToScan(name=name,
×
494
                                                        description=details[2],
495
                                                        plugin=p,
496
                                                        exception=None))
497
                except Exception as e:
×
UNCOV
498
                    self.logger.exception(f"cannot load plugin for: {name}")
×
UNCOV
499
                    out.append(HardwarePluginToScan(name=name,
×
500
                                                    description=details[2],
501
                                                    plugin=None,
502
                                                    exception=e))
503
        return out
×
504

505
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
506
        from .wallet import register_wallet_type, register_constructor
5✔
507
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
508

509
        def loader():
5✔
510
            plugin = self.get_plugin(name)
5✔
511
            register_constructor(wallet_type, plugin.wallet_class)
5✔
512
        register_wallet_type(wallet_type)
5✔
513
        plugin_loaders[wallet_type] = loader
5✔
514

515
    def register_keystore(self, name, gui_good, details):
5✔
516
        from .keystore import register_keystore
5✔
517

518
        def dynamic_constructor(d):
5✔
519
            return self.get_plugin(name).keystore_class(d)
5✔
520
        if details[0] == 'hardware':
5✔
521
            self.hw_wallets[name] = (gui_good, details)
5✔
522
            self.logger.info(f"registering hardware {name}: {details}")
5✔
523
            register_keystore(details[1], dynamic_constructor)
5✔
524

525
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
526
        if name not in self.plugins:
5✔
527
            self.load_plugin(name)
5✔
528
        return self.plugins[name]
5✔
529

530
    def is_plugin_zip(self, name: str) -> bool:
5✔
531
        """Returns True if the plugin is a zip file"""
UNCOV
532
        if (metadata := self.get_metadata(name)) is None:
×
UNCOV
533
            return False
×
UNCOV
534
        return metadata.get('is_zip', False)
×
535

536
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
537
        """Returns the metadata of the plugin"""
538
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
539
        if not metadata:
5✔
UNCOV
540
            return None
×
541
        return metadata
5✔
542

543
    def run(self):
5✔
544
        while self.is_running():
5✔
545
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
546
            self.run_jobs()
5✔
547
        self.on_stop()
5✔
548

549
    def read_file(self, name: str, filename: str) -> bytes:
5✔
UNCOV
550
        if self.is_plugin_zip(name):
×
UNCOV
551
            plugin_filename = self.zip_plugin_path(name)
×
UNCOV
552
            metadata = self.external_plugin_metadata[name]
×
UNCOV
553
            dirname = metadata['dirname']
×
554
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
555
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
556
                    return myfile.read()
×
557
        else:
558
            assert name in self.internal_plugin_metadata
×
559
            path = os.path.join(os.path.dirname(__file__), 'plugins', name, filename)
×
560
            with open(path, 'rb') as myfile:
×
UNCOV
561
                return myfile.read()
×
562

563
def get_file_hash256(path: str) -> bytes:
5✔
564
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
565
    with open(path, 'rb') as f:
×
UNCOV
566
        return sha256(f.read())
×
567

568

569
def hook(func):
5✔
570
    hook_names.add(func.__name__)
5✔
571
    return func
5✔
572

573

574
def run_hook(name, *args):
5✔
575
    results = []
5✔
576
    f_list = hooks.get(name, [])
5✔
577
    for p, f in f_list:
5✔
578
        if p.is_enabled():
5✔
579
            try:
5✔
580
                r = f(*args)
5✔
UNCOV
581
            except Exception:
×
UNCOV
582
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
583
                r = False
×
584
            if r:
5✔
585
                results.append(r)
×
586

587
    if results:
5✔
UNCOV
588
        assert len(results) == 1, results
×
589
        return results[0]
×
590

591

592
class BasePlugin(Logger):
5✔
593

594
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
595
        self.parent = parent  # type: Plugins  # The plugins object
5✔
596
        self.name = name
5✔
597
        self.config = config
5✔
598
        Logger.__init__(self)
5✔
599
        # add self to hooks
600
        for k in dir(self):
5✔
601
            if k in hook_names:
5✔
602
                l = hooks.get(k, [])
5✔
603
                l.append((self, getattr(self, k)))
5✔
604
                hooks[k] = l
5✔
605

606
    def __str__(self):
5✔
UNCOV
607
        return self.name
×
608

609
    def close(self):
5✔
610
        # remove self from hooks
611
        for attr_name in dir(self):
×
UNCOV
612
            if attr_name in hook_names:
×
613
                # found attribute in self that is also the name of a hook
UNCOV
614
                l = hooks.get(attr_name, [])
×
615
                try:
×
616
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
617
                except ValueError:
×
618
                    # maybe attr name just collided with hook name and was not hook
619
                    continue
×
620
                hooks[attr_name] = l
×
621
        self.parent.close_plugin(self)
×
UNCOV
622
        self.on_close()
×
623

624
    def on_close(self):
5✔
625
        pass
×
626

627
    def requires_settings(self) -> bool:
5✔
UNCOV
628
        return False
×
629

630
    def thread_jobs(self):
5✔
631
        return []
5✔
632

633
    def is_enabled(self):
5✔
UNCOV
634
        if not self.is_available():
×
UNCOV
635
            return False
×
UNCOV
636
        return self.config.is_plugin_enabled(self.name)
×
637

638
    def is_available(self):
5✔
639
        return True
×
640

641
    def can_user_disable(self):
5✔
UNCOV
642
        return True
×
643

644
    def settings_widget(self, window):
5✔
UNCOV
645
        raise NotImplementedError()
×
646

647
    def settings_dialog(self, window):
5✔
UNCOV
648
        raise NotImplementedError()
×
649

650
    def read_file(self, filename: str) -> bytes:
5✔
UNCOV
651
        return self.parent.read_file(self.name, filename)
×
652

653

654
class DeviceUnpairableError(UserFacingException): pass
5✔
655
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
656
class CannotAutoSelectDevice(Exception): pass
5✔
657

658

659
class Device(NamedTuple):
5✔
660
    path: Union[str, bytes]
5✔
661
    interface_number: int
5✔
662
    id_: str
5✔
663
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
664
    usage_page: int
5✔
665
    transport_ui_string: str
5✔
666

667

668
class DeviceInfo(NamedTuple):
5✔
669
    device: Device
5✔
670
    label: Optional[str] = None
5✔
671
    initialized: Optional[bool] = None
5✔
672
    exception: Optional[Exception] = None
5✔
673
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
674
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
675
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
676

677

678
class HardwarePluginToScan(NamedTuple):
5✔
679
    name: str
5✔
680
    description: str
5✔
681
    plugin: Optional['HW_PluginBase']
5✔
682
    exception: Optional[Exception]
5✔
683

684

685
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
686

687

688
# hidapi is not thread-safe
689
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
690
#     https://github.com/libusb/hidapi/issues/45
691
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
692
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
693
# It is not entirely clear to me, exactly what is safe and what isn't, when
694
# using multiple threads...
695
# Hence, we use a single thread for all device communications, including
696
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
697
# the following thread:
698
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
699
    max_workers=1,
700
    thread_name_prefix='hwd_comms_thread'
701
)
702

703
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
704
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
705
# To keep it simple, let's just import it now, as we are likely in the main thread here.
706
if threading.current_thread() is not threading.main_thread():
5✔
UNCOV
707
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
708
try:
5✔
709
    import hid
5✔
710
except ImportError:
5✔
711
    pass
5✔
712

713

714
T = TypeVar('T')
5✔
715

716

717
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
UNCOV
718
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
719
        return func()
×
720
    else:
UNCOV
721
        fut = _hwd_comms_executor.submit(func)
×
722
        return fut.result()
×
723
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
724

725

726
def runs_in_hwd_thread(func):
5✔
727
    @wraps(func)
5✔
728
    def wrapper(*args, **kwargs):
5✔
UNCOV
729
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
730
    return wrapper
5✔
731

732

733
def assert_runs_in_hwd_thread():
5✔
UNCOV
734
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
735
        raise Exception("must only be called from HWD communication thread")
×
736

737

738
class DeviceMgr(ThreadJob):
5✔
739
    """Manages hardware clients.  A client communicates over a hardware
740
    channel with the device.
741

742
    In addition to tracking device HID IDs, the device manager tracks
743
    hardware wallets and manages wallet pairing.  A HID ID may be
744
    paired with a wallet when it is confirmed that the hardware device
745
    matches the wallet, i.e. they have the same master public key.  A
746
    HID ID can be unpaired if e.g. it is wiped.
747

748
    Because of hotplugging, a wallet must request its client
749
    dynamically each time it is required, rather than caching it
750
    itself.
751

752
    The device manager is shared across plugins, so just one place
753
    does hardware scans when needed.  By tracking HID IDs, if a device
754
    is plugged into a different port the wallet is automatically
755
    re-paired.
756

757
    Wallets are informed on connect / disconnect events.  It must
758
    implement connected(), disconnected() callbacks.  Being connected
759
    implies a pairing.  Callbacks can happen in any thread context,
760
    and we do them without holding the lock.
761

762
    Confusingly, the HID ID (serial number) reported by the HID system
763
    doesn't match the device ID reported by the device itself.  We use
764
    the HID IDs.
765

766
    This plugin is thread-safe.  Currently only devices supported by
767
    hidapi are implemented."""
768

769
    def __init__(self, config: SimpleConfig):
5✔
770
        ThreadJob.__init__(self)
5✔
771
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
772
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
773
        # A client->id_ map. Needs self.lock.
774
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
775
        # What we recognise.  (vendor_id, product_id) -> Plugin
776
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
777
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
778
        # Custom enumerate functions for devices we don't know about.
779
        self._enumerate_func = set()  # Needs self.lock.
5✔
780

781
        self.lock = threading.RLock()
5✔
782

783
        self.config = config
5✔
784

785
    def thread_jobs(self):
5✔
786
        # Thread job to handle device timeouts
787
        return [self]
5✔
788

789
    def run(self):
5✔
790
        '''Handle device timeouts.  Runs in the context of the Plugins
791
        thread.'''
792
        with self.lock:
5✔
793
            clients = list(self.clients.keys())
5✔
794
        cutoff = time.time() - self.config.get_session_timeout()
5✔
795
        for client in clients:
5✔
UNCOV
796
            client.timeout(cutoff)
×
797

798
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
UNCOV
799
        for pair in device_pairs:
×
800
            self._recognised_hardware[pair] = plugin
×
801

802
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
803
        for vendor_id in vendor_ids:
×
804
            self._recognised_vendor[vendor_id] = plugin
×
805

806
    def register_enumerate_func(self, func):
5✔
807
        with self.lock:
×
808
            self._enumerate_func.add(func)
×
809

810
    @runs_in_hwd_thread
5✔
811
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
812
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
813
        # Get from cache first
UNCOV
814
        client = self._client_by_id(device.id_)
×
UNCOV
815
        if client:
×
UNCOV
816
            return client
×
UNCOV
817
        client = plugin.create_client(device, handler)
×
818
        if client:
×
819
            self.logger.info(f"Registering {client}")
×
820
            with self.lock:
×
821
                self.clients[client] = device.id_
×
822
        return client
×
823

824
    def id_by_pairing_code(self, pairing_code):
5✔
825
        with self.lock:
×
826
            return self.pairing_code_to_id.get(pairing_code)
×
827

828
    def pairing_code_by_id(self, id_):
5✔
829
        with self.lock:
×
830
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
UNCOV
831
                if id2 == id_:
×
UNCOV
832
                    return pairing_code
×
833
            return None
×
834

835
    def unpair_pairing_code(self, pairing_code):
5✔
836
        with self.lock:
×
837
            if pairing_code not in self.pairing_code_to_id:
×
UNCOV
838
                return
×
UNCOV
839
            _id = self.pairing_code_to_id.pop(pairing_code)
×
840
        self._close_client(_id)
×
841

842
    def unpair_id(self, id_):
5✔
843
        pairing_code = self.pairing_code_by_id(id_)
×
844
        if pairing_code:
×
UNCOV
845
            self.unpair_pairing_code(pairing_code)
×
846
        else:
847
            self._close_client(id_)
×
848

849
    def _close_client(self, id_):
5✔
UNCOV
850
        with self.lock:
×
851
            client = self._client_by_id(id_)
×
UNCOV
852
            self.clients.pop(client, None)
×
UNCOV
853
        if client:
×
854
            client.close()
×
855

856
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
857
        with self.lock:
×
858
            for client, client_id in self.clients.items():
×
UNCOV
859
                if client_id == id_:
×
UNCOV
860
                    return client
×
861
        return None
×
862

863
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
864
        '''Returns a client for the device ID if one is registered.  If
865
        a device is wiped or in bootloader mode pairing is impossible;
866
        in such cases we communicate by device ID and not wallet.'''
UNCOV
867
        if scan_now:
×
UNCOV
868
            self.scan_devices()
×
UNCOV
869
        return self._client_by_id(id_)
×
870

871
    @runs_in_hwd_thread
5✔
872
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
873
                            keystore: 'Hardware_KeyStore',
874
                            force_pair: bool, *,
875
                            devices: Sequence['Device'] = None,
876
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
UNCOV
877
        self.logger.info("getting client for keystore")
×
UNCOV
878
        if handler is None:
×
UNCOV
879
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
UNCOV
880
        handler.update_status(False)
×
881
        pcode = keystore.pairing_code()
×
882
        client = None
×
883
        # search existing clients first (fast-path)
884
        if not devices:
×
885
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
886
        # search clients again, now allowing a (slow) scan
UNCOV
887
        if client is None:
×
888
            if devices is None:
×
889
                devices = self.scan_devices()
×
UNCOV
890
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
891
        if client is None and force_pair:
×
892
            try:
×
893
                info = self.select_device(plugin, handler, keystore, devices,
×
894
                                          allow_user_interaction=allow_user_interaction)
895
            except CannotAutoSelectDevice:
×
896
                pass
×
897
            else:
UNCOV
898
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
899
        if client:
×
900
            handler.update_status(True)
×
901
            # note: if select_device was called, we might also update label etc here:
902
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
903
        self.logger.info("end client for keystore")
×
904
        return client
×
905

906
    def client_by_pairing_code(
5✔
907
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
908
        devices: Sequence['Device'],
909
    ) -> Optional['HardwareClientBase']:
UNCOV
910
        _id = self.id_by_pairing_code(pairing_code)
×
UNCOV
911
        client = self._client_by_id(_id)
×
UNCOV
912
        if client:
×
UNCOV
913
            if type(client.plugin) != type(plugin):
×
914
                return
×
915
            # An unpaired client might have another wallet's handler
916
            # from a prior scan.  Replace to fix dialog parenting.
917
            client.handler = handler
×
918
            return client
×
919

UNCOV
920
        for device in devices:
×
921
            if device.id_ == _id:
×
922
                return self.create_client(device, handler, plugin)
×
923

924
    def force_pair_keystore(
5✔
925
        self,
926
        *,
927
        plugin: 'HW_PluginBase',
928
        handler: 'HardwareHandlerBase',
929
        info: 'DeviceInfo',
930
        keystore: 'Hardware_KeyStore',
931
    ) -> 'HardwareClientBase':
UNCOV
932
        xpub = keystore.xpub
×
UNCOV
933
        derivation = keystore.get_derivation_prefix()
×
UNCOV
934
        assert derivation is not None
×
UNCOV
935
        xtype = bip32.xpub_type(xpub)
×
936
        client = self._client_by_id(info.device.id_)
×
937
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
938
            # See comment above for same code
939
            client.handler = handler
×
940
            # This will trigger a PIN/passphrase entry request
941
            try:
×
UNCOV
942
                client_xpub = client.get_xpub(derivation, xtype)
×
943
            except (UserCancelled, RuntimeError):
×
944
                # Bad / cancelled PIN / passphrase
945
                client_xpub = None
×
946
            if client_xpub == xpub:
×
947
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
UNCOV
948
                with self.lock:
×
949
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
950
                return client
×
951

952
        # The user input has wrong PIN or passphrase, or cancelled input,
953
        # or it is not pairable
954
        raise DeviceUnpairableError(
×
955
            _('Electrum cannot pair with your {}.\n\n'
956
              'Before you request bitcoins to be sent to addresses in this '
957
              'wallet, ensure you can pair with your device, or that you have '
958
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
959
              'receive will be unspendable.').format(plugin.device))
960

961
    def list_pairable_device_infos(
5✔
962
        self,
963
        *,
964
        handler: Optional['HardwareHandlerBase'],
965
        plugin: 'HW_PluginBase',
966
        devices: Sequence['Device'] = None,
967
        include_failing_clients: bool = False,
968
    ) -> List['DeviceInfo']:
969
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
970
        Already paired devices are also included, as it is okay to reuse them.
971
        """
UNCOV
972
        if not plugin.libraries_available:
×
UNCOV
973
            message = plugin.get_library_not_available_message()
×
UNCOV
974
            raise HardwarePluginLibraryUnavailable(message)
×
UNCOV
975
        if devices is None:
×
976
            devices = self.scan_devices()
×
977
        infos = []
×
978
        for device in devices:
×
979
            if not plugin.can_recognize_device(device):
×
980
                continue
×
981
            try:
×
982
                client = self.create_client(device, handler, plugin)
×
983
                if not client:
×
984
                    continue
×
985
                label = client.label()
×
986
                is_initialized = client.is_initialized()
×
987
                soft_device_id = client.get_soft_device_id()
×
988
                model_name = client.device_model_name()
×
989
            except Exception as e:
×
990
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
991
                if include_failing_clients:
×
992
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
993
                continue
×
994
            infos.append(DeviceInfo(device=device,
×
995
                                    label=label,
996
                                    initialized=is_initialized,
997
                                    plugin_name=plugin.name,
998
                                    soft_device_id=soft_device_id,
999
                                    model_name=model_name))
1000

UNCOV
1001
        return infos
×
1002

1003
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
1004
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
1005
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
1006
        """Select the device to use for keystore."""
1007
        # ideally this should not be called from the GUI thread...
1008
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
UNCOV
1009
        while True:
×
UNCOV
1010
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
UNCOV
1011
            if infos:
×
UNCOV
1012
                break
×
1013
            if not allow_user_interaction:
×
1014
                raise CannotAutoSelectDevice()
×
1015
            msg = _('Please insert your {}').format(plugin.device)
×
1016
            msg += " ("
×
1017
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1018
                msg += f"label: {keystore.label}, "
×
1019
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1020
            msg += ').\n\n{}\n\n{}'.format(
×
1021
                _('Verify the cable is connected and that '
1022
                  'no other application is using it.'),
1023
                _('Try to connect again?')
1024
            )
UNCOV
1025
            if not handler.yes_no_question(msg):
×
UNCOV
1026
                raise UserCancelled()
×
UNCOV
1027
            devices = None
×
1028

1029
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1030
        # method 1: select device by id
1031
        if keystore.soft_device_id:
×
UNCOV
1032
            for info in infos:
×
UNCOV
1033
                if info.soft_device_id == keystore.soft_device_id:
×
UNCOV
1034
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1035
                    return info
×
1036
        # method 2: select device by label
1037
        #           but only if not a placeholder label and only if there is no collision
1038
        device_labels = [info.label for info in infos]
×
1039
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1040
                and device_labels.count(keystore.label) == 1):
UNCOV
1041
            for info in infos:
×
1042
                if info.label == keystore.label:
×
1043
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
1044
                    return info
×
1045
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1046
        #           saved for keystore anyway, select it
1047
        if (len(infos) == 1
×
1048
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1049
                and keystore.soft_device_id is None):
UNCOV
1050
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1051
            return infos[0]
×
1052

UNCOV
1053
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1054
        if not allow_user_interaction:
×
1055
            raise CannotAutoSelectDevice()
×
1056
        # ask user to select device manually
1057
        msg = (
×
1058
                _("Could not automatically pair with device for given keystore.") + "\n"
1059
                + f"(keystore label: {keystore.label!r}, "
1060
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1061
        msg += _("Please select which {} device to use:").format(plugin.device)
×
UNCOV
1062
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
1063
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1064
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1065
                                init=(_("initialized") if info.initialized else _("wiped")),
1066
                                transport=info.device.transport_ui_string,
1067
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1068
                        for info in infos]
UNCOV
1069
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1070
                          f"num options: {len(infos)}. options: {infos}")
UNCOV
1071
        c = handler.query_choice(msg, descriptions)
×
UNCOV
1072
        if c is None:
×
1073
            raise UserCancelled()
×
UNCOV
1074
        info = infos[c]
×
1075
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1076
        # note: updated label/soft_device_id will be saved after pairing succeeds
1077
        return info
×
1078

1079
    @runs_in_hwd_thread
5✔
1080
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1081
        try:
×
UNCOV
1082
            import hid  # noqa: F811
×
UNCOV
1083
        except ImportError:
×
UNCOV
1084
            return []
×
1085

1086
        devices = []
×
1087
        for d in hid.enumerate(0, 0):
×
1088
            vendor_id = d['vendor_id']
×
UNCOV
1089
            product_key = (vendor_id, d['product_id'])
×
1090
            plugin = None
×
1091
            if product_key in self._recognised_hardware:
×
1092
                plugin = self._recognised_hardware[product_key]
×
1093
            elif vendor_id in self._recognised_vendor:
×
1094
                plugin = self._recognised_vendor[vendor_id]
×
1095
            if plugin:
×
1096
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1097
                if device:
×
1098
                    devices.append(device)
×
1099
        return devices
×
1100

1101
    @runs_in_hwd_thread
5✔
1102
    @profiler
5✔
1103
    def scan_devices(self) -> Sequence['Device']:
5✔
UNCOV
1104
        self.logger.info("scanning devices...")
×
1105

1106
        # First see what's connected that we know about
UNCOV
1107
        devices = self._scan_devices_with_hid()
×
1108

1109
        # Let plugin handlers enumerate devices we don't know about
UNCOV
1110
        with self.lock:
×
1111
            enumerate_funcs = list(self._enumerate_func)
×
UNCOV
1112
        for f in enumerate_funcs:
×
UNCOV
1113
            try:
×
1114
                new_devices = f()
×
1115
            except BaseException as e:
×
1116
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1117
            else:
1118
                devices.extend(new_devices)
×
1119

1120
        # find out what was disconnected
UNCOV
1121
        client_ids = [dev.id_ for dev in devices]
×
1122
        disconnected_clients = []
×
UNCOV
1123
        with self.lock:
×
UNCOV
1124
            connected = {}
×
1125
            for client, id_ in self.clients.items():
×
1126
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1127
                    connected[client] = id_
×
1128
                else:
1129
                    disconnected_clients.append((client, id_))
×
1130
            self.clients = connected
×
1131

1132
        # Unpair disconnected devices
1133
        for client, id_ in disconnected_clients:
×
1134
            self.unpair_id(id_)
×
UNCOV
1135
            if client.handler:
×
UNCOV
1136
                client.handler.update_status(False)
×
1137

1138
        return devices
×
1139

1140
    @classmethod
5✔
1141
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1142
        ret = {}
×
1143
        # add libusb
UNCOV
1144
        try:
×
UNCOV
1145
            import usb1
×
1146
        except Exception as e:
×
UNCOV
1147
            ret["libusb.version"] = None
×
1148
        else:
1149
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1150
            try:
×
1151
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
UNCOV
1152
            except AttributeError:
×
1153
                ret["libusb.path"] = None
×
1154
        # add hidapi
1155
        try:
×
1156
            import hid  # noqa: F811
×
1157
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
UNCOV
1158
        except Exception as e:
×
1159
            from importlib.metadata import version
×
1160
            try:
×
1161
                ret["hidapi.version"] = version("hidapi")
×
1162
            except ImportError:
×
1163
                ret["hidapi.version"] = None
×
1164
        return ret
×
1165

1166
    def trigger_pairings(
5✔
1167
            self,
1168
            keystores: Sequence['KeyStore'],
1169
            *,
1170
            allow_user_interaction: bool = True,
1171
            devices: Sequence['Device'] = None,
1172
    ) -> None:
1173
        """Given a list of keystores, try to pair each with a connected hardware device.
1174

1175
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1176
        try to pair each keystore individually. Consider the following scenario:
1177
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1178
        - assume none of the devices are paired yet
1179
        1. if we tried to individually pair keystores, we might try with ks1 first
1180
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1181
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1182
             which is confusing and error-prone. It's especially problematic if the hw device does
1183
             not support labels (such as Ledger), as then the user cannot easily distinguish
1184
             same-type devices. (see #4199)
1185
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1186
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1187
        """
UNCOV
1188
        from .keystore import Hardware_KeyStore
×
UNCOV
1189
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
UNCOV
1190
        if not keystores:
×
UNCOV
1191
            return
×
1192
        if devices is None:
×
1193
            devices = self.scan_devices()
×
1194
        # first pair with all devices that can be auto-selected
1195
        for ks in keystores:
×
1196
            try:
×
1197
                ks.get_client(
×
1198
                    force_pair=True,
1199
                    allow_user_interaction=False,
1200
                    devices=devices,
1201
                )
UNCOV
1202
            except UserCancelled:
×
UNCOV
1203
                pass
×
UNCOV
1204
        if allow_user_interaction:
×
1205
            # now do manual selections
1206
            for ks in keystores:
×
1207
                try:
×
1208
                    ks.get_client(
×
1209
                        force_pair=True,
1210
                        allow_user_interaction=True,
1211
                        devices=devices,
1212
                    )
UNCOV
1213
                except UserCancelled:
×
UNCOV
1214
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc