• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5175397198856192

13 Aug 2025 01:48PM UTC coverage: 60.887% (-0.2%) from 61.069%
5175397198856192

Pull #10123

CirrusCI

accumulator
hww: fix crash when disabling keystore for hww
Pull Request #10123: wizard: add script and derivation to keystorewizard flow. fixes #10063

5 of 9 new or added lines in 1 file covered. (55.56%)

57 existing lines in 3 files now uncovered.

22538 of 37016 relevant lines covered (60.89%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.28
/electrum/keystore.py
1
#!/usr/bin/env python2
2
# -*- mode: python -*-
3
#
4
# Electrum - lightweight Bitcoin client
5
# Copyright (C) 2016  The Electrum developers
6
#
7
# Permission is hereby granted, free of charge, to any person
8
# obtaining a copy of this software and associated documentation files
9
# (the "Software"), to deal in the Software without restriction,
10
# including without limitation the rights to use, copy, modify, merge,
11
# publish, distribute, sublicense, and/or sell copies of the Software,
12
# and to permit persons to whom the Software is furnished to do so,
13
# subject to the following conditions:
14
#
15
# The above copyright notice and this permission notice shall be
16
# included in all copies or substantial portions of the Software.
17
#
18
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
22
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
24
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25
# SOFTWARE.
26

27
from unicodedata import normalize
1✔
28
import hashlib
1✔
29
import re
1✔
30
import copy
1✔
31
from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple, Any, Type
1✔
32
from functools import lru_cache, wraps
1✔
33
from abc import ABC, abstractmethod
1✔
34

35
import electrum_ecc as ecc
1✔
36
from electrum_ecc import string_to_number
1✔
37

38
from . import bitcoin, constants, bip32
1✔
39
from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
1✔
40
from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
1✔
41
from .bip32 import (convert_bip32_strpath_to_intpath, BIP32_PRIME,
1✔
42
                    is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
43
                    convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info,
44
                    KeyOriginInfo)
45
from .descriptor import PubkeyProvider
1✔
46
from . import crypto
1✔
47
from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
1✔
48
                     SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160,
49
                     CiphertextFormatError)
50
from .util import (InvalidPassword, WalletFileException,
1✔
51
                   BitcoinException, bfh, inv_dict, is_hex_str)
52
from .mnemonic import Mnemonic, Wordlist, calc_seed_type, is_seed
1✔
53
from .plugin import run_hook
1✔
54
from .logging import Logger
1✔
55

56
if TYPE_CHECKING:
1✔
57
    from .gui.common_qt.util import TaskThread
×
58
    from .hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
59
    from .wallet_db import WalletDB
×
60
    from .plugin import Device
×
61

62

63
class CannotDerivePubkey(Exception): pass
1✔
64
class ScriptTypeNotSupported(Exception): pass
1✔
65

66

67
def also_test_none_password(check_password_fn):
1✔
68
    """Decorator for check_password, simply to give a friendlier exception if
69
    check_password(x) is called on a keystore that does not have a password set.
70
    """
71
    @wraps(check_password_fn)
1✔
72
    def wrapper(self: 'Software_KeyStore', *args):
1✔
73
        password = args[0]
1✔
74
        try:
1✔
75
            return check_password_fn(self, password)
1✔
76
        except (CiphertextFormatError, InvalidPassword) as e:
1✔
77
            if password is not None:
1✔
78
                try:
1✔
79
                    check_password_fn(self, None)
1✔
80
                except Exception:
1✔
81
                    pass
1✔
82
                else:
83
                    raise InvalidPassword("password given but keystore has no password") from e
×
84
            raise
1✔
85
    return wrapper
1✔
86

87

88
class KeyStore(Logger, ABC):
1✔
89
    type: str
1✔
90

91
    def __init__(self):
1✔
92
        Logger.__init__(self)
1✔
93
        self.is_requesting_to_be_rewritten_to_wallet_file = False  # type: bool
1✔
94

95
    def has_seed(self) -> bool:
1✔
96
        return False
×
97

98
    def is_watching_only(self) -> bool:
1✔
99
        return False
1✔
100

101
    def can_import(self) -> bool:
1✔
102
        return False
1✔
103

104
    def get_type_text(self) -> str:
1✔
105
        return f'{self.type}'
×
106

107
    @abstractmethod
1✔
108
    def may_have_password(self) -> bool:
1✔
109
        """Returns whether the keystore can be encrypted with a password."""
110
        pass
×
111

112
    def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[bytes, Union[Sequence[int], str]]:
1✔
113
        keypairs = {}
1✔
114
        for txin in tx.inputs():
1✔
115
            keypairs.update(self._get_txin_derivations(txin))
1✔
116
        return keypairs
1✔
117

118
    def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[bytes, Union[Sequence[int], str]]:
1✔
119
        if txin.is_complete():
1✔
120
            return {}
1✔
121
        keypairs = {}
1✔
122
        for pubkey in txin.pubkeys:
1✔
123
            if pubkey in txin.sigs_ecdsa:
1✔
124
                # this pubkey already signed
125
                continue
1✔
126
            derivation = self.get_pubkey_derivation(pubkey, txin)
1✔
127
            if not derivation:
1✔
128
                continue
1✔
129
            keypairs[pubkey] = derivation
1✔
130
        return keypairs
1✔
131

132
    def can_sign(self, tx: 'Transaction', *, ignore_watching_only: bool = False) -> bool:
1✔
133
        """Returns whether this keystore could sign *something* in this tx."""
134
        if not ignore_watching_only and self.is_watching_only():
1✔
135
            return False
1✔
136
        if not isinstance(tx, PartialTransaction):
1✔
137
            return False
×
138
        return bool(self._get_tx_derivations(tx))
1✔
139

140
    def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only: bool = False) -> bool:
1✔
141
        """Returns whether this keystore could sign this txin."""
142
        if not ignore_watching_only and self.is_watching_only():
×
143
            return False
×
144
        if not isinstance(txin, PartialTxInput):
×
145
            return False
×
146
        return bool(self._get_txin_derivations(txin))
×
147

148
    def ready_to_sign(self) -> bool:
1✔
149
        return not self.is_watching_only()
1✔
150

151
    @abstractmethod
1✔
152
    def dump(self) -> dict[str, Any]:
1✔
153
        pass
×
154

155
    @abstractmethod
1✔
156
    def is_deterministic(self) -> bool:
1✔
157
        pass
×
158

159
    @abstractmethod
1✔
160
    def sign_message(
1✔
161
            self,
162
            sequence: 'AddressIndexGeneric',
163
            message: str,
164
            password,
165
            *,
166
            script_type: Optional[str] = None,
167
    ) -> bytes:
168
        pass
×
169

170
    @abstractmethod
1✔
171
    def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
1✔
172
        pass
×
173

174
    @abstractmethod
1✔
175
    def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
1✔
176
        pass
×
177

178
    @abstractmethod
1✔
179
    def get_pubkey_derivation(self, pubkey: bytes,
1✔
180
                              txinout: Union['PartialTxInput', 'PartialTxOutput'],
181
                              *, only_der_suffix=True) \
182
            -> Union[Sequence[int], str, None]:
183
        """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
184
        the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
185
        or None if the pubkey is unrelated.
186
        """
187
        pass
×
188

189
    @abstractmethod
1✔
190
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
1✔
191
        pass
×
192

193
    def find_my_pubkey_in_txinout(
1✔
194
            self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
195
            *, only_der_suffix: bool = False
196
    ) -> Tuple[Optional[bytes], Optional[List[int]]]:
197
        # note: we assume that this cosigner only has one pubkey in this txin/txout
198
        for pubkey in txinout.bip32_paths:
1✔
199
            path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
1✔
200
            if path and not isinstance(path, (str, bytes)):
1✔
201
                return pubkey, list(path)
1✔
202
        return None, None
1✔
203

204
    def can_have_deterministic_lightning_xprv(self) -> bool:
1✔
205
        return False
1✔
206

207
    def has_support_for_slip_19_ownership_proofs(self) -> bool:
1✔
208
        return False
×
209

210
    def add_slip_19_ownership_proofs_to_tx(self, tx: 'PartialTransaction', *, password) -> None:
1✔
211
        raise NotImplementedError()
×
212

213

214
class Software_KeyStore(KeyStore):
1✔
215

216
    def __init__(self, d: dict):
1✔
217
        KeyStore.__init__(self)
1✔
218
        self.pw_hash_version = d.get('pw_hash_version', 1)
1✔
219
        if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
1✔
220
            raise UnsupportedPasswordHashVersion(self.pw_hash_version)
×
221

222
    def may_have_password(self):
1✔
223
        return not self.is_watching_only()
1✔
224

225
    def sign_message(self, sequence, message, password, *, script_type=None) -> bytes:
1✔
226
        privkey, compressed = self.get_private_key(sequence, password)
×
227
        key = ecc.ECPrivkey(privkey)
×
228
        return bitcoin.ecdsa_sign_usermessage(key, message, is_compressed=compressed)
×
229

230
    def decrypt_message(self, sequence, message, password) -> bytes:
1✔
231
        privkey, compressed = self.get_private_key(sequence, password)
1✔
232
        ec = ecc.ECPrivkey(privkey)
1✔
233
        decrypted = crypto.ecies_decrypt_message(ec, message)
1✔
234
        return decrypted
1✔
235

236
    def sign_transaction(self, tx, password):
1✔
237
        if self.is_watching_only():
1✔
238
            return
×
239
        # Raise if password is not correct.
240
        self.check_password(password)
1✔
241
        # Add private keys
242
        keypairs = {}
1✔
243
        pubkey_to_deriv_map = self._get_tx_derivations(tx)
1✔
244
        for pubkey, deriv in pubkey_to_deriv_map.items():
1✔
245
            privkey, is_compressed = self.get_private_key(deriv, password)
1✔
246
            keypairs[pubkey] = privkey
1✔
247
        # Sign
248
        if keypairs:
1✔
249
            tx.sign(keypairs)
1✔
250

251
    @abstractmethod
1✔
252
    def update_password(self, old_password, new_password) -> None:
1✔
253
        pass
×
254

255
    @abstractmethod
1✔
256
    def check_password(self, password: Optional[str]) -> None:
1✔
257
        """Raises InvalidPassword if password is not correct"""
258
        pass
×
259

260
    @abstractmethod
1✔
261
    def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
1✔
262
        """Returns (privkey, is_compressed)"""
263
        pass
×
264

265

266
class Imported_KeyStore(Software_KeyStore):
1✔
267
    # keystore for imported private keys
268

269
    type = 'imported'
1✔
270

271
    def __init__(self, d: dict):
1✔
272
        Software_KeyStore.__init__(self, d)
1✔
273
        self.keypairs = d.get('keypairs', {})  # type: Dict[str, str]
1✔
274

275
    def is_deterministic(self):
1✔
276
        return False
×
277

278
    def dump(self):
1✔
279
        return {
1✔
280
            'type': self.type,
281
            'keypairs': self.keypairs,
282
            'pw_hash_version': self.pw_hash_version,
283
        }
284

285
    def can_import(self):
1✔
286
        return True
×
287

288
    @also_test_none_password
1✔
289
    def check_password(self, password):
1✔
290
        pubkey = list(self.keypairs.keys())[0]
1✔
291
        self.get_private_key(pubkey, password)
1✔
292

293
    def import_privkey(self, sec: str, password) -> Tuple[str, str]:
1✔
294
        txin_type, privkey, compressed = deserialize_privkey(sec)
1✔
295
        pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
1✔
296
        # re-serialize the key so the internal storage format is consistent
297
        serialized_privkey = serialize_privkey(
1✔
298
            privkey, compressed, txin_type, internal_use=True)
299
        # NOTE: if the same pubkey is reused for multiple addresses (script types),
300
        # there will only be one pubkey-privkey pair for it in self.keypairs,
301
        # and the privkey will encode a txin_type but that txin_type cannot be trusted.
302
        # Removing keys complicates this further.
303
        self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
1✔
304
        return txin_type, pubkey
1✔
305

306
    def delete_imported_key(self, key: str) -> None:
1✔
307
        self.keypairs.pop(key)
1✔
308

309
    def get_private_key(self, pubkey: str, password):
1✔
310
        sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
1✔
311
        try:
1✔
312
            txin_type, privkey, compressed = deserialize_privkey(sec)
1✔
313
        except BaseDecodeError as e:
1✔
314
            raise InvalidPassword() from e
1✔
315
        if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
1✔
316
            raise InvalidPassword()
×
317
        return privkey, compressed
1✔
318

319
    def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
1✔
320
        if pubkey.hex() in self.keypairs:
1✔
321
            return pubkey.hex()
1✔
322
        return None
×
323

324
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
1✔
325
        if sequence in self.keypairs:
1✔
326
            return PubkeyProvider(
1✔
327
                origin=None,
328
                pubkey=sequence,
329
                deriv_path=None,
330
            )
331
        return None
×
332

333
    def update_password(self, old_password, new_password):
1✔
334
        self.check_password(old_password)
1✔
335
        if new_password == '':
1✔
336
            new_password = None
×
337
        for k, v in self.keypairs.items():
1✔
338
            b = pw_decode(v, old_password, version=self.pw_hash_version)
1✔
339
            c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
1✔
340
            self.keypairs[k] = c
1✔
341
        self.pw_hash_version = PW_HASH_VERSION_LATEST
1✔
342

343

344
class Deterministic_KeyStore(Software_KeyStore):
1✔
345

346
    def __init__(self, d: dict):
1✔
347
        Software_KeyStore.__init__(self, d)
1✔
348
        self.seed = d.get('seed', '')  # only electrum seeds
1✔
349
        self.passphrase = d.get('passphrase', '')
1✔
350
        self._seed_type = d.get('seed_type', None)  # only electrum seeds
1✔
351

352
    def is_deterministic(self):
1✔
353
        return True
1✔
354

355
    def dump(self):
1✔
356
        d = {
1✔
357
            'type': self.type,
358
            'pw_hash_version': self.pw_hash_version,
359
        }
360
        if self.seed:
1✔
361
            d['seed'] = self.seed
1✔
362
        if self.passphrase:
1✔
363
            d['passphrase'] = self.passphrase
1✔
364
        if self._seed_type:
1✔
365
            d['seed_type'] = self._seed_type
1✔
366
        return d
1✔
367

368
    def has_seed(self):
1✔
369
        return bool(self.seed)
1✔
370

371
    def get_seed_type(self) -> Optional[str]:
1✔
372
        return self._seed_type
1✔
373

374
    def is_watching_only(self):
1✔
375
        return not self.has_seed()
1✔
376

377
    @abstractmethod
1✔
378
    def format_seed(self, seed: str) -> str:
1✔
379
        pass
×
380

381
    def add_seed(self, seed: str) -> None:
1✔
382
        if self.seed:
1✔
383
            raise Exception("a seed exists")
×
384
        self.seed = self.format_seed(seed)
1✔
385
        self._seed_type = calc_seed_type(seed) or None
1✔
386

387
    def get_seed(self, password) -> str:
1✔
388
        if not self.has_seed():
1✔
389
            raise Exception("This wallet has no seed words")
×
390
        return pw_decode(self.seed, password, version=self.pw_hash_version)
1✔
391

392
    def get_passphrase(self, password) -> str:
1✔
393
        if self.passphrase:
1✔
394
            return pw_decode(self.passphrase, password, version=self.pw_hash_version)
1✔
395
        else:
396
            return ''
×
397

398

399
class MasterPublicKeyMixin(ABC):
1✔
400

401
    @abstractmethod
1✔
402
    def get_master_public_key(self) -> str:
1✔
403
        pass
×
404

405
    @abstractmethod
1✔
406
    def get_derivation_prefix(self) -> Optional[str]:
1✔
407
        """Returns to bip32 path from some root node to self.xpub
408
        Note that the return value might be None; if it is unknown.
409
        """
410
        pass
×
411

412
    @abstractmethod
1✔
413
    def get_root_fingerprint(self) -> Optional[str]:
1✔
414
        """Returns the bip32 fingerprint of the top level node.
415
        This top level node is the node at the beginning of the derivation prefix,
416
        i.e. applying the derivation prefix to it will result self.xpub
417
        Note that the return value might be None; if it is unknown.
418
        """
419
        pass
×
420

421
    @abstractmethod
1✔
422
    def get_fp_and_derivation_to_be_used_in_partial_tx(
1✔
423
            self,
424
            der_suffix: Sequence[int],
425
            *,
426
            only_der_suffix: bool,
427
    ) -> Tuple[bytes, Sequence[int]]:
428
        """Returns fingerprint and derivation path corresponding to a derivation suffix.
429
        The fingerprint is either the root fp or the intermediate fp, depending on what is available
430
        and 'only_der_suffix', and the derivation path is adjusted accordingly.
431
        """
432
        pass
×
433

434
    def get_key_origin_info(self) -> Optional[KeyOriginInfo]:
1✔
435
        return None
×
436

437
    @abstractmethod
1✔
438
    def derive_pubkey(self, for_change: int, n: int) -> bytes:
1✔
439
        """Returns pubkey at given path.
440
        May raise CannotDerivePubkey.
441
        """
442
        pass
×
443

444
    def get_pubkey_derivation(
1✔
445
            self,
446
            pubkey: bytes,
447
            txinout: Union['PartialTxInput', 'PartialTxOutput'],
448
            *,
449
            only_der_suffix=True,
450
    ) -> Union[Sequence[int], str, None]:
451
        EXPECTED_DER_SUFFIX_LEN = 2
1✔
452
        def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
1✔
453
            if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
1✔
454
                return False
×
455
            try:
1✔
456
                if pubkey != self.derive_pubkey(*der_suffix):
1✔
457
                    return False
1✔
458
            except CannotDerivePubkey:
×
459
                return False
×
460
            return True
1✔
461

462
        if pubkey not in txinout.bip32_paths:
1✔
463
            return None
×
464
        fp_found, path_found = txinout.bip32_paths[pubkey]
1✔
465
        der_suffix = None
1✔
466
        full_path = None
1✔
467
        # 1. try fp against our root
468
        ks_root_fingerprint_hex = self.get_root_fingerprint()
1✔
469
        ks_der_prefix_str = self.get_derivation_prefix()
1✔
470
        ks_der_prefix = convert_bip32_strpath_to_intpath(ks_der_prefix_str) if ks_der_prefix_str else None
1✔
471
        if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
1✔
472
                fp_found.hex() == ks_root_fingerprint_hex):
473
            if path_found[:len(ks_der_prefix)] == ks_der_prefix:
1✔
474
                der_suffix = path_found[len(ks_der_prefix):]
1✔
475
                if not test_der_suffix_against_pubkey(der_suffix, pubkey):
1✔
476
                    der_suffix = None
×
477
        # 2. try fp against our intermediate fingerprint
478
        if (der_suffix is None and isinstance(self, Xpub) and
1✔
479
                fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
480
            der_suffix = path_found
1✔
481
            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
1✔
482
                der_suffix = None
×
483
        # 3. hack/bruteforce: ignore fp and check pubkey anyway
484
        #    This is only to resolve the following scenario/problem:
485
        #    problem: if we don't know our root fp, but tx contains root fp and full path,
486
        #             we will miss the pubkey (false negative match). Though it might still work
487
        #             within gap limit due to tx.add_info_from_wallet overwriting the fields.
488
        #             Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
489
        if der_suffix is None:
1✔
490
            der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
1✔
491
            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
1✔
492
                der_suffix = None
1✔
493
        # if all attempts/methods failed, we give up now:
494
        if der_suffix is None:
1✔
495
            return None
1✔
496
        if ks_der_prefix is not None:
1✔
497
            full_path = ks_der_prefix + list(der_suffix)
1✔
498
        return der_suffix if only_der_suffix else full_path
1✔
499

500

501
class Xpub(MasterPublicKeyMixin):
1✔
502

503
    def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
1✔
504
        self.xpub = None
1✔
505
        self.xpub_receive = None
1✔
506
        self.xpub_change = None
1✔
507
        self._xpub_bip32_node = None  # type: Optional[BIP32Node]
1✔
508

509
        # "key origin" info (subclass should persist these):
510
        self._derivation_prefix = derivation_prefix  # type: Optional[str]
1✔
511
        self._root_fingerprint = root_fingerprint  # type: Optional[str]
1✔
512

513
    def get_master_public_key(self):
1✔
514
        return self.xpub
1✔
515

516
    def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
1✔
517
        if self._xpub_bip32_node is None:
1✔
518
            if self.xpub is None:
1✔
519
                return None
×
520
            self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
1✔
521
        return self._xpub_bip32_node
1✔
522

523
    def get_derivation_prefix(self) -> Optional[str]:
1✔
524
        if self._derivation_prefix is None:
1✔
525
            return None
1✔
526
        return normalize_bip32_derivation(self._derivation_prefix)
1✔
527

528
    def get_root_fingerprint(self) -> Optional[str]:
1✔
529
        return self._root_fingerprint
1✔
530

531
    def get_fp_and_derivation_to_be_used_in_partial_tx(
1✔
532
            self,
533
            der_suffix: Sequence[int],
534
            *,
535
            only_der_suffix: bool,
536
    ) -> Tuple[bytes, Sequence[int]]:
537
        fingerprint_hex = self.get_root_fingerprint()
1✔
538
        der_prefix_str = self.get_derivation_prefix()
1✔
539
        if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
1✔
540
            # use root fp, and true full path
541
            fingerprint_bytes = bfh(fingerprint_hex)
1✔
542
            der_prefix_ints = convert_bip32_strpath_to_intpath(der_prefix_str)
1✔
543
        else:
544
            # use intermediate fp, and claim der suffix is the full path
545
            fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
1✔
546
            der_prefix_ints = convert_bip32_strpath_to_intpath('m')
1✔
547
        der_full = der_prefix_ints + list(der_suffix)
1✔
548
        return fingerprint_bytes, der_full
1✔
549

550
    def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
1✔
551
        assert self.xpub
1✔
552
        fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
1✔
553
                                                                                 only_der_suffix=only_der_suffix)
554
        bip32node = self.get_bip32_node_for_xpub()
1✔
555
        depth = len(der_full)
1✔
556
        child_number_int = der_full[-1] if len(der_full) >= 1 else 0
1✔
557
        child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
1✔
558
        fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
1✔
559
        bip32node = bip32node._replace(
1✔
560
            depth=depth,
561
            fingerprint=fingerprint,
562
            child_number=child_number_bytes,
563
            # only put plain xpubs (not ypub/zpub) in PSBTs:
564
            xtype="standard",
565
        )
566
        return bip32node.to_xpub()
1✔
567

568
    def get_key_origin_info(self) -> Optional[KeyOriginInfo]:
1✔
569
        fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(
1✔
570
            der_suffix=[], only_der_suffix=False)
571
        origin = KeyOriginInfo(fingerprint=fp_bytes, path=der_full)
1✔
572
        return origin
1✔
573

574
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
1✔
575
        strpath = convert_bip32_intpath_to_strpath(sequence)
1✔
576
        strpath = strpath[1:]  # cut leading "m"
1✔
577
        bip32node = self.get_bip32_node_for_xpub()
1✔
578
        return PubkeyProvider(
1✔
579
            origin=self.get_key_origin_info(),
580
            pubkey=bip32node._replace(xtype="standard").to_xkey(),
581
            deriv_path=strpath,
582
        )
583

584
    def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node) -> None:
1✔
585
        assert self.xpub
1✔
586
        # try to derive ourselves from what we were given
587
        child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
1✔
588
        child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
1✔
589
        child_node2 = self.get_bip32_node_for_xpub()
1✔
590
        child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
1✔
591
        if child_pubkey_bytes1 != child_pubkey_bytes2:
1✔
592
            raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
×
593
        self.add_key_origin(derivation_prefix=derivation_prefix,
1✔
594
                            root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
595

596
    def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
1✔
597
        assert self.xpub
1✔
598
        if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
1✔
599
            raise Exception("root fp must be 8 hex characters")
×
600
        derivation_prefix = normalize_bip32_derivation(derivation_prefix)
1✔
601
        if not is_xkey_consistent_with_key_origin_info(self.xpub,
1✔
602
                                                       derivation_prefix=derivation_prefix,
603
                                                       root_fingerprint=root_fingerprint):
604
            raise Exception("xpub inconsistent with provided key origin info")
×
605
        if root_fingerprint is not None:
1✔
606
            self._root_fingerprint = root_fingerprint
1✔
607
        if derivation_prefix is not None:
1✔
608
            self._derivation_prefix = derivation_prefix
1✔
609
        self.is_requesting_to_be_rewritten_to_wallet_file = True
1✔
610

611
    @lru_cache(maxsize=None)
1✔
612
    def derive_pubkey(self, for_change: int, n: int) -> bytes:
1✔
613
        for_change = int(for_change)
1✔
614
        if for_change not in (0, 1):
1✔
615
            raise CannotDerivePubkey("forbidden path")
×
616
        xpub = self.xpub_change if for_change else self.xpub_receive
1✔
617
        if xpub is None:
1✔
618
            rootnode = self.get_bip32_node_for_xpub()
1✔
619
            xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
1✔
620
            if for_change:
1✔
621
                self.xpub_change = xpub
1✔
622
            else:
623
                self.xpub_receive = xpub
1✔
624
        return self.get_pubkey_from_xpub(xpub, (n,))
1✔
625

626
    @classmethod
1✔
627
    def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
1✔
628
        node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
1✔
629
        return node.eckey.get_public_key_bytes(compressed=True)
1✔
630

631

632
class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
1✔
633

634
    type = 'bip32'
1✔
635

636
    def __init__(self, d: dict):
1✔
637
        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
1✔
638
        Deterministic_KeyStore.__init__(self, d)
1✔
639
        self.xpub = d.get('xpub')
1✔
640
        self.xprv = d.get('xprv')
1✔
641

642
    def watching_only_keystore(self):
1✔
NEW
643
        return BIP32_KeyStore({'xpub': self.xpub})
×
644

645
    def format_seed(self, seed):
1✔
646
        return ' '.join(seed.split())
1✔
647

648
    def dump(self):
1✔
649
        d = Deterministic_KeyStore.dump(self)
1✔
650
        d['xpub'] = self.xpub
1✔
651
        d['xprv'] = self.xprv
1✔
652
        d['derivation'] = self.get_derivation_prefix()
1✔
653
        d['root_fingerprint'] = self.get_root_fingerprint()
1✔
654
        return d
1✔
655

656
    def get_master_private_key(self, password) -> str:
1✔
657
        return pw_decode(self.xprv, password, version=self.pw_hash_version)
1✔
658

659
    @also_test_none_password
1✔
660
    def check_password(self, password):
1✔
661
        xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
1✔
662
        try:
1✔
663
            bip32node = BIP32Node.from_xkey(xprv)
1✔
664
        except BaseDecodeError as e:
1✔
665
            raise InvalidPassword() from e
1✔
666
        if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
1✔
UNCOV
667
            raise InvalidPassword()
×
668

669
    def update_password(self, old_password, new_password):
1✔
670
        self.check_password(old_password)
1✔
671
        if new_password == '':
1✔
UNCOV
672
            new_password = None
×
673
        if self.has_seed():
1✔
674
            decoded = self.get_seed(old_password)
1✔
675
            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
1✔
676
        if self.passphrase:
1✔
677
            decoded = self.get_passphrase(old_password)
1✔
678
            self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
1✔
679
        if self.xprv is not None:
1✔
680
            b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
1✔
681
            self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
1✔
682
        self.pw_hash_version = PW_HASH_VERSION_LATEST
1✔
683

684
    def is_watching_only(self):
1✔
685
        return self.xprv is None
1✔
686

687
    def add_xpub(self, xpub: str) -> None:
1✔
688
        assert is_xpub(xpub)
1✔
689
        self.xpub = xpub
1✔
690
        root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
1✔
691
        self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
1✔
692

693
    def add_xprv(self, xprv: str) -> None:
1✔
694
        assert is_xprv(xprv)
1✔
695
        self.xprv = xprv
1✔
696
        self.add_xpub(bip32.xpub_from_xprv(xprv))
1✔
697

698
    def add_xprv_from_seed(self, bip32_seed: bytes, *, xtype: str, derivation: str) -> None:
1✔
699
        rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
1✔
700
        node = rootnode.subkey_at_private_derivation(derivation)
1✔
701
        self.add_xprv(node.to_xprv())
1✔
702
        self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
1✔
703

704
    def get_private_key(self, sequence: Sequence[int], password):
1✔
705
        xprv = self.get_master_private_key(password)
1✔
706
        node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
1✔
707
        pk = node.eckey.get_secret_bytes()
1✔
708
        return pk, True
1✔
709

710
    def can_have_deterministic_lightning_xprv(self):
1✔
711
        if (self.get_seed_type() == 'segwit'
1✔
712
                and self.get_bip32_node_for_xpub().xtype == 'p2wpkh'):
713
            return True
1✔
714
        return False
1✔
715

716
    def get_lightning_xprv(self, password) -> str:
1✔
717
        assert self.can_have_deterministic_lightning_xprv()
1✔
718
        xprv = self.get_master_private_key(password)
1✔
719
        rootnode = BIP32Node.from_xkey(xprv)
1✔
720
        node = rootnode.subkey_at_private_derivation("m/67'/")
1✔
721
        return node.to_xprv()
1✔
722

723
class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
1✔
724

725
    type = 'old'
1✔
726

727
    def __init__(self, d: dict):
1✔
728
        Deterministic_KeyStore.__init__(self, d)
1✔
729
        self.mpk = d.get('mpk')  # type: Optional[str]
1✔
730
        self._root_fingerprint = None
1✔
731

732
    def watching_only_keystore(self):
1✔
UNCOV
733
        return Old_KeyStore({'mpk': self.mpk})
×
734

735
    def _get_hex_seed(self, password) -> str:
1✔
736
        hex_str = pw_decode(self.seed, password, version=self.pw_hash_version)
1✔
737
        assert is_hex_str(hex_str), f"expected hex str, got {type(hex_str)} with {len(hex_str)=}"
1✔
738
        return hex_str
1✔
739

740
    def dump(self):
1✔
741
        d = Deterministic_KeyStore.dump(self)
1✔
742
        d['mpk'] = self.mpk
1✔
743
        return d
1✔
744

745
    def add_seed(self, seed):
1✔
746
        Deterministic_KeyStore.add_seed(self, seed)
1✔
747
        hex_seed = self._get_hex_seed(None)
1✔
748
        self.mpk = self.mpk_from_seed(hex_seed)
1✔
749

750
    def add_master_public_key(self, mpk: str) -> None:
1✔
751
        self.mpk = mpk
1✔
752

753
    def format_seed(self, seed):
1✔
754
        from . import old_mnemonic, mnemonic
1✔
755
        seed = mnemonic.normalize_text(seed)
1✔
756
        # see if seed was entered as hex
757
        if seed:
1✔
758
            try:
1✔
759
                bfh(seed)
1✔
UNCOV
760
                return str(seed)
×
761
            except Exception:
1✔
762
                pass
1✔
763
        words = seed.split()
1✔
764
        seed = old_mnemonic.mn_decode(words)
1✔
765
        if not seed:
1✔
UNCOV
766
            raise Exception("Invalid seed")
×
767
        return seed
1✔
768

769
    def get_seed(self, password):
1✔
770
        from . import old_mnemonic
×
UNCOV
771
        hex_seed = self._get_hex_seed(password)
×
UNCOV
772
        return ' '.join(old_mnemonic.mn_encode(hex_seed))
×
773

774
    @classmethod
1✔
775
    def mpk_from_seed(cls, hex_seed: str) -> str:
1✔
776
        secexp = cls.stretch_key(hex_seed)
1✔
777
        privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
1✔
778
        return privkey.get_public_key_hex(compressed=False)[2:]
1✔
779

780
    @classmethod
1✔
781
    def stretch_key(cls, hex_seed: str) -> int:
1✔
782
        assert is_hex_str(hex_seed), f"expected hex str, got {type(hex_seed)} with {len(hex_seed)=}"
1✔
783
        encoded_hex_seed = hex_seed.encode('ascii')
1✔
784
        x = encoded_hex_seed
1✔
785
        for i in range(100000):
1✔
786
            x = hashlib.sha256(x + encoded_hex_seed).digest()
1✔
787
        return string_to_number(x)
1✔
788

789
    @classmethod
1✔
790
    def get_sequence(cls, mpk: str, for_change: int, n: int) -> int:
1✔
791
        return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
1✔
792

793
    @classmethod
1✔
794
    def get_pubkey_from_mpk(cls, mpk: str, for_change: int, n: int) -> bytes:
1✔
795
        z = cls.get_sequence(mpk, for_change, n)
1✔
796
        master_public_key = ecc.ECPubkey(bfh('04'+mpk))
1✔
797
        public_key = master_public_key + z*ecc.GENERATOR
1✔
798
        return public_key.get_public_key_bytes(compressed=False)
1✔
799

800
    @lru_cache(maxsize=None)
1✔
801
    def derive_pubkey(self, for_change, n) -> bytes:
1✔
802
        for_change = int(for_change)
1✔
803
        if for_change not in (0, 1):
1✔
UNCOV
804
            raise CannotDerivePubkey("forbidden path")
×
805
        return self.get_pubkey_from_mpk(self.mpk, for_change, n)
1✔
806

807
    def _get_private_key_from_stretched_exponent(self, for_change: int, n: int, secexp: int) -> bytes:
1✔
808
        secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
1✔
809
        pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
1✔
810
        return pk
1✔
811

812
    def get_private_key(self, sequence: Sequence[int], password):
1✔
813
        hex_seed = self._get_hex_seed(password)
1✔
814
        secexp = self.stretch_key(hex_seed)
1✔
815
        self._check_seed(hex_seed, secexp=secexp)
1✔
816
        for_change, n = sequence
1✔
817
        assert isinstance(for_change, int), type(for_change)
1✔
818
        assert isinstance(n, int), type(n)
1✔
819
        pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
1✔
820
        return pk, False
1✔
821

822
    def _check_seed(self, hex_seed: str, *, secexp: int = None) -> None:
1✔
823
        if secexp is None:
1✔
824
            secexp = self.stretch_key(hex_seed)
1✔
825
        master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
1✔
826
        master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
1✔
827
        if master_public_key != bfh(self.mpk):
1✔
UNCOV
828
            raise InvalidPassword()
×
829

830
    @also_test_none_password
1✔
831
    def check_password(self, password):
1✔
832
        hex_seed = self._get_hex_seed(password)
1✔
833
        self._check_seed(hex_seed)
1✔
834

835
    def get_master_public_key(self):
1✔
836
        return self.mpk
1✔
837

838
    def get_derivation_prefix(self) -> str:
1✔
839
        return 'm'
1✔
840

841
    def get_root_fingerprint(self) -> str:
1✔
842
        if self._root_fingerprint is None:
1✔
843
            master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
1✔
844
            xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
1✔
845
            self._root_fingerprint = xfp.hex().lower()
1✔
846
        return self._root_fingerprint
1✔
847

848
    def get_fp_and_derivation_to_be_used_in_partial_tx(
1✔
849
            self,
850
            der_suffix: Sequence[int],
851
            *,
852
            only_der_suffix: bool,
853
    ) -> Tuple[bytes, Sequence[int]]:
854
        fingerprint_hex = self.get_root_fingerprint()
1✔
855
        der_prefix_str = self.get_derivation_prefix()
1✔
856
        fingerprint_bytes = bfh(fingerprint_hex)
1✔
857
        der_prefix_ints = convert_bip32_strpath_to_intpath(der_prefix_str)
1✔
858
        der_full = der_prefix_ints + list(der_suffix)
1✔
859
        return fingerprint_bytes, der_full
1✔
860

861
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
1✔
862
        return PubkeyProvider(
1✔
863
            origin=None,
864
            pubkey=self.derive_pubkey(*sequence).hex(),
865
            deriv_path=None,
866
        )
867

868
    def update_password(self, old_password, new_password):
1✔
UNCOV
869
        self.check_password(old_password)
×
UNCOV
870
        if new_password == '':
×
UNCOV
871
            new_password = None
×
UNCOV
872
        if self.has_seed():
×
873
            decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
×
874
            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
×
875
        self.pw_hash_version = PW_HASH_VERSION_LATEST
×
876

877

878
class Hardware_KeyStore(Xpub, KeyStore):
1✔
879
    hw_type: str
1✔
880
    device: str
1✔
881
    plugin: 'HW_PluginBase'
1✔
882
    thread: Optional['TaskThread'] = None
1✔
883

884
    type = 'hardware'
1✔
885

886
    def __init__(self, d):
1✔
887
        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
1✔
888
        KeyStore.__init__(self)
1✔
889
        # Errors and other user interaction is done through the wallet's
890
        # handler.  The handler is per-window and preserved across
891
        # device reconnects
892
        self.xpub = d.get('xpub')
1✔
893
        self.label = d.get('label')  # type: Optional[str]
1✔
894
        self.soft_device_id = d.get('soft_device_id')  # type: Optional[str]
1✔
895
        self.handler = None  # type: Optional[HardwareHandlerBase]
1✔
896
        run_hook('init_keystore', self)
1✔
897

898
    def watching_only_keystore(self):
1✔
UNCOV
899
        return BIP32_KeyStore({'xpub': self.xpub})
×
900

901
    def set_label(self, label: Optional[str]) -> None:
1✔
NEW
902
        self.label = label
×
903

904
    def may_have_password(self):
1✔
905
        return False
1✔
906

907
    def is_deterministic(self):
1✔
NEW
908
        return True
×
909

910
    def get_type_text(self) -> str:
1✔
UNCOV
911
        return f'hw[{self.hw_type}]'
×
912

913
    def dump(self):
1✔
914
        return {
1✔
915
            'type': self.type,
916
            'hw_type': self.hw_type,
917
            'xpub': self.xpub,
918
            'derivation': self.get_derivation_prefix(),
919
            'root_fingerprint': self.get_root_fingerprint(),
920
            'label':self.label,
921
            'soft_device_id': self.soft_device_id,
922
        }
923

924
    def is_watching_only(self):
1✔
925
        '''The wallet is not watching-only; the user will be prompted for
926
        pin and passphrase as appropriate when needed.'''
UNCOV
927
        assert not self.has_seed()
×
NEW
928
        return False
×
929

930
    def get_client(
1✔
931
            self,
932
            force_pair: bool = True,
933
            *,
934
            devices: Sequence['Device'] = None,
935
            allow_user_interaction: bool = True,
936
    ) -> Optional['HardwareClientBase']:
UNCOV
937
        return self.plugin.get_client(
×
938
            self,
939
            force_pair=force_pair,
940
            devices=devices,
941
            allow_user_interaction=allow_user_interaction,
942
        )
943

944
    def get_password_for_storage_encryption(self) -> str:
1✔
945
        client = self.get_client()
×
UNCOV
946
        return client.get_password_for_storage_encryption()
×
947

948
    def has_usable_connection_with_device(self) -> bool:
1✔
949
        # we try to create a client even if there isn't one already,
950
        # but do not prompt the user if auto-select fails:
UNCOV
951
        client = self.get_client(
×
952
            force_pair=True,
953
            allow_user_interaction=False,
954
        )
UNCOV
955
        if client is None:
×
UNCOV
956
            return False
×
UNCOV
957
        return client.has_usable_connection_with_device()
×
958

959
    def ready_to_sign(self):
1✔
UNCOV
960
        return super().ready_to_sign() and self.has_usable_connection_with_device()
×
961

962
    def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
1✔
963
        assert client is not None
×
964
        if self._root_fingerprint is None:
×
965
            self._root_fingerprint = client.request_root_fingerprint_from_device()
×
UNCOV
966
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
UNCOV
967
        if self.label != client.label():
×
968
            self.label = client.label()
×
UNCOV
969
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
UNCOV
970
        if self.soft_device_id != client.get_soft_device_id():
×
971
            self.soft_device_id = client.get_soft_device_id()
×
972
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
973

974
    def pairing_code(self) -> Optional[str]:
1✔
975
        """Used by the DeviceMgr to keep track of paired hw devices."""
976
        if not self.soft_device_id:
×
977
            return None
×
978
        return f"{self.plugin.name}/{self.soft_device_id}"
×
979

980

981
KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
1✔
982
AddressIndexGeneric = Union[Sequence[int], str]  # can be hex pubkey str
1✔
983

984

985
def bip39_normalize_passphrase(passphrase: str):
1✔
986
    return normalize('NFKD', passphrase or '')
1✔
987

988

989
def bip39_to_seed(mnemonic: str, *, passphrase: Optional[str]) -> bytes:
1✔
990
    import hashlib
1✔
991
    passphrase = passphrase or ""
1✔
992
    PBKDF2_ROUNDS = 2048
1✔
993
    mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
1✔
994
    passphrase = bip39_normalize_passphrase(passphrase)
1✔
995
    return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
1✔
996
        b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
997

998

999
def bip39_is_checksum_valid(
1✔
1000
        mnemonic: str,
1001
        *,
1002
        wordlist: Wordlist = None,
1003
) -> Tuple[bool, bool]:
1004
    """Test checksum of bip39 mnemonic assuming English wordlist.
1005
    Returns tuple (is_checksum_valid, is_wordlist_valid)
1006
    """
1007
    words = [normalize('NFKD', word) for word in mnemonic.split()]
1✔
1008
    words_len = len(words)
1✔
1009
    if wordlist is None:
1✔
1010
        wordlist = Wordlist.from_file("english.txt")
1✔
1011
    n = len(wordlist)
1✔
1012
    i = 0
1✔
1013
    words.reverse()
1✔
1014
    while words:
1✔
1015
        w = words.pop()
1✔
1016
        try:
1✔
1017
            k = wordlist.index(w)
1✔
UNCOV
1018
        except ValueError:
×
UNCOV
1019
            return False, False
×
1020
        i = i*n + k
1✔
1021
    if words_len not in [12, 15, 18, 21, 24]:
1✔
UNCOV
1022
        return False, True
×
1023
    checksum_length = 11 * words_len // 33  # num bits
1✔
1024
    entropy_length = 32 * checksum_length  # num bits
1✔
1025
    entropy = i >> checksum_length
1✔
1026
    checksum = i % 2**checksum_length
1✔
1027
    entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
1✔
1028
    hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
1✔
1029
    calculated_checksum = hashed >> (256 - checksum_length)
1✔
1030
    return checksum == calculated_checksum, True
1✔
1031

1032

1033
def from_bip43_rootseed(
1✔
1034
    root_seed: bytes,
1035
    *,
1036
    derivation: str,
1037
    xtype: Optional[str] = None,
1038
):
1039
    k = BIP32_KeyStore({})
1✔
1040
    if xtype is None:
1✔
1041
        xtype = xtype_from_derivation(derivation)
1✔
1042
    k.add_xprv_from_seed(root_seed, xtype=xtype, derivation=derivation)
1✔
1043
    return k
1✔
1044

1045

1046
PURPOSE48_SCRIPT_TYPES = {
1✔
1047
    'p2wsh-p2sh': 1,  # specifically multisig
1048
    'p2wsh': 2,       # specifically multisig
1049
}
1050
PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
1✔
1051

1052

1053
def xtype_from_derivation(derivation: str) -> str:
1✔
1054
    """Returns the script type to be used for this derivation."""
1055
    bip32_indices = convert_bip32_strpath_to_intpath(derivation)
1✔
1056
    if len(bip32_indices) >= 1:
1✔
1057
        if bip32_indices[0] == 84 + BIP32_PRIME:
1✔
1058
            return 'p2wpkh'
1✔
1059
        elif bip32_indices[0] == 49 + BIP32_PRIME:
1✔
1060
            return 'p2wpkh-p2sh'
1✔
1061
        elif bip32_indices[0] == 44 + BIP32_PRIME:
1✔
1062
            return 'standard'
1✔
1063
        elif bip32_indices[0] == 45 + BIP32_PRIME:
1✔
1064
            return 'standard'
1✔
1065

1066
    if len(bip32_indices) >= 4:
1✔
1067
        if bip32_indices[0] == 48 + BIP32_PRIME:
1✔
1068
            # m / purpose' / coin_type' / account' / script_type' / change / address_index
1069
            script_type_int = bip32_indices[3] - BIP32_PRIME
1✔
1070
            script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
1✔
1071
            if script_type is not None:
1✔
1072
                return script_type
1✔
UNCOV
1073
    return 'standard'
×
1074

1075

1076
hw_keystores = {}  # type: Dict[str, Type[Hardware_KeyStore]]
1✔
1077

1078
def register_keystore(hw_type: str, constructor: Type[Hardware_KeyStore]) -> None:
1✔
1079
    hw_keystores[hw_type] = constructor
1✔
1080

1081
def hardware_keystore(d) -> Hardware_KeyStore:
1✔
1082
    hw_type = d['hw_type']
1✔
1083
    if hw_type in hw_keystores:
1✔
1084
        constructor = hw_keystores[hw_type]
1✔
1085
        return constructor(d)
1✔
UNCOV
1086
    raise WalletFileException(f'unknown hardware type: {hw_type}. '
×
1087
                              f'hw_keystores: {list(hw_keystores)}')
1088

1089
def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
1✔
1090
    # deepcopy object to avoid keeping a pointer to db.data
1091
    # note: this is needed as type(wallet.db.get("keystore")) != StoredDict
1092
    d = copy.deepcopy(db.get(name, {}))
1✔
1093
    t = d.get('type')
1✔
1094
    if not t:
1✔
UNCOV
1095
        raise WalletFileException(
×
1096
            'Wallet format requires update.\n'
1097
            'Cannot find keystore for name {}'.format(name))
1098
    keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
1✔
1099
    keystore_constructors['hardware'] = hardware_keystore
1✔
1100
    try:
1✔
1101
        ks_constructor = keystore_constructors[t]
1✔
UNCOV
1102
    except KeyError:
×
1103
        raise WalletFileException(f'Unknown type {t} for keystore named {name}')
×
1104
    k = ks_constructor(d)
1✔
1105
    return k
1✔
1106

1107

1108
def is_old_mpk(mpk: str) -> bool:
1✔
1109
    try:
1✔
1110
        int(mpk, 16)  # test if hex string
1✔
1111
    except Exception:
1✔
1112
        return False
1✔
1113
    if len(mpk) != 128:
1✔
UNCOV
1114
        return False
×
1115
    try:
1✔
1116
        ecc.ECPubkey(bfh('04' + mpk))
1✔
UNCOV
1117
    except Exception:
×
UNCOV
1118
        return False
×
1119
    return True
1✔
1120

1121

1122
def is_address_list(text: str) -> bool:
1✔
1123
    parts = text.split()
1✔
1124
    return bool(parts) and all(bitcoin.is_address(x) for x in parts)
1✔
1125

1126

1127
def get_private_keys(text: str, *, allow_spaces_inside_key=True, raise_on_error=False) -> Optional[Sequence[str]]:
1✔
1128
    if allow_spaces_inside_key:  # see #1612
1✔
1129
        parts = text.split('\n')
1✔
1130
        parts = map(lambda x: ''.join(x.split()), parts)
1✔
1131
        parts = list(filter(bool, parts))
1✔
1132
    else:
1133
        parts = text.split()
1✔
1134
    if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
1✔
1135
        return parts
1✔
1136
    return None
1✔
1137

1138

1139
def is_private_key_list(text: str, *, allow_spaces_inside_key: bool = True, raise_on_error: bool = False) -> bool:
1✔
1140
    return bool(get_private_keys(text,
1✔
1141
                                 allow_spaces_inside_key=allow_spaces_inside_key,
1142
                                 raise_on_error=raise_on_error))
1143

1144

1145
def is_master_key(x: str) -> bool:
1✔
1146
    return is_old_mpk(x) or is_bip32_key(x)
1✔
1147

1148

1149
def is_bip32_key(x: str) -> bool:
1✔
1150
    return is_xprv(x) or is_xpub(x)
1✔
1151

1152

1153
def bip44_derivation(account_id: int, bip43_purpose: int = 44) -> str:
1✔
1154
    coin = constants.net.BIP44_COIN_TYPE
1✔
1155
    der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
1✔
1156
    return normalize_bip32_derivation(der)
1✔
1157

1158

1159
def purpose48_derivation(account_id: int, xtype: str) -> str:
1✔
1160
    # m / purpose' / coin_type' / account' / script_type' / change / address_index
UNCOV
1161
    bip43_purpose = 48
×
UNCOV
1162
    coin = constants.net.BIP44_COIN_TYPE
×
UNCOV
1163
    account_id = int(account_id)
×
UNCOV
1164
    script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
×
UNCOV
1165
    if script_type_int is None:
×
UNCOV
1166
        raise Exception('unknown xtype: {}'.format(xtype))
×
UNCOV
1167
    der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
×
UNCOV
1168
    return normalize_bip32_derivation(der)
×
1169

1170

1171
def from_seed(seed: str, *, passphrase: Optional[str], for_multisig: bool = False) -> Union[BIP32_KeyStore, Old_KeyStore]:
1✔
1172
    passphrase = passphrase or ""
1✔
1173
    t = calc_seed_type(seed)
1✔
1174
    if t == 'old':
1✔
1175
        if passphrase:
1✔
1176
            raise Exception("'old'-type electrum seed cannot have passphrase")
1✔
1177
        keystore = Old_KeyStore({})
1✔
1178
        keystore.add_seed(seed)
1✔
1179
    elif t in ['standard', 'segwit']:
1✔
1180
        keystore = BIP32_KeyStore({})
1✔
1181
        keystore.add_seed(seed)
1✔
1182
        keystore.passphrase = passphrase
1✔
1183
        bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase=passphrase)
1✔
1184
        if t == 'standard':
1✔
1185
            der = "m/"
1✔
1186
            xtype = 'standard'
1✔
1187
        else:
1188
            der = "m/1'/" if for_multisig else "m/0'/"
1✔
1189
            xtype = 'p2wsh' if for_multisig else 'p2wpkh'
1✔
1190
        keystore.add_xprv_from_seed(bip32_seed, xtype=xtype, derivation=der)
1✔
1191
    else:
UNCOV
1192
        raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
×
1193
    return keystore
1✔
1194

1195
def from_private_key_list(text: str) -> Imported_KeyStore:
1✔
UNCOV
1196
    keystore = Imported_KeyStore({})
×
UNCOV
1197
    for x in get_private_keys(text):
×
UNCOV
1198
        keystore.import_privkey(x, None)
×
UNCOV
1199
    return keystore
×
1200

1201
def from_old_mpk(mpk: str) -> Old_KeyStore:
1✔
1202
    keystore = Old_KeyStore({})
1✔
1203
    keystore.add_master_public_key(mpk)
1✔
1204
    return keystore
1✔
1205

1206
def from_xpub(xpub: str) -> BIP32_KeyStore:
1✔
1207
    k = BIP32_KeyStore({})
1✔
1208
    k.add_xpub(xpub)
1✔
1209
    return k
1✔
1210

1211
def from_xprv(xprv: str) -> BIP32_KeyStore:
1✔
1212
    k = BIP32_KeyStore({})
1✔
1213
    k.add_xprv(xprv)
1✔
1214
    return k
1✔
1215

1216
def from_master_key(text: str) -> Union[BIP32_KeyStore, Old_KeyStore]:
1✔
1217
    if is_xprv(text):
1✔
1218
        k = from_xprv(text)
1✔
1219
    elif is_old_mpk(text):
1✔
1220
        k = from_old_mpk(text)
1✔
1221
    elif is_xpub(text):
1✔
1222
        k = from_xpub(text)
1✔
1223
    else:
UNCOV
1224
        raise BitcoinException('Invalid master key')
×
1225
    return k
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc