• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5119618206924800

19 Mar 2025 09:52AM UTC coverage: 61.133% (-0.05%) from 61.178%
5119618206924800

push

CirrusCI

web-flow
Merge pull request #9651 from f321x/plugin_manifest_json

Use manifest.json instead of loading init file for plugin registration

36 of 69 new or added lines in 2 files covered. (52.17%)

32 existing lines in 5 files now uncovered.

21363 of 34945 relevant lines covered (61.13%)

3.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.18
/electrum/plugin.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2015-2024 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import json
5✔
27
import os
5✔
28
import pkgutil
5✔
29
import importlib.util
5✔
30
import time
5✔
31
import threading
5✔
32
import traceback
5✔
33
import sys
5✔
34
import aiohttp
5✔
35
import zipfile as zipfile_lib
5✔
36

37
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
5✔
38
                    Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
39
import concurrent
5✔
40
import zipimport
5✔
41
from concurrent import futures
5✔
42
from functools import wraps, partial
5✔
43
from itertools import chain
5✔
44

45
from .i18n import _
5✔
46
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
5✔
47
from . import bip32
5✔
48
from . import plugins
5✔
49
from .simple_config import SimpleConfig
5✔
50
from .logging import get_logger, Logger
5✔
51
from .crypto import sha256
5✔
52

53
if TYPE_CHECKING:
5✔
54
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
55
    from .keystore import Hardware_KeyStore, KeyStore
×
56
    from .wallet import Abstract_Wallet
×
57

58

59
_logger = get_logger(__name__)
5✔
60
plugin_loaders = {}
5✔
61
hook_names = set()
5✔
62
hooks = {}
5✔
63
_root_permission_cache = {}
5✔
64

65

66
class Plugins(DaemonThread):
5✔
67

68
    LOGGING_SHORTCUT = 'p'
5✔
69
    pkgpath = os.path.dirname(plugins.__file__)
5✔
70

71
    @profiler
5✔
72
    def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
5✔
73
        self.config = config
5✔
74
        self.cmd_only = cmd_only  # type: bool
5✔
75
        self.internal_plugin_metadata = {}
5✔
76
        self.external_plugin_metadata = {}
5✔
77
        if cmd_only:
5✔
78
            # only import the command modules of plugins
79
            Logger.__init__(self)
×
80
            self.find_plugins()
×
NEW
81
            self.load_plugins()
×
UNCOV
82
            return
×
83
        DaemonThread.__init__(self)
5✔
84
        self.device_manager = DeviceMgr(config)
5✔
85
        self.name = 'Plugins'  # set name of thread
5✔
86
        self.hw_wallets = {}
5✔
87
        self.plugins = {}  # type: Dict[str, BasePlugin]
5✔
88
        self.gui_name = gui_name
5✔
89
        self.find_plugins()
5✔
90
        self.load_plugins()
5✔
91
        self.add_jobs(self.device_manager.thread_jobs())
5✔
92
        self.start()
5✔
93

94
    @property
5✔
95
    def descriptions(self):
5✔
96
        return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
×
97

98
    def find_directory_plugins(self, pkg_path: str, external: bool):
5✔
99
        """Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
100
        iter_modules = list(pkgutil.iter_modules([pkg_path]))
5✔
101
        for loader, name, ispkg in iter_modules:
5✔
102
            # FIXME pyinstaller binaries are packaging each built-in plugin twice:
103
            #       once as data and once as code. To honor the "no duplicates" rule below,
104
            #       we exclude the ones packaged as *code*, here:
105
            if loader.__class__.__qualname__ == "PyiFrozenImporter":
5✔
106
                continue
×
107
            module_path = os.path.join(pkg_path, name)
5✔
108
            if external and not self._has_recursive_root_permissions(module_path):
5✔
NEW
109
                self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
×
UNCOV
110
                continue
×
111
            if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
5✔
NEW
112
                continue
×
113
            try:
5✔
114
                with open(os.path.join(module_path, 'manifest.json'), 'r') as f:
5✔
115
                    d = json.load(f)
5✔
116
            except FileNotFoundError:
5✔
117
                self.logger.info(f"could not find manifest.json of plugin {name}, skipping...")
5✔
118
                continue
5✔
119
            if 'fullname' not in d:
5✔
UNCOV
120
                continue
×
121
            d['display_name'] = d['fullname']
5✔
122
            d['path'] = module_path
5✔
123
            if not self.cmd_only:
5✔
124
                gui_good = self.gui_name in d.get('available_for', [])
5✔
125
                if not gui_good:
5✔
126
                    continue
5✔
127
                details = d.get('registers_wallet_type')
5✔
128
                if details:
5✔
129
                    self.register_wallet_type(name, gui_good, details)
5✔
130
                details = d.get('registers_keystore')
5✔
131
                if details:
5✔
132
                    self.register_keystore(name, gui_good, details)
5✔
133
            if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
5✔
134
                _logger.info(f"Found the following plugin modules: {iter_modules=}")
×
135
                raise Exception(f"duplicate plugins? for {name=}")
×
136
            if not external:
5✔
137
                self.internal_plugin_metadata[name] = d
5✔
138
            else:
139
                self.external_plugin_metadata[name] = d
×
140

141
    @staticmethod
5✔
142
    def exec_module_from_spec(spec, path):
5✔
143
        try:
5✔
144
            module = importlib.util.module_from_spec(spec)
5✔
145
            # sys.modules needs to be modified for relative imports to work
146
            # see https://stackoverflow.com/a/50395128
147
            sys.modules[path] = module
5✔
148
            spec.loader.exec_module(module)
5✔
149
        except Exception as e:
×
150
            raise Exception(f"Error pre-loading {path}: {repr(e)}") from e
×
151
        return module
5✔
152

153
    def find_plugins(self):
5✔
154
        internal_plugins_path = (self.pkgpath, False)
5✔
155
        external_plugins_path = (self.get_external_plugin_dir(), True)
5✔
156
        for pkg_path, external in (internal_plugins_path, external_plugins_path):
5✔
157
            # external plugins enforce root permissions on the directory
158
            if pkg_path and os.path.exists(pkg_path):
5✔
159
                self.find_directory_plugins(pkg_path=pkg_path, external=external)
5✔
160
                self.find_zip_plugins(pkg_path=pkg_path, external=external)
5✔
161

162
    def load_plugins(self):
5✔
163
        for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
5✔
164
            if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
5✔
165
                try:
×
NEW
166
                    if self.cmd_only:  # only load init method to register commands
×
NEW
167
                        self.maybe_load_plugin_init_method(name)
×
168
                    else:
NEW
169
                        self.load_plugin_by_name(name)
×
170
                except BaseException as e:
×
171
                    self.logger.exception(f"cannot initialize plugin {name}: {e}")
×
172

173
    def _has_root_permissions(self, path):
5✔
174
        return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
×
175

176
    @profiler(min_threshold=0.5)
5✔
177
    def _has_recursive_root_permissions(self, path):
5✔
178
        """Check if a directory and all its subdirectories have root permissions"""
179
        global _root_permission_cache
NEW
180
        if _root_permission_cache.get(path) is not None:
×
NEW
181
            return _root_permission_cache[path]
×
NEW
182
        _root_permission_cache[path] = False
×
183
        for root, dirs, files in os.walk(path):
×
184
            if not self._has_root_permissions(root):
×
185
                return False
×
186
            for f in files:
×
187
                if not self._has_root_permissions(os.path.join(root, f)):
×
188
                    return False
×
NEW
189
        _root_permission_cache[path] = True
×
UNCOV
190
        return True
×
191

192
    def get_external_plugin_dir(self):
5✔
193
        if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
5✔
194
            return
×
195
        pkg_path = '/opt/electrum_plugins'
5✔
196
        if not os.path.exists(pkg_path):
5✔
197
            self.logger.info(f'directory {pkg_path} does not exist')
5✔
198
            return
5✔
199
        if not self._has_root_permissions(pkg_path):
×
200
            self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
×
201
            return
×
202
        return pkg_path
×
203

204
    def zip_plugin_path(self, name):
5✔
205
        filename = self.get_metadata(name)['filename']
×
206
        if name in self.internal_plugin_metadata:
×
207
            pkg_path = self.pkgpath
×
208
        else:
209
            pkg_path = self.get_external_plugin_dir()
×
210
        return os.path.join(pkg_path, filename)
×
211

212
    def find_zip_plugins(self, pkg_path: str, external: bool):
5✔
213
        """Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
214
        if pkg_path is None:
5✔
215
            return
×
216
        for filename in os.listdir(pkg_path):
5✔
217
            path = os.path.join(pkg_path, filename)
5✔
218
            if not filename.endswith('.zip'):
5✔
219
                continue
5✔
220
            if external and not self._has_root_permissions(path):
×
221
                self.logger.info(f'not loading {path}: file has user write permissions')
×
222
                continue
×
223
            try:
×
224
                zipfile = zipimport.zipimporter(path)
×
225
            except zipimport.ZipImportError:
×
226
                self.logger.exception(f"unable to load zip plugin '{filename}'")
×
227
                continue
×
228
            for name, b in pkgutil.iter_zipimport_modules(zipfile):
×
229
                if b is False:
×
230
                    continue
×
231
                if name in self.internal_plugin_metadata:
×
232
                    raise Exception(f"duplicate plugins for name={name}")
×
233
                if name in self.external_plugin_metadata:
×
234
                    raise Exception(f"duplicate plugins for name={name}")
×
235
                if self.cmd_only and not self.config.get('enable_plugin_' + name):
×
236
                    continue
×
NEW
237
                try:
×
NEW
238
                    with zipfile_lib.ZipFile(path) as file:
×
NEW
239
                        manifest_path = os.path.join(name, 'manifest.json')
×
NEW
240
                        with file.open(manifest_path, 'r') as f:
×
NEW
241
                            d = json.load(f)
×
NEW
242
                except Exception:
×
NEW
243
                    self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
×
244
                    continue
×
245
                d['filename'] = filename
×
246
                d['is_zip'] = True
×
NEW
247
                d['path'] = path
×
NEW
248
                if not self.cmd_only:
×
NEW
249
                    gui_good = self.gui_name in d.get('available_for', [])
×
NEW
250
                    if not gui_good:
×
NEW
251
                        continue
×
NEW
252
                    if 'fullname' not in d:
×
NEW
253
                        continue
×
NEW
254
                    d['display_name'] = d['fullname']
×
NEW
255
                    d['zip_hash_sha256'] = get_file_hash256(path)
×
256
                if external:
×
257
                    self.external_plugin_metadata[name] = d
×
258
                else:
259
                    self.internal_plugin_metadata[name] = d
×
260

261
    def get(self, name):
5✔
262
        return self.plugins.get(name)
×
263

264
    def count(self):
5✔
265
        return len(self.plugins)
×
266

267
    def load_plugin(self, name) -> 'BasePlugin':
5✔
268
        """Imports the code of the given plugin.
269
        note: can be called from any thread.
270
        """
271
        if self.get_metadata(name):
5✔
272
            return self.load_plugin_by_name(name)
5✔
273
        else:
274
            raise Exception(f"could not find plugin {name!r}")
×
275

276
    def maybe_load_plugin_init_method(self, name: str) -> None:
5✔
277
        """Loads the __init__.py module of the plugin if it is not already loaded."""
278
        is_external = name in self.external_plugin_metadata
5✔
279
        base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
5✔
280
        if base_name not in sys.modules:
5✔
281
            metadata = self.get_metadata(name)
5✔
282
            is_zip = metadata.get('is_zip', False)
5✔
283
            # if the plugin was not enabled on startup the init module hasn't been loaded yet
284
            if not is_zip:
5✔
285
                if is_external:
5✔
NEW
286
                    path = os.path.join(metadata['path'], '__init__.py')
×
NEW
287
                    init_spec = importlib.util.spec_from_file_location(base_name, path)
×
288
                else:
289
                    init_spec = importlib.util.find_spec(base_name)
5✔
290
            else:
NEW
291
                zipfile = zipimport.zipimporter(metadata['path'])
×
NEW
292
                init_spec = zipfile.find_spec(name)
×
293
            self.exec_module_from_spec(init_spec, base_name)
5✔
294
            if name == "trustedcoin":
5✔
295
                # removes trustedcoin after loading to not show it in the list of plugins
NEW
296
                del self.internal_plugin_metadata[name]
×
297

298
    def load_plugin_by_name(self, name: str) -> 'BasePlugin':
5✔
299
        if name in self.plugins:
5✔
300
            return self.plugins[name]
×
301

302
        # if the plugin was not enabled on startup the init module hasn't been loaded yet
303
        self.maybe_load_plugin_init_method(name)
5✔
304

305
        is_external = name in self.external_plugin_metadata
5✔
306
        if not is_external:
5✔
307
            full_name = f'electrum.plugins.{name}.{self.gui_name}'
5✔
308
        else:
309
            full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
×
310

311
        spec = importlib.util.find_spec(full_name)
5✔
312
        if spec is None:
5✔
313
            raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
×
314
        try:
5✔
315
            module = self.exec_module_from_spec(spec, full_name)
5✔
316
            plugin = module.Plugin(self, self.config, name)
5✔
317
        except Exception as e:
×
318
            raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
×
319
        self.add_jobs(plugin.thread_jobs())
5✔
320
        self.plugins[name] = plugin
5✔
321
        self.logger.info(f"loaded plugin {name!r}. (from thread: {threading.current_thread().name!r})")
5✔
322
        return plugin
5✔
323

324
    def close_plugin(self, plugin):
5✔
325
        self.remove_jobs(plugin.thread_jobs())
×
326

327
    def enable(self, name: str) -> 'BasePlugin':
5✔
328
        self.config.set_key('enable_plugin_' + name, True, save=True)
×
329
        p = self.get(name)
×
330
        if p:
×
331
            return p
×
332
        return self.load_plugin(name)
×
333

334
    def disable(self, name: str) -> None:
5✔
335
        self.config.set_key('enable_plugin_' + name, False, save=True)
×
336
        p = self.get(name)
×
337
        if not p:
×
338
            return
×
339
        self.plugins.pop(name)
×
340
        p.close()
×
341
        self.logger.info(f"closed {name}")
×
342

343
    @classmethod
5✔
344
    def is_plugin_enabler_config_key(cls, key: str) -> bool:
5✔
345
        return key.startswith('enable_plugin_')
×
346

347
    def toggle(self, name: str) -> Optional['BasePlugin']:
5✔
348
        p = self.get(name)
×
349
        return self.disable(name) if p else self.enable(name)
×
350

351
    def is_available(self, name: str, wallet: 'Abstract_Wallet') -> bool:
5✔
352
        d = self.descriptions.get(name)
×
353
        if not d:
×
354
            return False
×
355
        deps = d.get('requires', [])
×
356
        for dep, s in deps:
×
357
            try:
×
358
                __import__(dep)
×
359
            except ImportError as e:
×
360
                self.logger.warning(f'Plugin {name} unavailable: {repr(e)}')
×
361
                return False
×
362
        requires = d.get('requires_wallet_type', [])
×
363
        return not requires or wallet.wallet_type in requires
×
364

365
    def get_hardware_support(self):
5✔
366
        out = []
×
367
        for name, (gui_good, details) in self.hw_wallets.items():
×
368
            if gui_good:
×
369
                try:
×
370
                    p = self.get_plugin(name)
×
371
                    if p.is_available():
×
372
                        out.append(HardwarePluginToScan(name=name,
×
373
                                                        description=details[2],
374
                                                        plugin=p,
375
                                                        exception=None))
376
                except Exception as e:
×
377
                    self.logger.exception(f"cannot load plugin for: {name}")
×
378
                    out.append(HardwarePluginToScan(name=name,
×
379
                                                    description=details[2],
380
                                                    plugin=None,
381
                                                    exception=e))
382
        return out
×
383

384
    def register_wallet_type(self, name, gui_good, wallet_type):
5✔
385
        from .wallet import register_wallet_type, register_constructor
5✔
386
        self.logger.info(f"registering wallet type {(wallet_type, name)}")
5✔
387

388
        def loader():
5✔
389
            plugin = self.get_plugin(name)
5✔
390
            register_constructor(wallet_type, plugin.wallet_class)
5✔
391
        register_wallet_type(wallet_type)
5✔
392
        plugin_loaders[wallet_type] = loader
5✔
393

394
    def register_keystore(self, name, gui_good, details):
5✔
395
        from .keystore import register_keystore
5✔
396

397
        def dynamic_constructor(d):
5✔
398
            return self.get_plugin(name).keystore_class(d)
5✔
399
        if details[0] == 'hardware':
5✔
400
            self.hw_wallets[name] = (gui_good, details)
5✔
401
            self.logger.info(f"registering hardware {name}: {details}")
5✔
402
            register_keystore(details[1], dynamic_constructor)
5✔
403

404
    def get_plugin(self, name: str) -> 'BasePlugin':
5✔
405
        if name not in self.plugins:
5✔
406
            self.load_plugin(name)
5✔
407
        return self.plugins[name]
5✔
408

409
    def is_plugin_zip(self, name: str) -> bool:
5✔
410
        """Returns True if the plugin is a zip file"""
UNCOV
411
        if (metadata := self.get_metadata(name)) is None:
×
UNCOV
412
            return False
×
UNCOV
413
        return metadata.get('is_zip', False)
×
414

415
    def get_metadata(self, name: str) -> Optional[dict]:
5✔
416
        """Returns the metadata of the plugin"""
417
        metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
5✔
418
        if not metadata:
5✔
UNCOV
419
            return None
×
420
        return metadata
5✔
421

422
    def run(self):
5✔
423
        while self.is_running():
5✔
424
            self.wake_up_event.wait(0.1)  # time.sleep(0.1) OR event
5✔
425
            self.run_jobs()
5✔
426
        self.on_stop()
5✔
427

428

429
def get_file_hash256(path: str) -> str:
5✔
430
    '''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
431
    with open(path, 'rb') as f:
×
432
        return sha256(f.read()).hex()
×
433

434
def hook(func):
5✔
435
    hook_names.add(func.__name__)
5✔
436
    return func
5✔
437

438

439
def run_hook(name, *args):
5✔
440
    results = []
5✔
441
    f_list = hooks.get(name, [])
5✔
442
    for p, f in f_list:
5✔
443
        if p.is_enabled():
5✔
444
            try:
5✔
445
                r = f(*args)
5✔
446
            except Exception:
×
447
                _logger.exception(f"Plugin error. plugin: {p}, hook: {name}")
×
448
                r = False
×
449
            if r:
5✔
450
                results.append(r)
×
451

452
    if results:
5✔
453
        assert len(results) == 1, results
×
454
        return results[0]
×
455

456

457
class BasePlugin(Logger):
5✔
458

459
    def __init__(self, parent, config: 'SimpleConfig', name):
5✔
460
        self.parent = parent  # type: Plugins  # The plugins object
5✔
461
        self.name = name
5✔
462
        self.config = config
5✔
463
        self.wallet = None  # fixme: this field should not exist
5✔
464
        Logger.__init__(self)
5✔
465
        # add self to hooks
466
        for k in dir(self):
5✔
467
            if k in hook_names:
5✔
468
                l = hooks.get(k, [])
5✔
469
                l.append((self, getattr(self, k)))
5✔
470
                hooks[k] = l
5✔
471

472
    def __str__(self):
5✔
473
        return self.name
×
474

475
    def close(self):
5✔
476
        # remove self from hooks
477
        for attr_name in dir(self):
×
478
            if attr_name in hook_names:
×
479
                # found attribute in self that is also the name of a hook
480
                l = hooks.get(attr_name, [])
×
481
                try:
×
482
                    l.remove((self, getattr(self, attr_name)))
×
483
                except ValueError:
×
484
                    # maybe attr name just collided with hook name and was not hook
485
                    continue
×
486
                hooks[attr_name] = l
×
487
        self.parent.close_plugin(self)
×
488
        self.on_close()
×
489

490
    def on_close(self):
5✔
491
        pass
×
492

493
    def requires_settings(self) -> bool:
5✔
494
        return False
×
495

496
    def thread_jobs(self):
5✔
497
        return []
5✔
498

499
    def is_enabled(self):
5✔
500
        return self.is_available() and self.config.get('enable_plugin_' + self.name) is True
×
501

502
    def is_available(self):
5✔
503
        return True
×
504

505
    def can_user_disable(self):
5✔
506
        return True
×
507

508
    def settings_widget(self, window):
5✔
509
        raise NotImplementedError()
×
510

511
    def settings_dialog(self, window):
5✔
512
        raise NotImplementedError()
×
513

514
    def read_file(self, filename: str) -> bytes:
5✔
515
        if self.parent.is_plugin_zip(self.name):
×
516
            plugin_filename = self.parent.zip_plugin_path(self.name)
×
NEW
517
            with zipfile_lib.ZipFile(plugin_filename) as myzip:
×
518
                with myzip.open(os.path.join(self.name, filename)) as myfile:
×
519
                    return myfile.read()
×
520
        else:
521
            if self.name in self.parent.internal_plugin_metadata:
×
522
                path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
×
523
            else:
524
                path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
×
525
            with open(path, 'rb') as myfile:
×
526
                return myfile.read()
×
527

528

529
class DeviceUnpairableError(UserFacingException): pass
5✔
530
class HardwarePluginLibraryUnavailable(Exception): pass
5✔
531
class CannotAutoSelectDevice(Exception): pass
5✔
532

533

534
class Device(NamedTuple):
5✔
535
    path: Union[str, bytes]
5✔
536
    interface_number: int
5✔
537
    id_: str
5✔
538
    product_key: Any   # when using hid, often Tuple[int, int]
5✔
539
    usage_page: int
5✔
540
    transport_ui_string: str
5✔
541

542

543
class DeviceInfo(NamedTuple):
5✔
544
    device: Device
5✔
545
    label: Optional[str] = None
5✔
546
    initialized: Optional[bool] = None
5✔
547
    exception: Optional[Exception] = None
5✔
548
    plugin_name: Optional[str] = None  # manufacturer, e.g. "trezor"
5✔
549
    soft_device_id: Optional[str] = None  # if available, used to distinguish same-type hw devices
5✔
550
    model_name: Optional[str] = None  # e.g. "Ledger Nano S"
5✔
551

552

553
class HardwarePluginToScan(NamedTuple):
5✔
554
    name: str
5✔
555
    description: str
5✔
556
    plugin: Optional['HW_PluginBase']
5✔
557
    exception: Optional[Exception]
5✔
558

559

560
PLACEHOLDER_HW_CLIENT_LABELS = {None, "", " "}
5✔
561

562

563
# hidapi is not thread-safe
564
# see https://github.com/signal11/hidapi/issues/205#issuecomment-527654560
565
#     https://github.com/libusb/hidapi/issues/45
566
#     https://github.com/signal11/hidapi/issues/45#issuecomment-4434598
567
#     https://github.com/signal11/hidapi/pull/414#issuecomment-445164238
568
# It is not entirely clear to me, exactly what is safe and what isn't, when
569
# using multiple threads...
570
# Hence, we use a single thread for all device communications, including
571
# enumeration. Everything that uses hidapi, libusb, etc, MUST run on
572
# the following thread:
573
_hwd_comms_executor = concurrent.futures.ThreadPoolExecutor(
5✔
574
    max_workers=1,
575
    thread_name_prefix='hwd_comms_thread'
576
)
577

578
# hidapi needs to be imported from the main thread. Otherwise, at least on macOS,
579
# segfaults will follow. (see https://github.com/trezor/cython-hidapi/pull/150#issuecomment-1542391087)
580
# To keep it simple, let's just import it now, as we are likely in the main thread here.
581
if threading.current_thread() is not threading.main_thread():
5✔
582
    _logger.warning("expected to be in main thread... hidapi will not be safe to use now!")
×
583
try:
5✔
584
    import hid
5✔
585
except ImportError:
5✔
586
    pass
5✔
587

588

589
T = TypeVar('T')
5✔
590

591

592
def run_in_hwd_thread(func: Callable[[], T]) -> T:
5✔
593
    if threading.current_thread().name.startswith("hwd_comms_thread"):
×
594
        return func()
×
595
    else:
596
        fut = _hwd_comms_executor.submit(func)
×
597
        return fut.result()
×
598
        #except (concurrent.futures.CancelledError, concurrent.futures.TimeoutError) as e:
599

600

601
def runs_in_hwd_thread(func):
5✔
602
    @wraps(func)
5✔
603
    def wrapper(*args, **kwargs):
5✔
604
        return run_in_hwd_thread(partial(func, *args, **kwargs))
×
605
    return wrapper
5✔
606

607

608
def assert_runs_in_hwd_thread():
5✔
609
    if not threading.current_thread().name.startswith("hwd_comms_thread"):
×
610
        raise Exception("must only be called from HWD communication thread")
×
611

612

613
class DeviceMgr(ThreadJob):
5✔
614
    """Manages hardware clients.  A client communicates over a hardware
615
    channel with the device.
616

617
    In addition to tracking device HID IDs, the device manager tracks
618
    hardware wallets and manages wallet pairing.  A HID ID may be
619
    paired with a wallet when it is confirmed that the hardware device
620
    matches the wallet, i.e. they have the same master public key.  A
621
    HID ID can be unpaired if e.g. it is wiped.
622

623
    Because of hotplugging, a wallet must request its client
624
    dynamically each time it is required, rather than caching it
625
    itself.
626

627
    The device manager is shared across plugins, so just one place
628
    does hardware scans when needed.  By tracking HID IDs, if a device
629
    is plugged into a different port the wallet is automatically
630
    re-paired.
631

632
    Wallets are informed on connect / disconnect events.  It must
633
    implement connected(), disconnected() callbacks.  Being connected
634
    implies a pairing.  Callbacks can happen in any thread context,
635
    and we do them without holding the lock.
636

637
    Confusingly, the HID ID (serial number) reported by the HID system
638
    doesn't match the device ID reported by the device itself.  We use
639
    the HID IDs.
640

641
    This plugin is thread-safe.  Currently only devices supported by
642
    hidapi are implemented."""
643

644
    def __init__(self, config: SimpleConfig):
5✔
645
        ThreadJob.__init__(self)
5✔
646
        # A pairing_code->id_ map. Item only present if we have active pairing. Needs self.lock.
647
        self.pairing_code_to_id = {}  # type: Dict[str, str]
5✔
648
        # A client->id_ map. Needs self.lock.
649
        self.clients = {}  # type: Dict[HardwareClientBase, str]
5✔
650
        # What we recognise.  (vendor_id, product_id) -> Plugin
651
        self._recognised_hardware = {}  # type: Dict[Tuple[int, int], HW_PluginBase]
5✔
652
        self._recognised_vendor = {}  # type: Dict[int, HW_PluginBase]  # vendor_id -> Plugin
5✔
653
        # Custom enumerate functions for devices we don't know about.
654
        self._enumerate_func = set()  # Needs self.lock.
5✔
655

656
        self.lock = threading.RLock()
5✔
657

658
        self.config = config
5✔
659

660
    def thread_jobs(self):
5✔
661
        # Thread job to handle device timeouts
662
        return [self]
5✔
663

664
    def run(self):
5✔
665
        '''Handle device timeouts.  Runs in the context of the Plugins
666
        thread.'''
667
        with self.lock:
5✔
668
            clients = list(self.clients.keys())
5✔
669
        cutoff = time.time() - self.config.get_session_timeout()
5✔
670
        for client in clients:
5✔
671
            client.timeout(cutoff)
×
672

673
    def register_devices(self, device_pairs, *, plugin: 'HW_PluginBase'):
5✔
674
        for pair in device_pairs:
×
675
            self._recognised_hardware[pair] = plugin
×
676

677
    def register_vendor_ids(self, vendor_ids: Iterable[int], *, plugin: 'HW_PluginBase'):
5✔
678
        for vendor_id in vendor_ids:
×
679
            self._recognised_vendor[vendor_id] = plugin
×
680

681
    def register_enumerate_func(self, func):
5✔
682
        with self.lock:
×
683
            self._enumerate_func.add(func)
×
684

685
    @runs_in_hwd_thread
5✔
686
    def create_client(self, device: 'Device', handler: Optional['HardwareHandlerBase'],
5✔
687
                      plugin: 'HW_PluginBase') -> Optional['HardwareClientBase']:
688
        # Get from cache first
689
        client = self._client_by_id(device.id_)
×
690
        if client:
×
691
            return client
×
692
        client = plugin.create_client(device, handler)
×
693
        if client:
×
694
            self.logger.info(f"Registering {client}")
×
695
            with self.lock:
×
696
                self.clients[client] = device.id_
×
697
        return client
×
698

699
    def id_by_pairing_code(self, pairing_code):
5✔
700
        with self.lock:
×
701
            return self.pairing_code_to_id.get(pairing_code)
×
702

703
    def pairing_code_by_id(self, id_):
5✔
704
        with self.lock:
×
705
            for pairing_code, id2 in self.pairing_code_to_id.items():
×
706
                if id2 == id_:
×
707
                    return pairing_code
×
708
            return None
×
709

710
    def unpair_pairing_code(self, pairing_code):
5✔
711
        with self.lock:
×
712
            if pairing_code not in self.pairing_code_to_id:
×
713
                return
×
714
            _id = self.pairing_code_to_id.pop(pairing_code)
×
715
        self._close_client(_id)
×
716

717
    def unpair_id(self, id_):
5✔
718
        pairing_code = self.pairing_code_by_id(id_)
×
719
        if pairing_code:
×
720
            self.unpair_pairing_code(pairing_code)
×
721
        else:
722
            self._close_client(id_)
×
723

724
    def _close_client(self, id_):
5✔
725
        with self.lock:
×
726
            client = self._client_by_id(id_)
×
727
            self.clients.pop(client, None)
×
728
        if client:
×
729
            client.close()
×
730

731
    def _client_by_id(self, id_) -> Optional['HardwareClientBase']:
5✔
732
        with self.lock:
×
733
            for client, client_id in self.clients.items():
×
734
                if client_id == id_:
×
735
                    return client
×
736
        return None
×
737

738
    def client_by_id(self, id_, *, scan_now: bool = True) -> Optional['HardwareClientBase']:
5✔
739
        '''Returns a client for the device ID if one is registered.  If
740
        a device is wiped or in bootloader mode pairing is impossible;
741
        in such cases we communicate by device ID and not wallet.'''
742
        if scan_now:
×
743
            self.scan_devices()
×
744
        return self._client_by_id(id_)
×
745

746
    @runs_in_hwd_thread
5✔
747
    def client_for_keystore(self, plugin: 'HW_PluginBase', handler: Optional['HardwareHandlerBase'],
5✔
748
                            keystore: 'Hardware_KeyStore',
749
                            force_pair: bool, *,
750
                            devices: Sequence['Device'] = None,
751
                            allow_user_interaction: bool = True) -> Optional['HardwareClientBase']:
752
        self.logger.info("getting client for keystore")
×
753
        if handler is None:
×
754
            raise Exception(_("Handler not found for {}").format(plugin.name) + '\n' + _("A library is probably missing."))
×
755
        handler.update_status(False)
×
756
        pcode = keystore.pairing_code()
×
757
        client = None
×
758
        # search existing clients first (fast-path)
759
        if not devices:
×
760
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=[])
×
761
        # search clients again, now allowing a (slow) scan
762
        if client is None:
×
763
            if devices is None:
×
764
                devices = self.scan_devices()
×
765
            client = self.client_by_pairing_code(plugin=plugin, pairing_code=pcode, handler=handler, devices=devices)
×
766
        if client is None and force_pair:
×
767
            try:
×
768
                info = self.select_device(plugin, handler, keystore, devices,
×
769
                                          allow_user_interaction=allow_user_interaction)
770
            except CannotAutoSelectDevice:
×
771
                pass
×
772
            else:
773
                client = self.force_pair_keystore(plugin=plugin, handler=handler, info=info, keystore=keystore)
×
774
        if client:
×
775
            handler.update_status(True)
×
776
            # note: if select_device was called, we might also update label etc here:
777
            keystore.opportunistically_fill_in_missing_info_from_device(client)
×
778
        self.logger.info("end client for keystore")
×
779
        return client
×
780

781
    def client_by_pairing_code(
5✔
782
        self, *, plugin: 'HW_PluginBase', pairing_code: str, handler: 'HardwareHandlerBase',
783
        devices: Sequence['Device'],
784
    ) -> Optional['HardwareClientBase']:
785
        _id = self.id_by_pairing_code(pairing_code)
×
786
        client = self._client_by_id(_id)
×
787
        if client:
×
788
            if type(client.plugin) != type(plugin):
×
789
                return
×
790
            # An unpaired client might have another wallet's handler
791
            # from a prior scan.  Replace to fix dialog parenting.
792
            client.handler = handler
×
793
            return client
×
794

795
        for device in devices:
×
796
            if device.id_ == _id:
×
797
                return self.create_client(device, handler, plugin)
×
798

799
    def force_pair_keystore(
5✔
800
        self,
801
        *,
802
        plugin: 'HW_PluginBase',
803
        handler: 'HardwareHandlerBase',
804
        info: 'DeviceInfo',
805
        keystore: 'Hardware_KeyStore',
806
    ) -> 'HardwareClientBase':
807
        xpub = keystore.xpub
×
808
        derivation = keystore.get_derivation_prefix()
×
809
        assert derivation is not None
×
810
        xtype = bip32.xpub_type(xpub)
×
811
        client = self._client_by_id(info.device.id_)
×
812
        if client and client.is_pairable() and type(client.plugin) == type(plugin):
×
813
            # See comment above for same code
814
            client.handler = handler
×
815
            # This will trigger a PIN/passphrase entry request
816
            try:
×
817
                client_xpub = client.get_xpub(derivation, xtype)
×
818
            except (UserCancelled, RuntimeError):
×
819
                # Bad / cancelled PIN / passphrase
820
                client_xpub = None
×
821
            if client_xpub == xpub:
×
822
                keystore.opportunistically_fill_in_missing_info_from_device(client)
×
823
                with self.lock:
×
824
                    self.pairing_code_to_id[keystore.pairing_code()] = info.device.id_
×
825
                return client
×
826

827
        # The user input has wrong PIN or passphrase, or cancelled input,
828
        # or it is not pairable
829
        raise DeviceUnpairableError(
×
830
            _('Electrum cannot pair with your {}.\n\n'
831
              'Before you request bitcoins to be sent to addresses in this '
832
              'wallet, ensure you can pair with your device, or that you have '
833
              'its seed (and passphrase, if any).  Otherwise all bitcoins you '
834
              'receive will be unspendable.').format(plugin.device))
835

836
    def list_pairable_device_infos(
5✔
837
        self,
838
        *,
839
        handler: Optional['HardwareHandlerBase'],
840
        plugin: 'HW_PluginBase',
841
        devices: Sequence['Device'] = None,
842
        include_failing_clients: bool = False,
843
    ) -> List['DeviceInfo']:
844
        """Returns a list of DeviceInfo objects: one for each connected device accepted by the plugin.
845
        Already paired devices are also included, as it is okay to reuse them.
846
        """
847
        if not plugin.libraries_available:
×
848
            message = plugin.get_library_not_available_message()
×
849
            raise HardwarePluginLibraryUnavailable(message)
×
850
        if devices is None:
×
851
            devices = self.scan_devices()
×
852
        infos = []
×
853
        for device in devices:
×
854
            if not plugin.can_recognize_device(device):
×
855
                continue
×
856
            try:
×
857
                client = self.create_client(device, handler, plugin)
×
858
                if not client:
×
859
                    continue
×
860
                label = client.label()
×
861
                is_initialized = client.is_initialized()
×
862
                soft_device_id = client.get_soft_device_id()
×
863
                model_name = client.device_model_name()
×
864
            except Exception as e:
×
865
                self.logger.error(f'failed to create client for {plugin.name} at {device.path}: {repr(e)}')
×
866
                if include_failing_clients:
×
867
                    infos.append(DeviceInfo(device=device, exception=e, plugin_name=plugin.name))
×
868
                continue
×
869
            infos.append(DeviceInfo(device=device,
×
870
                                    label=label,
871
                                    initialized=is_initialized,
872
                                    plugin_name=plugin.name,
873
                                    soft_device_id=soft_device_id,
874
                                    model_name=model_name))
875

876
        return infos
×
877

878
    def select_device(self, plugin: 'HW_PluginBase', handler: 'HardwareHandlerBase',
5✔
879
                      keystore: 'Hardware_KeyStore', devices: Sequence['Device'] = None,
880
                      *, allow_user_interaction: bool = True) -> 'DeviceInfo':
881
        """Select the device to use for keystore."""
882
        # ideally this should not be called from the GUI thread...
883
        # assert handler.get_gui_thread() != threading.current_thread(), 'must not be called from GUI thread'
884
        while True:
×
885
            infos = self.list_pairable_device_infos(handler=handler, plugin=plugin, devices=devices)
×
886
            if infos:
×
887
                break
×
888
            if not allow_user_interaction:
×
889
                raise CannotAutoSelectDevice()
×
890
            msg = _('Please insert your {}').format(plugin.device)
×
891
            msg += " ("
×
892
            if keystore.label and keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS:
×
893
                msg += f"label: {keystore.label}, "
×
894
            msg += f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r}"
×
895
            msg += ').\n\n{}\n\n{}'.format(
×
896
                _('Verify the cable is connected and that '
897
                  'no other application is using it.'),
898
                _('Try to connect again?')
899
            )
900
            if not handler.yes_no_question(msg):
×
901
                raise UserCancelled()
×
902
            devices = None
×
903

904
        # select device automatically. (but only if we have reasonable expectation it is the correct one)
905
        # method 1: select device by id
906
        if keystore.soft_device_id:
×
907
            for info in infos:
×
908
                if info.soft_device_id == keystore.soft_device_id:
×
909
                    self.logger.debug(f"select_device. auto-selected(1) {plugin.device}: soft_device_id matched")
×
910
                    return info
×
911
        # method 2: select device by label
912
        #           but only if not a placeholder label and only if there is no collision
913
        device_labels = [info.label for info in infos]
×
914
        if (keystore.label not in PLACEHOLDER_HW_CLIENT_LABELS
×
915
                and device_labels.count(keystore.label) == 1):
916
            for info in infos:
×
917
                if info.label == keystore.label:
×
918
                    self.logger.debug(f"select_device. auto-selected(2) {plugin.device}: label recognised")
×
919
                    return info
×
920
        # method 3: if there is only one device connected, and we don't have useful label/soft_device_id
921
        #           saved for keystore anyway, select it
922
        if (len(infos) == 1
×
923
                and keystore.label in PLACEHOLDER_HW_CLIENT_LABELS
924
                and keystore.soft_device_id is None):
925
            self.logger.debug(f"select_device. auto-selected(3) {plugin.device}: only one device")
×
926
            return infos[0]
×
927

928
        self.logger.debug(f"select_device. auto-select failed for {plugin.device}. {allow_user_interaction=}")
×
929
        if not allow_user_interaction:
×
930
            raise CannotAutoSelectDevice()
×
931
        # ask user to select device manually
932
        msg = (
×
933
                _("Could not automatically pair with device for given keystore.") + "\n"
934
                + f"(keystore label: {keystore.label!r}, "
935
                + f"bip32 root fingerprint: {keystore.get_root_fingerprint()!r})\n\n")
936
        msg += _("Please select which {} device to use:").format(plugin.device)
×
937
        msg += "\n(" + _("Or click cancel to skip this keystore instead.") + ")"
×
938
        descriptions = ["{label} ({maybe_model}{init}, {transport})"
×
939
                        .format(label=info.label or _("An unnamed {}").format(info.plugin_name),
940
                                init=(_("initialized") if info.initialized else _("wiped")),
941
                                transport=info.device.transport_ui_string,
942
                                maybe_model=f"{info.model_name}, " if info.model_name else "")
943
                        for info in infos]
944
        self.logger.debug(f"select_device. prompting user for manual selection of {plugin.device}. "
×
945
                          f"num options: {len(infos)}. options: {infos}")
946
        c = handler.query_choice(msg, descriptions)
×
947
        if c is None:
×
948
            raise UserCancelled()
×
949
        info = infos[c]
×
950
        self.logger.debug(f"select_device. user manually selected {plugin.device}. device info: {info}")
×
951
        # note: updated label/soft_device_id will be saved after pairing succeeds
952
        return info
×
953

954
    @runs_in_hwd_thread
5✔
955
    def _scan_devices_with_hid(self) -> List['Device']:
5✔
956
        try:
×
957
            import hid
×
958
        except ImportError:
×
959
            return []
×
960

961
        devices = []
×
962
        for d in hid.enumerate(0, 0):
×
963
            vendor_id = d['vendor_id']
×
964
            product_key = (vendor_id, d['product_id'])
×
965
            plugin = None
×
966
            if product_key in self._recognised_hardware:
×
967
                plugin = self._recognised_hardware[product_key]
×
968
            elif vendor_id in self._recognised_vendor:
×
969
                plugin = self._recognised_vendor[vendor_id]
×
970
            if plugin:
×
971
                device = plugin.create_device_from_hid_enumeration(d, product_key=product_key)
×
972
                if device:
×
973
                    devices.append(device)
×
974
        return devices
×
975

976
    @runs_in_hwd_thread
5✔
977
    @profiler
5✔
978
    def scan_devices(self) -> Sequence['Device']:
5✔
979
        self.logger.info("scanning devices...")
×
980

981
        # First see what's connected that we know about
982
        devices = self._scan_devices_with_hid()
×
983

984
        # Let plugin handlers enumerate devices we don't know about
985
        with self.lock:
×
986
            enumerate_funcs = list(self._enumerate_func)
×
987
        for f in enumerate_funcs:
×
988
            try:
×
989
                new_devices = f()
×
990
            except BaseException as e:
×
991
                self.logger.error(f'custom device enum failed. func {str(f)}, error {e!r}')
×
992
            else:
993
                devices.extend(new_devices)
×
994

995
        # find out what was disconnected
996
        client_ids = [dev.id_ for dev in devices]
×
997
        disconnected_clients = []
×
998
        with self.lock:
×
999
            connected = {}
×
1000
            for client, id_ in self.clients.items():
×
1001
                if id_ in client_ids and client.has_usable_connection_with_device():
×
1002
                    connected[client] = id_
×
1003
                else:
1004
                    disconnected_clients.append((client, id_))
×
1005
            self.clients = connected
×
1006

1007
        # Unpair disconnected devices
1008
        for client, id_ in disconnected_clients:
×
1009
            self.unpair_id(id_)
×
1010
            if client.handler:
×
1011
                client.handler.update_status(False)
×
1012

1013
        return devices
×
1014

1015
    @classmethod
5✔
1016
    def version_info(cls) -> Mapping[str, Optional[str]]:
5✔
1017
        ret = {}
×
1018
        # add libusb
1019
        try:
×
1020
            import usb1
×
1021
        except Exception as e:
×
1022
            ret["libusb.version"] = None
×
1023
        else:
1024
            ret["libusb.version"] = ".".join(map(str, usb1.getVersion()[:4]))
×
1025
            try:
×
1026
                ret["libusb.path"] = usb1.libusb1.libusb._name
×
1027
            except AttributeError:
×
1028
                ret["libusb.path"] = None
×
1029
        # add hidapi
1030
        try:
×
1031
            import hid
×
1032
            ret["hidapi.version"] = hid.__version__  # available starting with 0.12.0.post2
×
1033
        except Exception as e:
×
1034
            from importlib.metadata import version
×
1035
            try:
×
1036
                ret["hidapi.version"] = version("hidapi")
×
1037
            except ImportError:
×
1038
                ret["hidapi.version"] = None
×
1039
        return ret
×
1040

1041
    def trigger_pairings(
5✔
1042
            self,
1043
            keystores: Sequence['KeyStore'],
1044
            *,
1045
            allow_user_interaction: bool = True,
1046
            devices: Sequence['Device'] = None,
1047
    ) -> None:
1048
        """Given a list of keystores, try to pair each with a connected hardware device.
1049

1050
        E.g. for a multisig-wallet, it is more user-friendly to use this method than to
1051
        try to pair each keystore individually. Consider the following scenario:
1052
        - three hw keystores in a 2-of-3 multisig wallet, devices d2 (for ks2) and d3 (for ks3) are connected
1053
        - assume none of the devices are paired yet
1054
        1. if we tried to individually pair keystores, we might try with ks1 first
1055
           - but ks1 cannot be paired automatically, as neither d2 nor d3 matches the stored fingerprint
1056
           - the user might then be prompted if they want to manually pair ks1 with either d2 or d3,
1057
             which is confusing and error-prone. It's especially problematic if the hw device does
1058
             not support labels (such as Ledger), as then the user cannot easily distinguish
1059
             same-type devices. (see #4199)
1060
        2. instead, if using this method, we would auto-pair ks2-d2 and ks3-d3 first,
1061
           and then tell the user ks1 could not be paired (and there are no devices left to try)
1062
        """
1063
        from .keystore import Hardware_KeyStore
×
1064
        keystores = [ks for ks in keystores if isinstance(ks, Hardware_KeyStore)]
×
1065
        if not keystores:
×
1066
            return
×
1067
        if devices is None:
×
1068
            devices = self.scan_devices()
×
1069
        # first pair with all devices that can be auto-selected
1070
        for ks in keystores:
×
1071
            try:
×
1072
                ks.get_client(
×
1073
                    force_pair=True,
1074
                    allow_user_interaction=False,
1075
                    devices=devices,
1076
                )
1077
            except UserCancelled:
×
1078
                pass
×
1079
        if allow_user_interaction:
×
1080
            # now do manual selections
1081
            for ks in keystores:
×
1082
                try:
×
1083
                    ks.get_client(
×
1084
                        force_pair=True,
1085
                        allow_user_interaction=True,
1086
                        devices=devices,
1087
                    )
1088
                except UserCancelled:
×
1089
                    pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc