• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5304010765238272

17 Aug 2023 02:17PM UTC coverage: 59.027% (+0.02%) from 59.008%
5304010765238272

Pull #8493

CirrusCI

ecdsa
storage.append: fail if the file length is not what we expect
Pull Request #8493: partial-writes using jsonpatch

165 of 165 new or added lines in 9 files covered. (100.0%)

18653 of 31601 relevant lines covered (59.03%)

2.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/electrum/wizard.py
1
import copy
×
2
import os
×
3

4
from typing import List, TYPE_CHECKING, Tuple, NamedTuple, Any, Dict, Optional, Union
×
5

6
from electrum.logging import get_logger
×
7
from electrum.storage import WalletStorage, StorageEncryptionVersion
×
8
from electrum.wallet_db import WalletDB
×
9
from electrum.bip32 import normalize_bip32_derivation, xpub_type
×
10
from electrum import keystore
×
11
from electrum import bitcoin
×
12
from electrum.mnemonic import is_any_2fa_seed_type
×
13

14

15
class WizardViewState(NamedTuple):
×
16
    view: Optional[str]
×
17
    wizard_data: Dict[str, Any]
×
18
    params: Dict[str, Any]
×
19

20
class AbstractWizard:
×
21
    # serve as a base for all UIs, so no qt
22
    # encapsulate wizard state
23
    # encapsulate navigation decisions, UI agnostic
24
    # encapsulate stack, go backwards
25
    # allow extend/override flow in subclasses e.g.
26
    # - override: replace 'next' value to own fn
27
    # - extend: add new keys to navmap, wire up flow by override
28

29
    _logger = get_logger(__name__)
×
30

31
    def __init__(self):
×
32
        self.navmap = {}
×
33

34
        self._current = WizardViewState(None, {}, {})
×
35
        self._stack = []  # type: List[WizardViewState]
×
36

37
    def navmap_merge(self, additional_navmap):
×
38
        # NOTE: only merges one level deep. Deeper dict levels will overwrite
39
        for k,v in additional_navmap.items():
×
40
            if k in self.navmap:
×
41
                self.navmap[k].update(v)
×
42
            else:
43
                self.navmap[k] = v
×
44

45
    # from current view and wizard_data, resolve the new view
46
    # returns WizardViewState tuple (view name, wizard_data, view params)
47
    # view name is the string id of the view in the nav map
48
    # wizard data is the (stacked) wizard data dict containing user input and choices
49
    # view params are transient, meant for extra configuration of a view (e.g. info
50
    #   msg in a generic choice dialog)
51
    # exception: stay on this view
52
    def resolve_next(self, view, wizard_data):
×
53
        assert view
×
54
        self._logger.debug(f'view={view}')
×
55
        assert view in self.navmap
×
56

57
        nav = self.navmap[view]
×
58

59
        if 'accept' in nav:
×
60
            # allow python scope to append to wizard_data before
61
            # adding to stack or finishing
62
            if callable(nav['accept']):
×
63
                nav['accept'](wizard_data)
×
64
            else:
65
                self._logger.error(f'accept handler for view {view} not callable')
×
66

67
        if 'next' not in nav:
×
68
            # finished
69
            self.finished(wizard_data)
×
70
            return (None, wizard_data, {})
×
71

72
        nexteval = nav['next']
×
73
        # simple string based next view
74
        if isinstance(nexteval, str):
×
75
            new_view = WizardViewState(nexteval, wizard_data, {})
×
76
        else:
77
            # handler fn based next view
78
            nv = nexteval(wizard_data)
×
79
            self._logger.debug(repr(nv))
×
80

81
            # append wizard_data and params if not returned
82
            if isinstance(nv, str):
×
83
                new_view = WizardViewState(nv, wizard_data, {})
×
84
            elif len(nv) == 1:
×
85
                new_view = WizardViewState(nv[0], wizard_data, {})
×
86
            elif len(nv) == 2:
×
87
                new_view = WizardViewState(nv[0], nv[1], {})
×
88
            else:
89
                new_view = nv
×
90

91
        self._stack.append(copy.deepcopy(self._current))
×
92
        self._current = new_view
×
93

94
        self._logger.debug(f'resolve_next view is {self._current.view}')
×
95
        self.log_stack(self._stack)
×
96

97
        return new_view
×
98

99
    def resolve_prev(self):
×
100
        prev_view = self._stack.pop()
×
101

102
        self._logger.debug(f'resolve_prev view is {prev_view}')
×
103
        self.log_stack(self._stack)
×
104

105
        self._current = prev_view
×
106
        return prev_view
×
107

108
    # check if this view is the final view
109
    def is_last_view(self, view, wizard_data):
×
110
        assert view
×
111
        assert view in self.navmap
×
112

113
        nav = self.navmap[view]
×
114

115
        if 'last' not in nav:
×
116
            return False
×
117

118
        lastnav = nav['last']
×
119
        # bool literal
120
        if isinstance(lastnav, bool):
×
121
            return lastnav
×
122
        elif callable(lastnav):
×
123
            # handler fn based
124
            l = lastnav(view, wizard_data)
×
125
            self._logger.debug(f'view "{view}" last: {l}')
×
126
            return l
×
127
        else:
128
            raise Exception(f'last handler for view {view} is not callable nor a bool literal')
×
129

130
    def finished(self, wizard_data):
×
131
        self._logger.debug('finished.')
×
132

133
    def reset(self):
×
134
        self._stack = []
×
135
        self._current = WizardViewState(None, {}, {})
×
136

137
    def log_stack(self, _stack):
×
138
        logstr = 'wizard stack:'
×
139
        stack = copy.deepcopy(_stack)
×
140
        i = 0
×
141
        for item in stack:
×
142
            self.sanitize_stack_item(item.wizard_data)
×
143
            logstr += f'\n{i}: {repr(item.wizard_data)}'
×
144
            i += 1
×
145
        self._logger.debug(logstr)
×
146

147
    def log_state(self, _current):
×
148
        current = copy.deepcopy(_current)
×
149
        self.sanitize_stack_item(current)
×
150
        self._logger.debug(f'wizard current: {repr(current)}')
×
151

152
    def sanitize_stack_item(self, _stack_item):
×
153
        sensitive_keys = ['seed', 'seed_extra_words', 'master_key', 'private_key_list', 'password']
×
154
        def sanitize(_dict):
×
155
            for item in _dict:
×
156
                if isinstance(_dict[item], dict):
×
157
                    sanitize(_dict[item])
×
158
                else:
159
                    if item in sensitive_keys:
×
160
                        _dict[item] = '<sensitive value removed>'
×
161
        sanitize(_stack_item)
×
162

163

164
class NewWalletWizard(AbstractWizard):
×
165

166
    _logger = get_logger(__name__)
×
167

168
    def __init__(self, daemon):
×
169
        AbstractWizard.__init__(self)
×
170
        self.navmap = {
×
171
            'wallet_name': {
172
                'next': 'wallet_type'
173
            },
174
            'wallet_type': {
175
                'next': self.on_wallet_type
176
            },
177
            'keystore_type': {
178
                'next': self.on_keystore_type
179
            },
180
            'create_seed': {
181
                'next': 'confirm_seed'
182
            },
183
            'confirm_seed': {
184
                'next': self.on_have_or_confirm_seed,
185
                'accept': self.maybe_master_pubkey,
186
                'last': lambda v,d: self.is_single_password() and not self.is_multisig(d)
187
            },
188
            'have_seed': {
189
                'next': self.on_have_or_confirm_seed,
190
                'accept': self.maybe_master_pubkey,
191
                'last': lambda v,d: self.is_single_password() and not self.is_bip39_seed(d) and not self.is_multisig(d)
192
            },
193
            'bip39_refine': {
194
                'next': lambda d: 'wallet_password' if not self.is_multisig(d) else 'multisig_cosigner_keystore',
195
                'accept': self.maybe_master_pubkey,
196
                'last': lambda v,d: self.is_single_password() and not self.is_multisig(d)
197
            },
198
            'have_master_key': {
199
                'next': lambda d: 'wallet_password' if not self.is_multisig(d) else 'multisig_cosigner_keystore',
200
                'accept': self.maybe_master_pubkey,
201
                'last': lambda v,d: self.is_single_password() and not self.is_multisig(d)
202
            },
203
            'multisig': {
204
                'next': 'keystore_type'
205
            },
206
            'multisig_cosigner_keystore': { # this view should set 'multisig_current_cosigner'
207
                'next': self.on_cosigner_keystore_type
208
            },
209
            'multisig_cosigner_key': {
210
                'next': lambda d: 'wallet_password' if self.has_all_cosigner_data(d) else 'multisig_cosigner_keystore',
211
                'last': lambda v,d: self.is_single_password() and self.has_all_cosigner_data(d)
212
            },
213
            'multisig_cosigner_seed': {
214
                'next': self.on_have_cosigner_seed,
215
                'last': lambda v,d: self.is_single_password() and self.has_all_cosigner_data(d)
216
            },
217
            'multisig_cosigner_bip39_refine': {
218
                'next': lambda d: 'wallet_password' if self.has_all_cosigner_data(d) else 'multisig_cosigner_keystore',
219
                'last': lambda v,d: self.is_single_password() and self.has_all_cosigner_data(d)
220
            },
221
            'imported': {
222
                'next': 'wallet_password',
223
                'last': lambda v,d: self.is_single_password()
224
            },
225
            'wallet_password': {
226
                'last': True
227
            }
228
        }
229
        self._daemon = daemon
×
230

231
    def start(self, initial_data=None):
×
232
        if initial_data is None:
×
233
            initial_data = {}
×
234
        self.reset()
×
235
        self._current = WizardViewState('wallet_name', initial_data, {})
×
236
        return self._current
×
237

238
    def is_single_password(self):
×
239
        raise NotImplementedError()
×
240

241
    def is_bip39_seed(self, wizard_data):
×
242
        return wizard_data.get('seed_variant') == 'bip39'
×
243

244
    def is_multisig(self, wizard_data):
×
245
        return wizard_data['wallet_type'] == 'multisig'
×
246

247
    def on_wallet_type(self, wizard_data):
×
248
        t = wizard_data['wallet_type']
×
249
        return {
×
250
            'standard': 'keystore_type',
251
            '2fa': 'trustedcoin_start',
252
            'multisig': 'multisig',
253
            'imported': 'imported'
254
        }.get(t)
255

256
    def on_keystore_type(self, wizard_data):
×
257
        t = wizard_data['keystore_type']
×
258
        return {
×
259
            'createseed': 'create_seed',
260
            'haveseed': 'have_seed',
261
            'masterkey': 'have_master_key'
262
        }.get(t)
263

264
    def on_have_or_confirm_seed(self, wizard_data):
×
265
        if self.is_bip39_seed(wizard_data):
×
266
            return 'bip39_refine'
×
267
        elif self.is_multisig(wizard_data):
×
268
            return 'multisig_cosigner_keystore'
×
269
        else:
270
            return 'wallet_password'
×
271

272
    def maybe_master_pubkey(self, wizard_data):
×
273
        self._logger.info('maybe_master_pubkey')
×
274
        if self.is_bip39_seed(wizard_data) and 'derivation_path' not in wizard_data:
×
275
            self._logger.info('maybe_master_pubkey2')
×
276
            return
×
277

278
        wizard_data['multisig_master_pubkey'] = self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()
×
279

280
    def on_cosigner_keystore_type(self, wizard_data):
×
281
        t = wizard_data['cosigner_keystore_type']
×
282
        return {
×
283
            'key': 'multisig_cosigner_key',
284
            'seed': 'multisig_cosigner_seed'
285
        }.get(t)
286

287
    def on_have_cosigner_seed(self, wizard_data):
×
288
        current_cosigner_data = wizard_data['multisig_cosigner_data'][str(wizard_data['multisig_current_cosigner'])]
×
289
        if self.has_all_cosigner_data(wizard_data):
×
290
            return 'wallet_password'
×
291
        elif current_cosigner_data['seed_type'] == 'bip39' and 'derivation_path' not in current_cosigner_data:
×
292
            return 'multisig_cosigner_bip39_refine'
×
293
        else:
294
            return 'multisig_cosigner_keystore'
×
295

296
    def has_all_cosigner_data(self, wizard_data):
×
297
        # number of items in multisig_cosigner_data is less than participants?
298
        if len(wizard_data['multisig_cosigner_data']) < (wizard_data['multisig_participants'] - 1):
×
299
            return False
×
300

301
        # if last cosigner uses bip39 seed, we still need derivation path
302
        current_cosigner_data = wizard_data['multisig_cosigner_data'][str(wizard_data['multisig_current_cosigner'])]
×
303
        if 'seed_type' in current_cosigner_data and current_cosigner_data['seed_type'] == 'bip39' and 'derivation_path' not in current_cosigner_data:
×
304
            return False
×
305

306
        return True
×
307

308
    def has_duplicate_masterkeys(self, wizard_data) -> bool:
×
309
        """Multisig wallets need distinct master keys. If True, need to prevent wallet-creation."""
310
        xpubs = []
×
311
        xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key())
×
312
        for cosigner in wizard_data['multisig_cosigner_data']:
×
313
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
314
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
315
        assert xpubs
×
316
        return len(xpubs) != len(set(xpubs))
×
317

318
    def has_heterogeneous_masterkeys(self, wizard_data) -> bool:
×
319
        """Multisig wallets need homogeneous master keys.
320
        All master keys need to be bip32, and e.g. Ypub cannot be mixed with Zpub.
321
        If True, need to prevent wallet-creation.
322
        """
323
        xpubs = []
×
324
        xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key())
×
325
        for cosigner in wizard_data['multisig_cosigner_data']:
×
326
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
327
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
328
        assert xpubs
×
329
        try:
×
330
            k_xpub_type = xpub_type(xpubs[0])
×
331
        except Exception:
×
332
            return True  # maybe old_mpk?
×
333
        for xpub in xpubs:
×
334
            try:
×
335
                my_xpub_type = xpub_type(xpub)
×
336
            except Exception:
×
337
                return True  # maybe old_mpk?
×
338
            if my_xpub_type != k_xpub_type:
×
339
                return True
×
340
        return False
×
341

342
    def keystore_from_data(self, wallet_type, data):
×
343
        if 'seed' in data:
×
344
            if data['seed_variant'] == 'electrum':
×
345
                return keystore.from_seed(data['seed'], data['seed_extra_words'], True)
×
346
            elif data['seed_variant'] == 'bip39':
×
347
                root_seed = keystore.bip39_to_seed(data['seed'], data['seed_extra_words'])
×
348
                derivation = normalize_bip32_derivation(data['derivation_path'])
×
349
                if wallet_type == 'multisig':
×
350
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
351
                else:
352
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
×
353
                return keystore.from_bip43_rootseed(root_seed, derivation, xtype=script)
×
354
            else:
355
                raise Exception('Unsupported seed variant %s' % data['seed_variant'])
×
356
        elif 'master_key' in data:
×
357
            return keystore.from_master_key(data['master_key'])
×
358
        else:
359
            raise Exception('no seed or master_key in data')
×
360

361
    def finished(self, wizard_data):
×
362
        self._logger.debug('finished')
×
363
        # override
364

365
    def create_storage(self, path, data):
×
366
        assert data['wallet_type'] in ['standard', '2fa', 'imported', 'multisig']
×
367

368
        if os.path.exists(path):
×
369
            raise Exception('file already exists at path')
×
370
        storage = WalletStorage(path)
×
371

372
        # TODO: refactor using self.keystore_from_data
373
        k = None
×
374
        if 'keystore_type' not in data:
×
375
            assert data['wallet_type'] == 'imported'
×
376
            addresses = {}
×
377
            if 'private_key_list' in data:
×
378
                k = keystore.Imported_KeyStore({})
×
379
                keys = keystore.get_private_keys(data['private_key_list'])
×
380
                for pk in keys:
×
381
                    assert bitcoin.is_private_key(pk)
×
382
                    txin_type, pubkey = k.import_privkey(pk, None)
×
383
                    addr = bitcoin.pubkey_to_address(txin_type, pubkey)
×
384
                    addresses[addr] = {'type': txin_type, 'pubkey': pubkey}
×
385
            elif 'address_list' in data:
×
386
                for addr in data['address_list'].split():
×
387
                    addresses[addr] = {}
×
388
        elif data['keystore_type'] in ['createseed', 'haveseed']:
×
389
            if data['seed_type'] in ['old', 'standard', 'segwit']:
×
390
                self._logger.debug('creating keystore from electrum seed')
×
391
                k = keystore.from_seed(data['seed'], data['seed_extra_words'], data['wallet_type'] == 'multisig')
×
392
            elif data['seed_type'] == 'bip39':
×
393
                self._logger.debug('creating keystore from bip39 seed')
×
394
                root_seed = keystore.bip39_to_seed(data['seed'], data['seed_extra_words'])
×
395
                derivation = normalize_bip32_derivation(data['derivation_path'])
×
396
                if data['wallet_type'] == 'multisig':
×
397
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
398
                else:
399
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
×
400
                k = keystore.from_bip43_rootseed(root_seed, derivation, xtype=script)
×
401
            elif is_any_2fa_seed_type(data['seed_type']):
×
402
                self._logger.debug('creating keystore from 2fa seed')
×
403
                k = keystore.from_xprv(data['x1/']['xprv'])
×
404
            else:
405
                raise Exception('unsupported/unknown seed_type %s' % data['seed_type'])
×
406
        elif data['keystore_type'] == 'masterkey':
×
407
            k = keystore.from_master_key(data['master_key'])
×
408
            if isinstance(k, keystore.Xpub):  # has xpub
×
409
                t1 = xpub_type(k.xpub)
×
410
                if data['wallet_type'] == 'multisig':
×
411
                    if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
412
                        raise Exception('wrong key type %s' % t1)
×
413
                else:
414
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
×
415
                        raise Exception('wrong key type %s' % t1)
×
416
            elif isinstance(k, keystore.Old_KeyStore):
×
417
                pass
×
418
            else:
419
                raise Exception(f"unexpected keystore type: {type(keystore)}")
×
420
        else:
421
            raise Exception('unsupported/unknown keystore_type %s' % data['keystore_type'])
×
422

423
        if data['encrypt']:
×
424
            if k and k.may_have_password():
×
425
                k.update_password(None, data['password'])
×
426
            storage.set_password(data['password'], enc_version=StorageEncryptionVersion.USER_PASSWORD)
×
427

428
        db = WalletDB('', storage=storage, manual_upgrades=False)
×
429
        db.set_keystore_encryption(bool(data['password']) and data['encrypt'])
×
430

431
        db.put('wallet_type', data['wallet_type'])
×
432
        if 'seed_type' in data:
×
433
            db.put('seed_type', data['seed_type'])
×
434

435
        if data['wallet_type'] == 'standard':
×
436
            db.put('keystore', k.dump())
×
437
        elif data['wallet_type'] == '2fa':
×
438
            db.put('x1/', k.dump())
×
439
            if data['trustedcoin_keepordisable'] == 'disable':
×
440
                k2 = keystore.from_xprv(data['x2/']['xprv'])
×
441
                if data['encrypt'] and k2.may_have_password():
×
442
                    k2.update_password(None, data['password'])
×
443
                db.put('x2/', k2.dump())
×
444
            else:
445
                db.put('x2/', data['x2/'])
×
446
            db.put('x3/', data['x3/'])
×
447
            db.put('use_trustedcoin', True)
×
448
        elif data['wallet_type'] == 'multisig':
×
449
            if not isinstance(k, keystore.Xpub):
×
450
                raise Exception(f"unexpected keystore(main) type={type(k)} in multisig. not bip32.")
×
451
            k_xpub_type = xpub_type(k.xpub)
×
452
            db.put('wallet_type', '%dof%d' % (data['multisig_signatures'],data['multisig_participants']))
×
453
            db.put('x1/', k.dump())
×
454
            for cosigner in data['multisig_cosigner_data']:
×
455
                cosigner_keystore = self.keystore_from_data('multisig', data['multisig_cosigner_data'][cosigner])
×
456
                if not isinstance(cosigner_keystore, keystore.Xpub):
×
457
                    raise Exception(f"unexpected keystore(cosigner) type={type(cosigner_keystore)} in multisig. not bip32.")
×
458
                if k_xpub_type != xpub_type(cosigner_keystore.xpub):
×
459
                    raise Exception("multisig wallet needs to have homogeneous xpub types")
×
460
                if data['encrypt'] and cosigner_keystore.may_have_password():
×
461
                    cosigner_keystore.update_password(None, data['password'])
×
462
                db.put(f'x{cosigner}/', cosigner_keystore.dump())
×
463
        elif data['wallet_type'] == 'imported':
×
464
            if k:
×
465
                db.put('keystore', k.dump())
×
466
            db.put('addresses', addresses)
×
467

468
        if k and k.can_have_deterministic_lightning_xprv():
×
469
            db.put('lightning_xprv', k.get_lightning_xprv(data['password'] if data['encrypt'] else None))
×
470

471
        db.load_plugins()
×
472
        db.write(storage)
×
473

474
class ServerConnectWizard(AbstractWizard):
×
475

476
    _logger = get_logger(__name__)
×
477

478
    def __init__(self, daemon):
×
479
        AbstractWizard.__init__(self)
×
480
        self.navmap = {
×
481
            'autoconnect': {
482
                'next': 'server_config',
483
                'last': lambda v,d: d['autoconnect']
484
            },
485
            'proxy_ask': {
486
                'next': lambda d: 'proxy_config' if d['want_proxy'] else 'autoconnect'
487
            },
488
            'proxy_config': {
489
                'next': 'autoconnect'
490
            },
491
            'server_config': {
492
                'last': True
493
            }
494
        }
495
        self._daemon = daemon
×
496

497
    def start(self, initial_data=None):
×
498
        if initial_data is None:
×
499
            initial_data = {}
×
500
        self.reset()
×
501
        self._current = WizardViewState('proxy_ask', initial_data, {})
×
502
        return self._current
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc