• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6117194087530496

09 Apr 2025 08:59AM UTC coverage: 60.965% (-0.04%) from 61.0%
6117194087530496

push

CirrusCI

ecdsa
Userspace plugins:
 - Allow plugins saved as zipfiles in user data dir
 - plugins are authorized with a user chosen password
 - pubkey derived from password is saved with admin permissions

25 of 137 new or added lines in 1 file covered. (18.25%)

4 existing lines in 1 file now uncovered.

21436 of 35161 relevant lines covered (60.97%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.49
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68

69
class Plugins(DaemonThread):
5✔
70

71
    LOGGING_SHORTCUT = 'p'
5✔
72
    pkgpath = os.path.dirname(plugins.__file__)
5✔
73
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
74
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
75

76
    @profiler
5✔
77
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
78
        self.config = config
5✔
79
        self.cmd_only = cmd_only  # type: bool
5✔
80
        self.internal_plugin_metadata = {}
5✔
81
        self.external_plugin_metadata = {}
5✔
82
        if cmd_only:
5✔
83
            # only import the command modules of plugins
84
            Logger.__init__(self)
×
85
            self.find_plugins()
×
86
            self.load_plugins()
×
87
            return
×
88
        DaemonThread.__init__(self)
5✔
89
        self.device_manager = DeviceMgr(config)
5✔
90
        self.name = 'Plugins'  # set name of thread
5✔
91
        self.hw_wallets = {}
5✔
92
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
93
        self.gui_name = gui_name
5✔
94
        self.find_plugins()
5✔
95
        self.load_plugins()
5✔
96
        self.add_jobs(self.device_manager.thread_jobs())
5✔
97
        self.start()
5✔
98

99
    @property
5✔
100
    def descriptions(self):
5✔
101
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
102

103
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
104
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
105
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
106
        for loader, name, ispkg in iter_modules:
5✔
107
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
108
            #       once as data and once as code. To honor the "no duplicates" rule below,
109
            #       we exclude the ones packaged as *code*, here:
110
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
111
                continue
×
112
            module_path = os.path.join(pkg_path, name)
5✔
113
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
114
                continue
×
115
            try:
5✔
116
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
117
                    d = json.load(f)
5✔
118
            except FileNotFoundError:
5✔
119
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
120
                continue
5✔
121
            if 'fullname' not in d:
5✔
122
                continue
×
123
            d['path'] = module_path
5✔
124
            if not self.cmd_only:
5✔
125
                gui_good = self.gui_name in d.get('available_for', [])
5✔
126
                if not gui_good:
5✔
127
                    continue
5✔
128
                details = d.get('registers_wallet_type')
5✔
129
                if details:
5✔
130
                    self.register_wallet_type(name, gui_good, details)
5✔
131
                details = d.get('registers_keystore')
5✔
132
                if details:
5✔
133
                    self.register_keystore(name, gui_good, details)
5✔
134
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
135
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
136
                raise Exception(f"duplicate plugins? for {name=}")
×
137
            if not external:
5✔
138
                self.internal_plugin_metadata[name] = d
5✔
139
            else:
140
                self.external_plugin_metadata[name] = d
×
141

142
    @staticmethod
5✔
143
    def exec_module_from_spec(spec, path: str):
5✔
144
        if prev_fail := _exec_module_failure.get(path):
5✔
145
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
146
        try:
5✔
147
            module = importlib.util.module_from_spec(spec)
5✔
148
            # sys.modules needs to be modified for relative imports to work
149
            # see https://stackoverflow.com/a/50395128
150
            sys.modules[path] = module
5✔
151
            spec.loader.exec_module(module)
5✔
152
        except Exception as e:
×
153
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
154
            # so the import system knows it failed. If called again for the same plugin, we do not
155
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
156
            # might have defined commands).
157
            _exec_module_failure[path] = e
×
158
            if path in sys.modules:
×
159
                sys.modules.pop(path, None)
×
160
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
161
        return module
5✔
162

163
    def find_plugins(self):
5✔
164
        internal_plugins_path = (self.pkgpath, False)
5✔
165
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
166
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
167
            if pkg_path and os.path.exists(pkg_path):
5✔
168
                if not external:
5✔
169
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
170
                else:
171
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
172

173
    def load_plugins(self):
5✔
174
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
175
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
176
                try:
×
177
                    if self.cmd_only:  # only load init method to register commands
×
178
                        self.maybe_load_plugin_init_method(name)
×
179
                    else:
180
                        self.load_plugin_by_name(name)
×
181
                except BaseException as e:
×
182
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
183

184
    def _has_root_permissions(self, path):
5✔
185
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
186

187
    def get_keyfile_path(self):
5✔
NEW
188
        if sys.platform in ['win32']:
×
NEW
189
            keyfile_path = self.keyfile_windows
×
NEW
190
            keyfile_help = _('This file can be edited with Regdit')
×
NEW
191
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
192
            keyfile_path = self.keyfile_linux
×
NEW
193
            keyfile_help = _('The file must have root permissions')
×
194
        else:
NEW
195
            raise Exception('platform not supported')
×
NEW
196
        return keyfile_path, keyfile_help
×
197

198
    def get_pubkey_bytes(self) -> Optional[str]:
5✔
NEW
199
        if sys.platform in ['win32']:
×
NEW
200
            import winreg
×
NEW
201
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
NEW
202
                try:
×
NEW
203
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
NEW
204
                        pubkey_hex = winreg.QueryValue(key, "PluginsKey")
×
NEW
205
                except Exception as e:
×
NEW
206
                    self.logger.info(f'winreg error: {e}')
×
NEW
207
                    return
×
NEW
208
        elif sys.platform in ['linux', 'darwin'] or sys.platform.startswith('freebsd'):
×
NEW
209
            if not os.path.exists(self.keyfile_linux):
×
NEW
210
                return
×
NEW
211
            if not self._has_root_permissions(self.keyfile_linux):
×
NEW
212
                return
×
NEW
213
            with open(self.keyfile_linux) as f:
×
NEW
214
                pubkey_hex = f.read()
×
215
        else:
UNCOV
216
            return
×
217
        # all good
NEW
218
        return bytes.fromhex(pubkey_hex)
×
219

220
    def get_external_plugin_dir(self) -> str:
5✔
221
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
222
        if not os.path.exists(pkg_path):
5✔
223
            os.mkdir(pkg_path)
5✔
224
        return pkg_path
5✔
225

226
    async def download_external_plugin(self, url):
5✔
NEW
227
        filename = os.path.basename(urlparse(url).path)
×
NEW
228
        pkg_path = self.get_external_plugin_dir()
×
NEW
229
        path = os.path.join(pkg_path, filename)
×
NEW
230
        async with aiohttp.ClientSession() as session:
×
NEW
231
            async with session.get(url) as resp:
×
NEW
232
                if resp.status == 200:
×
NEW
233
                    with open(path, 'wb') as fd:
×
NEW
234
                        async for chunk in resp.content.iter_chunked(10):
×
NEW
235
                            fd.write(chunk)
×
NEW
236
        return path
×
237

238
    def read_manifest(self, path):
5✔
NEW
239
        with zipfile_lib.ZipFile(path) as file:
×
NEW
240
            for filename in file.namelist():
×
NEW
241
                if filename.endswith('manifest.json'):
×
NEW
242
                    break
×
243
            else:
NEW
244
                raise Exception('could not find manifest.json in zip archive')
×
NEW
245
            with file.open(filename, 'r') as f:
×
NEW
246
                manifest = json.load(f)
×
NEW
247
                manifest['path'] = path  # external, path of the zipfile
×
NEW
248
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
NEW
249
                manifest['is_zip'] = True
×
NEW
250
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
NEW
251
                return manifest
×
252

253
    def zip_plugin_path(self, name):
5✔
NEW
254
        path = self.get_metadata(name)['path']
×
NEW
255
        filename = os.path.basename(path)
×
256
        if name in self.internal_plugin_metadata:
×
257
            pkg_path = self.pkgpath
×
258
        else:
259
            pkg_path = self.get_external_plugin_dir()
×
260
        return os.path.join(pkg_path, filename)
×
261

262
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
263
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
264
        if pkg_path is None:
5✔
265
            return
×
266
        for filename in os.listdir(pkg_path):
5✔
UNCOV
267
            path = os.path.join(pkg_path, filename)
×
UNCOV
268
            if not filename.endswith('.zip'):
×
UNCOV
269
                continue
×
270
            try:
×
NEW
271
                d = self.read_manifest(path)
×
NEW
272
            except Exception:
×
NEW
273
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
274
                continue
×
NEW
275
            name = d['name']
×
NEW
276
            if name in self.internal_plugin_metadata:
×
NEW
277
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
278
            if name in self.external_plugin_metadata:
×
NEW
279
                raise Exception(f"duplicate plugins for name={name}")
×
NEW
280
            if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
NEW
281
                continue
×
NEW
282
            min_version = d.get('min_electrum_version')
×
NEW
283
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
NEW
284
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
285
                continue
×
NEW
286
            max_version = d.get('max_electrum_version')
×
NEW
287
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
NEW
288
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
NEW
289
                continue
×
290

NEW
291
            if not self.cmd_only:
×
NEW
292
                gui_good = self.gui_name in d.get('available_for', [])
×
NEW
293
                if not gui_good:
×
294
                    continue
×
NEW
295
                if 'fullname' not in d:
×
296
                    continue
×
NEW
297
            if external:
×
NEW
298
                self.external_plugin_metadata[name] = d
×
299
            else:
NEW
300
                self.internal_plugin_metadata[name] = d
×
301

302
    def get(self, name):
5✔
303
        return self.plugins.get(name)
×
304

305
    def count(self):
5✔
306
        return len(self.plugins)
×
307

308
    def load_plugin(self, name) -> 'BasePlugin':
5✔
309
        """Imports the code of the given plugin.
310
        note: can be called from any thread.
311
        """
312
        if self.get_metadata(name):
5✔
313
            return self.load_plugin_by_name(name)
5✔
314
        else:
315
            raise Exception(f"could not find plugin {name!r}")
×
316

317
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
318
        """Loads the __init__.py module of the plugin if it is not already loaded."""
319
        is_external = name in self.external_plugin_metadata
5✔
320
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
321
        if base_name not in sys.modules:
5✔
322
            metadata = self.get_metadata(name)
5✔
323
            is_zip = metadata.get('is_zip', False)
5✔
324
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
325
            if not is_zip:
5✔
326
                if is_external:
5✔
327
                    # this branch is deprecated: external plugins are always zip files
328
                    path = os.path.join(metadata['path'], '__init__.py')
×
329
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
330
                else:
331
                    init_spec = importlib.util.find_spec(base_name)
5✔
332
            else:
333
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
334
                dirname = metadata['dirname']
×
NEW
335
                init_spec = zipfile.find_spec(dirname)
×
336

337
            self.exec_module_from_spec(init_spec, base_name)
5✔
338
            if name == "trustedcoin":
5✔
339
                # removes trustedcoin after loading to not show it in the list of plugins
340
                del self.internal_plugin_metadata[name]
×
341

342
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
343
        if name in self.plugins:
5✔
344
            return self.plugins[name]
×
345
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
346
        self.maybe_load_plugin_init_method(name)
5✔
347
        is_external = name in self.external_plugin_metadata
5✔
348
        if is_external and not self.is_authorized(name):
5✔
NEW
349
            self.logger.info(f'plugin not authorized {name}')
×
NEW
350
            return
×
351
        if not is_external:
5✔
352
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
353
        else:
354
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
355

356
        spec = importlib.util.find_spec(full_name)
5✔
357
        if spec is None:
5✔
358
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
359
        try:
5✔
360
            module = self.exec_module_from_spec(spec, full_name)
5✔
361
            plugin = module.Plugin(self, self.config, name)
5✔
362
        except Exception as e:
×
363
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
364
        self.add_jobs(plugin.thread_jobs())
5✔
365
        self.plugins[name] = plugin
5✔
366
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
367
        return plugin
5✔
368

369
    def close_plugin(self, plugin):
5✔
370
        self.remove_jobs(plugin.thread_jobs())
×
371

372
    def derive_privkey(self, pw: str) -> ECPrivkey:
5✔
NEW
373
        from hashlib import pbkdf2_hmac
×
NEW
374
        salt = self.config.get('plugins_salt')
×
NEW
375
        if salt is None:
×
NEW
376
            salt = os.urandom(32).hex()
×
NEW
377
            self.config.set_key('plugins_salt', salt)
×
NEW
378
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), bytes.fromhex(salt), iterations=1024)
×
NEW
379
        return ECPrivkey(secret)
×
380

381
    def is_installed(self, name):
5✔
NEW
382
        return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
×
383

384
    def is_authorized(self, name):
5✔
NEW
385
        if name in self.internal_plugin_metadata:
×
NEW
386
            return True
×
NEW
387
        if name not in self.external_plugin_metadata:
×
NEW
388
            return False
×
NEW
389
        pubkey_bytes = self.get_pubkey_bytes()
×
NEW
390
        if not pubkey_bytes:
×
NEW
391
            return False
×
NEW
392
        if not self.is_plugin_zip(name):
×
NEW
393
            return False
×
NEW
394
        filename = self.zip_plugin_path(name)
×
NEW
395
        plugin_hash = get_file_hash256(filename)
×
NEW
396
        sig = self.config.get('authorize_plugin_' + name)
×
NEW
397
        if not sig:
×
NEW
398
            return False
×
NEW
399
        pubkey = ECPubkey(pubkey_bytes)
×
NEW
400
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
401

402
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey) -> 'BasePlugin':
5✔
NEW
403
        assert self.get_pubkey_bytes() == privkey.get_public_key_bytes()
×
NEW
404
        plugin_hash = get_file_hash256(filename)
×
NEW
405
        sig = privkey.ecdsa_sign(plugin_hash)
×
NEW
406
        value = sig.hex()
×
NEW
407
        self.config.set_key('authorize_plugin_' + name, value, save=True)
×
408

409
    def enable(self, name: str) -> 'BasePlugin':
5✔
410
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
411
        p = self.get(name)
×
412
        if p:
×
413
            return p
×
414
        return self.load_plugin(name)
×
415

416
    def disable(self, name: str) -> None:
5✔
417
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
418
        p = self.get(name)
×
419
        if not p:
×
420
            return
×
421
        self.plugins.pop(name)
×
422
        p.close()
×
423
        self.logger.info(f"closed {name}")
×
424

425
    @classmethod
5✔
426
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
NEW
427
        return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
×
428

429
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
430
        p = self.get(name)
×
431
        return self.disable(name) if p else self.enable(name)
×
432

433
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
434
        d = self.descriptions.get(name)
×
435
        if not d:
×
436
            return False
×
437
        deps = d.get('requires', [])
×
438
        for dep, s in deps:
×
439
            try:
×
440
                __import__(dep)
×
441
            except ImportError as e:
×
442
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
443
                return False
×
444
        requires = d.get('requires_wallet_type', [])
×
445
        return not requires or wallet.wallet_type in requires
×
446

447
    def get_hardware_support(self):
5✔
448
        out = []
×
449
        for name, (gui_good, details) in self.hw_wallets.items():
×
450
            if gui_good:
×
451
                try:
×
452
                    p = self.get_plugin(name)
×
453
                    if p.is_available():
×
454
                        out.append(HardwarePluginToScan(name=name,
×
455
                                                        description=details[2],
456
                                                        plugin=p,
457
                                                        exception=None))
458
                except Exception as e:
×
459
                    self.logger.exception(f"cannot load plugin for: {name}")
×
460
                    out.append(HardwarePluginToScan(name=name,
×
461
                                                    description=details[2],
462
                                                    plugin=None,
463
                                                    exception=e))
464
        return out
×
465

466
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
467
        from .wallet import register_wallet_type, register_constructor
5✔
468
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
469

470
        def loader():
5✔
471
            plugin = self.get_plugin(name)
5✔
472
            register_constructor(wallet_type, plugin.wallet_class)
5✔
473
        register_wallet_type(wallet_type)
5✔
474
        plugin_loaders[wallet_type] = loader
5✔
475

476
    def register_keystore(self, name, gui_good, details):
5✔
477
        from .keystore import register_keystore
5✔
478

479
        def dynamic_constructor(d):
5✔
480
            return self.get_plugin(name).keystore_class(d)
5✔
481
        if details[0] == 'hardware':
5✔
482
            self.hw_wallets[name] = (gui_good, details)
5✔
483
            self.logger.info(f"registering hardware {name}: {details}")
5✔
484
            register_keystore(details[1], dynamic_constructor)
5✔
485

486
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
487
        if name not in self.plugins:
5✔
488
            self.load_plugin(name)
5✔
489
        return self.plugins[name]
5✔
490

491
    def is_plugin_zip(self, name: str) -> bool:
5✔
492
        """Returns True if the plugin is a zip file"""
493
        if (metadata := self.get_metadata(name)) is None:
×
494
            return False
×
495
        return metadata.get('is_zip', False)
×
496

497
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
498
        """Returns the metadata of the plugin"""
499
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
500
        if not metadata:
5✔
501
            return None
×
502
        return metadata
5✔
503

504
    def run(self):
5✔
505
        while self.is_running():
5✔
506
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
507
            self.run_jobs()
5✔
508
        self.on_stop()
5✔
509

510

511
def get_file_hash256(path: str) -> bytes:
5✔
512
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
513
    with open(path, 'rb') as f:
×
NEW
514
        return sha256(f.read())
×
515

516

517
def hook(func):
5✔
518
    hook_names.add(func.__name__)
5✔
519
    return func
5✔
520

521

522
def run_hook(name, *args):
5✔
523
    results = []
5✔
524
    f_list = hooks.get(name, [])
5✔
525
    for p, f in f_list:
5✔
526
        if p.is_enabled():
5✔
527
            try:
5✔
528
                r = f(*args)
5✔
529
            except Exception:
×
530
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
531
                r = False
×
532
            if r:
5✔
533
                results.append(r)
×
534

535
    if results:
5✔
536
        assert len(results) == 1, results
×
537
        return results[0]
×
538

539

540
class BasePlugin(Logger):
5✔
541

542
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
543
        self.parent = parent  # type: Plugins  # The plugins object
5✔
544
        self.name = name
5✔
545
        self.config = config
5✔
546
        self.wallet = None  # fixme: this field should not exist
5✔
547
        Logger.__init__(self)
5✔
548
        # add self to hooks
549
        for k in dir(self):
5✔
550
            if k in hook_names:
5✔
551
                l = hooks.get(k, [])
5✔
552
                l.append((self, getattr(self, k)))
5✔
553
                hooks[k] = l
5✔
554

555
    def __str__(self):
5✔
556
        return self.name
×
557

558
    def close(self):
5✔
559
        # remove self from hooks
560
        for attr_name in dir(self):
×
561
            if attr_name in hook_names:
×
562
                # found attribute in self that is also the name of a hook
563
                l = hooks.get(attr_name, [])
×
564
                try:
×
565
                    l.remove((self, getattr(self, attr_name)))
×
566
                except ValueError:
×
567
                    # maybe attr name just collided with hook name and was not hook
568
                    continue
×
569
                hooks[attr_name] = l
×
570
        self.parent.close_plugin(self)
×
571
        self.on_close()
×
572

573
    def on_close(self):
5✔
574
        pass
×
575

576
    def requires_settings(self) -> bool:
5✔
577
        return False
×
578

579
    def thread_jobs(self):
5✔
580
        return []
5✔
581

582
    def is_enabled(self):
5✔
583
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
584

585
    def is_available(self):
5✔
586
        return True
×
587

588
    def can_user_disable(self):
5✔
589
        return True
×
590

591
    def settings_widget(self, window):
5✔
592
        raise NotImplementedError()
×
593

594
    def settings_dialog(self, window):
5✔
595
        raise NotImplementedError()
×
596

597
    def read_file(self, filename: str) -> bytes:
5✔
598
        if self.parent.is_plugin_zip(self.name):
×
599
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
600
            metadata = self.parent.external_plugin_metadata[self.name]
×
NEW
601
            dirname = metadata['dirname']
×
602
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
NEW
603
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
604
                    return myfile.read()
×
605
        else:
606
            if self.name in self.parent.internal_plugin_metadata:
×
607
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
608
            else:
609
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
610
            with open(path, 'rb') as myfile:
×
611
                return myfile.read()
×
612

613

614
class DeviceUnpairableError(UserFacingException): pass
5✔
615
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
616
class CannotAutoSelectDevice(Exception): pass
5✔
617

618

619
class Device(NamedTuple):
5✔
620
    path: Union[str, bytes]
5✔
621
    interface_number: int
5✔
622
    id_: str
5✔
623
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
624
    usage_page: int
5✔
625
    transport_ui_string: str
5✔
626

627

628
class DeviceInfo(NamedTuple):
5✔
629
    device: Device
5✔
630
    label: Optional[str] = None
5✔
631
    initialized: Optional[bool] = None
5✔
632
    exception: Optional[Exception] = None
5✔
633
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
634
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
635
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
636

637

638
class HardwarePluginToScan(NamedTuple):
5✔
639
    name: str
5✔
640
    description: str
5✔
641
    plugin: Optional['HW_PluginBase']
5✔
642
    exception: Optional[Exception]
5✔
643

644

645
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
646

647

648
# hidapi is not thread-safe
649
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
650
#     https://github.com/libusb/hidapi/issues/45
651
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
652
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
653
# It is not entirely clear to me, exactly what is safe and what isn't, when
654
# using multiple threads...
655
# Hence, we use a single thread for all device communications, including
656
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
657
# the following thread:
658
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
659
    max_workers=1,
660
    thread_name_prefix='hwd_comms_thread'
661
)
662

663
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
664
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
665
# To keep it simple, let's just import it now, as we are likely in the main thread here.
666
if threading.current_thread() is not threading.main_thread():
5✔
667
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
668
try:
5✔
669
    import hid
5✔
670
except ImportError:
5✔
671
    pass
5✔
672

673

674
T = TypeVar('T')
5✔
675

676

677
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
678
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
679
        return func()
×
680
    else:
681
        fut = _hwd_comms_executor.submit(func)
×
682
        return fut.result()
×
683
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
684

685

686
def runs_in_hwd_thread(func):
5✔
687
    @wraps(func)
5✔
688
    def wrapper(*args, **kwargs):
5✔
689
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
690
    return wrapper
5✔
691

692

693
def assert_runs_in_hwd_thread():
5✔
694
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
695
        raise Exception("must only be called from HWD communication thread")
×
696

697

698
class DeviceMgr(ThreadJob):
5✔
699
    """Manages hardware clients.  A client communicates over a hardware
700
    channel with the device.
701

702
    In addition to tracking device HID IDs, the device manager tracks
703
    hardware wallets and manages wallet pairing.  A HID ID may be
704
    paired with a wallet when it is confirmed that the hardware device
705
    matches the wallet, i.e. they have the same master public key.  A
706
    HID ID can be unpaired if e.g. it is wiped.
707

708
    Because of hotplugging, a wallet must request its client
709
    dynamically each time it is required, rather than caching it
710
    itself.
711

712
    The device manager is shared across plugins, so just one place
713
    does hardware scans when needed.  By tracking HID IDs, if a device
714
    is plugged into a different port the wallet is automatically
715
    re-paired.
716

717
    Wallets are informed on connect / disconnect events.  It must
718
    implement connected(), disconnected() callbacks.  Being connected
719
    implies a pairing.  Callbacks can happen in any thread context,
720
    and we do them without holding the lock.
721

722
    Confusingly, the HID ID (serial number) reported by the HID system
723
    doesn't match the device ID reported by the device itself.  We use
724
    the HID IDs.
725

726
    This plugin is thread-safe.  Currently only devices supported by
727
    hidapi are implemented."""
728

729
    def __init__(self, config: SimpleConfig):
5✔
730
        ThreadJob.__init__(self)
5✔
731
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
732
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
733
        # A client->id_ map. Needs self.lock.
734
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
735
        # What we recognise.  (vendor_id, product_id) -> Plugin
736
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
737
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
738
        # Custom enumerate functions for devices we don't know about.
739
        self._enumerate_func = set()  # Needs self.lock.
5✔
740

741
        self.lock = threading.RLock()
5✔
742

743
        self.config = config
5✔
744

745
    def thread_jobs(self):
5✔
746
        # Thread job to handle device timeouts
747
        return [self]
5✔
748

749
    def run(self):
5✔
750
        '''Handle device timeouts.  Runs in the context of the Plugins
751
        thread.'''
752
        with self.lock:
5✔
753
            clients = list(self.clients.keys())
5✔
754
        cutoff = time.time() - self.config.get_session_timeout()
5✔
755
        for client in clients:
5✔
756
            client.timeout(cutoff)
×
757

758
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
759
        for pair in device_pairs:
×
760
            self._recognised_hardware[pair] = plugin
×
761

762
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
763
        for vendor_id in vendor_ids:
×
764
            self._recognised_vendor[vendor_id] = plugin
×
765

766
    def register_enumerate_func(self, func):
5✔
767
        with self.lock:
×
768
            self._enumerate_func.add(func)
×
769

770
    @runs_in_hwd_thread
5✔
771
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
772
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
773
        # Get from cache first
774
        client = self._client_by_id(device.id_)
×
775
        if client:
×
776
            return client
×
777
        client = plugin.create_client(device, handler)
×
778
        if client:
×
779
            self.logger.info(f"Registering {client}")
×
780
            with self.lock:
×
781
                self.clients[client] = device.id_
×
782
        return client
×
783

784
    def id_by_pairing_code(self, pairing_code):
5✔
785
        with self.lock:
×
786
            return self.pairing_code_to_id.get(pairing_code)
×
787

788
    def pairing_code_by_id(self, id_):
5✔
789
        with self.lock:
×
790
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
791
                if id2 == id_:
×
792
                    return pairing_code
×
793
            return None
×
794

795
    def unpair_pairing_code(self, pairing_code):
5✔
796
        with self.lock:
×
797
            if pairing_code not in self.pairing_code_to_id:
×
798
                return
×
799
            _id = self.pairing_code_to_id.pop(pairing_code)
×
800
        self._close_client(_id)
×
801

802
    def unpair_id(self, id_):
5✔
803
        pairing_code = self.pairing_code_by_id(id_)
×
804
        if pairing_code:
×
805
            self.unpair_pairing_code(pairing_code)
×
806
        else:
807
            self._close_client(id_)
×
808

809
    def _close_client(self, id_):
5✔
810
        with self.lock:
×
811
            client = self._client_by_id(id_)
×
812
            self.clients.pop(client, None)
×
813
        if client:
×
814
            client.close()
×
815

816
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
817
        with self.lock:
×
818
            for client, client_id in self.clients.items():
×
819
                if client_id == id_:
×
820
                    return client
×
821
        return None
×
822

823
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
824
        '''Returns a client for the device ID if one is registered.  If
825
        a device is wiped or in bootloader mode pairing is impossible;
826
        in such cases we communicate by device ID and not wallet.'''
827
        if scan_now:
×
828
            self.scan_devices()
×
829
        return self._client_by_id(id_)
×
830

831
    @runs_in_hwd_thread
5✔
832
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
833
                            keystore: 'Hardware_KeyStore',
834
                            force_pair: bool, *,
835
                            devices: Sequence['Device'] = None,
836
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
837
        self.logger.info("getting client for keystore")
×
838
        if handler is None:
×
839
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
840
        handler.update_status(False)
×
841
        pcode = keystore.pairing_code()
×
842
        client = None
×
843
        # search existing clients first (fast-path)
844
        if not devices:
×
845
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
846
        # search clients again, now allowing a (slow) scan
847
        if client is None:
×
848
            if devices is None:
×
849
                devices = self.scan_devices()
×
850
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
851
        if client is None and force_pair:
×
852
            try:
×
853
                info = self.select_device(plugin, handler, keystore, devices,
×
854
                                          allow_user_interaction=allow_user_interaction)
855
            except CannotAutoSelectDevice:
×
856
                pass
×
857
            else:
858
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
859
        if client:
×
860
            handler.update_status(True)
×
861
            # note: if select_device was called, we might also update label etc here:
862
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
863
        self.logger.info("end client for keystore")
×
864
        return client
×
865

866
    def client_by_pairing_code(
5✔
867
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
868
        devices: Sequence['Device'],
869
    ) -> Optional['HardwareClientBase']:
870
        _id = self.id_by_pairing_code(pairing_code)
×
871
        client = self._client_by_id(_id)
×
872
        if client:
×
873
            if type(client.plugin) != type(plugin):
×
874
                return
×
875
            # An unpaired client might have another wallet's handler
876
            # from a prior scan.  Replace to fix dialog parenting.
877
            client.handler = handler
×
878
            return client
×
879

880
        for device in devices:
×
881
            if device.id_ == _id:
×
882
                return self.create_client(device, handler, plugin)
×
883

884
    def force_pair_keystore(
5✔
885
        self,
886
        *,
887
        plugin: 'HW_PluginBase',
888
        handler: 'HardwareHandlerBase',
889
        info: 'DeviceInfo',
890
        keystore: 'Hardware_KeyStore',
891
    ) -> 'HardwareClientBase':
892
        xpub = keystore.xpub
×
893
        derivation = keystore.get_derivation_prefix()
×
894
        assert derivation is not None
×
895
        xtype = bip32.xpub_type(xpub)
×
896
        client = self._client_by_id(info.device.id_)
×
897
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
898
            # See comment above for same code
899
            client.handler = handler
×
900
            # This will trigger a PIN/passphrase entry request
901
            try:
×
902
                client_xpub = client.get_xpub(derivation, xtype)
×
903
            except (UserCancelled, RuntimeError):
×
904
                # Bad / cancelled PIN / passphrase
905
                client_xpub = None
×
906
            if client_xpub == xpub:
×
907
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
908
                with self.lock:
×
909
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
910
                return client
×
911

912
        # The user input has wrong PIN or passphrase, or cancelled input,
913
        # or it is not pairable
914
        raise DeviceUnpairableError(
×
915
            _('Electrum cannot pair with your {}.\n\n'
916
              'Before you request bitcoins to be sent to addresses in this '
917
              'wallet, ensure you can pair with your device, or that you have '
918
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
919
              'receive will be unspendable.').format(plugin.device))
920

921
    def list_pairable_device_infos(
5✔
922
        self,
923
        *,
924
        handler: Optional['HardwareHandlerBase'],
925
        plugin: 'HW_PluginBase',
926
        devices: Sequence['Device'] = None,
927
        include_failing_clients: bool = False,
928
    ) -> List['DeviceInfo']:
929
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
930
        Already paired devices are also included, as it is okay to reuse them.
931
        """
932
        if not plugin.libraries_available:
×
933
            message = plugin.get_library_not_available_message()
×
934
            raise HardwarePluginLibraryUnavailable(message)
×
935
        if devices is None:
×
936
            devices = self.scan_devices()
×
937
        infos = []
×
938
        for device in devices:
×
939
            if not plugin.can_recognize_device(device):
×
940
                continue
×
941
            try:
×
942
                client = self.create_client(device, handler, plugin)
×
943
                if not client:
×
944
                    continue
×
945
                label = client.label()
×
946
                is_initialized = client.is_initialized()
×
947
                soft_device_id = client.get_soft_device_id()
×
948
                model_name = client.device_model_name()
×
949
            except Exception as e:
×
950
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
951
                if include_failing_clients:
×
952
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
953
                continue
×
954
            infos.append(DeviceInfo(device=device,
×
955
                                    label=label,
956
                                    initialized=is_initialized,
957
                                    plugin_name=plugin.name,
958
                                    soft_device_id=soft_device_id,
959
                                    model_name=model_name))
960

961
        return infos
×
962

963
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
964
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
965
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
966
        """Select the device to use for keystore."""
967
        # ideally this should not be called from the GUI thread...
968
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
969
        while True:
×
970
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
971
            if infos:
×
972
                break
×
973
            if not allow_user_interaction:
×
974
                raise CannotAutoSelectDevice()
×
975
            msg = _('Please insert your {}').format(plugin.device)
×
976
            msg += " ("
×
977
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
978
                msg += f"label: {keystore.label}, "
×
979
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
980
            msg += ').\n\n{}\n\n{}'.format(
×
981
                _('Verify the cable is connected and that '
982
                  'no other application is using it.'),
983
                _('Try to connect again?')
984
            )
985
            if not handler.yes_no_question(msg):
×
986
                raise UserCancelled()
×
987
            devices = None
×
988

989
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
990
        # method 1: select device by id
991
        if keystore.soft_device_id:
×
992
            for info in infos:
×
993
                if info.soft_device_id == keystore.soft_device_id:
×
994
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
995
                    return info
×
996
        # method 2: select device by label
997
        #           but only if not a placeholder label and only if there is no collision
998
        device_labels = [info.label for info in infos]
×
999
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1000
                and device_labels.count(keystore.label) == 1):
1001
            for info in infos:
×
1002
                if info.label == keystore.label:
×
1003
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1004
                    return info
×
1005
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1006
        #           saved for keystore anyway, select it
1007
        if (len(infos) == 1
×
1008
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1009
                and keystore.soft_device_id is None):
1010
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1011
            return infos[0]
×
1012

1013
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1014
        if not allow_user_interaction:
×
1015
            raise CannotAutoSelectDevice()
×
1016
        # ask user to select device manually
1017
        msg = (
×
1018
                _("Could not automatically pair with device for given keystore.") + "\n"
1019
                + f"(keystore label: {keystore.label!r}, "
1020
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1021
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1022
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1023
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1024
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1025
                                init=(_("initialized") if info.initialized else _("wiped")),
1026
                                transport=info.device.transport_ui_string,
1027
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1028
                        for info in infos]
1029
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1030
                          f"num options: {len(infos)}. options: {infos}")
1031
        c = handler.query_choice(msg, descriptions)
×
1032
        if c is None:
×
1033
            raise UserCancelled()
×
1034
        info = infos[c]
×
1035
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1036
        # note: updated label/soft_device_id will be saved after pairing succeeds
1037
        return info
×
1038

1039
    @runs_in_hwd_thread
5✔
1040
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1041
        try:
×
1042
            import hid  # noqa: F811
×
1043
        except ImportError:
×
1044
            return []
×
1045

1046
        devices = []
×
1047
        for d in hid.enumerate(0, 0):
×
1048
            vendor_id = d['vendor_id']
×
1049
            product_key = (vendor_id, d['product_id'])
×
1050
            plugin = None
×
1051
            if product_key in self._recognised_hardware:
×
1052
                plugin = self._recognised_hardware[product_key]
×
1053
            elif vendor_id in self._recognised_vendor:
×
1054
                plugin = self._recognised_vendor[vendor_id]
×
1055
            if plugin:
×
1056
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1057
                if device:
×
1058
                    devices.append(device)
×
1059
        return devices
×
1060

1061
    @runs_in_hwd_thread
5✔
1062
    @profiler
5✔
1063
    def scan_devices(self) -> Sequence['Device']:
5✔
1064
        self.logger.info("scanning devices...")
×
1065

1066
        # First see what's connected that we know about
1067
        devices = self._scan_devices_with_hid()
×
1068

1069
        # Let plugin handlers enumerate devices we don't know about
1070
        with self.lock:
×
1071
            enumerate_funcs = list(self._enumerate_func)
×
1072
        for f in enumerate_funcs:
×
1073
            try:
×
1074
                new_devices = f()
×
1075
            except BaseException as e:
×
1076
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1077
            else:
1078
                devices.extend(new_devices)
×
1079

1080
        # find out what was disconnected
1081
        client_ids = [dev.id_ for dev in devices]
×
1082
        disconnected_clients = []
×
1083
        with self.lock:
×
1084
            connected = {}
×
1085
            for client, id_ in self.clients.items():
×
1086
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1087
                    connected[client] = id_
×
1088
                else:
1089
                    disconnected_clients.append((client, id_))
×
1090
            self.clients = connected
×
1091

1092
        # Unpair disconnected devices
1093
        for client, id_ in disconnected_clients:
×
1094
            self.unpair_id(id_)
×
1095
            if client.handler:
×
1096
                client.handler.update_status(False)
×
1097

1098
        return devices
×
1099

1100
    @classmethod
5✔
1101
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1102
        ret = {}
×
1103
        # add libusb
1104
        try:
×
1105
            import usb1
×
1106
        except Exception as e:
×
1107
            ret["libusb.version"] = None
×
1108
        else:
1109
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1110
            try:
×
1111
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1112
            except AttributeError:
×
1113
                ret["libusb.path"] = None
×
1114
        # add hidapi
1115
        try:
×
1116
            import hid  # noqa: F811
×
1117
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1118
        except Exception as e:
×
1119
            from importlib.metadata import version
×
1120
            try:
×
1121
                ret["hidapi.version"] = version("hidapi")
×
1122
            except ImportError:
×
1123
                ret["hidapi.version"] = None
×
1124
        return ret
×
1125

1126
    def trigger_pairings(
5✔
1127
            self,
1128
            keystores: Sequence['KeyStore'],
1129
            *,
1130
            allow_user_interaction: bool = True,
1131
            devices: Sequence['Device'] = None,
1132
    ) -> None:
1133
        """Given a list of keystores, try to pair each with a connected hardware device.
1134

1135
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1136
        try to pair each keystore individually. Consider the following scenario:
1137
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1138
        - assume none of the devices are paired yet
1139
        1. if we tried to individually pair keystores, we might try with ks1 first
1140
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1141
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1142
             which is confusing and error-prone. It's especially problematic if the hw device does
1143
             not support labels (such as Ledger), as then the user cannot easily distinguish
1144
             same-type devices. (see #4199)
1145
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1146
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1147
        """
1148
        from .keystore import Hardware_KeyStore
×
1149
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1150
        if not keystores:
×
1151
            return
×
1152
        if devices is None:
×
1153
            devices = self.scan_devices()
×
1154
        # first pair with all devices that can be auto-selected
1155
        for ks in keystores:
×
1156
            try:
×
1157
                ks.get_client(
×
1158
                    force_pair=True,
1159
                    allow_user_interaction=False,
1160
                    devices=devices,
1161
                )
1162
            except UserCancelled:
×
1163
                pass
×
1164
        if allow_user_interaction:
×
1165
            # now do manual selections
1166
            for ks in keystores:
×
1167
                try:
×
1168
                    ks.get_client(
×
1169
                        force_pair=True,
1170
                        allow_user_interaction=True,
1171
                        devices=devices,
1172
                    )
1173
                except UserCancelled:
×
1174
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc