• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5501080659820544

09 Apr 2025 07:25AM UTC coverage: 60.963% (-0.04%) from 61.0%
5501080659820544

push

CirrusCI

ecdsa
Userspace plugins:
 - Allow plugins saved as zipfiles in user data dir
 - plugins are authorized with a user chosen password
 - pubkey derived from password is saved with admin permissions

22 of 131 new or added lines in 1 file covered. (16.79%)

16 existing lines in 4 files now uncovered.

21434 of 35159 relevant lines covered (60.96%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.34
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68

69
class Plugins(DaemonThread):
5✔
70

71
    LOGGING_SHORTCUT = 'p'
5✔
72
    pkgpath = os.path.dirname(plugins.__file__)
5✔
73
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
74
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
75

76
    @profiler
5✔
77
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
78
        self.config = config
5✔
79
        self.cmd_only = cmd_only  # type: bool
5✔
80
        self.internal_plugin_metadata = {}
5✔
81
        self.external_plugin_metadata = {}
5✔
82
        if cmd_only:
5✔
83
            # only import the command modules of plugins
84
            Logger.__init__(self)
×
85
            self.find_plugins()
×
86
            self.load_plugins()
×
87
            return
×
88
        DaemonThread.__init__(self)
5✔
89
        self.device_manager = DeviceMgr(config)
5✔
90
        self.name = 'Plugins'  # set name of thread
5✔
91
        self.hw_wallets = {}
5✔
92
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
93
        self.gui_name = gui_name
5✔
94
        self.find_plugins()
5✔
95
        self.load_plugins()
5✔
96
        self.add_jobs(self.device_manager.thread_jobs())
5✔
97
        self.start()
5✔
98

99
    @property
5✔
100
    def descriptions(self):
5✔
101
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
102

103
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
104
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
105
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
106
        for loader, name, ispkg in iter_modules:
5✔
107
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
108
            #       once as data and once as code. To honor the "no duplicates" rule below,
109
            #       we exclude the ones packaged as *code*, here:
110
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
111
                continue
×
112
            module_path = os.path.join(pkg_path, name)
5✔
113
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
114
                continue
×
115
            try:
5✔
116
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
117
                    d = json.load(f)
5✔
118
            except FileNotFoundError:
5✔
119
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
120
                continue
5✔
121
            if 'fullname' not in d:
5✔
122
                continue
×
123
            d['path'] = module_path
5✔
124
            if not self.cmd_only:
5✔
125
                gui_good = self.gui_name in d.get('available_for', [])
5✔
126
                if not gui_good:
5✔
127
                    continue
5✔
128
                details = d.get('registers_wallet_type')
5✔
129
                if details:
5✔
130
                    self.register_wallet_type(name, gui_good, details)
5✔
131
                details = d.get('registers_keystore')
5✔
132
                if details:
5✔
133
                    self.register_keystore(name, gui_good, details)
5✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
5✔
138
                self.internal_plugin_metadata[name] = d
5✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
5✔
143
    def exec_module_from_spec(spec, path: str):
5✔
144
        if prev_fail := _exec_module_failure.get(path):
5✔
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
5✔
147
            module = importlib.util.module_from_spec(spec)
5✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
5✔
151
            spec.loader.exec_module(module)
5✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
157
            _exec_module_failure[path] = e
×
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
5✔
162

163
    def find_plugins(self):
5✔
164
        internal_plugins_path = (self.pkgpath, False)
5✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
167
            if pkg_path and os.path.exists(pkg_path):
5✔
168
                if not external:
5✔
169
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
170
                else:
NEW
171
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
×
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
177
                    if self.cmd_only:  # only load init method to register commands
×
178
                        self.maybe_load_plugin_init_method(name)
×
179
                    else:
180
                        self.load_plugin_by_name(name)
×
181
                except BaseException as e:
×
182
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
183

184
    def _has_root_permissions(self, path):
5✔
185
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
186

187
    def get_keyfile_path(self):
5✔
NEW
188
        if sys.platform in ['win32']:
×
NEW
189
            keyfile_path = self.keyfile_windows
×
NEW
190
            keyfile_help = _('This file can be edited with Regdit')
×
NEW
191
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
192
            keyfile_path = self.keyfile_linux
×
NEW
193
            keyfile_help = _('The file must have root permissions')
×
194
        else:
NEW
195
            raise Exception('platform not supported')
×
NEW
196
        return keyfile_path, keyfile_help
×
197

198
    def get_pubkey_bytes(self) -> Optional[str]:
5✔
NEW
199
        if sys.platform in ['win32']:
×
NEW
200
            import winreg
×
NEW
201
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
NEW
202
                with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
NEW
203
                    pubkey_hex = winreg.QueryValue(key, "PluginsKey")
×
NEW
204
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
205
            if not os.path.exists(self.keyfile_linux):
×
NEW
206
                return
×
NEW
207
            if not self._has_root_permissions(self.keyfile_linux):
×
NEW
208
                return
×
NEW
209
            with open(self.keyfile_linux) as f:
×
NEW
210
                pubkey_hex = f.read()
×
211
        else:
NEW
212
            return
×
213
        # all good
NEW
214
        return bytes.fromhex(pubkey_hex)
×
215

216
    def get_external_plugin_dir(self):
5✔
217
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
218
        if not os.path.exists(pkg_path):
5✔
219
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
220
            return
5✔
221
        return pkg_path
×
222

223
    def get_plugin_hash(self, filename: str) -> bytes:
5✔
NEW
224
        from .crypto import sha256
×
NEW
225
        with open(filename, 'rb') as f:
×
NEW
226
            s = f.read()
×
NEW
227
        return sha256(s)
×
228

229
    async def download_external_plugin(self, url):
5✔
NEW
230
        filename = os.path.basename(urlparse(url).path)
×
NEW
231
        pkg_path = self.get_external_plugin_dir()
×
NEW
232
        path = os.path.join(pkg_path, filename)
×
NEW
233
        async with aiohttp.ClientSession() as session:
×
NEW
234
            async with session.get(url) as resp:
×
NEW
235
                if resp.status == 200:
×
NEW
236
                    with open(path, 'wb') as fd:
×
NEW
237
                        async for chunk in resp.content.iter_chunked(10):
×
NEW
238
                            fd.write(chunk)
×
NEW
239
        return path
×
240

241
    def read_manifest(self, path):
5✔
NEW
242
        with zipfile_lib.ZipFile(path) as file:
×
NEW
243
            for filename in file.namelist():
×
NEW
244
                if filename.endswith('manifest.json'):
×
NEW
245
                    break
×
246
            else:
NEW
247
                raise Exception('could not find manifest.json in zip archive')
×
NEW
248
            with file.open(filename, 'r') as f:
×
NEW
249
                manifest = json.load(f)
×
NEW
250
                manifest['path'] = path  # external, path of the zipfile
×
NEW
251
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
NEW
252
                manifest['is_zip'] = True
×
NEW
253
                manifest['zip_hash_sha256'] = get_file_hash256(path)
×
NEW
254
                return manifest
×
255

256
    def zip_plugin_path(self, name):
5✔
NEW
257
        path = self.get_metadata(name)['path']
×
NEW
258
        filename = os.path.basename(path)
×
259
        if name in self.internal_plugin_metadata:
×
260
            pkg_path = self.pkgpath
×
261
        else:
262
            pkg_path = self.get_external_plugin_dir()
×
263
        return os.path.join(pkg_path, filename)
×
264

265
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
266
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
UNCOV
267
        if pkg_path is None:
×
268
            return
×
UNCOV
269
        for filename in os.listdir(pkg_path):
×
UNCOV
270
            path = os.path.join(pkg_path, filename)
×
UNCOV
271
            if not filename.endswith('.zip'):
×
UNCOV
272
                continue
×
273
            try:
×
NEW
274
                d = self.read_manifest(path)
×
NEW
275
            except Exception:
×
NEW
276
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
277
                continue
×
NEW
278
            name = d['name']
×
NEW
279
            if name in self.internal_plugin_metadata:
×
NEW
280
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
281
            if name in self.external_plugin_metadata:
×
NEW
282
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
283
            if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
NEW
284
                continue
×
NEW
285
            min_version = d.get('min_electrum_version')
×
NEW
286
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
NEW
287
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
288
                continue
×
NEW
289
            max_version = d.get('max_electrum_version')
×
NEW
290
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
NEW
291
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
292
                continue
×
293

NEW
294
            if not self.cmd_only:
×
NEW
295
                gui_good = self.gui_name in d.get('available_for', [])
×
NEW
296
                if not gui_good:
×
297
                    continue
×
NEW
298
                if 'fullname' not in d:
×
299
                    continue
×
NEW
300
            if external:
×
NEW
301
                self.external_plugin_metadata[name] = d
×
302
            else:
NEW
303
                self.internal_plugin_metadata[name] = d
×
304

305
    def get(self, name):
5✔
306
        return self.plugins.get(name)
×
307

308
    def count(self):
5✔
309
        return len(self.plugins)
×
310

311
    def load_plugin(self, name) -> 'BasePlugin':
5✔
312
        """Imports the code of the given plugin.
313
        note: can be called from any thread.
314
        """
315
        if self.get_metadata(name):
5✔
316
            return self.load_plugin_by_name(name)
5✔
317
        else:
318
            raise Exception(f"could not find plugin {name!r}")
×
319

320
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
321
        """Loads the __init__.py module of the plugin if it is not already loaded."""
322
        is_external = name in self.external_plugin_metadata
5✔
323
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
324
        if base_name not in sys.modules:
5✔
325
            metadata = self.get_metadata(name)
5✔
326
            is_zip = metadata.get('is_zip', False)
5✔
327
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
328
            if not is_zip:
5✔
329
                if is_external:
5✔
330
                    # this branch is deprecated: external plugins are always zip files
331
                    path = os.path.join(metadata['path'], '__init__.py')
×
332
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
333
                else:
334
                    init_spec = importlib.util.find_spec(base_name)
5✔
335
            else:
336
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
337
                dirname = metadata['dirname']
×
NEW
338
                init_spec = zipfile.find_spec(dirname)
×
339

340
            self.exec_module_from_spec(init_spec, base_name)
5✔
341
            if name == "trustedcoin":
5✔
342
                # removes trustedcoin after loading to not show it in the list of plugins
343
                del self.internal_plugin_metadata[name]
×
344

345
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
346
        if name in self.plugins:
5✔
347
            return self.plugins[name]
×
348
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
349
        self.maybe_load_plugin_init_method(name)
5✔
350
        is_external = name in self.external_plugin_metadata
5✔
351
        if is_external and not self.is_authorized(name):
5✔
NEW
352
            self.logger.info(f'plugin not authorized {name}')
×
NEW
353
            return
×
354
        if not is_external:
5✔
355
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
356
        else:
357
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
358

359
        spec = importlib.util.find_spec(full_name)
5✔
360
        if spec is None:
5✔
361
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
362
        try:
5✔
363
            module = self.exec_module_from_spec(spec, full_name)
5✔
364
            plugin = module.Plugin(self, self.config, name)
5✔
365
        except Exception as e:
×
366
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
367
        self.add_jobs(plugin.thread_jobs())
5✔
368
        self.plugins[name] = plugin
5✔
369
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
370
        return plugin
5✔
371

372
    def close_plugin(self, plugin):
5✔
373
        self.remove_jobs(plugin.thread_jobs())
×
374

375
    def derive_privkey(self, pw: str) -> ECPrivkey:
5✔
NEW
376
        from hashlib import pbkdf2_hmac
×
NEW
377
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), b'electrum plugins', iterations=1024)
×
NEW
378
        return ECPrivkey(secret)
×
379

380
    def is_installed(self, name):
5✔
NEW
381
        return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
×
382

383
    def is_authorized(self, name):
5✔
NEW
384
        if name in self.internal_plugin_metadata:
×
NEW
385
            return True
×
NEW
386
        if name not in self.external_plugin_metadata:
×
NEW
387
            return False
×
NEW
388
        pubkey_bytes = self.get_pubkey_bytes()
×
NEW
389
        if not pubkey_bytes:
×
NEW
390
            return False
×
NEW
391
        if not self.is_plugin_zip(name):
×
NEW
392
            return False
×
NEW
393
        filename = self.zip_plugin_path(name)
×
NEW
394
        plugin_hash = self.get_plugin_hash(filename)
×
NEW
395
        sig = self.config.get('authorize_plugin_' + name)
×
NEW
396
        if not sig:
×
NEW
397
            return False
×
NEW
398
        pubkey = ECPubkey(pubkey_bytes)
×
NEW
399
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
400

401
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey) -> 'BasePlugin':
5✔
NEW
402
        assert self.get_pubkey_bytes() == privkey.get_public_key_bytes()
×
NEW
403
        plugin_hash = self.get_plugin_hash(filename)
×
NEW
404
        sig = privkey.ecdsa_sign(plugin_hash)
×
NEW
405
        value = sig.hex()
×
NEW
406
        self.config.set_key('authorize_plugin_' + name, value, save=True)
×
407

408
    def enable(self, name: str) -> 'BasePlugin':
5✔
409
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
410
        p = self.get(name)
×
411
        if p:
×
412
            return p
×
413
        return self.load_plugin(name)
×
414

415
    def disable(self, name: str) -> None:
5✔
416
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
417
        p = self.get(name)
×
418
        if not p:
×
419
            return
×
420
        self.plugins.pop(name)
×
421
        p.close()
×
422
        self.logger.info(f"closed {name}")
×
423

424
    @classmethod
5✔
425
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
NEW
426
        return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
×
427

428
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
429
        p = self.get(name)
×
430
        return self.disable(name) if p else self.enable(name)
×
431

432
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
433
        d = self.descriptions.get(name)
×
434
        if not d:
×
435
            return False
×
436
        deps = d.get('requires', [])
×
437
        for dep, s in deps:
×
438
            try:
×
439
                __import__(dep)
×
440
            except ImportError as e:
×
441
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
442
                return False
×
443
        requires = d.get('requires_wallet_type', [])
×
444
        return not requires or wallet.wallet_type in requires
×
445

446
    def get_hardware_support(self):
5✔
447
        out = []
×
448
        for name, (gui_good, details) in self.hw_wallets.items():
×
449
            if gui_good:
×
450
                try:
×
451
                    p = self.get_plugin(name)
×
452
                    if p.is_available():
×
453
                        out.append(HardwarePluginToScan(name=name,
×
454
                                                        description=details[2],
455
                                                        plugin=p,
456
                                                        exception=None))
457
                except Exception as e:
×
458
                    self.logger.exception(f"cannot load plugin for: {name}")
×
459
                    out.append(HardwarePluginToScan(name=name,
×
460
                                                    description=details[2],
461
                                                    plugin=None,
462
                                                    exception=e))
463
        return out
×
464

465
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
466
        from .wallet import register_wallet_type, register_constructor
5✔
467
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
468

469
        def loader():
5✔
470
            plugin = self.get_plugin(name)
5✔
471
            register_constructor(wallet_type, plugin.wallet_class)
5✔
472
        register_wallet_type(wallet_type)
5✔
473
        plugin_loaders[wallet_type] = loader
5✔
474

475
    def register_keystore(self, name, gui_good, details):
5✔
476
        from .keystore import register_keystore
5✔
477

478
        def dynamic_constructor(d):
5✔
479
            return self.get_plugin(name).keystore_class(d)
5✔
480
        if details[0] == 'hardware':
5✔
481
            self.hw_wallets[name] = (gui_good, details)
5✔
482
            self.logger.info(f"registering hardware {name}: {details}")
5✔
483
            register_keystore(details[1], dynamic_constructor)
5✔
484

485
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
486
        if name not in self.plugins:
5✔
487
            self.load_plugin(name)
5✔
488
        return self.plugins[name]
5✔
489

490
    def is_plugin_zip(self, name: str) -> bool:
5✔
491
        """Returns True if the plugin is a zip file"""
492
        if (metadata := self.get_metadata(name)) is None:
×
493
            return False
×
494
        return metadata.get('is_zip', False)
×
495

496
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
497
        """Returns the metadata of the plugin"""
498
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
499
        if not metadata:
5✔
500
            return None
×
501
        return metadata
5✔
502

503
    def run(self):
5✔
504
        while self.is_running():
5✔
505
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
506
            self.run_jobs()
5✔
507
        self.on_stop()
5✔
508

509

510
def get_file_hash256(path: str) -> str:
5✔
511
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
512
    with open(path, 'rb') as f:
×
513
        return sha256(f.read()).hex()
×
514

515

516
def hook(func):
5✔
517
    hook_names.add(func.__name__)
5✔
518
    return func
5✔
519

520

521
def run_hook(name, *args):
5✔
522
    results = []
5✔
523
    f_list = hooks.get(name, [])
5✔
524
    for p, f in f_list:
5✔
525
        if p.is_enabled():
5✔
526
            try:
5✔
527
                r = f(*args)
5✔
528
            except Exception:
×
529
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
530
                r = False
×
531
            if r:
5✔
532
                results.append(r)
×
533

534
    if results:
5✔
535
        assert len(results) == 1, results
×
536
        return results[0]
×
537

538

539
class BasePlugin(Logger):
5✔
540

541
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
542
        self.parent = parent  # type: Plugins  # The plugins object
5✔
543
        self.name = name
5✔
544
        self.config = config
5✔
545
        self.wallet = None  # fixme: this field should not exist
5✔
546
        Logger.__init__(self)
5✔
547
        # add self to hooks
548
        for k in dir(self):
5✔
549
            if k in hook_names:
5✔
550
                l = hooks.get(k, [])
5✔
551
                l.append((self, getattr(self, k)))
5✔
552
                hooks[k] = l
5✔
553

554
    def __str__(self):
5✔
555
        return self.name
×
556

557
    def close(self):
5✔
558
        # remove self from hooks
559
        for attr_name in dir(self):
×
560
            if attr_name in hook_names:
×
561
                # found attribute in self that is also the name of a hook
562
                l = hooks.get(attr_name, [])
×
563
                try:
×
564
                    l.remove((self, getattr(self, attr_name)))
×
565
                except ValueError:
×
566
                    # maybe attr name just collided with hook name and was not hook
567
                    continue
×
568
                hooks[attr_name] = l
×
569
        self.parent.close_plugin(self)
×
570
        self.on_close()
×
571

572
    def on_close(self):
5✔
573
        pass
×
574

575
    def requires_settings(self) -> bool:
5✔
576
        return False
×
577

578
    def thread_jobs(self):
5✔
579
        return []
5✔
580

581
    def is_enabled(self):
5✔
582
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
583

584
    def is_available(self):
5✔
585
        return True
×
586

587
    def can_user_disable(self):
5✔
588
        return True
×
589

590
    def settings_widget(self, window):
5✔
591
        raise NotImplementedError()
×
592

593
    def settings_dialog(self, window):
5✔
594
        raise NotImplementedError()
×
595

596
    def read_file(self, filename: str) -> bytes:
5✔
597
        if self.parent.is_plugin_zip(self.name):
×
598
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
599
            metadata = self.parent.external_plugin_metadata[self.name]
×
NEW
600
            dirname = metadata['dirname']
×
601
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
NEW
602
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
603
                    return myfile.read()
×
604
        else:
605
            if self.name in self.parent.internal_plugin_metadata:
×
606
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
607
            else:
608
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
609
            with open(path, 'rb') as myfile:
×
610
                return myfile.read()
×
611

612

613
class DeviceUnpairableError(UserFacingException): pass
5✔
614
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
615
class CannotAutoSelectDevice(Exception): pass
5✔
616

617

618
class Device(NamedTuple):
5✔
619
    path: Union[str, bytes]
5✔
620
    interface_number: int
5✔
621
    id_: str
5✔
622
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
623
    usage_page: int
5✔
624
    transport_ui_string: str
5✔
625

626

627
class DeviceInfo(NamedTuple):
5✔
628
    device: Device
5✔
629
    label: Optional[str] = None
5✔
630
    initialized: Optional[bool] = None
5✔
631
    exception: Optional[Exception] = None
5✔
632
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
633
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
634
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
635

636

637
class HardwarePluginToScan(NamedTuple):
5✔
638
    name: str
5✔
639
    description: str
5✔
640
    plugin: Optional['HW_PluginBase']
5✔
641
    exception: Optional[Exception]
5✔
642

643

644
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
645

646

647
# hidapi is not thread-safe
648
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
649
#     https://github.com/libusb/hidapi/issues/45
650
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
651
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
652
# It is not entirely clear to me, exactly what is safe and what isn't, when
653
# using multiple threads...
654
# Hence, we use a single thread for all device communications, including
655
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
656
# the following thread:
657
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
658
    max_workers=1,
659
    thread_name_prefix='hwd_comms_thread'
660
)
661

662
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
663
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
664
# To keep it simple, let's just import it now, as we are likely in the main thread here.
665
if threading.current_thread() is not threading.main_thread():
5✔
666
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
667
try:
5✔
668
    import hid
5✔
669
except ImportError:
5✔
670
    pass
5✔
671

672

673
T = TypeVar('T')
5✔
674

675

676
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
677
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
678
        return func()
×
679
    else:
680
        fut = _hwd_comms_executor.submit(func)
×
681
        return fut.result()
×
682
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
683

684

685
def runs_in_hwd_thread(func):
5✔
686
    @wraps(func)
5✔
687
    def wrapper(*args, **kwargs):
5✔
688
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
689
    return wrapper
5✔
690

691

692
def assert_runs_in_hwd_thread():
5✔
693
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
694
        raise Exception("must only be called from HWD communication thread")
×
695

696

697
class DeviceMgr(ThreadJob):
5✔
698
    """Manages hardware clients.  A client communicates over a hardware
699
    channel with the device.
700

701
    In addition to tracking device HID IDs, the device manager tracks
702
    hardware wallets and manages wallet pairing.  A HID ID may be
703
    paired with a wallet when it is confirmed that the hardware device
704
    matches the wallet, i.e. they have the same master public key.  A
705
    HID ID can be unpaired if e.g. it is wiped.
706

707
    Because of hotplugging, a wallet must request its client
708
    dynamically each time it is required, rather than caching it
709
    itself.
710

711
    The device manager is shared across plugins, so just one place
712
    does hardware scans when needed.  By tracking HID IDs, if a device
713
    is plugged into a different port the wallet is automatically
714
    re-paired.
715

716
    Wallets are informed on connect / disconnect events.  It must
717
    implement connected(), disconnected() callbacks.  Being connected
718
    implies a pairing.  Callbacks can happen in any thread context,
719
    and we do them without holding the lock.
720

721
    Confusingly, the HID ID (serial number) reported by the HID system
722
    doesn't match the device ID reported by the device itself.  We use
723
    the HID IDs.
724

725
    This plugin is thread-safe.  Currently only devices supported by
726
    hidapi are implemented."""
727

728
    def __init__(self, config: SimpleConfig):
5✔
729
        ThreadJob.__init__(self)
5✔
730
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
731
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
732
        # A client->id_ map. Needs self.lock.
733
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
734
        # What we recognise.  (vendor_id, product_id) -> Plugin
735
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
736
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
737
        # Custom enumerate functions for devices we don't know about.
738
        self._enumerate_func = set()  # Needs self.lock.
5✔
739

740
        self.lock = threading.RLock()
5✔
741

742
        self.config = config
5✔
743

744
    def thread_jobs(self):
5✔
745
        # Thread job to handle device timeouts
746
        return [self]
5✔
747

748
    def run(self):
5✔
749
        '''Handle device timeouts.  Runs in the context of the Plugins
750
        thread.'''
751
        with self.lock:
5✔
752
            clients = list(self.clients.keys())
5✔
753
        cutoff = time.time() - self.config.get_session_timeout()
5✔
754
        for client in clients:
5✔
755
            client.timeout(cutoff)
×
756

757
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
758
        for pair in device_pairs:
×
759
            self._recognised_hardware[pair] = plugin
×
760

761
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
762
        for vendor_id in vendor_ids:
×
763
            self._recognised_vendor[vendor_id] = plugin
×
764

765
    def register_enumerate_func(self, func):
5✔
766
        with self.lock:
×
767
            self._enumerate_func.add(func)
×
768

769
    @runs_in_hwd_thread
5✔
770
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
771
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
772
        # Get from cache first
773
        client = self._client_by_id(device.id_)
×
774
        if client:
×
775
            return client
×
776
        client = plugin.create_client(device, handler)
×
777
        if client:
×
778
            self.logger.info(f"Registering {client}")
×
779
            with self.lock:
×
780
                self.clients[client] = device.id_
×
781
        return client
×
782

783
    def id_by_pairing_code(self, pairing_code):
5✔
784
        with self.lock:
×
785
            return self.pairing_code_to_id.get(pairing_code)
×
786

787
    def pairing_code_by_id(self, id_):
5✔
788
        with self.lock:
×
789
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
790
                if id2 == id_:
×
791
                    return pairing_code
×
792
            return None
×
793

794
    def unpair_pairing_code(self, pairing_code):
5✔
795
        with self.lock:
×
796
            if pairing_code not in self.pairing_code_to_id:
×
797
                return
×
798
            _id = self.pairing_code_to_id.pop(pairing_code)
×
799
        self._close_client(_id)
×
800

801
    def unpair_id(self, id_):
5✔
802
        pairing_code = self.pairing_code_by_id(id_)
×
803
        if pairing_code:
×
804
            self.unpair_pairing_code(pairing_code)
×
805
        else:
806
            self._close_client(id_)
×
807

808
    def _close_client(self, id_):
5✔
809
        with self.lock:
×
810
            client = self._client_by_id(id_)
×
811
            self.clients.pop(client, None)
×
812
        if client:
×
813
            client.close()
×
814

815
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
816
        with self.lock:
×
817
            for client, client_id in self.clients.items():
×
818
                if client_id == id_:
×
819
                    return client
×
820
        return None
×
821

822
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
823
        '''Returns a client for the device ID if one is registered.  If
824
        a device is wiped or in bootloader mode pairing is impossible;
825
        in such cases we communicate by device ID and not wallet.'''
826
        if scan_now:
×
827
            self.scan_devices()
×
828
        return self._client_by_id(id_)
×
829

830
    @runs_in_hwd_thread
5✔
831
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
832
                            keystore: 'Hardware_KeyStore',
833
                            force_pair: bool, *,
834
                            devices: Sequence['Device'] = None,
835
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
836
        self.logger.info("getting client for keystore")
×
837
        if handler is None:
×
838
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
839
        handler.update_status(False)
×
840
        pcode = keystore.pairing_code()
×
841
        client = None
×
842
        # search existing clients first (fast-path)
843
        if not devices:
×
844
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
845
        # search clients again, now allowing a (slow) scan
846
        if client is None:
×
847
            if devices is None:
×
848
                devices = self.scan_devices()
×
849
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
850
        if client is None and force_pair:
×
851
            try:
×
852
                info = self.select_device(plugin, handler, keystore, devices,
×
853
                                          allow_user_interaction=allow_user_interaction)
854
            except CannotAutoSelectDevice:
×
855
                pass
×
856
            else:
857
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
858
        if client:
×
859
            handler.update_status(True)
×
860
            # note: if select_device was called, we might also update label etc here:
861
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
862
        self.logger.info("end client for keystore")
×
863
        return client
×
864

865
    def client_by_pairing_code(
5✔
866
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
867
        devices: Sequence['Device'],
868
    ) -> Optional['HardwareClientBase']:
869
        _id = self.id_by_pairing_code(pairing_code)
×
870
        client = self._client_by_id(_id)
×
871
        if client:
×
872
            if type(client.plugin) != type(plugin):
×
873
                return
×
874
            # An unpaired client might have another wallet's handler
875
            # from a prior scan.  Replace to fix dialog parenting.
876
            client.handler = handler
×
877
            return client
×
878

879
        for device in devices:
×
880
            if device.id_ == _id:
×
881
                return self.create_client(device, handler, plugin)
×
882

883
    def force_pair_keystore(
5✔
884
        self,
885
        *,
886
        plugin: 'HW_PluginBase',
887
        handler: 'HardwareHandlerBase',
888
        info: 'DeviceInfo',
889
        keystore: 'Hardware_KeyStore',
890
    ) -> 'HardwareClientBase':
891
        xpub = keystore.xpub
×
892
        derivation = keystore.get_derivation_prefix()
×
893
        assert derivation is not None
×
894
        xtype = bip32.xpub_type(xpub)
×
895
        client = self._client_by_id(info.device.id_)
×
896
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
897
            # See comment above for same code
898
            client.handler = handler
×
899
            # This will trigger a PIN/passphrase entry request
900
            try:
×
901
                client_xpub = client.get_xpub(derivation, xtype)
×
902
            except (UserCancelled, RuntimeError):
×
903
                # Bad / cancelled PIN / passphrase
904
                client_xpub = None
×
905
            if client_xpub == xpub:
×
906
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
907
                with self.lock:
×
908
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
909
                return client
×
910

911
        # The user input has wrong PIN or passphrase, or cancelled input,
912
        # or it is not pairable
913
        raise DeviceUnpairableError(
×
914
            _('Electrum cannot pair with your {}.\n\n'
915
              'Before you request bitcoins to be sent to addresses in this '
916
              'wallet, ensure you can pair with your device, or that you have '
917
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
918
              'receive will be unspendable.').format(plugin.device))
919

920
    def list_pairable_device_infos(
5✔
921
        self,
922
        *,
923
        handler: Optional['HardwareHandlerBase'],
924
        plugin: 'HW_PluginBase',
925
        devices: Sequence['Device'] = None,
926
        include_failing_clients: bool = False,
927
    ) -> List['DeviceInfo']:
928
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
929
        Already paired devices are also included, as it is okay to reuse them.
930
        """
931
        if not plugin.libraries_available:
×
932
            message = plugin.get_library_not_available_message()
×
933
            raise HardwarePluginLibraryUnavailable(message)
×
934
        if devices is None:
×
935
            devices = self.scan_devices()
×
936
        infos = []
×
937
        for device in devices:
×
938
            if not plugin.can_recognize_device(device):
×
939
                continue
×
940
            try:
×
941
                client = self.create_client(device, handler, plugin)
×
942
                if not client:
×
943
                    continue
×
944
                label = client.label()
×
945
                is_initialized = client.is_initialized()
×
946
                soft_device_id = client.get_soft_device_id()
×
947
                model_name = client.device_model_name()
×
948
            except Exception as e:
×
949
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
950
                if include_failing_clients:
×
951
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
952
                continue
×
953
            infos.append(DeviceInfo(device=device,
×
954
                                    label=label,
955
                                    initialized=is_initialized,
956
                                    plugin_name=plugin.name,
957
                                    soft_device_id=soft_device_id,
958
                                    model_name=model_name))
959

960
        return infos
×
961

962
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
963
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
964
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
965
        """Select the device to use for keystore."""
966
        # ideally this should not be called from the GUI thread...
967
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
968
        while True:
×
969
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
970
            if infos:
×
971
                break
×
972
            if not allow_user_interaction:
×
973
                raise CannotAutoSelectDevice()
×
974
            msg = _('Please insert your {}').format(plugin.device)
×
975
            msg += " ("
×
976
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
977
                msg += f"label: {keystore.label}, "
×
978
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
979
            msg += ').\n\n{}\n\n{}'.format(
×
980
                _('Verify the cable is connected and that '
981
                  'no other application is using it.'),
982
                _('Try to connect again?')
983
            )
984
            if not handler.yes_no_question(msg):
×
985
                raise UserCancelled()
×
986
            devices = None
×
987

988
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
989
        # method 1: select device by id
990
        if keystore.soft_device_id:
×
991
            for info in infos:
×
992
                if info.soft_device_id == keystore.soft_device_id:
×
993
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
994
                    return info
×
995
        # method 2: select device by label
996
        #           but only if not a placeholder label and only if there is no collision
997
        device_labels = [info.label for info in infos]
×
998
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
999
                and device_labels.count(keystore.label) == 1):
1000
            for info in infos:
×
1001
                if info.label == keystore.label:
×
1002
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1003
                    return info
×
1004
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1005
        #           saved for keystore anyway, select it
1006
        if (len(infos) == 1
×
1007
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1008
                and keystore.soft_device_id is None):
1009
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1010
            return infos[0]
×
1011

1012
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1013
        if not allow_user_interaction:
×
1014
            raise CannotAutoSelectDevice()
×
1015
        # ask user to select device manually
1016
        msg = (
×
1017
                _("Could not automatically pair with device for given keystore.") + "\n"
1018
                + f"(keystore label: {keystore.label!r}, "
1019
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1020
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1021
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1022
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1023
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1024
                                init=(_("initialized") if info.initialized else _("wiped")),
1025
                                transport=info.device.transport_ui_string,
1026
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1027
                        for info in infos]
1028
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1029
                          f"num options: {len(infos)}. options: {infos}")
1030
        c = handler.query_choice(msg, descriptions)
×
1031
        if c is None:
×
1032
            raise UserCancelled()
×
1033
        info = infos[c]
×
1034
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1035
        # note: updated label/soft_device_id will be saved after pairing succeeds
1036
        return info
×
1037

1038
    @runs_in_hwd_thread
5✔
1039
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1040
        try:
×
1041
            import hid  # noqa: F811
×
1042
        except ImportError:
×
1043
            return []
×
1044

1045
        devices = []
×
1046
        for d in hid.enumerate(0, 0):
×
1047
            vendor_id = d['vendor_id']
×
1048
            product_key = (vendor_id, d['product_id'])
×
1049
            plugin = None
×
1050
            if product_key in self._recognised_hardware:
×
1051
                plugin = self._recognised_hardware[product_key]
×
1052
            elif vendor_id in self._recognised_vendor:
×
1053
                plugin = self._recognised_vendor[vendor_id]
×
1054
            if plugin:
×
1055
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1056
                if device:
×
1057
                    devices.append(device)
×
1058
        return devices
×
1059

1060
    @runs_in_hwd_thread
5✔
1061
    @profiler
5✔
1062
    def scan_devices(self) -> Sequence['Device']:
5✔
1063
        self.logger.info("scanning devices...")
×
1064

1065
        # First see what's connected that we know about
1066
        devices = self._scan_devices_with_hid()
×
1067

1068
        # Let plugin handlers enumerate devices we don't know about
1069
        with self.lock:
×
1070
            enumerate_funcs = list(self._enumerate_func)
×
1071
        for f in enumerate_funcs:
×
1072
            try:
×
1073
                new_devices = f()
×
1074
            except BaseException as e:
×
1075
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1076
            else:
1077
                devices.extend(new_devices)
×
1078

1079
        # find out what was disconnected
1080
        client_ids = [dev.id_ for dev in devices]
×
1081
        disconnected_clients = []
×
1082
        with self.lock:
×
1083
            connected = {}
×
1084
            for client, id_ in self.clients.items():
×
1085
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1086
                    connected[client] = id_
×
1087
                else:
1088
                    disconnected_clients.append((client, id_))
×
1089
            self.clients = connected
×
1090

1091
        # Unpair disconnected devices
1092
        for client, id_ in disconnected_clients:
×
1093
            self.unpair_id(id_)
×
1094
            if client.handler:
×
1095
                client.handler.update_status(False)
×
1096

1097
        return devices
×
1098

1099
    @classmethod
5✔
1100
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1101
        ret = {}
×
1102
        # add libusb
1103
        try:
×
1104
            import usb1
×
1105
        except Exception as e:
×
1106
            ret["libusb.version"] = None
×
1107
        else:
1108
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1109
            try:
×
1110
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1111
            except AttributeError:
×
1112
                ret["libusb.path"] = None
×
1113
        # add hidapi
1114
        try:
×
1115
            import hid  # noqa: F811
×
1116
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1117
        except Exception as e:
×
1118
            from importlib.metadata import version
×
1119
            try:
×
1120
                ret["hidapi.version"] = version("hidapi")
×
1121
            except ImportError:
×
1122
                ret["hidapi.version"] = None
×
1123
        return ret
×
1124

1125
    def trigger_pairings(
5✔
1126
            self,
1127
            keystores: Sequence['KeyStore'],
1128
            *,
1129
            allow_user_interaction: bool = True,
1130
            devices: Sequence['Device'] = None,
1131
    ) -> None:
1132
        """Given a list of keystores, try to pair each with a connected hardware device.
1133

1134
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1135
        try to pair each keystore individually. Consider the following scenario:
1136
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1137
        - assume none of the devices are paired yet
1138
        1. if we tried to individually pair keystores, we might try with ks1 first
1139
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1140
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1141
             which is confusing and error-prone. It's especially problematic if the hw device does
1142
             not support labels (such as Ledger), as then the user cannot easily distinguish
1143
             same-type devices. (see #4199)
1144
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1145
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1146
        """
1147
        from .keystore import Hardware_KeyStore
×
1148
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1149
        if not keystores:
×
1150
            return
×
1151
        if devices is None:
×
1152
            devices = self.scan_devices()
×
1153
        # first pair with all devices that can be auto-selected
1154
        for ks in keystores:
×
1155
            try:
×
1156
                ks.get_client(
×
1157
                    force_pair=True,
1158
                    allow_user_interaction=False,
1159
                    devices=devices,
1160
                )
1161
            except UserCancelled:
×
1162
                pass
×
1163
        if allow_user_interaction:
×
1164
            # now do manual selections
1165
            for ks in keystores:
×
1166
                try:
×
1167
                    ks.get_client(
×
1168
                        force_pair=True,
1169
                        allow_user_interaction=True,
1170
                        devices=devices,
1171
                    )
1172
                except UserCancelled:
×
1173
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc