• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4980750673510400

11 Apr 2025 08:05AM UTC coverage: 60.284%. Remained the same
4980750673510400

push

CirrusCI

ecdsa
minor fix (follow-up 737417fb8)

0 of 1 new or added line in 1 file covered. (0.0%)

7 existing lines in 4 files now uncovered.

21566 of 35774 relevant lines covered (60.28%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.7
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import sys
5✔
33
import aiohttp
5✔
34
import zipfile as zipfile_lib
5✔
35
from urllib.parse import urlparse
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from functools import wraps, partial
5✔
42
from itertools import chain
5✔
43

44
from electrum_ecc import ECPrivkey, ECPubkey
5✔
45

46
from ._vendor.distutils.version import StrictVersion
5✔
47
from .version import ELECTRUM_VERSION
5✔
48
from .i18n import _
5✔
49
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
50
from . import bip32
5✔
51
from . import plugins
5✔
52
from .simple_config import SimpleConfig
5✔
53
from .logging import get_logger, Logger
5✔
54
from .crypto import sha256
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
58
    from .keystore import Hardware_KeyStore, KeyStore
×
59
    from .wallet import Abstract_Wallet
×
60

61

62
_logger = get_logger(__name__)
5✔
63
plugin_loaders = {}
5✔
64
hook_names = set()
5✔
65
hooks = {}
5✔
66
_exec_module_failure = {}  # type: Dict[str, Exception]
5✔
67

68
PLUGIN_PASSWORD_VERSION = 1
5✔
69

70

71
class Plugins(DaemonThread):
5✔
72

73
    LOGGING_SHORTCUT = 'p'
5✔
74
    pkgpath = os.path.dirname(plugins.__file__)
5✔
75
    keyfile_linux = '/etc/electrum/plugins_key'
5✔
76
    keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
5✔
77

78
    @profiler
5✔
79
    def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
5✔
80
        self.config = config
5✔
81
        self.cmd_only = cmd_only  # type: bool
5✔
82
        self.internal_plugin_metadata = {}
5✔
83
        self.external_plugin_metadata = {}
5✔
84
        if cmd_only:
5✔
85
            # only import the command modules of plugins
86
            Logger.__init__(self)
×
87
            self.find_plugins()
×
88
            self.load_plugins()
×
89
            return
×
90
        DaemonThread.__init__(self)
5✔
91
        self.device_manager = DeviceMgr(config)
5✔
92
        self.name = 'Plugins'  # set name of thread
5✔
93
        self.hw_wallets = {}
5✔
94
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
95
        self.gui_name = gui_name
5✔
96
        self.find_plugins()
5✔
97
        self.load_plugins()
5✔
98
        self.add_jobs(self.device_manager.thread_jobs())
5✔
99
        self.start()
5✔
100

101
    @property
5✔
102
    def descriptions(self):
5✔
103
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
104

105
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
106
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
107
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
108
        for loader, name, ispkg in iter_modules:
5✔
109
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
110
            #       once as data and once as code. To honor the "no duplicates" rule below,
111
            #       we exclude the ones packaged as *code*, here:
112
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
113
                continue
×
114
            module_path = os.path.join(pkg_path, name)
5✔
115
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
116
                continue
×
117
            try:
5✔
118
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
119
                    d = json.load(f)
5✔
120
            except FileNotFoundError:
×
121
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
×
122
                continue
×
123
            if 'fullname' not in d:
5✔
124
                continue
×
125
            d['path'] = module_path
5✔
126
            if not self.cmd_only:
5✔
127
                gui_good = self.gui_name in d.get('available_for', [])
5✔
128
                if not gui_good:
5✔
129
                    continue
5✔
130
                details = d.get('registers_wallet_type')
5✔
131
                if details:
5✔
132
                    self.register_wallet_type(name, gui_good, details)
5✔
133
                details = d.get('registers_keystore')
5✔
134
                if details:
5✔
135
                    self.register_keystore(name, gui_good, details)
5✔
136
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
137
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
138
                raise Exception(f"duplicate plugins? for {name=}")
×
139
            if not external:
5✔
140
                self.internal_plugin_metadata[name] = d
5✔
141
            else:
142
                self.external_plugin_metadata[name] = d
×
143

144
    @staticmethod
5✔
145
    def exec_module_from_spec(spec, path: str):
5✔
146
        if prev_fail := _exec_module_failure.get(path):
5✔
147
            raise Exception(f"exec_module already failed once before, with: {prev_fail!r}")
×
148
        try:
5✔
149
            module = importlib.util.module_from_spec(spec)
5✔
150
            # sys.modules needs to be modified for relative imports to work
151
            # see https://stackoverflow.com/a/50395128
152
            sys.modules[path] = module
5✔
153
            spec.loader.exec_module(module)
5✔
154
        except Exception as e:
×
155
            # We can't undo all side-effects, but we at least rm the module from sys.modules,
156
            # so the import system knows it failed. If called again for the same plugin, we do not
157
            # retry due to potential interactions with not-undone side-effects (e.g. plugin
158
            # might have defined commands).
159
            _exec_module_failure[path] = e
×
160
            if path in sys.modules:
×
161
                sys.modules.pop(path, None)
×
162
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
163
        return module
5✔
164

165
    def find_plugins(self):
5✔
166
        internal_plugins_path = (self.pkgpath, False)
5✔
167
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
168
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
169
            if pkg_path and os.path.exists(pkg_path):
5✔
170
                if not external:
5✔
171
                    self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
172
                else:
173
                    self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
174

175
    def load_plugins(self):
5✔
176
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
177
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
178
                try:
×
179
                    if self.cmd_only:  # only load init method to register commands
×
180
                        self.maybe_load_plugin_init_method(name)
×
181
                    else:
182
                        self.load_plugin_by_name(name)
×
183
                except BaseException as e:
×
184
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
185

186
    def _has_root_permissions(self, path):
5✔
187
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
188

189
    def get_keyfile_path(self) -> Tuple[str, str]:
5✔
190
        if sys.platform in ['windows', 'win32']:
×
191
            keyfile_path = self.keyfile_windows
×
192
            keyfile_help = _('This file can be edited with Regdit')
×
193
        elif 'ANDROID_DATA' in os.environ:
×
194
            raise Exception('platform not supported')
×
195
        else:
196
            # treat unknown platforms as linux-like
197
            keyfile_path = self.keyfile_linux
×
198
            keyfile_help = _('The file must have root permissions')
×
199
        return keyfile_path, keyfile_help
×
200

201
    def create_new_key(self, password:str) -> str:
5✔
202
        salt = os.urandom(32)
×
203
        privkey = self.derive_privkey(password, salt)
×
204
        pubkey = privkey.get_public_key_bytes()
×
NEW
205
        key = bytes([PLUGIN_PASSWORD_VERSION]) + salt + pubkey
×
206
        return key.hex()
×
207

208
    def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
5✔
209
        """
210
        returns pubkey, salt
211
        returns None, None if the pubkey has not been set
212
        """
213
        if sys.platform in ['windows', 'win32']:
×
214
            import winreg
×
215
            with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
×
216
                try:
×
217
                    with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
×
218
                        key_hex = winreg.QueryValue(key, "PluginsKey")
×
219
                except Exception as e:
×
220
                    self.logger.info(f'winreg error: {e}')
×
221
                    return None, None
×
222
        elif 'ANDROID_DATA' in os.environ:
×
223
            return None, None
×
224
        else:
225
            # treat unknown platforms as linux-like
226
            if not os.path.exists(self.keyfile_linux):
×
227
                return None, None
×
228
            if not self._has_root_permissions(self.keyfile_linux):
×
229
                return
×
230
            with open(self.keyfile_linux) as f:
×
231
                key_hex = f.read()
×
232
        key = bytes.fromhex(key_hex)
×
233
        version = key[0]
×
234
        if version != PLUGIN_PASSWORD_VERSION:
×
235
            self.logger.info(f'unknown plugin password version: {version}')
×
236
            return None, None
×
237
        # all good
238
        salt = key[1:1+32]
×
239
        pubkey = key[1+32:]
×
240
        return pubkey, salt
×
241

242
    def get_external_plugin_dir(self) -> str:
5✔
243
        pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
5✔
244
        if not os.path.exists(pkg_path):
5✔
245
            os.mkdir(pkg_path)
5✔
246
        return pkg_path
5✔
247

248
    async def download_external_plugin(self, url):
5✔
249
        filename = os.path.basename(urlparse(url).path)
×
250
        pkg_path = self.get_external_plugin_dir()
×
251
        path = os.path.join(pkg_path, filename)
×
252
        async with aiohttp.ClientSession() as session:
×
253
            async with session.get(url) as resp:
×
254
                if resp.status == 200:
×
255
                    with open(path, 'wb') as fd:
×
256
                        async for chunk in resp.content.iter_chunked(10):
×
257
                            fd.write(chunk)
×
258
        return path
×
259

260
    def read_manifest(self, path) -> dict:
5✔
261
        """ return json dict """
262
        with zipfile_lib.ZipFile(path) as file:
×
263
            for filename in file.namelist():
×
264
                if filename.endswith('manifest.json'):
×
265
                    break
×
266
            else:
267
                raise Exception('could not find manifest.json in zip archive')
×
268
            with file.open(filename, 'r') as f:
×
269
                manifest = json.load(f)
×
270
                manifest['path'] = path  # external, path of the zipfile
×
271
                manifest['dirname'] = os.path.dirname(filename)  # internal
×
272
                manifest['is_zip'] = True
×
273
                manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
×
274
                return manifest
×
275

276
    def zip_plugin_path(self, name) -> str:
5✔
277
        path = self.get_metadata(name)['path']
×
278
        filename = os.path.basename(path)
×
279
        if name in self.internal_plugin_metadata:
×
280
            pkg_path = self.pkgpath
×
281
        else:
282
            pkg_path = self.get_external_plugin_dir()
×
283
        return os.path.join(pkg_path, filename)
×
284

285
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
286
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
287
        if pkg_path is None:
5✔
288
            return
×
289
        for filename in os.listdir(pkg_path):
5✔
290
            path = os.path.join(pkg_path, filename)
×
291
            if not filename.endswith('.zip'):
×
292
                continue
×
293
            try:
×
294
                d = self.read_manifest(path)
×
295
                name = d['name']
×
296
            except Exception:
×
297
                self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
298
                continue
×
299
            if name in self.internal_plugin_metadata:
×
300
                raise Exception(f"duplicate plugins for name={name}")
×
301
            if name in self.external_plugin_metadata:
×
302
                raise Exception(f"duplicate plugins for name={name}")
×
303
            if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
304
                continue
×
305
            min_version = d.get('min_electrum_version')
×
306
            if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
×
307
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
308
                continue
×
309
            max_version = d.get('max_electrum_version')
×
310
            if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
×
311
                self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
×
312
                continue
×
313

314
            if not self.cmd_only:
×
315
                gui_good = self.gui_name in d.get('available_for', [])
×
316
                if not gui_good:
×
317
                    continue
×
318
                if 'fullname' not in d:
×
319
                    continue
×
320
                details = d.get('registers_keystore')
×
321
                if details:
×
322
                    self.register_keystore(name, gui_good, details)
×
323
            if external:
×
324
                self.external_plugin_metadata[name] = d
×
325
            else:
326
                self.internal_plugin_metadata[name] = d
×
327

328
    def get(self, name):
5✔
329
        return self.plugins.get(name)
×
330

331
    def count(self):
5✔
332
        return len(self.plugins)
×
333

334
    def load_plugin(self, name) -> 'BasePlugin':
5✔
335
        """Imports the code of the given plugin.
336
        note: can be called from any thread.
337
        """
338
        if self.get_metadata(name):
5✔
339
            return self.load_plugin_by_name(name)
5✔
340
        else:
341
            raise Exception(f"could not find plugin {name!r}")
×
342

343
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
344
        """Loads the __init__.py module of the plugin if it is not already loaded."""
345
        is_external = name in self.external_plugin_metadata
5✔
346
        base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
347
        if base_name not in sys.modules:
5✔
348
            metadata = self.get_metadata(name)
5✔
349
            is_zip = metadata.get('is_zip', False)
5✔
350
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
351
            if not is_zip:
5✔
352
                if is_external:
5✔
353
                    # this branch is deprecated: external plugins are always zip files
354
                    path = os.path.join(metadata['path'], '__init__.py')
×
355
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
356
                else:
357
                    init_spec = importlib.util.find_spec(base_name)
5✔
358
            else:
359
                zipfile = zipimport.zipimporter(metadata['path'])
×
360
                dirname = metadata['dirname']
×
361
                init_spec = zipfile.find_spec(dirname)
×
362

363
            self.exec_module_from_spec(init_spec, base_name)
5✔
364
            if name == "trustedcoin":
5✔
365
                # removes trustedcoin after loading to not show it in the list of plugins
366
                del self.internal_plugin_metadata[name]
×
367

368
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
369
        if name in self.plugins:
5✔
370
            return self.plugins[name]
×
371
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
372
        self.maybe_load_plugin_init_method(name)
5✔
373
        is_external = name in self.external_plugin_metadata
5✔
374
        if is_external and not self.is_authorized(name):
5✔
375
            self.logger.info(f'plugin not authorized {name}')
×
376
            return
×
377
        if not is_external:
5✔
378
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
379
        else:
380
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
381

382
        spec = importlib.util.find_spec(full_name)
5✔
383
        if spec is None:
5✔
384
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
385
        try:
5✔
386
            module = self.exec_module_from_spec(spec, full_name)
5✔
387
            plugin = module.Plugin(self, self.config, name)
5✔
388
        except Exception as e:
×
389
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
390
        self.add_jobs(plugin.thread_jobs())
5✔
391
        self.plugins[name] = plugin
5✔
392
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
393
        return plugin
5✔
394

395
    def close_plugin(self, plugin):
5✔
396
        self.remove_jobs(plugin.thread_jobs())
×
397

398
    def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
5✔
399
        from hashlib import pbkdf2_hmac
×
400
        secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
×
401
        return ECPrivkey(secret)
×
402

403
    def is_installed(self, name) -> bool:
5✔
404
        """an external plugin may be installed but not authorized """
405
        return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
×
406

407
    def is_authorized(self, name) -> bool:
5✔
408
        if name in self.internal_plugin_metadata:
×
409
            return True
×
410
        if name not in self.external_plugin_metadata:
×
411
            return False
×
412
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
413
        if not pubkey_bytes:
×
414
            return False
×
415
        if not self.is_plugin_zip(name):
×
416
            return False
×
417
        filename = self.zip_plugin_path(name)
×
418
        plugin_hash = get_file_hash256(filename)
×
419
        sig = self.config.get('authorize_plugin_' + name)
×
420
        if not sig:
×
421
            return False
×
422
        pubkey = ECPubkey(pubkey_bytes)
×
423
        return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
×
424

425
    def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
5✔
426
        pubkey_bytes, salt = self.get_pubkey_bytes()
×
427
        assert pubkey_bytes == privkey.get_public_key_bytes()
×
428
        plugin_hash = get_file_hash256(filename)
×
429
        sig = privkey.ecdsa_sign(plugin_hash)
×
430
        value = sig.hex()
×
431
        self.config.set_key('authorize_plugin_' + name, value, save=True)
×
432

433
    def enable(self, name: str) -> 'BasePlugin':
5✔
434
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
435
        p = self.get(name)
×
436
        if p:
×
437
            return p
×
438
        return self.load_plugin(name)
×
439

440
    def disable(self, name: str) -> None:
5✔
441
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
442
        p = self.get(name)
×
443
        if not p:
×
444
            return
×
445
        self.plugins.pop(name)
×
446
        p.close()
×
447
        self.logger.info(f"closed {name}")
×
448

449
    @classmethod
5✔
450
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
451
        return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
×
452

453
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
454
        p = self.get(name)
×
455
        return self.disable(name) if p else self.enable(name)
×
456

457
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
458
        d = self.descriptions.get(name)
×
459
        if not d:
×
460
            return False
×
461
        deps = d.get('requires', [])
×
462
        for dep, s in deps:
×
463
            try:
×
464
                __import__(dep)
×
465
            except ImportError as e:
×
466
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
467
                return False
×
468
        requires = d.get('requires_wallet_type', [])
×
469
        return not requires or wallet.wallet_type in requires
×
470

471
    def get_hardware_support(self):
5✔
472
        out = []
×
473
        for name, (gui_good, details) in self.hw_wallets.items():
×
474
            if gui_good:
×
475
                try:
×
476
                    p = self.get_plugin(name)
×
477
                    if p.is_available():
×
478
                        out.append(HardwarePluginToScan(name=name,
×
479
                                                        description=details[2],
480
                                                        plugin=p,
481
                                                        exception=None))
482
                except Exception as e:
×
483
                    self.logger.exception(f"cannot load plugin for: {name}")
×
484
                    out.append(HardwarePluginToScan(name=name,
×
485
                                                    description=details[2],
486
                                                    plugin=None,
487
                                                    exception=e))
488
        return out
×
489

490
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
491
        from .wallet import register_wallet_type, register_constructor
5✔
492
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
493

494
        def loader():
5✔
495
            plugin = self.get_plugin(name)
5✔
496
            register_constructor(wallet_type, plugin.wallet_class)
5✔
497
        register_wallet_type(wallet_type)
5✔
498
        plugin_loaders[wallet_type] = loader
5✔
499

500
    def register_keystore(self, name, gui_good, details):
5✔
501
        from .keystore import register_keystore
5✔
502

503
        def dynamic_constructor(d):
5✔
504
            return self.get_plugin(name).keystore_class(d)
5✔
505
        if details[0] == 'hardware':
5✔
506
            self.hw_wallets[name] = (gui_good, details)
5✔
507
            self.logger.info(f"registering hardware {name}: {details}")
5✔
508
            register_keystore(details[1], dynamic_constructor)
5✔
509

510
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
511
        if name not in self.plugins:
5✔
512
            self.load_plugin(name)
5✔
513
        return self.plugins[name]
5✔
514

515
    def is_plugin_zip(self, name: str) -> bool:
5✔
516
        """Returns True if the plugin is a zip file"""
517
        if (metadata := self.get_metadata(name)) is None:
×
518
            return False
×
519
        return metadata.get('is_zip', False)
×
520

521
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
522
        """Returns the metadata of the plugin"""
523
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
524
        if not metadata:
5✔
525
            return None
×
526
        return metadata
5✔
527

528
    def run(self):
5✔
529
        while self.is_running():
5✔
530
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
531
            self.run_jobs()
5✔
532
        self.on_stop()
5✔
533

534

535
def get_file_hash256(path: str) -> bytes:
5✔
536
    '''Get the sha256 hash of a file, similar to `sha256sum`.'''
537
    with open(path, 'rb') as f:
×
538
        return sha256(f.read())
×
539

540

541
def hook(func):
5✔
542
    hook_names.add(func.__name__)
5✔
543
    return func
5✔
544

545

546
def run_hook(name, *args):
5✔
547
    results = []
5✔
548
    f_list = hooks.get(name, [])
5✔
549
    for p, f in f_list:
5✔
550
        if p.is_enabled():
5✔
551
            try:
5✔
552
                r = f(*args)
5✔
553
            except Exception:
×
554
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
555
                r = False
×
556
            if r:
5✔
557
                results.append(r)
×
558

559
    if results:
5✔
560
        assert len(results) == 1, results
×
561
        return results[0]
×
562

563

564
class BasePlugin(Logger):
5✔
565

566
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
567
        self.parent = parent  # type: Plugins  # The plugins object
5✔
568
        self.name = name
5✔
569
        self.config = config
5✔
570
        self.wallet = None  # fixme: this field should not exist
5✔
571
        Logger.__init__(self)
5✔
572
        # add self to hooks
573
        for k in dir(self):
5✔
574
            if k in hook_names:
5✔
575
                l = hooks.get(k, [])
5✔
576
                l.append((self, getattr(self, k)))
5✔
577
                hooks[k] = l
5✔
578

579
    def __str__(self):
5✔
580
        return self.name
×
581

582
    def close(self):
5✔
583
        # remove self from hooks
584
        for attr_name in dir(self):
×
585
            if attr_name in hook_names:
×
586
                # found attribute in self that is also the name of a hook
587
                l = hooks.get(attr_name, [])
×
588
                try:
×
589
                    l.remove((self, getattr(self, attr_name)))
×
590
                except ValueError:
×
591
                    # maybe attr name just collided with hook name and was not hook
592
                    continue
×
593
                hooks[attr_name] = l
×
594
        self.parent.close_plugin(self)
×
595
        self.on_close()
×
596

597
    def on_close(self):
5✔
598
        pass
×
599

600
    def requires_settings(self) -> bool:
5✔
601
        return False
×
602

603
    def thread_jobs(self):
5✔
604
        return []
5✔
605

606
    def is_enabled(self):
5✔
607
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
608

609
    def is_available(self):
5✔
610
        return True
×
611

612
    def can_user_disable(self):
5✔
613
        return True
×
614

615
    def settings_widget(self, window):
5✔
616
        raise NotImplementedError()
×
617

618
    def settings_dialog(self, window):
5✔
619
        raise NotImplementedError()
×
620

621
    def read_file(self, filename: str) -> bytes:
5✔
622
        if self.parent.is_plugin_zip(self.name):
×
623
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
624
            metadata = self.parent.external_plugin_metadata[self.name]
×
625
            dirname = metadata['dirname']
×
626
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
627
                with myzip.open(os.path.join(dirname, filename)) as myfile:
×
628
                    return myfile.read()
×
629
        else:
630
            if self.name in self.parent.internal_plugin_metadata:
×
631
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
632
            else:
633
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
634
            with open(path, 'rb') as myfile:
×
635
                return myfile.read()
×
636

637

638
class DeviceUnpairableError(UserFacingException): pass
5✔
639
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
640
class CannotAutoSelectDevice(Exception): pass
5✔
641

642

643
class Device(NamedTuple):
5✔
644
    path: Union[str, bytes]
5✔
645
    interface_number: int
5✔
646
    id_: str
5✔
647
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
648
    usage_page: int
5✔
649
    transport_ui_string: str
5✔
650

651

652
class DeviceInfo(NamedTuple):
5✔
653
    device: Device
5✔
654
    label: Optional[str] = None
5✔
655
    initialized: Optional[bool] = None
5✔
656
    exception: Optional[Exception] = None
5✔
657
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
658
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
659
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
660

661

662
class HardwarePluginToScan(NamedTuple):
5✔
663
    name: str
5✔
664
    description: str
5✔
665
    plugin: Optional['HW_PluginBase']
5✔
666
    exception: Optional[Exception]
5✔
667

668

669
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
670

671

672
# hidapi is not thread-safe
673
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
674
#     https://github.com/libusb/hidapi/issues/45
675
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
676
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
677
# It is not entirely clear to me, exactly what is safe and what isn't, when
678
# using multiple threads...
679
# Hence, we use a single thread for all device communications, including
680
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
681
# the following thread:
682
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
683
    max_workers=1,
684
    thread_name_prefix='hwd_comms_thread'
685
)
686

687
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
688
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
689
# To keep it simple, let's just import it now, as we are likely in the main thread here.
690
if threading.current_thread() is not threading.main_thread():
5✔
691
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
692
try:
5✔
693
    import hid
5✔
694
except ImportError:
5✔
695
    pass
5✔
696

697

698
T = TypeVar('T')
5✔
699

700

701
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
702
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
703
        return func()
×
704
    else:
705
        fut = _hwd_comms_executor.submit(func)
×
706
        return fut.result()
×
707
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
708

709

710
def runs_in_hwd_thread(func):
5✔
711
    @wraps(func)
5✔
712
    def wrapper(*args, **kwargs):
5✔
713
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
714
    return wrapper
5✔
715

716

717
def assert_runs_in_hwd_thread():
5✔
718
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
719
        raise Exception("must only be called from HWD communication thread")
×
720

721

722
class DeviceMgr(ThreadJob):
5✔
723
    """Manages hardware clients.  A client communicates over a hardware
724
    channel with the device.
725

726
    In addition to tracking device HID IDs, the device manager tracks
727
    hardware wallets and manages wallet pairing.  A HID ID may be
728
    paired with a wallet when it is confirmed that the hardware device
729
    matches the wallet, i.e. they have the same master public key.  A
730
    HID ID can be unpaired if e.g. it is wiped.
731

732
    Because of hotplugging, a wallet must request its client
733
    dynamically each time it is required, rather than caching it
734
    itself.
735

736
    The device manager is shared across plugins, so just one place
737
    does hardware scans when needed.  By tracking HID IDs, if a device
738
    is plugged into a different port the wallet is automatically
739
    re-paired.
740

741
    Wallets are informed on connect / disconnect events.  It must
742
    implement connected(), disconnected() callbacks.  Being connected
743
    implies a pairing.  Callbacks can happen in any thread context,
744
    and we do them without holding the lock.
745

746
    Confusingly, the HID ID (serial number) reported by the HID system
747
    doesn't match the device ID reported by the device itself.  We use
748
    the HID IDs.
749

750
    This plugin is thread-safe.  Currently only devices supported by
751
    hidapi are implemented."""
752

753
    def __init__(self, config: SimpleConfig):
5✔
754
        ThreadJob.__init__(self)
5✔
755
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
756
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
757
        # A client->id_ map. Needs self.lock.
758
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
759
        # What we recognise.  (vendor_id, product_id) -> Plugin
760
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
761
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
762
        # Custom enumerate functions for devices we don't know about.
763
        self._enumerate_func = set()  # Needs self.lock.
5✔
764

765
        self.lock = threading.RLock()
5✔
766

767
        self.config = config
5✔
768

769
    def thread_jobs(self):
5✔
770
        # Thread job to handle device timeouts
771
        return [self]
5✔
772

773
    def run(self):
5✔
774
        '''Handle device timeouts.  Runs in the context of the Plugins
775
        thread.'''
776
        with self.lock:
5✔
777
            clients = list(self.clients.keys())
5✔
778
        cutoff = time.time() - self.config.get_session_timeout()
5✔
779
        for client in clients:
5✔
780
            client.timeout(cutoff)
×
781

782
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
783
        for pair in device_pairs:
×
784
            self._recognised_hardware[pair] = plugin
×
785

786
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
787
        for vendor_id in vendor_ids:
×
788
            self._recognised_vendor[vendor_id] = plugin
×
789

790
    def register_enumerate_func(self, func):
5✔
791
        with self.lock:
×
792
            self._enumerate_func.add(func)
×
793

794
    @runs_in_hwd_thread
5✔
795
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
796
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
797
        # Get from cache first
798
        client = self._client_by_id(device.id_)
×
799
        if client:
×
800
            return client
×
801
        client = plugin.create_client(device, handler)
×
802
        if client:
×
803
            self.logger.info(f"Registering {client}")
×
804
            with self.lock:
×
805
                self.clients[client] = device.id_
×
806
        return client
×
807

808
    def id_by_pairing_code(self, pairing_code):
5✔
809
        with self.lock:
×
810
            return self.pairing_code_to_id.get(pairing_code)
×
811

812
    def pairing_code_by_id(self, id_):
5✔
813
        with self.lock:
×
814
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
815
                if id2 == id_:
×
816
                    return pairing_code
×
817
            return None
×
818

819
    def unpair_pairing_code(self, pairing_code):
5✔
820
        with self.lock:
×
821
            if pairing_code not in self.pairing_code_to_id:
×
822
                return
×
823
            _id = self.pairing_code_to_id.pop(pairing_code)
×
824
        self._close_client(_id)
×
825

826
    def unpair_id(self, id_):
5✔
827
        pairing_code = self.pairing_code_by_id(id_)
×
828
        if pairing_code:
×
829
            self.unpair_pairing_code(pairing_code)
×
830
        else:
831
            self._close_client(id_)
×
832

833
    def _close_client(self, id_):
5✔
834
        with self.lock:
×
835
            client = self._client_by_id(id_)
×
836
            self.clients.pop(client, None)
×
837
        if client:
×
838
            client.close()
×
839

840
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
841
        with self.lock:
×
842
            for client, client_id in self.clients.items():
×
843
                if client_id == id_:
×
844
                    return client
×
845
        return None
×
846

847
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
848
        '''Returns a client for the device ID if one is registered.  If
849
        a device is wiped or in bootloader mode pairing is impossible;
850
        in such cases we communicate by device ID and not wallet.'''
851
        if scan_now:
×
852
            self.scan_devices()
×
853
        return self._client_by_id(id_)
×
854

855
    @runs_in_hwd_thread
5✔
856
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
857
                            keystore: 'Hardware_KeyStore',
858
                            force_pair: bool, *,
859
                            devices: Sequence['Device'] = None,
860
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
861
        self.logger.info("getting client for keystore")
×
862
        if handler is None:
×
863
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
864
        handler.update_status(False)
×
865
        pcode = keystore.pairing_code()
×
866
        client = None
×
867
        # search existing clients first (fast-path)
868
        if not devices:
×
869
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
870
        # search clients again, now allowing a (slow) scan
871
        if client is None:
×
872
            if devices is None:
×
873
                devices = self.scan_devices()
×
874
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
875
        if client is None and force_pair:
×
876
            try:
×
877
                info = self.select_device(plugin, handler, keystore, devices,
×
878
                                          allow_user_interaction=allow_user_interaction)
879
            except CannotAutoSelectDevice:
×
880
                pass
×
881
            else:
882
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
883
        if client:
×
884
            handler.update_status(True)
×
885
            # note: if select_device was called, we might also update label etc here:
886
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
887
        self.logger.info("end client for keystore")
×
888
        return client
×
889

890
    def client_by_pairing_code(
5✔
891
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
892
        devices: Sequence['Device'],
893
    ) -> Optional['HardwareClientBase']:
894
        _id = self.id_by_pairing_code(pairing_code)
×
895
        client = self._client_by_id(_id)
×
896
        if client:
×
897
            if type(client.plugin) != type(plugin):
×
898
                return
×
899
            # An unpaired client might have another wallet's handler
900
            # from a prior scan.  Replace to fix dialog parenting.
901
            client.handler = handler
×
902
            return client
×
903

904
        for device in devices:
×
905
            if device.id_ == _id:
×
906
                return self.create_client(device, handler, plugin)
×
907

908
    def force_pair_keystore(
5✔
909
        self,
910
        *,
911
        plugin: 'HW_PluginBase',
912
        handler: 'HardwareHandlerBase',
913
        info: 'DeviceInfo',
914
        keystore: 'Hardware_KeyStore',
915
    ) -> 'HardwareClientBase':
916
        xpub = keystore.xpub
×
917
        derivation = keystore.get_derivation_prefix()
×
918
        assert derivation is not None
×
919
        xtype = bip32.xpub_type(xpub)
×
920
        client = self._client_by_id(info.device.id_)
×
921
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
922
            # See comment above for same code
923
            client.handler = handler
×
924
            # This will trigger a PIN/passphrase entry request
925
            try:
×
926
                client_xpub = client.get_xpub(derivation, xtype)
×
927
            except (UserCancelled, RuntimeError):
×
928
                # Bad / cancelled PIN / passphrase
929
                client_xpub = None
×
930
            if client_xpub == xpub:
×
931
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
932
                with self.lock:
×
933
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
934
                return client
×
935

936
        # The user input has wrong PIN or passphrase, or cancelled input,
937
        # or it is not pairable
938
        raise DeviceUnpairableError(
×
939
            _('Electrum cannot pair with your {}.\n\n'
940
              'Before you request bitcoins to be sent to addresses in this '
941
              'wallet, ensure you can pair with your device, or that you have '
942
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
943
              'receive will be unspendable.').format(plugin.device))
944

945
    def list_pairable_device_infos(
5✔
946
        self,
947
        *,
948
        handler: Optional['HardwareHandlerBase'],
949
        plugin: 'HW_PluginBase',
950
        devices: Sequence['Device'] = None,
951
        include_failing_clients: bool = False,
952
    ) -> List['DeviceInfo']:
953
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
954
        Already paired devices are also included, as it is okay to reuse them.
955
        """
956
        if not plugin.libraries_available:
×
957
            message = plugin.get_library_not_available_message()
×
958
            raise HardwarePluginLibraryUnavailable(message)
×
959
        if devices is None:
×
960
            devices = self.scan_devices()
×
961
        infos = []
×
962
        for device in devices:
×
963
            if not plugin.can_recognize_device(device):
×
964
                continue
×
965
            try:
×
966
                client = self.create_client(device, handler, plugin)
×
967
                if not client:
×
968
                    continue
×
969
                label = client.label()
×
970
                is_initialized = client.is_initialized()
×
971
                soft_device_id = client.get_soft_device_id()
×
972
                model_name = client.device_model_name()
×
973
            except Exception as e:
×
974
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
975
                if include_failing_clients:
×
976
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
977
                continue
×
978
            infos.append(DeviceInfo(device=device,
×
979
                                    label=label,
980
                                    initialized=is_initialized,
981
                                    plugin_name=plugin.name,
982
                                    soft_device_id=soft_device_id,
983
                                    model_name=model_name))
984

985
        return infos
×
986

987
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
988
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
989
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
990
        """Select the device to use for keystore."""
991
        # ideally this should not be called from the GUI thread...
992
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
993
        while True:
×
994
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
995
            if infos:
×
996
                break
×
997
            if not allow_user_interaction:
×
998
                raise CannotAutoSelectDevice()
×
999
            msg = _('Please insert your {}').format(plugin.device)
×
1000
            msg += " ("
×
1001
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
1002
                msg += f"label: {keystore.label}, "
×
1003
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
1004
            msg += ').\n\n{}\n\n{}'.format(
×
1005
                _('Verify the cable is connected and that '
1006
                  'no other application is using it.'),
1007
                _('Try to connect again?')
1008
            )
1009
            if not handler.yes_no_question(msg):
×
1010
                raise UserCancelled()
×
1011
            devices = None
×
1012

1013
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
1014
        # method 1: select device by id
1015
        if keystore.soft_device_id:
×
1016
            for info in infos:
×
1017
                if info.soft_device_id == keystore.soft_device_id:
×
1018
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
1019
                    return info
×
1020
        # method 2: select device by label
1021
        #           but only if not a placeholder label and only if there is no collision
1022
        device_labels = [info.label for info in infos]
×
1023
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
1024
                and device_labels.count(keystore.label) == 1):
1025
            for info in infos:
×
1026
                if info.label == keystore.label:
×
1027
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
1028
                    return info
×
1029
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
1030
        #           saved for keystore anyway, select it
1031
        if (len(infos) == 1
×
1032
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
1033
                and keystore.soft_device_id is None):
1034
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
1035
            return infos[0]
×
1036

1037
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
1038
        if not allow_user_interaction:
×
1039
            raise CannotAutoSelectDevice()
×
1040
        # ask user to select device manually
1041
        msg = (
×
1042
                _("Could not automatically pair with device for given keystore.") + "\n"
1043
                + f"(keystore label: {keystore.label!r}, "
1044
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
1045
        msg += _("Please select which {} device to use:").format(plugin.device)
×
1046
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
1047
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
1048
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
1049
                                init=(_("initialized") if info.initialized else _("wiped")),
1050
                                transport=info.device.transport_ui_string,
1051
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
1052
                        for info in infos]
1053
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
1054
                          f"num options: {len(infos)}. options: {infos}")
1055
        c = handler.query_choice(msg, descriptions)
×
1056
        if c is None:
×
1057
            raise UserCancelled()
×
1058
        info = infos[c]
×
1059
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
1060
        # note: updated label/soft_device_id will be saved after pairing succeeds
1061
        return info
×
1062

1063
    @runs_in_hwd_thread
5✔
1064
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
1065
        try:
×
1066
            import hid  # noqa: F811
×
1067
        except ImportError:
×
1068
            return []
×
1069

1070
        devices = []
×
1071
        for d in hid.enumerate(0, 0):
×
1072
            vendor_id = d['vendor_id']
×
1073
            product_key = (vendor_id, d['product_id'])
×
1074
            plugin = None
×
1075
            if product_key in self._recognised_hardware:
×
1076
                plugin = self._recognised_hardware[product_key]
×
1077
            elif vendor_id in self._recognised_vendor:
×
1078
                plugin = self._recognised_vendor[vendor_id]
×
1079
            if plugin:
×
1080
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
1081
                if device:
×
1082
                    devices.append(device)
×
1083
        return devices
×
1084

1085
    @runs_in_hwd_thread
5✔
1086
    @profiler
5✔
1087
    def scan_devices(self) -> Sequence['Device']:
5✔
1088
        self.logger.info("scanning devices...")
×
1089

1090
        # First see what's connected that we know about
1091
        devices = self._scan_devices_with_hid()
×
1092

1093
        # Let plugin handlers enumerate devices we don't know about
1094
        with self.lock:
×
1095
            enumerate_funcs = list(self._enumerate_func)
×
1096
        for f in enumerate_funcs:
×
1097
            try:
×
1098
                new_devices = f()
×
1099
            except BaseException as e:
×
1100
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
1101
            else:
1102
                devices.extend(new_devices)
×
1103

1104
        # find out what was disconnected
1105
        client_ids = [dev.id_ for dev in devices]
×
1106
        disconnected_clients = []
×
1107
        with self.lock:
×
1108
            connected = {}
×
1109
            for client, id_ in self.clients.items():
×
1110
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1111
                    connected[client] = id_
×
1112
                else:
1113
                    disconnected_clients.append((client, id_))
×
1114
            self.clients = connected
×
1115

1116
        # Unpair disconnected devices
1117
        for client, id_ in disconnected_clients:
×
1118
            self.unpair_id(id_)
×
1119
            if client.handler:
×
1120
                client.handler.update_status(False)
×
1121

1122
        return devices
×
1123

1124
    @classmethod
5✔
1125
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1126
        ret = {}
×
1127
        # add libusb
1128
        try:
×
1129
            import usb1
×
1130
        except Exception as e:
×
1131
            ret["libusb.version"] = None
×
1132
        else:
1133
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1134
            try:
×
1135
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1136
            except AttributeError:
×
1137
                ret["libusb.path"] = None
×
1138
        # add hidapi
1139
        try:
×
1140
            import hid  # noqa: F811
×
1141
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1142
        except Exception as e:
×
1143
            from importlib.metadata import version
×
1144
            try:
×
1145
                ret["hidapi.version"] = version("hidapi")
×
1146
            except ImportError:
×
1147
                ret["hidapi.version"] = None
×
1148
        return ret
×
1149

1150
    def trigger_pairings(
5✔
1151
            self,
1152
            keystores: Sequence['KeyStore'],
1153
            *,
1154
            allow_user_interaction: bool = True,
1155
            devices: Sequence['Device'] = None,
1156
    ) -> None:
1157
        """Given a list of keystores, try to pair each with a connected hardware device.
1158

1159
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1160
        try to pair each keystore individually. Consider the following scenario:
1161
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1162
        - assume none of the devices are paired yet
1163
        1. if we tried to individually pair keystores, we might try with ks1 first
1164
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1165
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1166
             which is confusing and error-prone. It's especially problematic if the hw device does
1167
             not support labels (such as Ledger), as then the user cannot easily distinguish
1168
             same-type devices. (see #4199)
1169
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1170
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1171
        """
1172
        from .keystore import Hardware_KeyStore
×
1173
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1174
        if not keystores:
×
1175
            return
×
1176
        if devices is None:
×
1177
            devices = self.scan_devices()
×
1178
        # first pair with all devices that can be auto-selected
1179
        for ks in keystores:
×
1180
            try:
×
1181
                ks.get_client(
×
1182
                    force_pair=True,
1183
                    allow_user_interaction=False,
1184
                    devices=devices,
1185
                )
1186
            except UserCancelled:
×
1187
                pass
×
1188
        if allow_user_interaction:
×
1189
            # now do manual selections
1190
            for ks in keystores:
×
1191
                try:
×
1192
                    ks.get_client(
×
1193
                        force_pair=True,
1194
                        allow_user_interaction=True,
1195
                        devices=devices,
1196
                    )
1197
                except UserCancelled:
×
1198
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc