• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5766584628674560

20 Aug 2025 05:57PM UTC coverage: 61.507% (-0.03%) from 61.535%
5766584628674560

Pull #10159

CirrusCI

SomberNight
logging: add config.LOGS_MAX_TOTAL_SIZE_BYTES: to limit size on disk
Pull Request #10159: logging: add config.LOGS_MAX_TOTAL_SIZE_BYTES: to limit size on disk

7 of 31 new or added lines in 2 files covered. (22.58%)

125 existing lines in 47 files now uncovered.

22810 of 37085 relevant lines covered (61.51%)

3.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.0
/electrum/wizard.py
1
import copy
5✔
2
import os
5✔
3

4
from typing import List, NamedTuple, Any, Dict, Optional, Tuple, TYPE_CHECKING
5✔
5

6
from electrum.gui.messages import TERMS_OF_USE_LATEST_VERSION
5✔
7

8
from electrum.i18n import _
5✔
9
from electrum.interface import ServerAddr
5✔
10
from electrum.keystore import hardware_keystore
5✔
11
from electrum.logging import get_logger
5✔
12
from electrum.network import ProxySettings
5✔
13
from electrum.plugin import run_hook
5✔
14
from electrum.slip39 import EncryptedSeed
5✔
15
from electrum.storage import WalletStorage, StorageEncryptionVersion
5✔
16
from electrum.wallet_db import WalletDB
5✔
17
from electrum.bip32 import normalize_bip32_derivation, xpub_type
5✔
18
from electrum import keystore, mnemonic, bitcoin
5✔
19
from electrum.mnemonic import is_any_2fa_seed_type, can_seed_have_passphrase
5✔
20
from electrum.util import multisig_type
5✔
21

22
if TYPE_CHECKING:
2✔
UNCOV
23
    from electrum.daemon import Daemon
UNCOV
24
    from electrum.plugin import Plugins
UNCOV
25
    from electrum.keystore import Hardware_KeyStore
UNCOV
26
    from electrum.simple_config import SimpleConfig
27

28

29
class WizardViewState(NamedTuple):
5✔
30
    view: Optional[str]
5✔
31
    wizard_data: Dict[str, Any]
5✔
32
    params: Dict[str, Any]
5✔
33

34

35
class AbstractWizard:
5✔
36
    # serve as a base for all UIs, so no qt
37
    # encapsulate wizard state
38
    # encapsulate navigation decisions, UI agnostic
39
    # encapsulate stack, go backwards
40
    # allow extend/override flow in subclasses e.g.
41
    # - override: replace 'next' value to own fn
42
    # - extend: add new keys to navmap, wire up flow by override
43

44
    _logger = get_logger(__name__)
5✔
45

46
    def __init__(self):
5✔
47
        self.navmap = {}
5✔
48

49
        self._current = WizardViewState(None, {}, {})
5✔
50
        self._stack = []  # type: List[WizardViewState]
5✔
51

52
    def navmap_merge(self, additional_navmap: dict):
5✔
53
        # NOTE: only merges one level deep. Deeper dict levels will overwrite
54
        for k, v in additional_navmap.items():
5✔
55
            if k in self.navmap:
5✔
56
                self.navmap[k].update(v)
5✔
57
            else:
58
                self.navmap[k] = v
5✔
59

60
    # from current view and wizard_data, resolve the new view
61
    # returns WizardViewState tuple (view name, wizard_data, view params)
62
    # view name is the string id of the view in the nav map
63
    # wizard data is the (stacked) wizard data dict containing user input and choices
64
    # view params are transient, meant for extra configuration of a view (e.g. info
65
    #   msg in a generic choice dialog)
66
    # exception: stay on this view
67
    def resolve_next(self, view: str, wizard_data: dict) -> WizardViewState:
5✔
68
        assert view, f'view not defined: {repr(self.sanitize_stack_item(wizard_data))}'
5✔
69
        self._logger.debug(f'view={view}')
5✔
70
        assert view in self.navmap
5✔
71

72
        nav = self.navmap[view]
5✔
73

74
        if 'accept' in nav:
5✔
75
            # allow python scope to append to wizard_data before
76
            # adding to stack or finishing
77
            view_accept = nav['accept']
5✔
78
            if callable(view_accept):
5✔
79
                view_accept(wizard_data)
5✔
80
            else:
81
                raise Exception(f'accept handler for view {view} is not callable')
×
82

83
        # make a clone for next view
84
        wizard_data = copy.deepcopy(wizard_data)
5✔
85

86
        if 'next' not in nav:
5✔
87
            new_view = WizardViewState(None, wizard_data, {})
5✔
88
        else:
89
            view_next = nav['next']
5✔
90
            if isinstance(view_next, str):
5✔
91
                # string literal
92
                new_view = WizardViewState(view_next, wizard_data, {})
5✔
93
            elif callable(view_next):
5✔
94
                # handler fn based
95
                nv = view_next(wizard_data)
5✔
96
                self._logger.debug(repr(nv))
5✔
97

98
                # append wizard_data and params if not returned
99
                if isinstance(nv, str):
5✔
100
                    new_view = WizardViewState(nv, wizard_data, {})
5✔
101
                elif len(nv) == 1:
×
102
                    new_view = WizardViewState(nv[0], wizard_data, {})
×
103
                elif len(nv) == 2:
×
104
                    new_view = WizardViewState(nv[0], nv[1], {})
×
105
                else:
106
                    new_view = nv
×
107
            else:
108
                raise Exception(f'next handler for view {view} is not callable nor a string literal')
×
109

110
            if 'params' in self.navmap[new_view.view]:
5✔
111
                params = self.navmap[new_view.view]['params']
×
112
                assert isinstance(params, dict), 'params is not a dict'
×
113
                new_view.params.update(params)
×
114

115
            self._logger.debug(f'resolve_next view is {new_view.view}')
5✔
116

117
        self._stack.append(copy.deepcopy(self._current))
5✔
118
        self._current = new_view
5✔
119

120
        self.log_stack()
5✔
121

122
        return new_view
5✔
123

124
    def resolve_prev(self):
5✔
125
        self._current = self._stack.pop()
×
126

127
        self._logger.debug(f'resolve_prev view is "{self._current.view}"')
×
128
        self.log_stack()
×
129

130
        return self._current
×
131

132
    # check if this view is the final view
133
    def is_last_view(self, view: str, wizard_data: dict) -> bool:
5✔
134
        assert view, f'view not defined: {repr(self.sanitize_stack_item(wizard_data))}'
5✔
135
        assert view in self.navmap
5✔
136

137
        nav = self.navmap[view]
5✔
138

139
        if 'last' not in nav:
5✔
140
            return False
5✔
141

142
        view_last = nav['last']
5✔
143
        if isinstance(view_last, bool):
5✔
144
            # bool literal
145
            self._logger.debug(f'view "{view}" last: {view_last}')
5✔
146
            return view_last
5✔
147
        elif callable(view_last):
5✔
148
            # handler fn based
149
            is_last = view_last(wizard_data)
5✔
150
            self._logger.debug(f'view "{view}" last: {is_last}')
5✔
151
            return is_last
5✔
152
        else:
153
            raise Exception(f'last handler for view {view} is not callable nor a bool literal')
×
154

155
    def reset(self):
5✔
156
        self._stack = []
5✔
157
        self._current = WizardViewState(None, {}, {})
5✔
158

159
    def log_stack(self):
5✔
160
        logstr = 'wizard stack:'
5✔
161
        i = 0
5✔
162
        for item in self._stack:
5✔
163
            ssi = self.sanitize_stack_item(item.wizard_data)
5✔
164
            logstr += f'\n{i}: {hex(id(item.wizard_data))} - {repr(ssi)}'
5✔
165
            i += 1
5✔
166
        sci = self.sanitize_stack_item(self._current.wizard_data)
5✔
167
        logstr += f'\nc: {hex(id(self._current.wizard_data))} - {repr(sci)}'
5✔
168
        self._logger.debug(logstr)
5✔
169

170
    def sanitize_stack_item(self, _stack_item) -> dict:
5✔
171
        whitelist = [
5✔
172
            "wallet_name", "wallet_exists", "wallet_is_open", "wallet_needs_hw_unlock",
173
            "wallet_type", "keystore_type", "seed_variant", "seed_type", "seed_extend",
174
            "script_type", "derivation_path", "encrypt",
175
            # hardware devices:
176
            "hardware_device", "hw_type", "label", "soft_device_id", "xpub_encrypt",
177
            # inside keystore:
178
            "type", "pw_hash_version", "derivation", "root_fingerprint",
179
            # multisig:
180
            "multisig_participants", "multisig_signatures", "multisig_current_cosigner", "cosigner_keystore_type",
181
            # trustedcoin:
182
            "trustedcoin_keepordisable", "trustedcoin_go_online",
183
        ]
184

185
        def sanitize(_dict):
5✔
186
            result = {}
5✔
187
            for item in _dict:
5✔
188
                if isinstance(_dict[item], dict):
5✔
189
                    result[item] = sanitize(_dict[item])
5✔
190
                else:
191
                    if item in whitelist:
5✔
192
                        result[item] = _dict[item]
5✔
193
                    else:
194
                        result[item] = '<redacted>'
5✔
195
            return result
5✔
196
        return sanitize(_stack_item)
5✔
197

198
    def get_wizard_data(self) -> dict:
5✔
199
        return copy.deepcopy(self._current.wizard_data)
×
200

201

202
class KeystoreWizard(AbstractWizard):
5✔
203

204
    _logger = get_logger(__name__)
5✔
205

206
    def __init__(self, plugins: 'Plugins'):
5✔
207
        AbstractWizard.__init__(self)
5✔
208
        self.plugins = plugins
5✔
209
        self.navmap = {
5✔
210
            'keystore_type': {
211
                'next': self.on_keystore_type
212
            },
213
            'enter_seed': {
214
                'next': lambda d: 'enter_ext' if self.wants_ext(d) else 'script_and_derivation',
215
                'accept': lambda d: None if (self.wants_ext(d) or self.needs_derivation_path(d)) else self.update_keystore(d),
216
                'last': lambda d: not self.wants_ext(d) and not self.needs_derivation_path(d),
217
            },
218
            'enter_ext': {
219
                'next': 'script_and_derivation',
220
                'accept': lambda d: None if self.needs_derivation_path(d) else self.update_keystore(d),
221
                'last': lambda d: not self.needs_derivation_path(d)
222
            },
223
            'script_and_derivation': {
224
                'accept': self.update_keystore,
225
                'last': True
226
            },
227
            'choose_hardware_device': {
228
                'next': self.on_hardware_device,
229
            },
230
            'wallet_password': {
231
                'last': True
232
            },
233
            'wallet_password_hardware': {
234
                'last': True
235
            },
236
        }
237

238
    def maybe_master_pubkey(self, wizard_data):
5✔
239
        self.update_keystore(wizard_data)
5✔
240

241
    def check_multisig_constraints(self, wizard_data: dict) -> Tuple[bool, str]:
5✔
242
        # called by GUI. overloaded in NewWalletWizard
243
        return True, ''
×
244

245
    def update_keystore(self, wizard_data):
5✔
246
        wallet_type = wizard_data['wallet_type']
5✔
247
        keystore = self.keystore_from_data(wallet_type, wizard_data)
5✔
248
        self._result = keystore, (wizard_data['keystore_type'] == 'hardware')
5✔
249

250
    def on_keystore_type(self, wizard_data: dict) -> str:
5✔
251
        t = wizard_data['keystore_type']
5✔
252
        return {
5✔
253
            'haveseed': 'enter_seed',
254
            'hardware': 'choose_hardware_device'
255
        }.get(t)
256

257
    def last_cosigner(self, wizard_data: dict) -> bool:
5✔
258
        # one at a time
259
        return True
5✔
260

261
    def _convert_wallet_type(self, wizard_data: dict) -> None:
5✔
262
        assert 'wallet_type' in wizard_data
5✔
263
        if multisig_type(wizard_data['wallet_type']):
5✔
264
            wizard_data['wallet_type'] = 'multisig'  # convert from e.g. "2of2" to "multisig"
5✔
265
            wizard_data['multisig_participants'] = 2
5✔
266
            wizard_data['multisig_signatures'] = 2
5✔
267
            wizard_data['multisig_cosigner_data'] = {}
5✔
268

269
    def start(self, *, start_viewstate: WizardViewState = None) -> WizardViewState:
5✔
270
        self.reset()
5✔
271
        if start_viewstate is None:
5✔
272
            start_view = 'keystore_type'
×
273
            params = self.navmap[start_view].get('params', {})
×
274
            self._current = WizardViewState(start_view, {}, params)
×
275
        else:
276
            self._current = start_viewstate
5✔
277
        self._convert_wallet_type(self._current.wizard_data)  # mutating in-place
5✔
278
        return self._current
5✔
279

280
    # returns (sub)dict of current cosigner (or root if first)
281
    def current_cosigner(self, wizard_data: dict) -> dict:
5✔
282
        wdata = wizard_data
5✔
283
        if wizard_data.get('wallet_type') == 'multisig' and 'multisig_current_cosigner' in wizard_data:
5✔
284
            cosigner = wizard_data['multisig_current_cosigner']
5✔
285
            wdata = wizard_data['multisig_cosigner_data'][str(cosigner)]
5✔
286
        return wdata
5✔
287

288
    def needs_derivation_path(self, wizard_data: dict) -> bool:
5✔
289
        wdata = self.current_cosigner(wizard_data)
5✔
290
        return 'seed_variant' in wdata and wdata['seed_variant'] in ['bip39', 'slip39']
5✔
291

292
    def wants_ext(self, wizard_data: dict) -> bool:
5✔
293
        wdata = self.current_cosigner(wizard_data)
5✔
294
        return 'seed_variant' in wdata and wdata['seed_extend']
5✔
295

296
    def is_multisig(self, wizard_data: dict) -> bool:
5✔
297
        return wizard_data['wallet_type'] == 'multisig'
5✔
298

299
    def is_hardware(self, wizard_data: dict) -> bool:
5✔
300
        return wizard_data['keystore_type'] == 'hardware'
5✔
301

302
    def wallet_password_view(self, wizard_data: dict) -> str:
5✔
303
        if self.is_hardware(wizard_data) and wizard_data['wallet_type'] == 'standard':
5✔
304
            return 'wallet_password_hardware'
5✔
305
        return 'wallet_password'
5✔
306

307
    def on_hardware_device(self, wizard_data: dict, new_wallet=True) -> str:
5✔
308
        current_cosigner = self.current_cosigner(wizard_data)
5✔
309
        _type, _info = current_cosigner['hardware_device']
5✔
310
        plugin = self.plugins.get_plugin(_type)
5✔
311
        run_hook('init_wallet_wizard', self)  # TODO: currently only used for hww, hook name might be confusing
5✔
312
        return plugin.wizard_entry_for_device(_info, new_wallet=new_wallet)
5✔
313

314
    def validate_seed(self, seed: str, seed_variant: str, wallet_type: str) -> Tuple[bool, str, str, bool]:
5✔
315
        seed_type = ''
×
316
        seed_valid = False
×
317
        validation_message = ''
×
318
        can_passphrase = True
×
319

320
        if seed_variant == 'electrum':
×
321
            seed_type = mnemonic.calc_seed_type(seed)
×
322
            if seed_type != '':
×
323
                seed_valid = True
×
324
                can_passphrase = can_seed_have_passphrase(seed)
×
325
        elif seed_variant == 'bip39':
×
326
            is_checksum, is_wordlist = keystore.bip39_is_checksum_valid(seed)
×
327
            validation_message = ('' if is_checksum else _('BIP39 checksum failed')) if is_wordlist else _('Unknown BIP39 wordlist')
×
328
            if not bool(seed):
×
329
                validation_message = ''
×
330
            seed_type = 'bip39'
×
331
            # bip39 always valid, even if checksum failed, see #8720
332
            # however, reject empty string
333
            seed_valid = bool(seed)
×
334
        elif seed_variant == 'slip39':
×
335
            # seed shares should be already validated by wizard page, we have a combined encrypted seed
336
            if seed and isinstance(seed, EncryptedSeed):
×
337
                seed_valid = True
×
338
                seed_type = 'slip39'
×
339
            else:
340
                seed_valid = False
×
341
        else:
342
            raise Exception(f'unknown seed variant {seed_variant}')
×
343

344
        # check if seed matches wallet type
345
        if wallet_type == '2fa' and not is_any_2fa_seed_type(seed_type):
×
346
            seed_valid = False
×
347
        elif wallet_type == 'standard' and seed_type not in ['old', 'standard', 'segwit', 'bip39', 'slip39']:
×
348
            seed_valid = False
×
349
        elif wallet_type == 'multisig' and seed_type not in ['standard', 'segwit', 'bip39', 'slip39']:
×
350
            seed_valid = False
×
351

352
        self._logger.debug(f'seed verified: {seed_valid}, type={seed_type!r}, validation_message={validation_message}')
×
353

354
        return seed_valid, seed_type, validation_message, can_passphrase
×
355

356
    def keystore_from_data(self, wallet_type: str, data: dict):
5✔
357
        if data['keystore_type'] in ['createseed', 'haveseed'] and 'seed' in data:
5✔
358
            seed_extension = data.get('seed_extra_words', '')
5✔
359
            if data['seed_variant'] == 'electrum':
5✔
360
                for_multisig = wallet_type in ['multisig']
5✔
361
                return keystore.from_seed(data['seed'], passphrase=seed_extension, for_multisig=for_multisig)
5✔
362
            elif data['seed_variant'] == 'bip39':
5✔
363
                root_seed = keystore.bip39_to_seed(data['seed'], passphrase=seed_extension)
5✔
364
                derivation = normalize_bip32_derivation(data['derivation_path'])
5✔
365
                if wallet_type == 'multisig':
5✔
366
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
5✔
367
                else:
368
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
5✔
369
                return keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
5✔
370
            elif data['seed_variant'] == 'slip39':
5✔
371
                root_seed = data['seed'].decrypt(seed_extension)
5✔
372
                derivation = normalize_bip32_derivation(data['derivation_path'])
5✔
373
                if wallet_type == 'multisig':
5✔
374
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
375
                else:
376
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
5✔
377
                return keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
5✔
378
            else:
379
                raise Exception('Unsupported seed variant %s' % data['seed_variant'])
×
380
        elif data['keystore_type'] == 'masterkey' and 'master_key' in data:
5✔
381
            return keystore.from_master_key(data['master_key'])
5✔
382
        elif data['keystore_type'] == 'hardware':
5✔
383
            return self.hw_keystore(data)
5✔
384
        else:
385
            raise Exception('no seed or master_key in data')
×
386

387
    def hw_keystore(self, data: dict) -> 'Hardware_KeyStore':
5✔
388
        return hardware_keystore({
5✔
389
            'type': 'hardware',
390
            'hw_type': data['hw_type'],
391
            'derivation': data['derivation_path'],
392
            'root_fingerprint': data['root_fingerprint'],
393
            'xpub': data['master_key'],
394
            'label': data['label'],
395
            'soft_device_id': data['soft_device_id']
396
        })
397

398

399
class NewWalletWizard(KeystoreWizard):
5✔
400

401
    _logger = get_logger(__name__)
5✔
402

403
    def __init__(self, daemon: 'Daemon', plugins: 'Plugins'):
5✔
404
        KeystoreWizard.__init__(self, plugins)
5✔
405
        self.navmap = {
5✔
406
            'wallet_name': {
407
                'next': lambda d: 'hw_unlock' if d.get('wallet_needs_hw_unlock') else 'wallet_type',
408
            },
409
            'hw_unlock': {
410
                'next': lambda d: self.on_hardware_device(d, new_wallet=False),
411
            },
412
            'wallet_type': {
413
                'next': self.on_wallet_type
414
            },
415
            'keystore_type': {
416
                'next': self.on_keystore_type
417
            },
418
            'create_seed': {
419
                'next': lambda d: 'create_ext' if self.wants_ext(d) else 'confirm_seed',
420
            },
421
            'create_ext': {
422
                'next': 'confirm_seed',
423
            },
424
            'confirm_seed': {
425
                'next': lambda d: 'confirm_ext' if self.wants_ext(d) else self.on_have_or_confirm_seed(d),
426
                'accept': lambda d: None if self.wants_ext(d) else self.maybe_master_pubkey(d),
427
                'last': lambda d: self.is_single_password() and not self.is_multisig(d) and not self.wants_ext(d),
428
            },
429
            'confirm_ext': {
430
                'next': self.on_have_or_confirm_seed,
431
                'accept': self.maybe_master_pubkey,
432
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
433
            },
434
            'have_seed': {
435
                'next': lambda d: 'have_ext' if self.wants_ext(d) else self.on_have_or_confirm_seed(d),
436
                'accept': lambda d: None if self.wants_ext(d) else self.maybe_master_pubkey(d),
437
                'last': lambda d: self.is_single_password() and not
438
                                    (self.needs_derivation_path(d) or self.is_multisig(d) or self.wants_ext(d)),
439
            },
440
            'have_ext': {
441
                'next': self.on_have_or_confirm_seed,
442
                'accept': self.maybe_master_pubkey,
443
                'last': lambda d: self.is_single_password() and not
444
                                  (self.needs_derivation_path(d) or self.is_multisig(d))
445
            },
446
            'choose_hardware_device': {
447
                'next': self.on_hardware_device,
448
            },
449
            'script_and_derivation': {
450
                'next': lambda d: self.wallet_password_view(d) if not self.is_multisig(d) else 'multisig_cosigner_keystore',
451
                'accept': self.maybe_master_pubkey,
452
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
453
            },
454
            'have_master_key': {
455
                'next': lambda d: self.wallet_password_view(d) if not self.is_multisig(d) else 'multisig_cosigner_keystore',
456
                'accept': self.maybe_master_pubkey,
457
                'last': lambda d: self.is_single_password() and not self.is_multisig(d)
458
            },
459
            'multisig': {
460
                'next': 'keystore_type'
461
            },
462
            'multisig_cosigner_keystore': {  # this view should set 'multisig_current_cosigner'
463
                'next': self.on_cosigner_keystore_type
464
            },
465
            'multisig_cosigner_key': {
466
                'next': lambda d: self.wallet_password_view(d) if self.last_cosigner(d) else 'multisig_cosigner_keystore',
467
                'last': lambda d: self.is_single_password() and self.last_cosigner(d)
468
            },
469
            'multisig_cosigner_seed': {
470
                'next': lambda d: 'multisig_cosigner_have_ext' if self.wants_ext(d) else self.on_have_cosigner_seed(d),
471
                'last': lambda d: self.is_single_password() and self.last_cosigner(d) and not
472
                                  (self.needs_derivation_path(d) or self.wants_ext(d)),
473
            },
474
            'multisig_cosigner_have_ext': {
475
                'next': self.on_have_cosigner_seed,
476
                'last': lambda d: self.is_single_password() and self.last_cosigner(d) and not self.needs_derivation_path(d)
477
            },
478
            'multisig_cosigner_hardware': {
479
                'next': self.on_hardware_device,
480
            },
481
            'multisig_cosigner_script_and_derivation': {
482
                'next': lambda d: self.wallet_password_view(d) if self.last_cosigner(d) else 'multisig_cosigner_keystore',
483
                'last': lambda d: self.is_single_password() and self.last_cosigner(d)
484
            },
485
            'imported': {
486
                'next': 'wallet_password',
487
                'last': lambda d: self.is_single_password()
488
            },
489
            'wallet_password': {
490
                'last': True
491
            },
492
            'wallet_password_hardware': {
493
                'last': True
494
            }
495
        }
496
        self._daemon = daemon
5✔
497
        self.plugins = plugins
5✔
498
        # todo: load only if needed, like hw plugins
499
        self.plugins.load_plugin_by_name('trustedcoin')
5✔
500

501
    def start(self, *, start_viewstate: WizardViewState = None) -> WizardViewState:
5✔
502
        self.reset()
5✔
503
        if start_viewstate is None:
5✔
504
            start_view = 'wallet_name'
5✔
505
            params = self.navmap[start_view].get('params', {})
5✔
506
            self._current = WizardViewState(start_view, {}, params)
5✔
507
        else:
508
            self._current = start_viewstate
×
509
        return self._current
5✔
510

511
    def is_single_password(self) -> bool:
5✔
512
        raise NotImplementedError()
×
513

514
    def on_wallet_type(self, wizard_data: dict) -> str:
5✔
515
        t = wizard_data['wallet_type']
5✔
516
        return {
5✔
517
            'standard': 'keystore_type',
518
            '2fa': 'trustedcoin_start',
519
            'multisig': 'multisig',
520
            'imported': 'imported'
521
        }.get(t)
522

523
    def on_keystore_type(self, wizard_data: dict) -> str:
5✔
524
        t = wizard_data['keystore_type']
5✔
525
        return {
5✔
526
            'createseed': 'create_seed',
527
            'haveseed': 'have_seed',
528
            'masterkey': 'have_master_key',
529
            'hardware': 'choose_hardware_device'
530
        }.get(t)
531

532
    def on_have_or_confirm_seed(self, wizard_data: dict) -> str:
5✔
533
        if self.needs_derivation_path(wizard_data):
5✔
534
            return 'script_and_derivation'
5✔
535
        elif self.is_multisig(wizard_data):
5✔
536
            return 'multisig_cosigner_keystore'
5✔
537
        else:
538
            return 'wallet_password'
5✔
539

540
    def maybe_master_pubkey(self, wizard_data: dict):
5✔
541
        self._logger.debug('maybe_master_pubkey')
5✔
542
        if self.needs_derivation_path(wizard_data) and 'derivation_path' not in wizard_data:
5✔
543
            self._logger.debug('deferred, missing derivation_path')
5✔
544
            return
5✔
545

546
        wizard_data['multisig_master_pubkey'] = self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()
5✔
547

548
    def on_cosigner_keystore_type(self, wizard_data: dict) -> str:
5✔
549
        t = wizard_data['cosigner_keystore_type']
5✔
550
        return {
5✔
551
            'masterkey': 'multisig_cosigner_key',
552
            'haveseed': 'multisig_cosigner_seed',
553
            'hardware': 'multisig_cosigner_hardware'
554
        }.get(t)
555

556
    def on_have_cosigner_seed(self, wizard_data: dict) -> str:
5✔
557
        current_cosigner = self.current_cosigner(wizard_data)
5✔
558
        if self.needs_derivation_path(wizard_data) and 'derivation_path' not in current_cosigner:
5✔
559
            return 'multisig_cosigner_script_and_derivation'
5✔
560
        elif self.last_cosigner(wizard_data):
5✔
561
            return 'wallet_password'
×
562
        else:
563
            return 'multisig_cosigner_keystore'
5✔
564

565
    def last_cosigner(self, wizard_data: dict) -> bool:
5✔
566
        # check if we have the final number of cosigners. Doesn't check if cosigner data itself is complete
567
        # (should be validated by wizardcomponents)
568
        if not self.is_multisig(wizard_data):
5✔
569
            return True
5✔
570

571
        if len(wizard_data['multisig_cosigner_data']) < (wizard_data['multisig_participants'] - 1):
5✔
572
            return False
5✔
573

574
        return True
5✔
575

576
    def has_duplicate_masterkeys(self, wizard_data: dict) -> bool:
5✔
577
        """Multisig wallets need distinct master keys. If True, need to prevent wallet-creation."""
578
        xpubs = [self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()]
×
579
        for cosigner in wizard_data['multisig_cosigner_data']:
×
580
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
581
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
582
        assert xpubs
×
583
        return len(xpubs) != len(set(xpubs))
×
584

585
    def has_heterogeneous_masterkeys(self, wizard_data: dict) -> bool:
5✔
586
        """Multisig wallets need homogeneous master keys.
587
        All master keys need to be bip32, and e.g. Ypub cannot be mixed with Zpub.
588
        If True, need to prevent wallet-creation.
589
        """
590
        xpubs = [self.keystore_from_data(wizard_data['wallet_type'], wizard_data).get_master_public_key()]
×
591
        for cosigner in wizard_data['multisig_cosigner_data']:
×
592
            data = wizard_data['multisig_cosigner_data'][cosigner]
×
593
            xpubs.append(self.keystore_from_data(wizard_data['wallet_type'], data).get_master_public_key())
×
594
        assert xpubs
×
595
        try:
×
596
            k_xpub_type = xpub_type(xpubs[0])
×
597
        except Exception:
×
598
            return True  # maybe old_mpk?
×
599
        for xpub in xpubs:
×
600
            try:
×
601
                my_xpub_type = xpub_type(xpub)
×
602
            except Exception:
×
603
                return True  # maybe old_mpk?
×
604
            if my_xpub_type != k_xpub_type:
×
605
                return True
×
606
        return False
×
607

608
    def is_current_cosigner_hardware(self, wizard_data: dict) -> bool:
5✔
609
        cosigner_data = self.current_cosigner(wizard_data)
×
610
        cosigner_is_hardware = cosigner_data == wizard_data and wizard_data['keystore_type'] == 'hardware'
×
611
        if 'cosigner_keystore_type' in wizard_data and wizard_data['cosigner_keystore_type'] == 'hardware':
×
612
            cosigner_is_hardware = True
×
613
        return cosigner_is_hardware
×
614

615
    def check_multisig_constraints(self, wizard_data: dict) -> Tuple[bool, str]:
5✔
616
        if not self.is_multisig(wizard_data):
×
617
            return True, ''
×
618

619
        # current cosigner might be incomplete. In that case, return valid
620
        cosigner_data = self.current_cosigner(wizard_data)
×
621
        if self.needs_derivation_path(wizard_data):
×
622
            if 'derivation_path' not in cosigner_data:
×
623
                self._logger.debug('defer multisig check: missing derivation_path')
×
624
                return True, ''
×
625
        if self.wants_ext(wizard_data):
×
626
            if 'seed_extra_words' not in cosigner_data:
×
627
                self._logger.debug('defer multisig check: missing extra words')
×
628
                return True, ''
×
629
        if self.is_current_cosigner_hardware(wizard_data):
×
630
            if 'master_key' not in cosigner_data:
×
631
                self._logger.debug('defer multisig check: missing master_key')
×
632
                return True, ''
×
633

634
        user_info = ''
×
635

636
        if self.has_duplicate_masterkeys(wizard_data):
×
637
            self._logger.debug('Duplicate master keys!')
×
638
            user_info = _('Duplicate master keys')
×
639
            multisig_keys_valid = False
×
640
        elif self.has_heterogeneous_masterkeys(wizard_data):
×
641
            self._logger.debug('Heterogenous master keys!')
×
642
            user_info = _('Heterogenous master keys')
×
643
            multisig_keys_valid = False
×
644
        else:
645
            multisig_keys_valid = True
×
646

647
        return multisig_keys_valid, user_info
×
648

649
    def validate_master_key(self, key: str, wallet_type: str):
5✔
650
        # TODO: deduplicate with master key check in create_storage()
651
        validation_message = ''
×
652
        key_valid = False
×
653

654
        if not keystore.is_master_key(key):
×
655
            validation_message = _('Not a master key')
×
656
        else:
657
            k = keystore.from_master_key(key)
×
658
            if wallet_type == 'standard':
×
659
                if isinstance(k, keystore.Xpub):  # has bip32 xpub
×
660
                    t1 = xpub_type(k.xpub)
×
661
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:  # disallow Ypub/Zpub
×
662
                        validation_message = '%s: %s' % (_('Wrong key type'), t1)
×
663
                    else:
664
                        key_valid = True
×
665
                elif isinstance(k, keystore.Old_KeyStore):
×
666
                    key_valid = True
×
667
                else:
668
                    self._logger.error(f"unexpected keystore type: {type(keystore)}")
×
669
            elif wallet_type == 'multisig':
×
670
                if not isinstance(k, keystore.Xpub):  # old mpk?
×
671
                    validation_message = '%s: %s' % (_('Wrong key type'), "not bip32")
×
672
                t1 = xpub_type(k.xpub)
×
673
                if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:  # disallow ypub/zpub
×
674
                    validation_message = '%s: %s' % (_('Wrong key type'), t1)
×
675
                else:
676
                    key_valid = True
×
677
            else:
678
                validation_message = '%s: %s' % (_('Unsupported wallet type'), wallet_type)
×
679
                self._logger.error(f'Unsupported wallet type: {wallet_type}')
×
680

681
        return key_valid, validation_message
×
682

683
    def create_storage(self, path: str, data: dict):
5✔
684
        assert data['wallet_type'] in ['standard', '2fa', 'imported', 'multisig']
5✔
685

686
        if os.path.exists(path):
5✔
687
            raise Exception('file already exists at path')
×
688
        storage = WalletStorage(path)
5✔
689

690
        # TODO: refactor using self.keystore_from_data
691
        k = None
5✔
692
        if 'keystore_type' not in data:
5✔
693
            assert data['wallet_type'] == 'imported'
5✔
694
            addresses = {}
5✔
695
            if 'private_key_list' in data:
5✔
696
                k = keystore.Imported_KeyStore({})
5✔
697
                keys = keystore.get_private_keys(data['private_key_list'])
5✔
698
                for pk in keys:
5✔
699
                    assert bitcoin.is_private_key(pk)
5✔
700
                    txin_type, pubkey = k.import_privkey(pk, None)
5✔
701
                    addr = bitcoin.pubkey_to_address(txin_type, pubkey)
5✔
702
                    addresses[addr] = {'type': txin_type, 'pubkey': pubkey}
5✔
703
            elif 'address_list' in data:
5✔
704
                for addr in data['address_list'].split():
5✔
705
                    assert isinstance(addr, str)
5✔
706
                    assert bitcoin.is_address(addr), f"expected bitcoin addr. got {addr[:5] + '..' + addr[-2:]}"
5✔
707
                    # note: we do not normalize addresses. :/
708
                    #       In particular, bech32 addresses can be either all-lowercase or all-uppercase.
709
                    #       TODO we should normalize them, but it only makes sense if we also do a walletDB-upgrade.
710
                    addresses[addr] = {}
5✔
711
        elif data['keystore_type'] in ['createseed', 'haveseed']:
5✔
712
            seed_extension = data['seed_extra_words'] if data['seed_extend'] else ''
5✔
713
            if data['seed_type'] in ['old', 'standard', 'segwit']:
5✔
714
                self._logger.debug('creating keystore from electrum seed')
5✔
715
                k = keystore.from_seed(data['seed'], passphrase=seed_extension, for_multisig=data['wallet_type'] == 'multisig')
5✔
716
            elif data['seed_type'] in ['bip39', 'slip39']:
5✔
717
                self._logger.debug('creating keystore from %s seed' % data['seed_type'])
5✔
718
                if data['seed_type'] == 'bip39':
5✔
719
                    root_seed = keystore.bip39_to_seed(data['seed'], passphrase=seed_extension)
5✔
720
                else:
721
                    root_seed = data['seed'].decrypt(seed_extension)
5✔
722
                derivation = normalize_bip32_derivation(data['derivation_path'])
5✔
723
                if data['wallet_type'] == 'multisig':
5✔
724
                    script = data['script_type'] if data['script_type'] != 'p2sh' else 'standard'
×
725
                else:
726
                    script = data['script_type'] if data['script_type'] != 'p2pkh' else 'standard'
5✔
727
                k = keystore.from_bip43_rootseed(root_seed, derivation=derivation, xtype=script)
5✔
728
            elif is_any_2fa_seed_type(data['seed_type']):
5✔
729
                self._logger.debug('creating keystore from 2fa seed')
5✔
730
                k = keystore.from_xprv(data['x1']['xprv'])
5✔
731
            else:
732
                raise Exception('unsupported/unknown seed_type %s' % data['seed_type'])
×
733
        elif data['keystore_type'] == 'masterkey':
5✔
734
            k = keystore.from_master_key(data['master_key'])
5✔
735
            if isinstance(k, keystore.Xpub):  # has xpub
5✔
736
                t1 = xpub_type(k.xpub)
5✔
737
                if data['wallet_type'] == 'multisig':
5✔
738
                    if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
739
                        raise Exception('wrong key type %s' % t1)
×
740
                else:
741
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
5✔
742
                        raise Exception('wrong key type %s' % t1)
×
743
            elif isinstance(k, keystore.Old_KeyStore):
5✔
744
                pass
5✔
745
            else:
746
                raise Exception(f'unexpected keystore type: {type(k)}')
×
747
        elif data['keystore_type'] == 'hardware':
5✔
748
            k = self.hw_keystore(data)
5✔
749
            if isinstance(k, keystore.Xpub):  # has xpub
5✔
750
                t1 = xpub_type(k.xpub)
5✔
751
                if data['wallet_type'] == 'multisig':
5✔
752
                    if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
753
                        raise Exception('wrong key type %s' % t1)
×
754
                else:
755
                    if t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
5✔
756
                        raise Exception('wrong key type %s' % t1)
×
757
            else:
758
                raise Exception(f'unexpected keystore type: {type(k)}')
×
759
        else:
760
            raise Exception('unsupported/unknown keystore_type %s' % data['keystore_type'])
×
761

762
        if data['password']:
5✔
763
            if k and k.may_have_password():
5✔
764
                k.update_password(None, data['password'])
5✔
765

766
        if data['encrypt']:
5✔
767
            if data.get('xpub_encrypt'):
5✔
768
                assert data.get('keystore_type') == 'hardware' and data['wallet_type'] == 'standard'
5✔
769
                enc_version = StorageEncryptionVersion.XPUB_PASSWORD
5✔
770
            else:
771
                enc_version = StorageEncryptionVersion.USER_PASSWORD
5✔
772
            storage.set_password(data['password'], enc_version=enc_version)
5✔
773

774
        db = WalletDB('', storage=storage, upgrade=True)
5✔
775
        db.set_keystore_encryption(bool(data['password']))
5✔
776

777
        db.put('wallet_type', data['wallet_type'])
5✔
778

779
        if data['wallet_type'] == 'standard':
5✔
780
            db.put('keystore', k.dump())
5✔
781
        elif data['wallet_type'] == '2fa':
5✔
782
            db.put('x1', k.dump())
5✔
783
            if 'trustedcoin_keepordisable' in data and data['trustedcoin_keepordisable'] == 'disable':
5✔
784
                k2 = keystore.from_xprv(data['x2']['xprv'])
5✔
785
                if data['encrypt'] and k2.may_have_password():
5✔
786
                    k2.update_password(None, data['password'])
×
787
                db.put('x2', k2.dump())
5✔
788
            else:
789
                db.put('x2', data['x2'])
5✔
790
            if 'x3' in data:
5✔
791
                db.put('x3', data['x3'])
5✔
792
            db.put('use_trustedcoin', True)
5✔
793
        elif data['wallet_type'] == 'multisig':
5✔
794
            if not isinstance(k, keystore.Xpub):
5✔
795
                raise Exception(f'unexpected keystore(main) type={type(k)} in multisig. not bip32.')
×
796
            k_xpub_type = xpub_type(k.xpub)
5✔
797
            db.put('wallet_type', '%dof%d' % (data['multisig_signatures'], data['multisig_participants']))
5✔
798
            db.put('x1', k.dump())
5✔
799
            for cosigner in data['multisig_cosigner_data']:
5✔
800
                cosigner_keystore = self.keystore_from_data('multisig', data['multisig_cosigner_data'][cosigner])
5✔
801
                if not isinstance(cosigner_keystore, keystore.Xpub):
5✔
802
                    raise Exception(f'unexpected keystore(cosigner) type={type(cosigner_keystore)} in multisig. not bip32.')
×
803
                if k_xpub_type != xpub_type(cosigner_keystore.xpub):
5✔
804
                    raise Exception('multisig wallet needs to have homogeneous xpub types')
×
805
                if data['encrypt'] and cosigner_keystore.may_have_password():
5✔
806
                    cosigner_keystore.update_password(None, data['password'])
×
807
                db.put(f'x{cosigner}', cosigner_keystore.dump())
5✔
808
        elif data['wallet_type'] == 'imported':
5✔
809
            if k:
5✔
810
                db.put('keystore', k.dump())
5✔
811
            db.put('addresses', addresses)
5✔
812

813
        if k and k.can_have_deterministic_lightning_xprv():
5✔
814
            db.put('lightning_xprv', k.get_lightning_xprv(data['password']))
5✔
815

816
        db.load_plugins()
5✔
817
        db.write()
5✔
818

819

820
class ServerConnectWizard(AbstractWizard):
5✔
821

822
    _logger = get_logger(__name__)
5✔
823

824
    def __init__(self, daemon: 'Daemon'):
5✔
825
        AbstractWizard.__init__(self)
5✔
826
        self.navmap = {
5✔
827
            'welcome': {
828
                'next': lambda d: 'proxy_config' if d['want_proxy'] else 'server_config',
829
                'accept': self.do_configure_autoconnect,
830
                'last': lambda d: bool(d['autoconnect'] and not d['want_proxy'])
831
            },
832
            'proxy_config': {
833
                'next': 'server_config',
834
                'accept': self.do_configure_proxy,
835
                'last': lambda d: bool(d['autoconnect'])
836
            },
837
            'server_config': {
838
                'accept': self.do_configure_server,
839
                'last': True
840
            }
841
        }
842
        self._daemon = daemon
5✔
843

844
    def do_configure_proxy(self, wizard_data: dict):
5✔
845
        proxy_settings = wizard_data['proxy']
5✔
846
        if not self._daemon.network:
5✔
847
            self._logger.debug('not configuring proxy, electrum config wants offline mode')
×
848
            return
×
849
        self._logger.debug(f'configuring proxy: {proxy_settings!r}')
5✔
850
        net_params = self._daemon.network.get_parameters()
5✔
851
        proxy = ProxySettings.from_dict(proxy_settings)
5✔
852
        net_params = net_params._replace(proxy=proxy, auto_connect=bool(wizard_data['autoconnect']))
5✔
853
        self._daemon.network.run_from_another_thread(self._daemon.network.set_parameters(net_params))
5✔
854

855
    def do_configure_server(self, wizard_data: dict):
5✔
856
        self._logger.debug(f'configuring server: {wizard_data!r}')
5✔
857
        net_params = self._daemon.network.get_parameters()
5✔
858
        server = ''
5✔
859
        oneserver = wizard_data.get('one_server', False)
5✔
860
        if not wizard_data['autoconnect']:
5✔
861
            try:
5✔
862
                server = ServerAddr.from_str_with_inference(wizard_data['server'])
5✔
863
                if not server:
5✔
864
                    raise Exception('failed to parse server %s' % wizard_data['server'])
×
865
            except Exception:
×
866
                return
×
867
        net_params = net_params._replace(server=server, auto_connect=wizard_data['autoconnect'], oneserver=oneserver)
5✔
868
        self._daemon.network.run_from_another_thread(self._daemon.network.set_parameters(net_params))
5✔
869

870
    def do_configure_autoconnect(self, wizard_data: dict):
5✔
871
        self._logger.debug(f'configuring autoconnect: {wizard_data!r}')
5✔
872
        if self._daemon.config.cv.NETWORK_AUTO_CONNECT.is_modifiable():
5✔
873
            if wizard_data.get('autoconnect') is not None:
5✔
874
                self._daemon.config.NETWORK_AUTO_CONNECT = wizard_data.get('autoconnect')
5✔
875

876
    def start(self, *, start_viewstate: WizardViewState = None) -> WizardViewState:
5✔
877
        self.reset()
5✔
878
        if start_viewstate is None:
5✔
879
            start_view = 'welcome'
5✔
880
            params = self.navmap[start_view].get('params', {})
5✔
881
            self._current = WizardViewState(start_view, {}, params)
5✔
882
        else:
883
            self._current = start_viewstate
×
884
        return self._current
5✔
885

886

887
class TermsOfUseWizard(AbstractWizard):
5✔
888

889
    _logger = get_logger(__name__)
5✔
890

891
    def __init__(self, config: 'SimpleConfig'):
5✔
892
        AbstractWizard.__init__(self)
×
893
        self._config = config
×
894
        self.navmap = {
×
895
            'terms_of_use': {
896
                'accept': self.accept_terms_of_use,
897
                'last': True,
898
            },
899
        }
900

901
    def accept_terms_of_use(self, _):
5✔
902
        self._config.TERMS_OF_USE_ACCEPTED = TERMS_OF_USE_LATEST_VERSION
×
903

904
    def start(self, *, start_viewstate: WizardViewState = None) -> WizardViewState:
5✔
905
        self.reset()
×
906
        if start_viewstate is None:
×
907
            start_view = 'terms_of_use'
×
908
            params = self.navmap[start_view].get('params', {})
×
909
            self._current = WizardViewState(start_view, {}, params)
×
910
        else:
911
            self._current = start_viewstate
×
912
        return self._current
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc