• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5304010765238272

17 Aug 2023 02:17PM UTC coverage: 59.027% (+0.02%) from 59.008%
5304010765238272

Pull #8493

CirrusCI

ecdsa
storage.append: fail if the file length is not what we expect
Pull Request #8493: partial-writes using jsonpatch

165 of 165 new or added lines in 9 files covered. (100.0%)

18653 of 31601 relevant lines covered (59.03%)

2.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.26
/electrum/base_wizard.py
1
# -*- coding: utf-8 -*-
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2016 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import os
5✔
27
import sys
5✔
28
import copy
5✔
29
import traceback
5✔
30
from functools import partial
5✔
31
from typing import List, TYPE_CHECKING, Tuple, NamedTuple, Any, Dict, Optional, Union
5✔
32

33
from . import bitcoin
5✔
34
from . import keystore
5✔
35
from . import mnemonic
5✔
36
from .bip32 import is_bip32_derivation, xpub_type, normalize_bip32_derivation, BIP32Node
5✔
37
from .keystore import bip44_derivation, purpose48_derivation, Hardware_KeyStore, KeyStore, bip39_to_seed
5✔
38
from .wallet import (Imported_Wallet, Standard_Wallet, Multisig_Wallet,
5✔
39
                     wallet_types, Wallet, Abstract_Wallet)
40
from .storage import WalletStorage, StorageEncryptionVersion
5✔
41
from .wallet_db import WalletDB
5✔
42
from .i18n import _
5✔
43
from .util import UserCancelled, InvalidPassword, WalletFileException, UserFacingException
5✔
44
from .simple_config import SimpleConfig
5✔
45
from .plugin import Plugins, HardwarePluginLibraryUnavailable
5✔
46
from .logging import Logger
5✔
47
from .plugins.hw_wallet.plugin import OutdatedHwFirmwareException, HW_PluginBase
5✔
48

49
if TYPE_CHECKING:
5✔
50
    from .plugin import DeviceInfo, BasePlugin
×
51

52

53
# hardware device setup purpose
54
HWD_SETUP_NEW_WALLET, HWD_SETUP_DECRYPT_WALLET = range(0, 2)
5✔
55

56

57
class ScriptTypeNotSupported(Exception): pass
5✔
58

59

60
class GoBack(Exception): pass
5✔
61

62

63
class ReRunDialog(Exception): pass
5✔
64

65

66
class ChooseHwDeviceAgain(Exception): pass
5✔
67

68

69
class WizardStackItem(NamedTuple):
5✔
70
    action: Any
5✔
71
    args: Any
5✔
72
    kwargs: Dict[str, Any]
5✔
73
    db_data: dict
5✔
74

75

76
class WizardWalletPasswordSetting(NamedTuple):
5✔
77
    password: Optional[str]
5✔
78
    encrypt_storage: bool
5✔
79
    storage_enc_version: StorageEncryptionVersion
5✔
80
    encrypt_keystore: bool
5✔
81

82

83
class BaseWizard(Logger):
5✔
84

85
    def __init__(self, config: SimpleConfig, plugins: Plugins):
5✔
86
        super(BaseWizard, self).__init__()
×
87
        Logger.__init__(self)
×
88
        self.config = config
×
89
        self.plugins = plugins
×
90
        self.data = {}
×
91
        self.pw_args = None  # type: Optional[WizardWalletPasswordSetting]
×
92
        self._stack = []  # type: List[WizardStackItem]
×
93
        self.plugin = None  # type: Optional[BasePlugin]
×
94
        self.keystores = []  # type: List[KeyStore]
×
95
        self.is_kivy = config.GUI_NAME == 'kivy'
×
96
        self.seed_type = None
×
97

98
    def set_icon(self, icon):
5✔
99
        pass
×
100

101
    def run(self, *args, **kwargs):
5✔
102
        action = args[0]
×
103
        args = args[1:]
×
104
        db_data = copy.deepcopy(self.data)
×
105
        self._stack.append(WizardStackItem(action, args, kwargs, db_data))
×
106
        if not action:
×
107
            return
×
108
        if type(action) is tuple:
×
109
            self.plugin, action = action
×
110
        if self.plugin and hasattr(self.plugin, action):
×
111
            f = getattr(self.plugin, action)
×
112
            f(self, *args, **kwargs)
×
113
        elif hasattr(self, action):
×
114
            f = getattr(self, action)
×
115
            f(*args, **kwargs)
×
116
        else:
117
            raise Exception("unknown action", action)
×
118

119
    def can_go_back(self):
5✔
120
        return len(self._stack) > 1
×
121

122
    def go_back(self, *, rerun_previous: bool = True) -> None:
5✔
123
        if not self.can_go_back():
×
124
            return
×
125
        # pop 'current' frame
126
        self._stack.pop()
×
127
        prev_frame = self._stack[-1]
×
128
        # try to undo side effects since we last entered 'previous' frame
129
        # FIXME only self.data is properly restored
130
        self.data = copy.deepcopy(prev_frame.db_data)
×
131

132
        if rerun_previous:
×
133
            # pop 'previous' frame
134
            self._stack.pop()
×
135
            # rerun 'previous' frame
136
            self.run(prev_frame.action, *prev_frame.args, **prev_frame.kwargs)
×
137

138
    def reset_stack(self):
5✔
139
        self._stack = []
×
140

141
    def new(self):
5✔
142
        title = _("Create new wallet")
×
143
        message = '\n'.join([
×
144
            _("What kind of wallet do you want to create?")
145
        ])
146
        wallet_kinds = [
×
147
            ('standard',  _("Standard wallet")),
148
            ('2fa', _("Wallet with two-factor authentication")),
149
            ('multisig',  _("Multi-signature wallet")),
150
            ('imported',  _("Import Bitcoin addresses or private keys")),
151
        ]
152
        choices = [pair for pair in wallet_kinds if pair[0] in wallet_types]
×
153
        self.choice_dialog(title=title, message=message, choices=choices, run_next=self.on_wallet_type)
×
154

155
    def upgrade_db(self, storage, db):
5✔
156
        exc = None  # type: Optional[Exception]
×
157
        def on_finished():
×
158
            if exc is None:
×
159
                self.terminate(storage=storage, db=db)
×
160
            else:
161
                raise exc
×
162
        def do_upgrade():
×
163
            nonlocal exc
164
            try:
×
165
                db.upgrade()
×
166
            except Exception as e:
×
167
                exc = e
×
168
        self.waiting_dialog(do_upgrade, _('Upgrading wallet format...'), on_finished=on_finished)
×
169

170
    def run_task_without_blocking_gui(self, task, *, msg: str = None) -> Any:
5✔
171
        """Perform a task in a thread without blocking the GUI.
172
        Returns the result of 'task', or raises the same exception.
173
        This method blocks until 'task' is finished.
174
        """
175
        raise NotImplementedError()
×
176

177
    def load_2fa(self):
5✔
178
        self.data['wallet_type'] = '2fa'
×
179
        self.data['use_trustedcoin'] = True
×
180
        self.plugin = self.plugins.load_plugin('trustedcoin')
×
181

182
    def on_wallet_type(self, choice):
5✔
183
        self.data['wallet_type'] = self.wallet_type = choice
×
184
        if choice == 'standard':
×
185
            action = 'choose_keystore'
×
186
        elif choice == 'multisig':
×
187
            action = 'choose_multisig'
×
188
        elif choice == '2fa':
×
189
            self.load_2fa()
×
190
            action = self.plugin.get_action(self.data)
×
191
        elif choice == 'imported':
×
192
            action = 'import_addresses_or_keys'
×
193
        self.run(action)
×
194

195
    def choose_multisig(self):
5✔
196
        def on_multisig(m, n):
×
197
            multisig_type = "%dof%d" % (m, n)
×
198
            self.data['wallet_type'] = multisig_type
×
199
            self.n = n
×
200
            self.run('choose_keystore')
×
201
        self.multisig_dialog(run_next=on_multisig)
×
202

203
    def choose_keystore(self):
5✔
204
        assert self.wallet_type in ['standard', 'multisig']
×
205
        i = len(self.keystores)
×
206
        title = _('Add cosigner') + ' (%d of %d)'%(i+1, self.n) if self.wallet_type=='multisig' else _('Keystore')
×
207
        if self.wallet_type =='standard' or i==0:
×
208
            message = _('Do you want to create a new seed, or to restore a wallet using an existing seed?')
×
209
            choices = [
×
210
                ('choose_seed_type', _('Create a new seed')),
211
                ('restore_from_seed', _('I already have a seed')),
212
                ('restore_from_key', _('Use a master key')),
213
            ]
214
            if not self.is_kivy:
×
215
                choices.append(('choose_hw_device',  _('Use a hardware device')))
×
216
        else:
217
            message = _('Add a cosigner to your multi-sig wallet')
×
218
            choices = [
×
219
                ('restore_from_key', _('Enter cosigner key')),
220
                ('restore_from_seed', _('Enter cosigner seed')),
221
            ]
222
            if not self.is_kivy:
×
223
                choices.append(('choose_hw_device',  _('Cosign with hardware device')))
×
224

225
        self.choice_dialog(title=title, message=message, choices=choices, run_next=self.run)
×
226

227
    def import_addresses_or_keys(self):
5✔
228
        v = lambda x: keystore.is_address_list(x) or keystore.is_private_key_list(x, raise_on_error=True)
×
229
        title = _("Import Bitcoin Addresses")
×
230
        message = _("Enter a list of Bitcoin addresses (this will create a watching-only wallet), or a list of private keys.")
×
231
        self.add_xpub_dialog(title=title, message=message, run_next=self.on_import,
×
232
                             is_valid=v, allow_multi=True, show_wif_help=True)
233

234
    def on_import(self, text):
5✔
235
        # text is already sanitized by is_address_list and is_private_keys_list
236
        if keystore.is_address_list(text):
×
237
            self.data['addresses'] = {}
×
238
            for addr in text.split():
×
239
                assert bitcoin.is_address(addr)
×
240
                self.data['addresses'][addr] = {}
×
241
        elif keystore.is_private_key_list(text):
×
242
            self.data['addresses'] = {}
×
243
            k = keystore.Imported_KeyStore({})
×
244
            keys = keystore.get_private_keys(text)
×
245
            for pk in keys:
×
246
                assert bitcoin.is_private_key(pk)
×
247
                txin_type, pubkey = k.import_privkey(pk, None)
×
248
                addr = bitcoin.pubkey_to_address(txin_type, pubkey)
×
249
                self.data['addresses'][addr] = {'type':txin_type, 'pubkey':pubkey}
×
250
            self.keystores.append(k)
×
251
        else:
252
            return self.terminate(aborted=True)
×
253
        return self.run('create_wallet')
×
254

255
    def restore_from_key(self):
5✔
256
        if self.wallet_type == 'standard':
×
257
            v = keystore.is_master_key
×
258
            title = _("Create keystore from a master key")
×
259
            message = ' '.join([
×
260
                _("To create a watching-only wallet, please enter your master public key (xpub/ypub/zpub)."),
261
                _("To create a spending wallet, please enter a master private key (xprv/yprv/zprv).")
262
            ])
263
            self.add_xpub_dialog(title=title, message=message, run_next=self.on_restore_from_key, is_valid=v)
×
264
        else:
265
            i = len(self.keystores) + 1
×
266
            self.add_cosigner_dialog(index=i, run_next=self.on_restore_from_key, is_valid=keystore.is_bip32_key)
×
267

268
    def on_restore_from_key(self, text):
5✔
269
        k = keystore.from_master_key(text)
×
270
        self.on_keystore(k)
×
271

272
    def choose_hw_device(self, purpose=HWD_SETUP_NEW_WALLET, *, storage: WalletStorage = None):
5✔
273
        while True:
274
            try:
×
275
                self._choose_hw_device(purpose=purpose, storage=storage)
×
276
            except ChooseHwDeviceAgain:
×
277
                pass
×
278
            else:
279
                break
×
280

281
    def _choose_hw_device(self, *, purpose, storage: WalletStorage = None):
5✔
282
        title = _('Hardware Keystore')
×
283
        # check available plugins
284
        supported_plugins = self.plugins.get_hardware_support()
×
285
        devices = []  # type: List[Tuple[str, DeviceInfo]]
×
286
        devmgr = self.plugins.device_manager
×
287
        debug_msg = ''
×
288

289
        def failed_getting_device_infos(name, e):
×
290
            nonlocal debug_msg
291
            err_str_oneline = ' // '.join(str(e).splitlines())
×
292
            self.logger.warning(f'error getting device infos for {name}: {err_str_oneline}')
×
293
            indented_error_msg = '    '.join([''] + str(e).splitlines(keepends=True))
×
294
            debug_msg += f'  {name}: (error getting device infos)\n{indented_error_msg}\n'
×
295

296
        # scan devices
297
        try:
×
298
            scanned_devices = self.run_task_without_blocking_gui(task=devmgr.scan_devices,
×
299
                                                                 msg=_("Scanning devices..."))
300
        except BaseException as e:
×
301
            self.logger.info('error scanning devices: {}'.format(repr(e)))
×
302
            debug_msg = '  {}:\n    {}'.format(_('Error scanning devices'), e)
×
303
        else:
304
            for splugin in supported_plugins:
×
305
                name, plugin = splugin.name, splugin.plugin
×
306
                # plugin init errored?
307
                if not plugin:
×
308
                    e = splugin.exception
×
309
                    indented_error_msg = '    '.join([''] + str(e).splitlines(keepends=True))
×
310
                    debug_msg += f'  {name}: (error during plugin init)\n'
×
311
                    debug_msg += '    {}\n'.format(_('You might have an incompatible library.'))
×
312
                    debug_msg += f'{indented_error_msg}\n'
×
313
                    continue
×
314
                # see if plugin recognizes 'scanned_devices'
315
                try:
×
316
                    # FIXME: side-effect: this sets client.handler
317
                    device_infos = devmgr.list_pairable_device_infos(
×
318
                        handler=None, plugin=plugin, devices=scanned_devices, include_failing_clients=True)
319
                except HardwarePluginLibraryUnavailable as e:
×
320
                    failed_getting_device_infos(name, e)
×
321
                    continue
×
322
                except BaseException as e:
×
323
                    self.logger.exception('')
×
324
                    failed_getting_device_infos(name, e)
×
325
                    continue
×
326
                device_infos_failing = list(filter(lambda di: di.exception is not None, device_infos))
×
327
                for di in device_infos_failing:
×
328
                    failed_getting_device_infos(name, di.exception)
×
329
                device_infos_working = list(filter(lambda di: di.exception is None, device_infos))
×
330
                devices += list(map(lambda x: (name, x), device_infos_working))
×
331
        if not debug_msg:
×
332
            debug_msg = '  {}'.format(_('No exceptions encountered.'))
×
333
        if not devices:
×
334
            msg = (_('No hardware device detected.') + '\n' +
×
335
                   _('To trigger a rescan, press \'Next\'.') + '\n\n')
336
            if sys.platform == 'win32':
×
337
                msg += _('If your device is not detected on Windows, go to "Settings", "Devices", "Connected devices", '
×
338
                         'and do "Remove device". Then, plug your device again.') + '\n'
339
                msg += _('While this is less than ideal, it might help if you run Electrum as Administrator.') + '\n'
×
340
            else:
341
                msg += _('On Linux, you might have to add a new permission to your udev rules.') + '\n'
×
342
            msg += '\n\n'
×
343
            msg += _('Debug message') + '\n' + debug_msg
×
344
            self.confirm_dialog(title=title, message=msg,
×
345
                                run_next=lambda x: None)
346
            raise ChooseHwDeviceAgain()
×
347
        # select device
348
        self.devices = devices
×
349
        choices = []
×
350
        for name, info in devices:
×
351
            state = _("initialized") if info.initialized else _("wiped")
×
352
            label = info.label or _("An unnamed {}").format(name)
×
353
            try: transport_str = info.device.transport_ui_string[:20]
×
354
            except Exception: transport_str = 'unknown transport'
×
355
            descr = f"{label} [{info.model_name or name}, {state}, {transport_str}]"
×
356
            choices.append(((name, info), descr))
×
357
        msg = _('Select a device') + ':'
×
358
        self.choice_dialog(title=title, message=msg, choices=choices,
×
359
                           run_next=lambda *args: self.on_device(*args, purpose=purpose, storage=storage))
360

361
    def on_device(self, name, device_info: 'DeviceInfo', *, purpose, storage: WalletStorage = None):
5✔
362
        self.plugin = self.plugins.get_plugin(name)
×
363
        assert isinstance(self.plugin, HW_PluginBase)
×
364
        devmgr = self.plugins.device_manager
×
365
        try:
×
366
            client = self.plugin.setup_device(device_info, self, purpose)
×
367
        except OSError as e:
×
368
            self.show_error(_('We encountered an error while connecting to your device:')
×
369
                            + '\n' + str(e) + '\n'
370
                            + _('To try to fix this, we will now re-pair with your device.') + '\n'
371
                            + _('Please try again.'))
372
            devmgr.unpair_id(device_info.device.id_)
×
373
            raise ChooseHwDeviceAgain()
×
374
        except OutdatedHwFirmwareException as e:
×
375
            if self.question(e.text_ignore_old_fw_and_continue(), title=_("Outdated device firmware")):
×
376
                self.plugin.set_ignore_outdated_fw()
×
377
                # will need to re-pair
378
                devmgr.unpair_id(device_info.device.id_)
×
379
            raise ChooseHwDeviceAgain()
×
380
        except GoBack:
×
381
            raise ChooseHwDeviceAgain()
×
382
        except (UserCancelled, ReRunDialog):
×
383
            raise
×
384
        except UserFacingException as e:
×
385
            self.show_error(str(e))
×
386
            raise ChooseHwDeviceAgain()
×
387
        except BaseException as e:
×
388
            self.logger.exception('')
×
389
            self.show_error(str(e))
×
390
            raise ChooseHwDeviceAgain()
×
391

392
        if purpose == HWD_SETUP_NEW_WALLET:
×
393
            def f(derivation, script_type):
×
394
                derivation = normalize_bip32_derivation(derivation)
×
395
                self.run('on_hw_derivation', name, device_info, derivation, script_type)
×
396
            self.derivation_and_script_type_dialog(f)
×
397
        elif purpose == HWD_SETUP_DECRYPT_WALLET:
×
398
            password = client.get_password_for_storage_encryption()
×
399
            try:
×
400
                storage.decrypt(password)
×
401
            except InvalidPassword:
×
402
                # try to clear session so that user can type another passphrase
403
                if hasattr(client, 'clear_session'):  # FIXME not all hw wallet plugins have this
×
404
                    client.clear_session()
×
405
                raise
×
406
        else:
407
            raise Exception('unknown purpose: %s' % purpose)
×
408

409
    def derivation_and_script_type_dialog(self, f, *, get_account_xpub=None):
5✔
410
        message1 = _('Choose the type of addresses in your wallet.')
×
411
        message2 = ' '.join([
×
412
            _('You can override the suggested derivation path.'),
413
            _('If you are not sure what this is, leave this field unchanged.')
414
        ])
415
        hide_choices = False
×
416
        if self.wallet_type == 'multisig':
×
417
            # There is no general standard for HD multisig.
418
            # For legacy, this is partially compatible with BIP45; assumes index=0
419
            # For segwit, a custom path is used, as there is no standard at all.
420
            default_choice_idx = 2
×
421
            choices = [
×
422
                ('standard',   'legacy multisig (p2sh)',            normalize_bip32_derivation("m/45'/0")),
423
                ('p2wsh-p2sh', 'p2sh-segwit multisig (p2wsh-p2sh)', purpose48_derivation(0, xtype='p2wsh-p2sh')),
424
                ('p2wsh',      'native segwit multisig (p2wsh)',    purpose48_derivation(0, xtype='p2wsh')),
425
            ]
426
            # if this is not the first cosigner, pre-select the expected script type,
427
            # and hide the choices
428
            script_type = self.get_script_type_of_wallet()
×
429
            if script_type is not None:
×
430
                script_types = [*zip(*choices)][0]
×
431
                chosen_idx = script_types.index(script_type)
×
432
                default_choice_idx = chosen_idx
×
433
                hide_choices = True
×
434
        else:
435
            default_choice_idx = 2
×
436
            choices = [
×
437
                ('standard',    'legacy (p2pkh)',            bip44_derivation(0, bip43_purpose=44)),
438
                ('p2wpkh-p2sh', 'p2sh-segwit (p2wpkh-p2sh)', bip44_derivation(0, bip43_purpose=49)),
439
                ('p2wpkh',      'native segwit (p2wpkh)',    bip44_derivation(0, bip43_purpose=84)),
440
            ]
441
        while True:
442
            try:
×
443
                self.derivation_and_script_type_gui_specific_dialog(
×
444
                    run_next=f,
445
                    title=_('Script type and Derivation path'),
446
                    message1=message1,
447
                    message2=message2,
448
                    choices=choices,
449
                    test_text=is_bip32_derivation,
450
                    default_choice_idx=default_choice_idx,
451
                    get_account_xpub=get_account_xpub,
452
                    hide_choices=hide_choices,
453
                )
454
                return
×
455
            except ScriptTypeNotSupported as e:
×
456
                self.show_error(e)
×
457
                # let the user choose again
458

459
    def on_hw_derivation(self, name, device_info: 'DeviceInfo', derivation, xtype):
5✔
460
        from .keystore import hardware_keystore
×
461
        devmgr = self.plugins.device_manager
×
462
        assert isinstance(self.plugin, HW_PluginBase)
×
463
        try:
×
464
            xpub = self.plugin.get_xpub(device_info.device.id_, derivation, xtype, self)
×
465
            client = devmgr.client_by_id(device_info.device.id_, scan_now=False)
×
466
            if not client: raise Exception("failed to find client for device id")
×
467
            root_fingerprint = client.request_root_fingerprint_from_device()
×
468
            label = client.label()  # use this as device_info.label might be outdated!
×
469
            soft_device_id = client.get_soft_device_id()  # use this as device_info.device_id might be outdated!
×
470
        except ScriptTypeNotSupported:
×
471
            raise  # this is handled in derivation_dialog
×
472
        except BaseException as e:
×
473
            self.logger.exception('')
×
474
            self.show_error(e)
×
475
            raise ChooseHwDeviceAgain()
×
476
        d = {
×
477
            'type': 'hardware',
478
            'hw_type': name,
479
            'derivation': derivation,
480
            'root_fingerprint': root_fingerprint,
481
            'xpub': xpub,
482
            'label': label,
483
            'soft_device_id': soft_device_id,
484
        }
485
        try:
×
486
            client.manipulate_keystore_dict_during_wizard_setup(d)
×
487
        except Exception as e:
×
488
            self.logger.exception('')
×
489
            self.show_error(e)
×
490
            raise ChooseHwDeviceAgain()
×
491
        k = hardware_keystore(d)
×
492
        self.on_keystore(k)
×
493

494
    def passphrase_dialog(self, run_next, is_restoring=False):
5✔
495
        title = _('Seed extension')
×
496
        message = '\n'.join([
×
497
            _('You may extend your seed with custom words.'),
498
            _('Your seed extension must be saved together with your seed.'),
499
        ])
500
        warning = '\n'.join([
×
501
            _('Note that this is NOT your encryption password.'),
502
            _('If you do not know what this is, leave this field empty.'),
503
        ])
504
        warn_issue4566 = is_restoring and self.seed_type == 'bip39'
×
505
        self.line_dialog(title=title, message=message, warning=warning,
×
506
                         default='', test=lambda x:True, run_next=run_next,
507
                         warn_issue4566=warn_issue4566)
508

509
    def restore_from_seed(self):
5✔
510
        self.opt_bip39 = True
×
511
        self.opt_slip39 = True
×
512
        self.opt_ext = True
×
513
        is_cosigning_seed = lambda x: mnemonic.seed_type(x) in ['standard', 'segwit']
×
514
        test = mnemonic.is_seed if self.wallet_type == 'standard' else is_cosigning_seed
×
515
        f = lambda *args: self.run('on_restore_seed', *args)
×
516
        self.restore_seed_dialog(run_next=f, test=test)
×
517

518
    def on_restore_seed(self, seed, seed_type, is_ext):
5✔
519
        self.seed_type = seed_type if seed_type != 'electrum' else mnemonic.seed_type(seed)
×
520
        if self.seed_type == 'bip39':
×
521
            def f(passphrase):
×
522
                root_seed = bip39_to_seed(seed, passphrase)
×
523
                self.on_restore_bip43(root_seed)
×
524
            self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
×
525
        elif self.seed_type == 'slip39':
×
526
            def f(passphrase):
×
527
                root_seed = seed.decrypt(passphrase)
×
528
                self.on_restore_bip43(root_seed)
×
529
            self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
×
530
        elif self.seed_type in ['standard', 'segwit']:
×
531
            f = lambda passphrase: self.run('create_keystore', seed, passphrase)
×
532
            self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
×
533
        elif self.seed_type == 'old':
×
534
            self.run('create_keystore', seed, '')
×
535
        elif mnemonic.is_any_2fa_seed_type(self.seed_type):
×
536
            self.load_2fa()
×
537
            self.run('on_restore_seed', seed, is_ext)
×
538
        else:
539
            raise Exception('Unknown seed type', self.seed_type)
×
540

541
    def on_restore_bip43(self, root_seed):
5✔
542
        def f(derivation, script_type):
×
543
            derivation = normalize_bip32_derivation(derivation)
×
544
            self.run('on_bip43', root_seed, derivation, script_type)
×
545
        if self.wallet_type == 'standard':
×
546
            def get_account_xpub(account_path):
×
547
                root_node = BIP32Node.from_rootseed(root_seed, xtype="standard")
×
548
                account_node = root_node.subkey_at_private_derivation(account_path)
×
549
                account_xpub = account_node.to_xpub()
×
550
                return account_xpub
×
551
        else:
552
            get_account_xpub = None
×
553
        self.derivation_and_script_type_dialog(f, get_account_xpub=get_account_xpub)
×
554

555
    def create_keystore(self, seed, passphrase):
5✔
556
        k = keystore.from_seed(seed, passphrase, self.wallet_type == 'multisig')
×
557
        if k.can_have_deterministic_lightning_xprv():
×
558
            self.data['lightning_xprv'] = k.get_lightning_xprv(None)
×
559
        self.on_keystore(k)
×
560

561
    def on_bip43(self, root_seed, derivation, script_type):
5✔
562
        k = keystore.from_bip43_rootseed(root_seed, derivation, xtype=script_type)
×
563
        self.on_keystore(k)
×
564

565
    def get_script_type_of_wallet(self) -> Optional[str]:
5✔
566
        if len(self.keystores) > 0:
×
567
            ks = self.keystores[0]
×
568
            if isinstance(ks, keystore.Xpub):
×
569
                return xpub_type(ks.xpub)
×
570
        return None
×
571

572
    def on_keystore(self, k: KeyStore):
5✔
573
        has_xpub = isinstance(k, keystore.Xpub)
×
574
        if has_xpub:
×
575
            t1 = xpub_type(k.xpub)
×
576
        if self.wallet_type == 'standard':
×
577
            if has_xpub and t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
×
578
                self.show_error(_('Wrong key type') + ' %s'%t1)
×
579
                self.run('choose_keystore')
×
580
                return
×
581
            self.keystores.append(k)
×
582
            self.run('create_wallet')
×
583
        elif self.wallet_type == 'multisig':
×
584
            assert has_xpub
×
585
            if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
×
586
                self.show_error(_('Wrong key type') + ' %s'%t1)
×
587
                self.run('choose_keystore')
×
588
                return
×
589
            if k.xpub in map(lambda x: x.xpub, self.keystores):
×
590
                self.show_error(_('Error: duplicate master public key'))
×
591
                self.run('choose_keystore')
×
592
                return
×
593
            if len(self.keystores)>0:
×
594
                t2 = xpub_type(self.keystores[0].xpub)
×
595
                if t1 != t2:
×
596
                    self.show_error(_('Cannot add this cosigner:') + '\n' + "Their key type is '%s', we are '%s'"%(t1, t2))
×
597
                    self.run('choose_keystore')
×
598
                    return
×
599
            if len(self.keystores) == 0:
×
600
                xpub = k.get_master_public_key()
×
601
                self.reset_stack()
×
602
                self.keystores.append(k)
×
603
                self.run('show_xpub_and_add_cosigners', xpub)
×
604
                return
×
605
            self.reset_stack()
×
606
            self.keystores.append(k)
×
607
            if len(self.keystores) < self.n:
×
608
                self.run('choose_keystore')
×
609
            else:
610
                self.run('create_wallet')
×
611

612
    def create_wallet(self):
5✔
613
        encrypt_keystore = any(k.may_have_password() for k in self.keystores)
×
614
        # note: the following condition ("if") is duplicated logic from
615
        # wallet.get_available_storage_encryption_version()
616
        if self.wallet_type == 'standard' and isinstance(self.keystores[0], Hardware_KeyStore):
×
617
            # offer encrypting with a pw derived from the hw device
618
            k = self.keystores[0]  # type: Hardware_KeyStore
×
619
            assert isinstance(self.plugin, HW_PluginBase)
×
620
            try:
×
621
                k.handler = self.plugin.create_handler(self)
×
622
                password = k.get_password_for_storage_encryption()
×
623
            except UserCancelled:
×
624
                devmgr = self.plugins.device_manager
×
625
                devmgr.unpair_pairing_code(k.pairing_code())
×
626
                raise ChooseHwDeviceAgain()
×
627
            except BaseException as e:
×
628
                self.logger.exception('')
×
629
                self.show_error(str(e))
×
630
                raise ChooseHwDeviceAgain()
×
631
            self.request_storage_encryption(
×
632
                run_next=lambda encrypt_storage: self.on_password(
633
                    password,
634
                    encrypt_storage=encrypt_storage,
635
                    storage_enc_version=StorageEncryptionVersion.XPUB_PASSWORD,
636
                    encrypt_keystore=False))
637
        else:
638
            # reset stack to disable 'back' button in password dialog
639
            self.reset_stack()
×
640
            # prompt the user to set an arbitrary password
641
            self.request_password(
×
642
                run_next=lambda password, encrypt_storage: self.on_password(
643
                    password,
644
                    encrypt_storage=encrypt_storage,
645
                    storage_enc_version=StorageEncryptionVersion.USER_PASSWORD,
646
                    encrypt_keystore=encrypt_keystore),
647
                force_disable_encrypt_cb=not encrypt_keystore)
648

649
    def on_password(self, password, *, encrypt_storage: bool,
5✔
650
                    storage_enc_version=StorageEncryptionVersion.USER_PASSWORD,
651
                    encrypt_keystore: bool):
652
        for k in self.keystores:
×
653
            if k.may_have_password():
×
654
                k.update_password(None, password)
×
655
        if self.wallet_type == 'standard':
×
656
            self.data['seed_type'] = self.seed_type
×
657
            keys = self.keystores[0].dump()
×
658
            self.data['keystore'] = keys
×
659
        elif self.wallet_type == 'multisig':
×
660
            for i, k in enumerate(self.keystores):
×
661
                self.data['x%d/'%(i+1)] = k.dump()
×
662
        elif self.wallet_type == 'imported':
×
663
            if len(self.keystores) > 0:
×
664
                keys = self.keystores[0].dump()
×
665
                self.data['keystore'] = keys
×
666
        else:
667
            raise Exception('Unknown wallet type')
×
668
        self.pw_args = WizardWalletPasswordSetting(password=password,
×
669
                                                   encrypt_storage=encrypt_storage,
670
                                                   storage_enc_version=storage_enc_version,
671
                                                   encrypt_keystore=encrypt_keystore)
672
        self.terminate()
×
673

674
    def create_storage(self, path) -> Tuple[WalletStorage, WalletDB]:
5✔
675
        if os.path.exists(path):
×
676
            raise Exception('file already exists at path')
×
677
        assert self.pw_args, f"pw_args not set?!"
×
678
        pw_args = self.pw_args
×
679
        self.pw_args = None  # clean-up so that it can get GC-ed
×
680
        storage = WalletStorage(path)
×
681
        if pw_args.encrypt_storage:
×
682
            storage.set_password(pw_args.password, enc_version=pw_args.storage_enc_version)
×
683
        db = WalletDB('', storage=storage, manual_upgrades=False)
×
684
        db.set_keystore_encryption(bool(pw_args.password) and pw_args.encrypt_keystore)
×
685
        for key, value in self.data.items():
×
686
            db.put(key, value)
×
687
        db.load_plugins()
×
688
        db.write()
×
689
        return storage, db
×
690

691
    def terminate(self, *, storage: WalletStorage = None,
5✔
692
                  db: WalletDB = None,
693
                  aborted: bool = False) -> None:
694
        raise NotImplementedError()  # implemented by subclasses
×
695

696
    def show_xpub_and_add_cosigners(self, xpub):
5✔
697
        self.show_xpub_dialog(xpub=xpub, run_next=lambda x: self.run('choose_keystore'))
×
698

699
    def choose_seed_type(self):
5✔
700
        seed_type = 'standard' if self.config.WIZARD_DONT_CREATE_SEGWIT else 'segwit'
×
701
        self.create_seed(seed_type)
×
702

703
    def create_seed(self, seed_type):
5✔
704
        from . import mnemonic
×
705
        self.seed_type = seed_type
×
706
        seed = mnemonic.Mnemonic('en').make_seed(seed_type=self.seed_type)
×
707
        self.opt_bip39 = False
×
708
        self.opt_ext = True
×
709
        self.opt_slip39 = False
×
710
        f = lambda x: self.request_passphrase(seed, x)
×
711
        self.show_seed_dialog(run_next=f, seed_text=seed)
×
712

713
    def request_passphrase(self, seed, opt_passphrase):
5✔
714
        if opt_passphrase:
×
715
            f = lambda x: self.confirm_seed(seed, x)
×
716
            self.passphrase_dialog(run_next=f)
×
717
        else:
718
            self.run('confirm_seed', seed, '')
×
719

720
    def confirm_seed(self, seed, passphrase):
5✔
721
        f = lambda x: self.confirm_passphrase(seed, passphrase)
×
722
        self.confirm_seed_dialog(
×
723
            run_next=f,
724
            seed=seed if self.config.get('debug_seed') else '',
725
            test=lambda x: mnemonic.is_matching_seed(seed=seed, seed_again=x),
726
        )
727

728
    def confirm_passphrase(self, seed, passphrase):
5✔
729
        f = lambda x: self.run('create_keystore', seed, x)
×
730
        if passphrase:
×
731
            title = _('Confirm Seed Extension')
×
732
            message = '\n'.join([
×
733
                _('Your seed extension must be saved together with your seed.'),
734
                _('Please type it here.'),
735
            ])
736
            self.line_dialog(run_next=f, title=title, message=message, default='', test=lambda x: x==passphrase)
×
737
        else:
738
            f('')
×
739

740
    def show_error(self, msg: Union[str, BaseException]) -> None:
5✔
741
        raise NotImplementedError()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc