• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5288265869164544

17 Mar 2025 03:18PM UTC coverage: 61.181% (+0.02%) from 61.158%
5288265869164544

Pull #9646

CirrusCI

f321x
allow all plugins to be either zip or directory based
Pull Request #9646: Allow all plugins to be either zip or directory based

39 of 73 new or added lines in 1 file covered. (53.42%)

125 existing lines in 7 files now uncovered.

21371 of 34931 relevant lines covered (61.18%)

2.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.83
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
4✔
27
import pkgutil
4✔
28
import importlib.util
4✔
29
import importlib.machinery
4✔
30
import time
4✔
31
import threading
4✔
32
import traceback
4✔
33
import sys
4✔
34
import aiohttp
4✔
35

36
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
4✔
37
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
38
import concurrent
4✔
39
import zipimport
4✔
40
from concurrent import futures
4✔
41
from functools import wraps, partial
4✔
42
from itertools import chain
4✔
43

44
from .i18n import _
4✔
45
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
4✔
46
from . import bip32
4✔
47
from . import plugins
4✔
48
from .simple_config import SimpleConfig
4✔
49
from .logging import get_logger, Logger
4✔
50
from .crypto import sha256
4✔
51

52
if TYPE_CHECKING:
4✔
53
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
54
    from .keystore import Hardware_KeyStore, KeyStore
×
UNCOV
55
    from .wallet import Abstract_Wallet
×
56

57

58
_logger = get_logger(__name__)
4✔
59
plugin_loaders = {}
4✔
60
hook_names = set()
4✔
61
hooks = {}
4✔
62

63

64
class Plugins(DaemonThread):
4✔
65

66
    LOGGING_SHORTCUT = 'p'
4✔
67
    pkgpath = os.path.dirname(plugins.__file__)
4✔
68

69
    @profiler
4✔
70
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
4✔
71
        self.config = config
4✔
72
        self.cmd_only = cmd_only  # type: bool
4✔
73
        self.internal_plugin_metadata = {}
4✔
74
        self.external_plugin_metadata = {}
4✔
75
        self.loaded_command_modules = set()  # type: set[str]
4✔
76
        if cmd_only:
4✔
77
            # only import the command modules of plugins
78
            Logger.__init__(self)
×
79
            self.find_plugins()
×
UNCOV
80
            return
×
81
        DaemonThread.__init__(self)
4✔
82
        self.device_manager = DeviceMgr(config)
4✔
83
        self.name = 'Plugins'  # set name of thread
4✔
84
        self.hw_wallets = {}
4✔
85
        self.plugins = {}  # type: Dict[str, BasePlugin]
4✔
86
        self.gui_name = gui_name
4✔
87
        self.find_plugins()
4✔
88
        self.load_plugins()
4✔
89
        self.add_jobs(self.device_manager.thread_jobs())
4✔
90
        self.start()
4✔
91

92
    @property
4✔
93
    def descriptions(self):
4✔
UNCOV
94
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
95

96
    def find_directory_plugins(self, pkg_path: str, external: bool):
4✔
97
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
98
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
4✔
99
        for loader, name, ispkg in iter_modules:
4✔
100
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
101
            #       once as data and once as code. To honor the "no duplicates" rule below,
102
            #       we exclude the ones packaged as *code*, here:
103
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
4✔
UNCOV
104
                continue
×
105
            if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
4✔
NEW
106
                continue
×
107
            base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
4✔
108
            full_name = f'{base_name}.{name}' + ('.commands' if self.cmd_only else '')
4✔
109
            if external:
4✔
110
                # construct ModuleSpec manually as importlib won't find it in the external dir
NEW
111
                module_path = os.path.join(pkg_path, name)
×
NEW
112
                if not self._has_recursive_root_permissions(module_path):
×
NEW
113
                    self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
NEW
114
                    continue
×
NEW
115
                module_path = module_path + '/' + ('commands.py' if self.cmd_only else '__init__.py')
×
NEW
116
                if not os.path.exists(module_path):
×
NEW
117
                    continue
×
NEW
118
                spec = importlib.util.spec_from_file_location(full_name, module_path)
×
119
            else:
120
                spec = importlib.util.find_spec(full_name)
4✔
121
            if spec is None:
4✔
122
                if self.cmd_only:
×
123
                    continue # no commands module in this plugin
×
UNCOV
124
                raise Exception(f"Error pre-loading {full_name}: no spec")
×
125
            module = self.exec_module_from_spec(spec, full_name)
4✔
126
            if self.cmd_only:
4✔
127
                assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
128
                self.loaded_command_modules.add(name)
×
UNCOV
129
                continue
×
130
            d = module.__dict__
4✔
131
            if 'fullname' not in d:
4✔
132
                continue
4✔
133
            d['display_name'] = d['fullname']
4✔
134
            gui_good = self.gui_name in d.get('available_for', [])
4✔
135
            if not gui_good:
4✔
136
                continue
4✔
137
            details = d.get('registers_wallet_type')
4✔
138
            if details:
4✔
139
                self.register_wallet_type(name, gui_good, details)
4✔
140
            details = d.get('registers_keystore')
4✔
141
            if details:
4✔
142
                self.register_keystore(name, gui_good, details)
4✔
143
            if d.get('requires_wallet_type'):
4✔
144
                # trustedcoin will not be added to list
145
                continue
4✔
146
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
4✔
147
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
NEW
148
                raise Exception(f"duplicate plugins? for {name=}")
×
149
            if not external:
4✔
150
                self.internal_plugin_metadata[name] = d
4✔
151
            else:
UNCOV
152
                self.external_plugin_metadata[name] = d
×
153

154
    @staticmethod
4✔
155
    def exec_module_from_spec(spec, path):
4✔
156
        try:
4✔
157
            module = importlib.util.module_from_spec(spec)
4✔
158
            # sys.modules needs to be modified for relative imports to work
159
            # see https://stackoverflow.com/a/50395128
160
            sys.modules[path] = module
4✔
161
            spec.loader.exec_module(module)
4✔
162
        except Exception as e:
×
UNCOV
163
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
164
        return module
4✔
165

166
    def find_plugins(self):
4✔
167
        internal_plugins_path = (self.pkgpath, False)
4✔
168
        external_plugins_path = (self.get_external_plugin_dir(), True)
4✔
169
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
4✔
170
            # external plugins enforce root permissions on the directory
171
            if pkg_path and os.path.exists(pkg_path):
4✔
172
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
4✔
173
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
4✔
174

175
    def load_plugins(self):
4✔
176
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
4✔
177
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
4✔
NEW
178
                try:
×
179
                    self.load_plugin_by_name(name)
×
180
                except BaseException as e:
×
UNCOV
181
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
182

183
    def _has_root_permissions(self, path):
4✔
UNCOV
184
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
185

186
    @profiler(min_threshold=0.5)
4✔
187
    def _has_recursive_root_permissions(self, path):
4✔
188
        """Check if a directory and all its subdirectories have root permissions"""
NEW
189
        for root, dirs, files in os.walk(path):
×
NEW
190
            if not self._has_root_permissions(root):
×
NEW
191
                return False
×
NEW
192
            for f in files:
×
NEW
193
                if not self._has_root_permissions(os.path.join(root, f)):
×
NEW
194
                    return False
×
NEW
195
        return True
×
196

197
    def get_external_plugin_dir(self):
4✔
198
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
4✔
UNCOV
199
            return
×
200
        pkg_path = '/opt/electrum_plugins'
4✔
201
        if not os.path.exists(pkg_path):
4✔
202
            self.logger.info(f'directory {pkg_path} does not exist')
4✔
203
            return
4✔
204
        if not self._has_root_permissions(pkg_path):
×
205
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
206
            return
×
UNCOV
207
        return pkg_path
×
208

209
    def zip_plugin_path(self, name):
4✔
NEW
210
        filename = self.get_metadata(name)['filename']
×
NEW
211
        if name in self.internal_plugin_metadata:
×
NEW
212
            pkg_path = self.pkgpath
×
213
        else:
NEW
214
            pkg_path = self.get_external_plugin_dir()
×
UNCOV
215
        return os.path.join(pkg_path, filename)
×
216

217
    def find_zip_plugins(self, pkg_path: str, external: bool):
4✔
218
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
219
        if pkg_path is None:
4✔
220
            return
×
221
        for filename in os.listdir(pkg_path):
4✔
222
            path = os.path.join(pkg_path, filename)
4✔
223
            if not filename.endswith('.zip'):
4✔
224
                continue
4✔
225
            if external and not self._has_root_permissions(path):
×
226
                self.logger.info(f'not loading {path}: file has user write permissions')
×
227
                continue
×
228
            try:
×
229
                zipfile = zipimport.zipimporter(path)
×
230
            except zipimport.ZipImportError:
×
231
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
232
                continue
×
233
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
234
                if b is False:
×
235
                    continue
×
236
                if name in self.internal_plugin_metadata:
×
237
                    raise Exception(f"duplicate plugins for name={name}")
×
238
                if name in self.external_plugin_metadata:
×
NEW
239
                    raise Exception(f"duplicate plugins for name={name}")
×
240
                module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
×
241
                spec = zipfile.find_spec(name)
×
242
                module = self.exec_module_from_spec(spec, module_path)
×
243
                if self.cmd_only:
×
244
                    if self.config.get('enable_plugin_' + name) is not True:
×
245
                        continue
×
246
                    spec2 = importlib.util.find_spec(module_path + '.commands')
×
247
                    if spec2 is None:  # no commands module in this plugin
×
248
                        continue
×
249
                    self.exec_module_from_spec(spec2, module_path + '.commands')
×
250
                    assert name not in self.loaded_command_modules, f"duplicate command modules for: {name}"
×
251
                    self.loaded_command_modules.add(name)
×
252
                    continue
×
253
                d = module.__dict__
×
254
                gui_good = self.gui_name in d.get('available_for', [])
×
255
                if not gui_good:
×
256
                    continue
×
257
                d['filename'] = filename
×
258
                if 'fullname' not in d:
×
259
                    continue
×
260
                d['display_name'] = d['fullname']
×
NEW
261
                d['zip_hash_sha256'] = get_file_hash256(path)
×
NEW
262
                d['is_zip'] = True
×
NEW
263
                if external:
×
NEW
264
                    self.external_plugin_metadata[name] = d
×
265
                else:
UNCOV
266
                    self.internal_plugin_metadata[name] = d
×
267

268
    def get(self, name):
4✔
UNCOV
269
        return self.plugins.get(name)
×
270

271
    def count(self):
4✔
UNCOV
272
        return len(self.plugins)
×
273

274
    def load_plugin(self, name) -> 'BasePlugin':
4✔
275
        """Imports the code of the given plugin.
276
        note: can be called from any thread.
277
        """
278
        if self.get_metadata(name):
4✔
279
            return self.load_plugin_by_name(name)
4✔
280
        else:
UNCOV
281
            raise Exception(f"could not find plugin {name!r}")
×
282

283
    def load_plugin_by_name(self, name) -> 'BasePlugin':
4✔
284
        if name in self.plugins:
4✔
NEW
285
            return self.plugins[name]
×
286

287
        is_zip = self.is_plugin_zip(name)
4✔
288
        is_external = name in self.external_plugin_metadata
4✔
289
        if not is_external:
4✔
290
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
4✔
291
        else:
NEW
292
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
293

294
        spec = importlib.util.find_spec(full_name)
4✔
295
        if spec is None:
4✔
UNCOV
296
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
297
        try:
4✔
298
            if is_zip:
4✔
NEW
299
                module = self.exec_module_from_spec(spec, full_name)
×
300
            else:
301
                module = importlib.util.module_from_spec(spec)
4✔
302
                spec.loader.exec_module(module)
4✔
303
            plugin = module.Plugin(self, self.config, name)
4✔
304
        except Exception as e:
×
UNCOV
305
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
306
        self.add_jobs(plugin.thread_jobs())
4✔
307
        self.plugins[name] = plugin
4✔
308
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
4✔
309
        return plugin
4✔
310

311
    def close_plugin(self, plugin):
4✔
UNCOV
312
        self.remove_jobs(plugin.thread_jobs())
×
313

314
    def enable(self, name: str) -> 'BasePlugin':
4✔
315
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
316
        p = self.get(name)
×
317
        if p:
×
318
            return p
×
UNCOV
319
        return self.load_plugin(name)
×
320

321
    def disable(self, name: str) -> None:
4✔
322
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
323
        p = self.get(name)
×
324
        if not p:
×
325
            return
×
326
        self.plugins.pop(name)
×
327
        p.close()
×
UNCOV
328
        self.logger.info(f"closed {name}")
×
329

330
    @classmethod
4✔
331
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
4✔
UNCOV
332
        return key.startswith('enable_plugin_')
×
333

334
    def toggle(self, name: str) -> Optional['BasePlugin']:
4✔
335
        p = self.get(name)
×
UNCOV
336
        return self.disable(name) if p else self.enable(name)
×
337

338
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
4✔
339
        d = self.descriptions.get(name)
×
340
        if not d:
×
341
            return False
×
342
        deps = d.get('requires', [])
×
343
        for dep, s in deps:
×
344
            try:
×
345
                __import__(dep)
×
346
            except ImportError as e:
×
347
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
348
                return False
×
349
        requires = d.get('requires_wallet_type', [])
×
UNCOV
350
        return not requires or wallet.wallet_type in requires
×
351

352
    def get_hardware_support(self):
4✔
353
        out = []
×
354
        for name, (gui_good, details) in self.hw_wallets.items():
×
355
            if gui_good:
×
356
                try:
×
357
                    p = self.get_plugin(name)
×
358
                    if p.is_available():
×
UNCOV
359
                        out.append(HardwarePluginToScan(name=name,
×
360
                                                        description=details[2],
361
                                                        plugin=p,
362
                                                        exception=None))
363
                except Exception as e:
×
364
                    self.logger.exception(f"cannot load plugin for: {name}")
×
UNCOV
365
                    out.append(HardwarePluginToScan(name=name,
×
366
                                                    description=details[2],
367
                                                    plugin=None,
368
                                                    exception=e))
UNCOV
369
        return out
×
370

371
    def register_wallet_type(self, name, gui_good, wallet_type):
4✔
372
        from .wallet import register_wallet_type, register_constructor
4✔
373
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
4✔
374

375
        def loader():
4✔
376
            plugin = self.get_plugin(name)
4✔
377
            register_constructor(wallet_type, plugin.wallet_class)
4✔
378
        register_wallet_type(wallet_type)
4✔
379
        plugin_loaders[wallet_type] = loader
4✔
380

381
    def register_keystore(self, name, gui_good, details):
4✔
382
        from .keystore import register_keystore
4✔
383

384
        def dynamic_constructor(d):
4✔
385
            return self.get_plugin(name).keystore_class(d)
4✔
386
        if details[0] == 'hardware':
4✔
387
            self.hw_wallets[name] = (gui_good, details)
4✔
388
            self.logger.info(f"registering hardware {name}: {details}")
4✔
389
            register_keystore(details[1], dynamic_constructor)
4✔
390

391
    def get_plugin(self, name: str) -> 'BasePlugin':
4✔
392
        if name not in self.plugins:
4✔
393
            self.load_plugin(name)
4✔
394
        return self.plugins[name]
4✔
395

396
    def is_plugin_zip(self, name: str) -> bool:
4✔
397
        """Returns True if the plugin is a zip file"""
398
        if (metadata := self.get_metadata(name)) is None:
4✔
399
            return False
4✔
400
        return metadata.get('is_zip', False)
4✔
401

402
    def get_metadata(self, name: str) -> Optional[dict]:
4✔
403
        """Returns the metadata of the plugin"""
404
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
4✔
405
        if not metadata:
4✔
406
            return None
4✔
407
        return metadata
4✔
408

409
    def run(self):
4✔
410
        while self.is_running():
4✔
411
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
4✔
412
            self.run_jobs()
4✔
413
        self.on_stop()
4✔
414

415

416
def get_file_hash256(path: str) -> str:
4✔
417
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
418
    with open(path, 'rb') as f:
×
UNCOV
419
        return sha256(f.read()).hex()
×
420

421
def hook(func):
4✔
422
    hook_names.add(func.__name__)
4✔
423
    return func
4✔
424

425

426
def run_hook(name, *args):
4✔
427
    results = []
4✔
428
    f_list = hooks.get(name, [])
4✔
429
    for p, f in f_list:
4✔
430
        if p.is_enabled():
4✔
431
            try:
4✔
432
                r = f(*args)
4✔
433
            except Exception:
×
434
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
UNCOV
435
                r = False
×
436
            if r:
4✔
UNCOV
437
                results.append(r)
×
438

439
    if results:
4✔
440
        assert len(results) == 1, results
×
UNCOV
441
        return results[0]
×
442

443

444
class BasePlugin(Logger):
4✔
445

446
    def __init__(self, parent, config: 'SimpleConfig', name):
4✔
447
        self.parent = parent  # type: Plugins  # The plugins object
4✔
448
        self.name = name
4✔
449
        self.config = config
4✔
450
        self.wallet = None  # fixme: this field should not exist
4✔
451
        Logger.__init__(self)
4✔
452
        # add self to hooks
453
        for k in dir(self):
4✔
454
            if k in hook_names:
4✔
455
                l = hooks.get(k, [])
4✔
456
                l.append((self, getattr(self, k)))
4✔
457
                hooks[k] = l
4✔
458

459
    def __str__(self):
4✔
UNCOV
460
        return self.name
×
461

462
    def close(self):
4✔
463
        # remove self from hooks
464
        for attr_name in dir(self):
×
UNCOV
465
            if attr_name in hook_names:
×
466
                # found attribute in self that is also the name of a hook
467
                l = hooks.get(attr_name, [])
×
468
                try:
×
469
                    l.remove((self, getattr(self, attr_name)))
×
UNCOV
470
                except ValueError:
×
471
                    # maybe attr name just collided with hook name and was not hook
472
                    continue
×
473
                hooks[attr_name] = l
×
474
        self.parent.close_plugin(self)
×
UNCOV
475
        self.on_close()
×
476

477
    def on_close(self):
4✔
UNCOV
478
        pass
×
479

480
    def requires_settings(self) -> bool:
4✔
UNCOV
481
        return False
×
482

483
    def thread_jobs(self):
4✔
484
        return []
4✔
485

486
    def is_enabled(self):
4✔
UNCOV
487
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
488

489
    def is_available(self):
4✔
UNCOV
490
        return True
×
491

492
    def can_user_disable(self):
4✔
UNCOV
493
        return True
×
494

495
    def settings_widget(self, window):
4✔
UNCOV
496
        raise NotImplementedError()
×
497

498
    def settings_dialog(self, window):
4✔
UNCOV
499
        raise NotImplementedError()
×
500

501
    def read_file(self, filename: str) -> bytes:
4✔
NEW
502
        import zipfile
×
NEW
503
        if self.parent.is_plugin_zip(self.name):
×
504
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
505
            with zipfile.ZipFile(plugin_filename) as myzip:
×
506
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
UNCOV
507
                    return myfile.read()
×
508
        else:
NEW
509
            if filename in self.parent.internal_plugin_metadata:
×
NEW
510
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
511
            else:
512
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
513
            with open(path, 'rb') as myfile:
×
UNCOV
514
                return myfile.read()
×
515

516

517
class DeviceUnpairableError(UserFacingException): pass
4✔
518
class HardwarePluginLibraryUnavailable(Exception): pass
4✔
519
class CannotAutoSelectDevice(Exception): pass
4✔
520

521

522
class Device(NamedTuple):
4✔
523
    path: Union[str, bytes]
4✔
524
    interface_number: int
4✔
525
    id_: str
4✔
526
    product_key: Any   # when using hid, often Tuple[int, int]
4✔
527
    usage_page: int
4✔
528
    transport_ui_string: str
4✔
529

530

531
class DeviceInfo(NamedTuple):
4✔
532
    device: Device
4✔
533
    label: Optional[str] = None
4✔
534
    initialized: Optional[bool] = None
4✔
535
    exception: Optional[Exception] = None
4✔
536
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
4✔
537
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
4✔
538
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
4✔
539

540

541
class HardwarePluginToScan(NamedTuple):
4✔
542
    name: str
4✔
543
    description: str
4✔
544
    plugin: Optional['HW_PluginBase']
4✔
545
    exception: Optional[Exception]
4✔
546

547

548
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
4✔
549

550

551
# hidapi is not thread-safe
552
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
553
#     https://github.com/libusb/hidapi/issues/45
554
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
555
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
556
# It is not entirely clear to me, exactly what is safe and what isn't, when
557
# using multiple threads...
558
# Hence, we use a single thread for all device communications, including
559
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
560
# the following thread:
561
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
4✔
562
    max_workers=1,
563
    thread_name_prefix='hwd_comms_thread'
564
)
565

566
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
567
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
568
# To keep it simple, let's just import it now, as we are likely in the main thread here.
569
if threading.current_thread() is not threading.main_thread():
4✔
UNCOV
570
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
571
try:
4✔
572
    import hid
4✔
573
except ImportError:
4✔
574
    pass
4✔
575

576

577
T = TypeVar('T')
4✔
578

579

580
def run_in_hwd_thread(func: Callable[[], T]) -> T:
4✔
581
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
582
        return func()
×
583
    else:
584
        fut = _hwd_comms_executor.submit(func)
×
UNCOV
585
        return fut.result()
×
586
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
587

588

589
def runs_in_hwd_thread(func):
4✔
590
    @wraps(func)
4✔
591
    def wrapper(*args, **kwargs):
4✔
UNCOV
592
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
593
    return wrapper
4✔
594

595

596
def assert_runs_in_hwd_thread():
4✔
597
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
UNCOV
598
        raise Exception("must only be called from HWD communication thread")
×
599

600

601
class DeviceMgr(ThreadJob):
4✔
602
    """Manages hardware clients.  A client communicates over a hardware
603
    channel with the device.
604

605
    In addition to tracking device HID IDs, the device manager tracks
606
    hardware wallets and manages wallet pairing.  A HID ID may be
607
    paired with a wallet when it is confirmed that the hardware device
608
    matches the wallet, i.e. they have the same master public key.  A
609
    HID ID can be unpaired if e.g. it is wiped.
610

611
    Because of hotplugging, a wallet must request its client
612
    dynamically each time it is required, rather than caching it
613
    itself.
614

615
    The device manager is shared across plugins, so just one place
616
    does hardware scans when needed.  By tracking HID IDs, if a device
617
    is plugged into a different port the wallet is automatically
618
    re-paired.
619

620
    Wallets are informed on connect / disconnect events.  It must
621
    implement connected(), disconnected() callbacks.  Being connected
622
    implies a pairing.  Callbacks can happen in any thread context,
623
    and we do them without holding the lock.
624

625
    Confusingly, the HID ID (serial number) reported by the HID system
626
    doesn't match the device ID reported by the device itself.  We use
627
    the HID IDs.
628

629
    This plugin is thread-safe.  Currently only devices supported by
630
    hidapi are implemented."""
631

632
    def __init__(self, config: SimpleConfig):
4✔
633
        ThreadJob.__init__(self)
4✔
634
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
635
        self.pairing_code_to_id = {}  # type: Dict[str, str]
4✔
636
        # A client->id_ map. Needs self.lock.
637
        self.clients = {}  # type: Dict[HardwareClientBase, str]
4✔
638
        # What we recognise.  (vendor_id, product_id) -> Plugin
639
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
4✔
640
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
4✔
641
        # Custom enumerate functions for devices we don't know about.
642
        self._enumerate_func = set()  # Needs self.lock.
4✔
643

644
        self.lock = threading.RLock()
4✔
645

646
        self.config = config
4✔
647

648
    def thread_jobs(self):
4✔
649
        # Thread job to handle device timeouts
650
        return [self]
4✔
651

652
    def run(self):
4✔
653
        '''Handle device timeouts.  Runs in the context of the Plugins
654
        thread.'''
655
        with self.lock:
4✔
656
            clients = list(self.clients.keys())
4✔
657
        cutoff = time.time() - self.config.get_session_timeout()
4✔
658
        for client in clients:
4✔
UNCOV
659
            client.timeout(cutoff)
×
660

661
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
4✔
662
        for pair in device_pairs:
×
UNCOV
663
            self._recognised_hardware[pair] = plugin
×
664

665
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
4✔
666
        for vendor_id in vendor_ids:
×
UNCOV
667
            self._recognised_vendor[vendor_id] = plugin
×
668

669
    def register_enumerate_func(self, func):
4✔
670
        with self.lock:
×
UNCOV
671
            self._enumerate_func.add(func)
×
672

673
    @runs_in_hwd_thread
4✔
674
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
4✔
675
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
676
        # Get from cache first
677
        client = self._client_by_id(device.id_)
×
678
        if client:
×
679
            return client
×
680
        client = plugin.create_client(device, handler)
×
681
        if client:
×
682
            self.logger.info(f"Registering {client}")
×
683
            with self.lock:
×
684
                self.clients[client] = device.id_
×
UNCOV
685
        return client
×
686

687
    def id_by_pairing_code(self, pairing_code):
4✔
688
        with self.lock:
×
UNCOV
689
            return self.pairing_code_to_id.get(pairing_code)
×
690

691
    def pairing_code_by_id(self, id_):
4✔
692
        with self.lock:
×
693
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
694
                if id2 == id_:
×
695
                    return pairing_code
×
UNCOV
696
            return None
×
697

698
    def unpair_pairing_code(self, pairing_code):
4✔
699
        with self.lock:
×
700
            if pairing_code not in self.pairing_code_to_id:
×
701
                return
×
702
            _id = self.pairing_code_to_id.pop(pairing_code)
×
UNCOV
703
        self._close_client(_id)
×
704

705
    def unpair_id(self, id_):
4✔
706
        pairing_code = self.pairing_code_by_id(id_)
×
707
        if pairing_code:
×
UNCOV
708
            self.unpair_pairing_code(pairing_code)
×
709
        else:
UNCOV
710
            self._close_client(id_)
×
711

712
    def _close_client(self, id_):
4✔
713
        with self.lock:
×
714
            client = self._client_by_id(id_)
×
715
            self.clients.pop(client, None)
×
716
        if client:
×
UNCOV
717
            client.close()
×
718

719
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
4✔
720
        with self.lock:
×
721
            for client, client_id in self.clients.items():
×
722
                if client_id == id_:
×
723
                    return client
×
UNCOV
724
        return None
×
725

726
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
4✔
727
        '''Returns a client for the device ID if one is registered.  If
728
        a device is wiped or in bootloader mode pairing is impossible;
729
        in such cases we communicate by device ID and not wallet.'''
730
        if scan_now:
×
731
            self.scan_devices()
×
UNCOV
732
        return self._client_by_id(id_)
×
733

734
    @runs_in_hwd_thread
4✔
735
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
4✔
736
                            keystore: 'Hardware_KeyStore',
737
                            force_pair: bool, *,
738
                            devices: Sequence['Device'] = None,
739
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
740
        self.logger.info("getting client for keystore")
×
741
        if handler is None:
×
742
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
743
        handler.update_status(False)
×
744
        pcode = keystore.pairing_code()
×
UNCOV
745
        client = None
×
746
        # search existing clients first (fast-path)
747
        if not devices:
×
UNCOV
748
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
749
        # search clients again, now allowing a (slow) scan
750
        if client is None:
×
751
            if devices is None:
×
752
                devices = self.scan_devices()
×
753
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
754
        if client is None and force_pair:
×
755
            try:
×
UNCOV
756
                info = self.select_device(plugin, handler, keystore, devices,
×
757
                                          allow_user_interaction=allow_user_interaction)
758
            except CannotAutoSelectDevice:
×
UNCOV
759
                pass
×
760
            else:
761
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
762
        if client:
×
UNCOV
763
            handler.update_status(True)
×
764
            # note: if select_device was called, we might also update label etc here:
765
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
766
        self.logger.info("end client for keystore")
×
UNCOV
767
        return client
×
768

769
    def client_by_pairing_code(
4✔
770
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
771
        devices: Sequence['Device'],
772
    ) -> Optional['HardwareClientBase']:
773
        _id = self.id_by_pairing_code(pairing_code)
×
774
        client = self._client_by_id(_id)
×
775
        if client:
×
776
            if type(client.plugin) != type(plugin):
×
UNCOV
777
                return
×
778
            # An unpaired client might have another wallet's handler
779
            # from a prior scan.  Replace to fix dialog parenting.
780
            client.handler = handler
×
UNCOV
781
            return client
×
782

783
        for device in devices:
×
784
            if device.id_ == _id:
×
UNCOV
785
                return self.create_client(device, handler, plugin)
×
786

787
    def force_pair_keystore(
4✔
788
        self,
789
        *,
790
        plugin: 'HW_PluginBase',
791
        handler: 'HardwareHandlerBase',
792
        info: 'DeviceInfo',
793
        keystore: 'Hardware_KeyStore',
794
    ) -> 'HardwareClientBase':
795
        xpub = keystore.xpub
×
796
        derivation = keystore.get_derivation_prefix()
×
797
        assert derivation is not None
×
798
        xtype = bip32.xpub_type(xpub)
×
799
        client = self._client_by_id(info.device.id_)
×
UNCOV
800
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
801
            # See comment above for same code
UNCOV
802
            client.handler = handler
×
803
            # This will trigger a PIN/passphrase entry request
804
            try:
×
805
                client_xpub = client.get_xpub(derivation, xtype)
×
UNCOV
806
            except (UserCancelled, RuntimeError):
×
807
                # Bad / cancelled PIN / passphrase
808
                client_xpub = None
×
809
            if client_xpub == xpub:
×
810
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
811
                with self.lock:
×
812
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
UNCOV
813
                return client
×
814

815
        # The user input has wrong PIN or passphrase, or cancelled input,
816
        # or it is not pairable
UNCOV
817
        raise DeviceUnpairableError(
×
818
            _('Electrum cannot pair with your {}.\n\n'
819
              'Before you request bitcoins to be sent to addresses in this '
820
              'wallet, ensure you can pair with your device, or that you have '
821
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
822
              'receive will be unspendable.').format(plugin.device))
823

824
    def list_pairable_device_infos(
4✔
825
        self,
826
        *,
827
        handler: Optional['HardwareHandlerBase'],
828
        plugin: 'HW_PluginBase',
829
        devices: Sequence['Device'] = None,
830
        include_failing_clients: bool = False,
831
    ) -> List['DeviceInfo']:
832
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
833
        Already paired devices are also included, as it is okay to reuse them.
834
        """
835
        if not plugin.libraries_available:
×
836
            message = plugin.get_library_not_available_message()
×
837
            raise HardwarePluginLibraryUnavailable(message)
×
838
        if devices is None:
×
839
            devices = self.scan_devices()
×
840
        infos = []
×
841
        for device in devices:
×
842
            if not plugin.can_recognize_device(device):
×
843
                continue
×
844
            try:
×
845
                client = self.create_client(device, handler, plugin)
×
846
                if not client:
×
847
                    continue
×
848
                label = client.label()
×
849
                is_initialized = client.is_initialized()
×
850
                soft_device_id = client.get_soft_device_id()
×
851
                model_name = client.device_model_name()
×
852
            except Exception as e:
×
853
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
854
                if include_failing_clients:
×
855
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
856
                continue
×
UNCOV
857
            infos.append(DeviceInfo(device=device,
×
858
                                    label=label,
859
                                    initialized=is_initialized,
860
                                    plugin_name=plugin.name,
861
                                    soft_device_id=soft_device_id,
862
                                    model_name=model_name))
863

UNCOV
864
        return infos
×
865

866
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
4✔
867
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
868
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
869
        """Select the device to use for keystore."""
870
        # ideally this should not be called from the GUI thread...
871
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
872
        while True:
×
873
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
874
            if infos:
×
875
                break
×
876
            if not allow_user_interaction:
×
877
                raise CannotAutoSelectDevice()
×
878
            msg = _('Please insert your {}').format(plugin.device)
×
879
            msg += " ("
×
880
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
881
                msg += f"label: {keystore.label}, "
×
882
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
UNCOV
883
            msg += ').\n\n{}\n\n{}'.format(
×
884
                _('Verify the cable is connected and that '
885
                  'no other application is using it.'),
886
                _('Try to connect again?')
887
            )
888
            if not handler.yes_no_question(msg):
×
889
                raise UserCancelled()
×
UNCOV
890
            devices = None
×
891

892
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
893
        # method 1: select device by id
894
        if keystore.soft_device_id:
×
895
            for info in infos:
×
896
                if info.soft_device_id == keystore.soft_device_id:
×
897
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
UNCOV
898
                    return info
×
899
        # method 2: select device by label
900
        #           but only if not a placeholder label and only if there is no collision
901
        device_labels = [info.label for info in infos]
×
UNCOV
902
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
903
                and device_labels.count(keystore.label) == 1):
904
            for info in infos:
×
905
                if info.label == keystore.label:
×
906
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
UNCOV
907
                    return info
×
908
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
909
        #           saved for keystore anyway, select it
UNCOV
910
        if (len(infos) == 1
×
911
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
912
                and keystore.soft_device_id is None):
913
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
UNCOV
914
            return infos[0]
×
915

916
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
917
        if not allow_user_interaction:
×
UNCOV
918
            raise CannotAutoSelectDevice()
×
919
        # ask user to select device manually
UNCOV
920
        msg = (
×
921
                _("Could not automatically pair with device for given keystore.") + "\n"
922
                + f"(keystore label: {keystore.label!r}, "
923
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
924
        msg += _("Please select which {} device to use:").format(plugin.device)
×
925
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
UNCOV
926
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
927
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
928
                                init=(_("initialized") if info.initialized else _("wiped")),
929
                                transport=info.device.transport_ui_string,
930
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
931
                        for info in infos]
UNCOV
932
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
933
                          f"num options: {len(infos)}. options: {infos}")
934
        c = handler.query_choice(msg, descriptions)
×
935
        if c is None:
×
936
            raise UserCancelled()
×
937
        info = infos[c]
×
UNCOV
938
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
939
        # note: updated label/soft_device_id will be saved after pairing succeeds
UNCOV
940
        return info
×
941

942
    @runs_in_hwd_thread
4✔
943
    def _scan_devices_with_hid(self) -> List['Device']:
4✔
944
        try:
×
945
            import hid
×
946
        except ImportError:
×
UNCOV
947
            return []
×
948

949
        devices = []
×
950
        for d in hid.enumerate(0, 0):
×
951
            vendor_id = d['vendor_id']
×
952
            product_key = (vendor_id, d['product_id'])
×
953
            plugin = None
×
954
            if product_key in self._recognised_hardware:
×
955
                plugin = self._recognised_hardware[product_key]
×
956
            elif vendor_id in self._recognised_vendor:
×
957
                plugin = self._recognised_vendor[vendor_id]
×
958
            if plugin:
×
959
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
960
                if device:
×
961
                    devices.append(device)
×
UNCOV
962
        return devices
×
963

964
    @runs_in_hwd_thread
4✔
965
    @profiler
4✔
966
    def scan_devices(self) -> Sequence['Device']:
4✔
UNCOV
967
        self.logger.info("scanning devices...")
×
968

969
        # First see what's connected that we know about
UNCOV
970
        devices = self._scan_devices_with_hid()
×
971

972
        # Let plugin handlers enumerate devices we don't know about
973
        with self.lock:
×
974
            enumerate_funcs = list(self._enumerate_func)
×
975
        for f in enumerate_funcs:
×
976
            try:
×
977
                new_devices = f()
×
978
            except BaseException as e:
×
UNCOV
979
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
980
            else:
UNCOV
981
                devices.extend(new_devices)
×
982

983
        # find out what was disconnected
984
        client_ids = [dev.id_ for dev in devices]
×
985
        disconnected_clients = []
×
986
        with self.lock:
×
987
            connected = {}
×
988
            for client, id_ in self.clients.items():
×
989
                if id_ in client_ids and client.has_usable_connection_with_device():
×
UNCOV
990
                    connected[client] = id_
×
991
                else:
992
                    disconnected_clients.append((client, id_))
×
UNCOV
993
            self.clients = connected
×
994

995
        # Unpair disconnected devices
996
        for client, id_ in disconnected_clients:
×
997
            self.unpair_id(id_)
×
998
            if client.handler:
×
UNCOV
999
                client.handler.update_status(False)
×
1000

UNCOV
1001
        return devices
×
1002

1003
    @classmethod
4✔
1004
    def version_info(cls) -> Mapping[str, Optional[str]]:
4✔
UNCOV
1005
        ret = {}
×
1006
        # add libusb
1007
        try:
×
1008
            import usb1
×
1009
        except Exception as e:
×
UNCOV
1010
            ret["libusb.version"] = None
×
1011
        else:
1012
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1013
            try:
×
1014
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1015
            except AttributeError:
×
UNCOV
1016
                ret["libusb.path"] = None
×
1017
        # add hidapi
1018
        try:
×
1019
            import hid
×
1020
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1021
        except Exception as e:
×
1022
            from importlib.metadata import version
×
1023
            try:
×
1024
                ret["hidapi.version"] = version("hidapi")
×
1025
            except ImportError:
×
1026
                ret["hidapi.version"] = None
×
UNCOV
1027
        return ret
×
1028

1029
    def trigger_pairings(
4✔
1030
            self,
1031
            keystores: Sequence['KeyStore'],
1032
            *,
1033
            allow_user_interaction: bool = True,
1034
            devices: Sequence['Device'] = None,
1035
    ) -> None:
1036
        """Given a list of keystores, try to pair each with a connected hardware device.
1037

1038
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1039
        try to pair each keystore individually. Consider the following scenario:
1040
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1041
        - assume none of the devices are paired yet
1042
        1. if we tried to individually pair keystores, we might try with ks1 first
1043
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1044
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1045
             which is confusing and error-prone. It's especially problematic if the hw device does
1046
             not support labels (such as Ledger), as then the user cannot easily distinguish
1047
             same-type devices. (see #4199)
1048
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1049
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1050
        """
1051
        from .keystore import Hardware_KeyStore
×
1052
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1053
        if not keystores:
×
1054
            return
×
1055
        if devices is None:
×
UNCOV
1056
            devices = self.scan_devices()
×
1057
        # first pair with all devices that can be auto-selected
1058
        for ks in keystores:
×
1059
            try:
×
UNCOV
1060
                ks.get_client(
×
1061
                    force_pair=True,
1062
                    allow_user_interaction=False,
1063
                    devices=devices,
1064
                )
1065
            except UserCancelled:
×
1066
                pass
×
UNCOV
1067
        if allow_user_interaction:
×
1068
            # now do manual selections
1069
            for ks in keystores:
×
1070
                try:
×
UNCOV
1071
                    ks.get_client(
×
1072
                        force_pair=True,
1073
                        allow_user_interaction=True,
1074
                        devices=devices,
1075
                    )
1076
                except UserCancelled:
×
UNCOV
1077
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc