• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4774572429410304

21 May 2025 02:53PM UTC coverage: 59.539% (+0.009%) from 59.53%
4774572429410304

Pull #9847

CirrusCI

f321x
implement release channels

implements release channels so the user can select either a stable or
beta release channel in the SettingsDialog for update notifications.
adapts the release_www.sh and make_download scripts to handle alpha
and beta versions which are detected by an 'a' or 'b' in the version name.
the version fields are added into the extradata field of the version file as
'version_alpha' and 'version_beta' and the extradata field is now being signed.
The update checker is supposed to return the newest version of those
contained in the version file which is
allowed according to the user configured option.
Pull Request #9847: implement release channels

2 of 2 new or added lines in 2 files covered. (100.0%)

234 existing lines in 4 files now uncovered.

21596 of 36272 relevant lines covered (59.54%)

2.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.7
/electrum/wizard.py
1
import copy
4✔
2
import os
4✔
3

4
from typing import List, NamedTuple, Any, Dict, Optional, Tuple, TYPE_CHECKING
4✔
5

6
from electrum.gui.messages import TERMS_OF_USE_LATEST_VERSION
4✔
7

8
from electrum.i18n import _
4✔
9
from electrum.interface import ServerAddr
4✔
10
from electrum.keystore import hardware_keystore
4✔
11
from electrum.logging import get_logger
4✔
12
from electrum.network import ProxySettings
4✔
13
from electrum.plugin import run_hook
4✔
14
from electrum.slip39 import EncryptedSeed
4✔
15
from electrum.storage import WalletStorage, StorageEncryptionVersion
4✔
16
from electrum.wallet_db import WalletDB
4✔
17
from electrum.bip32 import normalize_bip32_derivation, xpub_type
4✔
18
from electrum import keystore, mnemonic, bitcoin
4✔
19
from electrum.mnemonic import is_any_2fa_seed_type, can_seed_have_passphrase
4✔
20

21
if TYPE_CHECKING:
4✔
22
    from electrum.daemon import Daemon
×
23
    from electrum.plugin import Plugins
×
24
    from electrum.keystore import Hardware_KeyStore
×
25
    from electrum.simple_config import SimpleConfig
×
26

27

28
class WizardViewState(NamedTuple):
4✔
29
    view: Optional[str]
4✔
30
    wizard_data: Dict[str, Any]
4✔
31
    params: Dict[str, Any]
4✔
32

33

34
class AbstractWizard:
4✔
35
    # serve as a base for all UIs, so no qt
36
    # encapsulate wizard state
37
    # encapsulate navigation decisions, UI agnostic
38
    # encapsulate stack, go backwards
39
    # allow extend/override flow in subclasses e.g.
40
    # - override: replace 'next' value to own fn
41
    # - extend: add new keys to navmap, wire up flow by override
42

43
    _logger = get_logger(__name__)
4✔
44

45
    def __init__(self):
4✔
46
        self.navmap = {}
4✔
47

48
        self._current = WizardViewState(None, {}, {})
4✔
49
        self._stack = []  # type: List[WizardViewState]
4✔
50

51
    def navmap_merge(self, additional_navmap: dict):
4✔
52
        # NOTE: only merges one level deep. Deeper dict levels will overwrite
53
        for k, v in additional_navmap.items():
4✔
54
            if k in self.navmap:
4✔
55
                self.navmap[k].update(v)
×
56
            else:
57
                self.navmap[k] = v
4✔
58

59
    # from current view and wizard_data, resolve the new view
60
    # returns WizardViewState tuple (view name, wizard_data, view params)
61
    # view name is the string id of the view in the nav map
62
    # wizard data is the (stacked) wizard data dict containing user input and choices
63
    # view params are transient, meant for extra configuration of a view (e.g. info
64
    #   msg in a generic choice dialog)
65
    # exception: stay on this view
66
    def resolve_next(self, view: str, wizard_data: dict) -> WizardViewState:
4✔
67
        assert view, f'view not defined: {repr(self.sanitize_stack_item(wizard_data))}'
4✔
68
        self._logger.debug(f'view={view}')
4✔
69
        assert view in self.navmap
4✔
70

71
        nav = self.navmap[view]
4✔
72

73
        if 'accept' in nav:
4✔
74
            # allow python scope to append to wizard_data before
75
            # adding to stack or finishing
76
            view_accept = nav['accept']
4✔
77
            if callable(view_accept):
4✔
78
                view_accept(wizard_data)
4✔
79
            else:
80
                raise Exception(f'accept handler for view {view} is not callable')
×
81

82
        # make a clone for next view
83
        wizard_data = copy.deepcopy(wizard_data)
4✔
84

85
        if 'next' not in nav:
4✔
86
            new_view = WizardViewState(None, wizard_data, {})
4✔
87
        else:
88
            view_next = nav['next']
4✔
89
            if isinstance(view_next, str):
4✔
90
                # string literal
91
                new_view = WizardViewState(view_next, wizard_data, {})
4✔
92
            elif callable(view_next):
4✔
93
                # handler fn based
94
                nv = view_next(wizard_data)
4✔
95
                self._logger.debug(repr(nv))
4✔
96

97
                # append wizard_data and params if not returned
98
                if isinstance(nv, str):
4✔
99
                    new_view = WizardViewState(nv, wizard_data, {})
4✔
100
                elif len(nv) == 1:
×
101
                    new_view = WizardViewState(nv[0], wizard_data, {})
×
102
                elif len(nv) == 2:
×
103
                    new_view = WizardViewState(nv[0], nv[1], {})
×
104
                else:
105
                    new_view = nv
×
106
            else:
107
                raise Exception(f'next handler for view {view} is not callable nor a string literal')
×
108

109
            if 'params' in self.navmap[new_view.view]:
4✔
110
                params = self.navmap[new_view.view]['params']
×
111
                assert isinstance(params, dict), 'params is not a dict'
×
112
                new_view.params.update(params)
×
113

114
            self._logger.debug(f'resolve_next view is {new_view.view}')
4✔
115

116
        self._stack.append(copy.deepcopy(self._current))
4✔
117
        self._current = new_view
4✔
118

119
        self.log_stack()
4✔
120

121
        return new_view
4✔
122

123
    def resolve_prev(self):
4✔
124
        self._current = self._stack.pop()
×
125

126
        self._logger.debug(f'resolve_prev view is "{self._current.view}"')
×
127
        self.log_stack()
×
128

129
        return self._current
×
130

131
    # check if this view is the final view
132
    def is_last_view(self, view: str, wizard_data: dict) -> bool:
4✔
133
        assert view, f'view not defined: {repr(self.sanitize_stack_item(wizard_data))}'
4✔
134
        assert view in self.navmap
4✔
135

136
        nav = self.navmap[view]
4✔
137

138
        if 'last' not in nav:
4✔
139
            return False
4✔
140

141
        view_last = nav['last']
4✔
142
        if isinstance(view_last, bool):
4✔
143
            # bool literal
144
            self._logger.debug(f'view "{view}" last: {view_last}')
4✔
145
            return view_last
4✔
146
        elif callable(view_last):
4✔
147
            # handler fn based
148
            is_last = view_last(wizard_data)
4✔
149
            self._logger.debug(f'view "{view}" last: {is_last}')
4✔
150
            return is_last
4✔
151
        else:
152
            raise Exception(f'last handler for view {view} is not callable nor a bool literal')
×
153

154
    def reset(self):
4✔
155
        self._stack = []
4✔
156
        self._current = WizardViewState(None, {}, {})
4✔
157

158
    def log_stack(self):
4✔
159
        logstr = 'wizard stack:'
4✔
160
        i = 0
4✔
161
        for item in self._stack:
4✔
162
            ssi = self.sanitize_stack_item(item.wizard_data)
4✔
163
            logstr += f'\n{i}: {hex(id(item.wizard_data))} - {repr(ssi)}'
4✔
164
            i += 1
4✔
165
        sci = self.sanitize_stack_item(self._current.wizard_data)
4✔
166
        logstr += f'\nc: {hex(id(self._current.wizard_data))} - {repr(sci)}'
4✔
167
        self._logger.debug(logstr)
4✔
168

169
    def sanitize_stack_item(self, _stack_item) -> dict:
4✔
170
        whitelist = [
4✔
171
            "wallet_name", "wallet_exists", "wallet_is_open", "wallet_needs_hw_unlock",
172
            "wallet_type", "keystore_type", "seed_variant", "seed_type", "seed_extend",
173
            "script_type", "derivation_path", "encrypt",
174
            # hardware devices:
175
            "hardware_device", "hw_type", "label", "soft_device_id",
176
            # inside keystore:
177
            "type", "pw_hash_version", "derivation", "root_fingerprint",
178
            # multisig:
179
            "multisig_participants", "multisig_signatures", "multisig_current_cosigner", "cosigner_keystore_type",
180
            # trustedcoin:
181
            "trustedcoin_keepordisable", "trustedcoin_go_online",
182
        ]
183

184
        def sanitize(_dict):
4✔
185
            result = {}
4✔
186
            for item in _dict:
4✔
187
                if isinstance(_dict[item], dict):
4✔
188
                    result[item] = sanitize(_dict[item])
4✔
189
                else:
190
                    if item in whitelist:
4✔
191
                        result[item] = _dict[item]
4✔
192
                    else:
193
                        result[item] = '<redacted>'
4✔
194
            return result
4✔
195
        return sanitize(_stack_item)
4✔
196

197
    def get_wizard_data(self) -> dict:
4✔
198
        return copy.deepcopy(self._current.wizard_data)
×
199

200

201
class NewWalletWizard(AbstractWizard):
4✔
202

203
    _logger = get_logger(__name__)
4✔
204

205
    def __init__(self, daemon: 'Daemon', plugins: 'Plugins'):
4✔
206
        AbstractWizard.__init__(self)
4✔
207
        self.navmap = {
4✔
208
            'wallet_name': {
209
                'next': 'wallet_type'
210
            },
211
            'wallet_type': {
212
                'next': self.on_wallet_type
213
            },
214
            'keystore_type': {
215
                'next': self.on_keystore_type
216
            },
217
            'create_seed': {
218
                'next': 'confirm_seed'
219
            },
220
            'confirm_seed': {
221
                'next': self.on_have_or_confirm_seed,
222
                'accept': self.maybe_master_pubkey,
223
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
224
            },
225
            'have_seed': {
226
                'next': self.on_have_or_confirm_seed,
227
                'accept': self.maybe_master_pubkey,
228
                'last': lambda d: self.is_single_password() and not
229
                                  (self.needs_derivation_path(d) or self.is_multisig(d))
230
            },
231
            'choose_hardware_device': {
232
                'next': self.on_hardware_device,
233
            },
234
            'script_and_derivation': {
235
                'next': lambda d: self.wallet_password_view(d) if not self.is_multisig(d) else 'multisig_cosigner_keystore',
236
                'accept': self.maybe_master_pubkey,
237
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
238
            },
239
            'have_master_key': {
240
                'next': lambda d: self.wallet_password_view(d) if not self.is_multisig(d) else 'multisig_cosigner_keystore',
241
                'accept': self.maybe_master_pubkey,
242
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
243
            },
244
            'multisig': {
245
                'next': 'keystore_type'
246
            },
247
            'multisig_cosigner_keystore': {  # this view should set 'multisig_current_cosigner'
248
                'next': self.on_cosigner_keystore_type
249
            },
250
            'multisig_cosigner_key': {
251
                'next': lambda d: self.wallet_password_view(d) if self.last_cosigner(d) else 'multisig_cosigner_keystore',
252
                'last': lambda d: self.is_single_password() and self.last_cosigner(d)
253
            },
254
            'multisig_cosigner_seed': {
255
                'next': self.on_have_cosigner_seed,
256
                'last': lambda d: self.is_single_password() and self.last_cosigner(d) and not self.needs_derivation_path(d)
257
            },
258
            'multisig_cosigner_hardware': {
259
                'next': self.on_hardware_device,
260
            },
261
            'multisig_cosigner_script_and_derivation': {
262
                'next': lambda d: self.wallet_password_view(d) if self.last_cosigner(d) else 'multisig_cosigner_keystore',
263
                'last': lambda d: self.is_single_password() and self.last_cosigner(d)
264
            },
265
            'imported': {
266
                'next': 'wallet_password',
267
                'last': lambda d: self.is_single_password()
268
            },
269
            'wallet_password': {
270
                'last': True
271
            },
272
            'wallet_password_hardware': {
273
                'last': True
274
            }
275
        }
276
        self._daemon = daemon
4✔
277
        self.plugins = plugins
4✔
278
        # todo: load only if needed, like hw plugins
279
        self.plugins.load_plugin_by_name('trustedcoin')
4✔
280

281
    def start(self, initial_data: dict = None) -> WizardViewState:
4✔
282
        if initial_data is None:
4✔
283
            initial_data = {}
4✔
284
        self.reset()
4✔
285
        start_view = 'wallet_name'
4✔
286
        params = self.navmap[start_view].get('params', {})
4✔
287
        self._current = WizardViewState(start_view, initial_data, params)
4✔
288
        return self._current
4✔
289

290
    def is_single_password(self) -> bool:
4✔
291
        raise NotImplementedError()
×
292

293
    # returns (sub)dict of current cosigner (or root if first)
294
    def current_cosigner(self, wizard_data: dict) -> dict:
4✔
295
        wdata = wizard_data
4✔
296
        if wizard_data.get('wallet_type') == 'multisig' and 'multisig_current_cosigner' in wizard_data:
4✔
297
            cosigner = wizard_data['multisig_current_cosigner']
×
298
            wdata = wizard_data['multisig_cosigner_data'][str(cosigner)]
×
299
        return wdata
4✔
300

301
    def needs_derivation_path(self, wizard_data: dict) -> bool:
4✔
302
        wdata = self.current_cosigner(wizard_data)
4✔
303
        return 'seed_variant' in wdata and wdata['seed_variant'] in ['bip39', 'slip39']
4✔
304

305
    def wants_ext(self, wizard_data: dict) -> bool:
4✔
306
        wdata = self.current_cosigner(wizard_data)
×
307
        return 'seed_variant' in wdata and wdata['seed_extend']
×
308

309
    def is_multisig(self, wizard_data: dict) -> bool:
4✔
310
        return wizard_data['wallet_type'] == 'multisig'
4✔
311

312
    def on_wallet_type(self, wizard_data: dict) -> str:
4✔
313
        t = wizard_data['wallet_type']
4✔
314
        return {
4✔
315
            'standard': 'keystore_type',
316
            '2fa': 'trustedcoin_start',
317
            'multisig': 'multisig',
318
            'imported': 'imported'
319
        }.get(t)
320

321
    def on_keystore_type(self, wizard_data: dict) -> str:
4✔
322
        t = wizard_data['keystore_type']
4✔
323
        return {
4✔
324
            'createseed': 'create_seed',
325
            'haveseed': 'have_seed',
326
            'masterkey': 'have_master_key',
327
            'hardware': 'choose_hardware_device'
328
        }.get(t)
329

330
    def is_hardware(self, wizard_data: dict) -> bool:
4✔
331
        return wizard_data['keystore_type'] == 'hardware'
4✔
332

333
    def wallet_password_view(self, wizard_data: dict) -> str:
4✔
334
        if self.is_hardware(wizard_data) and wizard_data['wallet_type'] == 'standard':
4✔
335
            return 'wallet_password_hardware'
×
336
        return 'wallet_password'
4✔
337

338
    def on_hardware_device(self, wizard_data: dict, new_wallet=True) -> str:
4✔
339
        current_cosigner = self.current_cosigner(wizard_data)
×
340
        _type, _info = current_cosigner['hardware_device']
×
341
        run_hook('init_wallet_wizard', self)  # TODO: currently only used for hww, hook name might be confusing
×
342
        plugin = self.plugins.get_plugin(_type)
×
343
        return plugin.wizard_entry_for_device(_info, new_wallet=new_wallet)
×
344

345
    def on_have_or_confirm_seed(self, wizard_data: dict) -> str:
4✔
346
        if self.needs_derivation_path(wizard_data):
4✔
347
            return 'script_and_derivation'
4✔
348
        elif self.is_multisig(wizard_data):
4✔
349
            return 'multisig_cosigner_keystore'
×
350
        else:
351
            return 'wallet_password'
4✔
352

353
    def maybe_master_pubkey(self, wizard_data: dict):
4✔
354
        self._logger.debug('maybe_master_pubkey')
4✔
355
        if self.needs_derivation_path(wizard_data) and 'derivation_path' not in wizard_data:
4✔
356
            self._logger.debug('deferred, missing derivation_path')
4✔
357
            return
4✔
358

359
        wizard_data['multisig_master_pubkey'] = self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()
4✔
360

361
    def on_cosigner_keystore_type(self, wizard_data: dict) -> str:
4✔
362
        t = wizard_data['cosigner_keystore_type']
×
363
        return {
×
364
            'masterkey': 'multisig_cosigner_key',
365
            'haveseed': 'multisig_cosigner_seed',
366
            'hardware': 'multisig_cosigner_hardware'
367
        }.get(t)
368

369
    def on_have_cosigner_seed(self, wizard_data: dict) -> str:
4✔
370
        current_cosigner = self.current_cosigner(wizard_data)
×
371
        if self.needs_derivation_path(wizard_data) and 'derivation_path' not in current_cosigner:
×
372
            return 'multisig_cosigner_script_and_derivation'
×
373
        elif self.last_cosigner(wizard_data):
×
374
            return 'wallet_password'
×
375
        else:
376
            return 'multisig_cosigner_keystore'
×
377

378
    def last_cosigner(self, wizard_data: dict) -> bool:
4✔
379
        # check if we have the final number of cosigners. Doesn't check if cosigner data itself is complete
380
        # (should be validated by wizardcomponents)
381
        if not self.is_multisig(wizard_data):
×
382
            return True
×
383

384
        if len(wizard_data['multisig_cosigner_data']) < (wizard_data['multisig_participants'] - 1):
×
385
            return False
×
386

387
        return True
×
388

389
    def has_duplicate_masterkeys(self, wizard_data: dict) -> bool:
4✔
390
        """Multisig wallets need distinct master keys. If True, need to prevent wallet-creation."""
391
        xpubs = [self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()]
×
392
        for cosigner in wizard_data['multisig_cosigner_data']:
×
393
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
394
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
395
        assert xpubs
×
396
        return len(xpubs) != len(set(xpubs))
×
397

398
    def has_heterogeneous_masterkeys(self, wizard_data: dict) -> bool:
4✔
399
        """Multisig wallets need homogeneous master keys.
400
        All master keys need to be bip32, and e.g. Ypub cannot be mixed with Zpub.
401
        If True, need to prevent wallet-creation.
402
        """
403
        xpubs = [self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()]
×
404
        for cosigner in wizard_data['multisig_cosigner_data']:
×
405
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
406
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
407
        assert xpubs
×
408
        try:
×
409
            k_xpub_type = xpub_type(xpubs[0])
×
410
        except Exception:
×
411
            return True  # maybe old_mpk?
×
412
        for xpub in xpubs:
×
413
            try:
×
414
                my_xpub_type = xpub_type(xpub)
×
415
            except Exception:
×
416
                return True  # maybe old_mpk?
×
417
            if my_xpub_type != k_xpub_type:
×
418
                return True
×
419
        return False
×
420

421
    def keystore_from_data(self, wallet_type: str, data: dict):
4✔
422
        if data['keystore_type'] in ['createseed', 'haveseed'] and 'seed' in data:
4✔
423
            seed_extension = data['seed_extra_words'] if data['seed_extend'] else ''
4✔
424
            if data['seed_variant'] == 'electrum':
4✔
425
                return keystore.from_seed(data['seed'], passphrase=seed_extension, for_multisig=True)
4✔
426
            elif data['seed_variant'] == 'bip39':
4✔
427
                root_seed = keystore.bip39_to_seed(data['seed'], passphrase=seed_extension)
4✔
428
                derivation = normalize_bip32_derivation(data['derivation_path'])
4✔
429
                if wallet_type == 'multisig':
4✔
430
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
431
                else:
432
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
4✔
433
                return keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
4✔
434
            elif data['seed_variant'] == 'slip39':
×
435
                root_seed = data['seed'].decrypt(seed_extension)
×
436
                derivation = normalize_bip32_derivation(data['derivation_path'])
×
437
                if wallet_type == 'multisig':
×
438
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
439
                else:
440
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
×
441
                return keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
×
442
            else:
443
                raise Exception('Unsupported seed variant %s' % data['seed_variant'])
×
444
        elif data['keystore_type'] == 'masterkey' and 'master_key' in data:
×
445
            return keystore.from_master_key(data['master_key'])
×
446
        elif data['keystore_type'] == 'hardware':
×
447
            return self.hw_keystore(data)
×
448
        else:
449
            raise Exception('no seed or master_key in data')
×
450

451
    def is_current_cosigner_hardware(self, wizard_data: dict) -> bool:
4✔
452
        cosigner_data = self.current_cosigner(wizard_data)
×
453
        cosigner_is_hardware = cosigner_data == wizard_data and wizard_data['keystore_type'] == 'hardware'
×
454
        if 'cosigner_keystore_type' in wizard_data and wizard_data['cosigner_keystore_type'] == 'hardware':
×
455
            cosigner_is_hardware = True
×
456
        return cosigner_is_hardware
×
457

458
    def check_multisig_constraints(self, wizard_data: dict) -> Tuple[bool, str]:
4✔
459
        if not self.is_multisig(wizard_data):
×
460
            return True, ''
×
461

462
        # current cosigner might be incomplete. In that case, return valid
463
        cosigner_data = self.current_cosigner(wizard_data)
×
464
        if self.needs_derivation_path(wizard_data):
×
465
            if 'derivation_path' not in cosigner_data:
×
466
                self._logger.debug('defer multisig check: missing derivation_path')
×
467
                return True, ''
×
468
        if self.wants_ext(wizard_data):
×
469
            if 'seed_extra_words' not in cosigner_data:
×
470
                self._logger.debug('defer multisig check: missing extra words')
×
471
                return True, ''
×
472
        if self.is_current_cosigner_hardware(wizard_data):
×
473
            if 'master_key' not in cosigner_data:
×
474
                self._logger.debug('defer multisig check: missing master_key')
×
475
                return True, ''
×
476

477
        user_info = ''
×
478

479
        if self.has_duplicate_masterkeys(wizard_data):
×
480
            self._logger.debug('Duplicate master keys!')
×
481
            user_info = _('Duplicate master keys')
×
482
            multisig_keys_valid = False
×
483
        elif self.has_heterogeneous_masterkeys(wizard_data):
×
484
            self._logger.debug('Heterogenous master keys!')
×
485
            user_info = _('Heterogenous master keys')
×
486
            multisig_keys_valid = False
×
487
        else:
488
            multisig_keys_valid = True
×
489

490
        return multisig_keys_valid, user_info
×
491

492
    def validate_seed(self, seed: str, seed_variant: str, wallet_type: str):
4✔
493
        seed_type = ''
×
494
        seed_valid = False
×
495
        validation_message = ''
×
496
        can_passphrase = True
×
497

498
        if seed_variant == 'electrum':
×
499
            seed_type = mnemonic.calc_seed_type(seed)
×
500
            if seed_type != '':
×
501
                seed_valid = True
×
502
                can_passphrase = can_seed_have_passphrase(seed)
×
503
        elif seed_variant == 'bip39':
×
504
            is_checksum, is_wordlist = keystore.bip39_is_checksum_valid(seed)
×
505
            validation_message = ('' if is_checksum else _('BIP39 checksum failed')) if is_wordlist else _('Unknown BIP39 wordlist')
×
506
            if not bool(seed):
×
507
                validation_message = ''
×
508
            seed_type = 'bip39'
×
509
            # bip39 always valid, even if checksum failed, see #8720
510
            # however, reject empty string
511
            seed_valid = bool(seed)
×
512
        elif seed_variant == 'slip39':
×
513
            # seed shares should be already validated by wizard page, we have a combined encrypted seed
514
            if seed and isinstance(seed, EncryptedSeed):
×
515
                seed_valid = True
×
516
                seed_type = 'slip39'
×
517
            else:
518
                seed_valid = False
×
519
        else:
520
            raise Exception(f'unknown seed variant {seed_variant}')
×
521

522
        # check if seed matches wallet type
523
        if wallet_type == '2fa' and not is_any_2fa_seed_type(seed_type):
×
524
            seed_valid = False
×
525
        elif wallet_type == 'standard' and seed_type not in ['old', 'standard', 'segwit', 'bip39', 'slip39']:
×
526
            seed_valid = False
×
527
        elif wallet_type == 'multisig' and seed_type not in ['standard', 'segwit', 'bip39', 'slip39']:
×
528
            seed_valid = False
×
529

530
        self._logger.debug(f'seed verified: {seed_valid}, type={seed_type!r}, validation_message={validation_message}')
×
531

532
        return seed_valid, seed_type, validation_message, can_passphrase
×
533

534
    def validate_master_key(self, key: str, wallet_type: str):
4✔
535
        # TODO: deduplicate with master key check in create_storage()
536
        validation_message = ''
×
537
        key_valid = False
×
538

539
        if not keystore.is_master_key(key):
×
540
            validation_message = _('Not a master key')
×
541
        else:
542
            k = keystore.from_master_key(key)
×
543
            if wallet_type == 'standard':
×
544
                if isinstance(k, keystore.Xpub):  # has bip32 xpub
×
545
                    t1 = xpub_type(k.xpub)
×
546
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:  # disallow Ypub/Zpub
×
547
                        validation_message = '%s: %s' % (_('Wrong key type'), t1)
×
548
                    else:
549
                        key_valid = True
×
550
                elif isinstance(k, keystore.Old_KeyStore):
×
551
                    key_valid = True
×
552
                else:
553
                    self._logger.error(f"unexpected keystore type: {type(keystore)}")
×
554
            elif wallet_type == 'multisig':
×
555
                if not isinstance(k, keystore.Xpub):  # old mpk?
×
556
                    validation_message = '%s: %s' % (_('Wrong key type'), "not bip32")
×
557
                t1 = xpub_type(k.xpub)
×
558
                if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:  # disallow ypub/zpub
×
559
                    validation_message = '%s: %s' % (_('Wrong key type'), t1)
×
560
                else:
561
                    key_valid = True
×
562
            else:
563
                validation_message = '%s: %s' % (_('Unsupported wallet type'), wallet_type)
×
564
                self._logger.error(f'Unsupported wallet type: {wallet_type}')
×
565

566
        return key_valid, validation_message
×
567

568
    def create_storage(self, path: str, data: dict):
4✔
569
        assert data['wallet_type'] in ['standard', '2fa', 'imported', 'multisig']
4✔
570

571
        if os.path.exists(path):
4✔
572
            raise Exception('file already exists at path')
×
573
        storage = WalletStorage(path)
4✔
574

575
        # TODO: refactor using self.keystore_from_data
576
        k = None
4✔
577
        if 'keystore_type' not in data:
4✔
578
            assert data['wallet_type'] == 'imported'
×
579
            addresses = {}
×
580
            if 'private_key_list' in data:
×
581
                k = keystore.Imported_KeyStore({})
×
582
                keys = keystore.get_private_keys(data['private_key_list'])
×
583
                for pk in keys:
×
584
                    assert bitcoin.is_private_key(pk)
×
585
                    txin_type, pubkey = k.import_privkey(pk, None)
×
586
                    addr = bitcoin.pubkey_to_address(txin_type, pubkey)
×
587
                    addresses[addr] = {'type': txin_type, 'pubkey': pubkey}
×
588
            elif 'address_list' in data:
×
589
                for addr in data['address_list'].split():
×
590
                    addresses[addr] = {}
×
591
        elif data['keystore_type'] in ['createseed', 'haveseed']:
4✔
592
            seed_extension = data['seed_extra_words'] if data['seed_extend'] else ''
4✔
593
            if data['seed_type'] in ['old', 'standard', 'segwit']:
4✔
594
                self._logger.debug('creating keystore from electrum seed')
4✔
595
                k = keystore.from_seed(data['seed'], passphrase=seed_extension, for_multisig=data['wallet_type'] == 'multisig')
4✔
596
            elif data['seed_type'] in ['bip39', 'slip39']:
4✔
597
                self._logger.debug('creating keystore from %s seed' % data['seed_type'])
4✔
598
                if data['seed_type'] == 'bip39':
4✔
599
                    root_seed = keystore.bip39_to_seed(data['seed'], passphrase=seed_extension)
4✔
600
                else:
601
                    root_seed = data['seed'].decrypt(seed_extension)
×
602
                derivation = normalize_bip32_derivation(data['derivation_path'])
4✔
603
                if data['wallet_type'] == 'multisig':
4✔
604
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
605
                else:
606
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
4✔
607
                k = keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
4✔
608
            elif is_any_2fa_seed_type(data['seed_type']):
4✔
609
                self._logger.debug('creating keystore from 2fa seed')
4✔
610
                k = keystore.from_xprv(data['x1']['xprv'])
4✔
611
            else:
612
                raise Exception('unsupported/unknown seed_type %s' % data['seed_type'])
×
613
        elif data['keystore_type'] == 'masterkey':
×
614
            k = keystore.from_master_key(data['master_key'])
×
615
            if isinstance(k, keystore.Xpub):  # has xpub
×
616
                t1 = xpub_type(k.xpub)
×
617
                if data['wallet_type'] == 'multisig':
×
618
                    if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
619
                        raise Exception('wrong key type %s' % t1)
×
620
                else:
621
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
×
622
                        raise Exception('wrong key type %s' % t1)
×
623
            elif isinstance(k, keystore.Old_KeyStore):
×
624
                pass
×
625
            else:
626
                raise Exception(f'unexpected keystore type: {type(k)}')
×
627
        elif data['keystore_type'] == 'hardware':
×
628
            k = self.hw_keystore(data)
×
629
            if isinstance(k, keystore.Xpub):  # has xpub
×
630
                t1 = xpub_type(k.xpub)
×
631
                if data['wallet_type'] == 'multisig':
×
632
                    if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
633
                        raise Exception('wrong key type %s' % t1)
×
634
                else:
635
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
×
636
                        raise Exception('wrong key type %s' % t1)
×
637
            else:
638
                raise Exception(f'unexpected keystore type: {type(k)}')
×
639
        else:
640
            raise Exception('unsupported/unknown keystore_type %s' % data['keystore_type'])
×
641

642
        if data['password']:
4✔
643
            if k and k.may_have_password():
×
644
                k.update_password(None, data['password'])
×
645

646
        if data['encrypt']:
4✔
647
            enc_version = StorageEncryptionVersion.USER_PASSWORD
×
648
            if data.get('keystore_type') == 'hardware' and data['wallet_type'] == 'standard':
×
649
                enc_version = StorageEncryptionVersion.XPUB_PASSWORD
×
650
            storage.set_password(data['password'], enc_version=enc_version)
×
651

652
        db = WalletDB('', storage=storage, upgrade=True)
4✔
653
        db.set_keystore_encryption(bool(data['password']))
4✔
654

655
        db.put('wallet_type', data['wallet_type'])
4✔
656

657
        if data['wallet_type'] == 'standard':
4✔
658
            db.put('keystore', k.dump())
4✔
659
        elif data['wallet_type'] == '2fa':
4✔
660
            db.put('x1', k.dump())
4✔
661
            if 'trustedcoin_keepordisable' in data and data['trustedcoin_keepordisable'] == 'disable':
4✔
662
                k2 = keystore.from_xprv(data['x2']['xprv'])
×
663
                if data['encrypt'] and k2.may_have_password():
×
664
                    k2.update_password(None, data['password'])
×
665
                db.put('x2', k2.dump())
×
666
            else:
667
                db.put('x2', data['x2'])
4✔
668
            if 'x3' in data:
4✔
669
                db.put('x3', data['x3'])
4✔
670
            db.put('use_trustedcoin', True)
4✔
671
        elif data['wallet_type'] == 'multisig':
×
672
            if not isinstance(k, keystore.Xpub):
×
673
                raise Exception(f'unexpected keystore(main) type={type(k)} in multisig. not bip32.')
×
674
            k_xpub_type = xpub_type(k.xpub)
×
675
            db.put('wallet_type', '%dof%d' % (data['multisig_signatures'], data['multisig_participants']))
×
676
            db.put('x1', k.dump())
×
677
            for cosigner in data['multisig_cosigner_data']:
×
678
                cosigner_keystore = self.keystore_from_data('multisig', data['multisig_cosigner_data'][cosigner])
×
679
                if not isinstance(cosigner_keystore, keystore.Xpub):
×
680
                    raise Exception(f'unexpected keystore(cosigner) type={type(cosigner_keystore)} in multisig. not bip32.')
×
681
                if k_xpub_type != xpub_type(cosigner_keystore.xpub):
×
682
                    raise Exception('multisig wallet needs to have homogeneous xpub types')
×
683
                if data['encrypt'] and cosigner_keystore.may_have_password():
×
684
                    cosigner_keystore.update_password(None, data['password'])
×
685
                db.put(f'x{cosigner}', cosigner_keystore.dump())
×
686
        elif data['wallet_type'] == 'imported':
×
687
            if k:
×
688
                db.put('keystore', k.dump())
×
689
            db.put('addresses', addresses)
×
690

691
        if k and k.can_have_deterministic_lightning_xprv():
4✔
692
            db.put('lightning_xprv', k.get_lightning_xprv(data['password']))
4✔
693

694
        db.load_plugins()
4✔
695
        db.write()
4✔
696

697
    def hw_keystore(self, data: dict) -> 'Hardware_KeyStore':
4✔
698
        return hardware_keystore({
×
699
            'type': 'hardware',
700
            'hw_type': data['hw_type'],
701
            'derivation': data['derivation_path'],
702
            'root_fingerprint': data['root_fingerprint'],
703
            'xpub': data['master_key'],
704
            'label': data['label'],
705
            'soft_device_id': data['soft_device_id']
706
        })
707

708

709
class ServerConnectWizard(AbstractWizard):
4✔
710

711
    _logger = get_logger(__name__)
4✔
712

713
    def __init__(self, daemon: 'Daemon'):
4✔
714
        AbstractWizard.__init__(self)
4✔
715
        self.navmap = {
4✔
716
            'welcome': {
717
                'next': lambda d: 'proxy_config' if d['want_proxy'] else 'server_config',
718
                'accept': self.do_configure_autoconnect,
719
                'last': lambda d: bool(d['autoconnect'] and not d['want_proxy'])
720
            },
721
            'proxy_config': {
722
                'next': 'server_config',
723
                'accept': self.do_configure_proxy,
724
                'last': lambda d: bool(d['autoconnect'])
725
            },
726
            'server_config': {
727
                'accept': self.do_configure_server,
728
                'last': True
729
            }
730
        }
731
        self._daemon = daemon
4✔
732

733
    def do_configure_proxy(self, wizard_data: dict):
4✔
734
        proxy_settings = wizard_data['proxy']
4✔
735
        if not self._daemon.network:
4✔
736
            self._logger.debug('not configuring proxy, electrum config wants offline mode')
×
737
            return
×
738
        self._logger.debug(f'configuring proxy: {proxy_settings!r}')
4✔
739
        net_params = self._daemon.network.get_parameters()
4✔
740
        proxy = ProxySettings.from_dict(proxy_settings)
4✔
741
        net_params = net_params._replace(proxy=proxy, auto_connect=bool(wizard_data['autoconnect']))
4✔
742
        self._daemon.network.run_from_another_thread(self._daemon.network.set_parameters(net_params))
4✔
743

744
    def do_configure_server(self, wizard_data: dict):
4✔
745
        self._logger.debug(f'configuring server: {wizard_data!r}')
4✔
746
        net_params = self._daemon.network.get_parameters()
4✔
747
        server = ''
4✔
748
        if not wizard_data['autoconnect']:
4✔
749
            try:
4✔
750
                server = ServerAddr.from_str_with_inference(wizard_data['server'])
4✔
751
                if not server:
4✔
UNCOV
752
                    raise Exception('failed to parse server %s' % wizard_data['server'])
×
753
            except Exception:
×
754
                return
×
755
        net_params = net_params._replace(server=server, auto_connect=wizard_data['autoconnect'])
4✔
756
        self._daemon.network.run_from_another_thread(self._daemon.network.set_parameters(net_params))
4✔
757

758
    def do_configure_autoconnect(self, wizard_data: dict):
4✔
759
        self._logger.debug(f'configuring autoconnect: {wizard_data!r}')
4✔
760
        if self._daemon.config.cv.NETWORK_AUTO_CONNECT.is_modifiable():
4✔
761
            if wizard_data.get('autoconnect') is not None:
4✔
762
                self._daemon.config.NETWORK_AUTO_CONNECT = wizard_data.get('autoconnect')
4✔
763

764
    def start(self, initial_data: dict = None) -> WizardViewState:
4✔
765
        if initial_data is None:
4✔
766
            initial_data = {}
4✔
767
        self.reset()
4✔
768
        start_view = 'welcome'
4✔
769
        params = self.navmap[start_view].get('params', {})
4✔
770
        self._current = WizardViewState(start_view, initial_data, params)
4✔
771
        return self._current
4✔
772

773

774
class TermsOfUseWizard(AbstractWizard):
4✔
775

776
    _logger = get_logger(__name__)
4✔
777

778
    def __init__(self, config: 'SimpleConfig'):
4✔
UNCOV
779
        AbstractWizard.__init__(self)
×
780
        self._config = config
×
781
        self.navmap = {
×
782
            'terms_of_use': {
783
                'accept': self.accept_terms_of_use,
784
                'last': True,
785
            },
786
        }
787

788
    def accept_terms_of_use(self, _):
4✔
UNCOV
789
        self._config.TERMS_OF_USE_ACCEPTED = TERMS_OF_USE_LATEST_VERSION
×
790

791
    def start(self, initial_data: dict = None) -> WizardViewState:
4✔
UNCOV
792
        if initial_data is None:
×
793
            initial_data = {}
×
794
        self.reset()
×
795
        start_view = 'terms_of_use'
×
796
        params = self.navmap[start_view].get('params', {})
×
797
        self._current = WizardViewState(start_view, initial_data, params)
×
798
        return self._current
×
799

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc