• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5707590002278400

25 Apr 2025 10:46AM UTC coverage: 60.343% (+0.05%) from 60.296%
5707590002278400

Pull #9751

CirrusCI

ecdsa
fixes
Pull Request #9751: Txbatcher without password in memory

42 of 67 new or added lines in 4 files covered. (62.69%)

1196 existing lines in 9 files now uncovered.

21659 of 35893 relevant lines covered (60.34%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.07
/electrum/keystore.py
1
#!/usr/bin/env python2
2
# -*- mode: python -*-
3
#
4
# Electrum - lightweight Bitcoin client
5
# Copyright (C) 2016  The Electrum developers
6
#
7
# Permission is hereby granted, free of charge, to any person
8
# obtaining a copy of this software and associated documentation files
9
# (the "Software"), to deal in the Software without restriction,
10
# including without limitation the rights to use, copy, modify, merge,
11
# publish, distribute, sublicense, and/or sell copies of the Software,
12
# and to permit persons to whom the Software is furnished to do so,
13
# subject to the following conditions:
14
#
15
# The above copyright notice and this permission notice shall be
16
# included in all copies or substantial portions of the Software.
17
#
18
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
22
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
24
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25
# SOFTWARE.
26

27
from unicodedata import normalize
5✔
28
import hashlib
5✔
29
import re
5✔
30
import copy
5✔
31
from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
5✔
32
from functools import lru_cache, wraps
5✔
33
from abc import ABC, abstractmethod
5✔
34

35
import electrum_ecc as ecc
5✔
36
from electrum_ecc import string_to_number
5✔
37

38
from . import bitcoin, constants, bip32
5✔
39
from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
5✔
40
from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
5✔
41
from .bip32 import (convert_bip32_strpath_to_intpath, BIP32_PRIME,
5✔
42
                    is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
43
                    convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info,
44
                    KeyOriginInfo)
45
from .descriptor import PubkeyProvider
5✔
46
from . import crypto
5✔
47
from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
5✔
48
                     SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160,
49
                     CiphertextFormatError)
50
from .util import (InvalidPassword, WalletFileException,
5✔
51
                   BitcoinException, bfh, inv_dict, is_hex_str)
52
from .mnemonic import Mnemonic, Wordlist, calc_seed_type, is_seed
5✔
53
from .plugin import run_hook
5✔
54
from .logging import Logger
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .gui.qt.util import TaskThread
×
58
    from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
×
59
    from .wallet_db import WalletDB
×
60
    from .plugin import Device
×
61

62

63
class CannotDerivePubkey(Exception): pass
5✔
64

65
class ScriptTypeNotSupported(Exception): pass
5✔
66

67

68
def also_test_none_password(check_password_fn):
5✔
69
    """Decorator for check_password, simply to give a friendlier exception if
70
    check_password(x) is called on a keystore that does not have a password set.
71
    """
72
    @wraps(check_password_fn)
5✔
73
    def wrapper(self: 'Software_KeyStore', *args):
5✔
74
        password = args[0]
5✔
75
        try:
5✔
76
            return check_password_fn(self, password)
5✔
77
        except (CiphertextFormatError, InvalidPassword) as e:
5✔
78
            if password is not None:
5✔
79
                try:
5✔
80
                    check_password_fn(self, None)
5✔
81
                except Exception:
5✔
82
                    pass
5✔
83
                else:
UNCOV
84
                    raise InvalidPassword("password given but keystore has no password") from e
×
85
            raise
5✔
86
    return wrapper
5✔
87

88

89
class KeyStore(Logger, ABC):
5✔
90
    type: str
5✔
91

92
    def __init__(self):
5✔
93
        Logger.__init__(self)
5✔
94
        self.is_requesting_to_be_rewritten_to_wallet_file = False  # type: bool
5✔
95

96
    def has_seed(self) -> bool:
5✔
UNCOV
97
        return False
×
98

99
    def is_watching_only(self) -> bool:
5✔
100
        return False
5✔
101

102
    def can_import(self) -> bool:
5✔
103
        return False
5✔
104

105
    def get_type_text(self) -> str:
5✔
UNCOV
106
        return f'{self.type}'
×
107

108
    @abstractmethod
5✔
109
    def may_have_password(self):
5✔
110
        """Returns whether the keystore can be encrypted with a password."""
UNCOV
111
        pass
×
112

113
    def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[bytes, Union[Sequence[int], str]]:
5✔
114
        keypairs = {}
5✔
115
        for txin in tx.inputs():
5✔
116
            keypairs.update(self._get_txin_derivations(txin))
5✔
117
        return keypairs
5✔
118

119
    def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[bytes, Union[Sequence[int], str]]:
5✔
120
        if txin.is_complete():
5✔
121
            return {}
5✔
122
        keypairs = {}
5✔
123
        for pubkey in txin.pubkeys:
5✔
124
            if pubkey in txin.sigs_ecdsa:
5✔
125
                # this pubkey already signed
126
                continue
5✔
127
            derivation = self.get_pubkey_derivation(pubkey, txin)
5✔
128
            if not derivation:
5✔
129
                continue
5✔
130
            keypairs[pubkey] = derivation
5✔
131
        return keypairs
5✔
132

133
    def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool:
5✔
134
        """Returns whether this keystore could sign *something* in this tx."""
135
        if not ignore_watching_only and self.is_watching_only():
5✔
136
            return False
5✔
137
        if not isinstance(tx, PartialTransaction):
5✔
UNCOV
138
            return False
×
139
        return bool(self._get_tx_derivations(tx))
5✔
140

141
    def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool:
5✔
142
        """Returns whether this keystore could sign this txin."""
143
        if not ignore_watching_only and self.is_watching_only():
×
144
            return False
×
145
        if not isinstance(txin, PartialTxInput):
×
146
            return False
×
UNCOV
147
        return bool(self._get_txin_derivations(txin))
×
148

149
    def ready_to_sign(self) -> bool:
5✔
150
        return not self.is_watching_only()
5✔
151

152
    @abstractmethod
5✔
153
    def dump(self) -> dict:
5✔
UNCOV
154
        pass
×
155

156
    @abstractmethod
5✔
157
    def is_deterministic(self) -> bool:
5✔
UNCOV
158
        pass
×
159

160
    @abstractmethod
5✔
161
    def sign_message(
5✔
162
            self,
163
            sequence: 'AddressIndexGeneric',
164
            message: str,
165
            password,
166
            *,
167
            script_type: Optional[str] = None,
168
    ) -> bytes:
UNCOV
169
        pass
×
170

171
    @abstractmethod
5✔
172
    def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
5✔
UNCOV
173
        pass
×
174

175
    @abstractmethod
5✔
176
    def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
5✔
UNCOV
177
        pass
×
178

179
    @abstractmethod
5✔
180
    def get_pubkey_derivation(self, pubkey: bytes,
5✔
181
                              txinout: Union['PartialTxInput', 'PartialTxOutput'],
182
                              *, only_der_suffix=True) \
183
            -> Union[Sequence[int], str, None]:
184
        """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
185
        the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
186
        or None if the pubkey is unrelated.
187
        """
UNCOV
188
        pass
×
189

190
    @abstractmethod
5✔
191
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
5✔
UNCOV
192
        pass
×
193

194
    def find_my_pubkey_in_txinout(
5✔
195
            self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
196
            *, only_der_suffix: bool = False
197
    ) -> Tuple[Optional[bytes], Optional[List[int]]]:
198
        # note: we assume that this cosigner only has one pubkey in this txin/txout
199
        for pubkey in txinout.bip32_paths:
5✔
200
            path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
5✔
201
            if path and not isinstance(path, (str, bytes)):
5✔
202
                return pubkey, list(path)
5✔
203
        return None, None
5✔
204

205
    def can_have_deterministic_lightning_xprv(self) -> bool:
5✔
UNCOV
206
        return False
×
207

208
    def has_support_for_slip_19_ownership_proofs(self) -> bool:
5✔
UNCOV
209
        return False
×
210

211
    def add_slip_19_ownership_proofs_to_tx(self, tx: 'PartialTransaction', *, password) -> None:
5✔
UNCOV
212
        raise NotImplementedError()
×
213

214

215
class Software_KeyStore(KeyStore):
5✔
216

217
    def __init__(self, d):
5✔
218
        KeyStore.__init__(self)
5✔
219
        self.pw_hash_version = d.get('pw_hash_version', 1)
5✔
220
        if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
5✔
UNCOV
221
            raise UnsupportedPasswordHashVersion(self.pw_hash_version)
×
222

223
    def may_have_password(self):
5✔
224
        return not self.is_watching_only()
5✔
225

226
    def sign_message(self, sequence, message, password, *, script_type=None) -> bytes:
5✔
227
        privkey, compressed = self.get_private_key(sequence, password)
×
228
        key = ecc.ECPrivkey(privkey)
×
UNCOV
229
        return bitcoin.ecdsa_sign_usermessage(key, message, is_compressed=compressed)
×
230

231
    def decrypt_message(self, sequence, message, password) -> bytes:
5✔
232
        privkey, compressed = self.get_private_key(sequence, password)
5✔
233
        ec = ecc.ECPrivkey(privkey)
5✔
234
        decrypted = crypto.ecies_decrypt_message(ec, message)
5✔
235
        return decrypted
5✔
236

237
    def sign_transaction(self, tx, password):
5✔
238
        if self.is_watching_only():
5✔
UNCOV
239
            return
×
240
        # Raise if password is not correct.
241
        self.check_password(password)
5✔
242
        # Add private keys
243
        keypairs = {}
5✔
244
        pubkey_to_deriv_map = self._get_tx_derivations(tx)
5✔
245
        for pubkey, deriv in pubkey_to_deriv_map.items():
5✔
246
            privkey, is_compressed = self.get_private_key(deriv, password)
5✔
247
            keypairs[pubkey] = privkey
5✔
248
        # Sign
249
        if keypairs:
5✔
250
            tx.sign(keypairs)
5✔
251

252
    @abstractmethod
5✔
253
    def update_password(self, old_password, new_password):
5✔
UNCOV
254
        pass
×
255

256
    @abstractmethod
5✔
257
    def check_password(self, password: Optional[str]) -> None:
5✔
258
        """Raises InvalidPassword if password is not correct"""
UNCOV
259
        pass
×
260

261
    @abstractmethod
5✔
262
    def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
5✔
263
        """Returns (privkey, is_compressed)"""
UNCOV
264
        pass
×
265

266

267
class Imported_KeyStore(Software_KeyStore):
5✔
268
    # keystore for imported private keys
269

270
    type = 'imported'
5✔
271

272
    def __init__(self, d):
5✔
273
        Software_KeyStore.__init__(self, d)
5✔
274
        self.keypairs = d.get('keypairs', {})  # type: Dict[str, str]
5✔
275

276
    def is_deterministic(self):
5✔
UNCOV
277
        return False
×
278

279
    def dump(self):
5✔
280
        return {
5✔
281
            'type': self.type,
282
            'keypairs': self.keypairs,
283
            'pw_hash_version': self.pw_hash_version,
284
        }
285

286
    def can_import(self):
5✔
UNCOV
287
        return True
×
288

289
    @also_test_none_password
5✔
290
    def check_password(self, password):
5✔
291
        pubkey = list(self.keypairs.keys())[0]
5✔
292
        self.get_private_key(pubkey, password)
5✔
293

294
    def import_privkey(self, sec, password):
5✔
295
        txin_type, privkey, compressed = deserialize_privkey(sec)
5✔
296
        pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
5✔
297
        # re-serialize the key so the internal storage format is consistent
298
        serialized_privkey = serialize_privkey(
5✔
299
            privkey, compressed, txin_type, internal_use=True)
300
        # NOTE: if the same pubkey is reused for multiple addresses (script types),
301
        # there will only be one pubkey-privkey pair for it in self.keypairs,
302
        # and the privkey will encode a txin_type but that txin_type cannot be trusted.
303
        # Removing keys complicates this further.
304
        self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
5✔
305
        return txin_type, pubkey
5✔
306

307
    def delete_imported_key(self, key):
5✔
308
        self.keypairs.pop(key)
5✔
309

310
    def get_private_key(self, pubkey: str, password):
5✔
311
        sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
5✔
312
        try:
5✔
313
            txin_type, privkey, compressed = deserialize_privkey(sec)
5✔
314
        except BaseDecodeError as e:
5✔
315
            raise InvalidPassword() from e
5✔
316
        if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
5✔
UNCOV
317
            raise InvalidPassword()
×
318
        return privkey, compressed
5✔
319

320
    def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
5✔
321
        if pubkey.hex() in self.keypairs:
5✔
322
            return pubkey.hex()
5✔
UNCOV
323
        return None
×
324

325
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
5✔
326
        if sequence in self.keypairs:
5✔
327
            return PubkeyProvider(
5✔
328
                origin=None,
329
                pubkey=sequence,
330
                deriv_path=None,
331
            )
UNCOV
332
        return None
×
333

334
    def update_password(self, old_password, new_password):
5✔
335
        self.check_password(old_password)
5✔
336
        if new_password == '':
5✔
UNCOV
337
            new_password = None
×
338
        for k, v in self.keypairs.items():
5✔
339
            b = pw_decode(v, old_password, version=self.pw_hash_version)
5✔
340
            c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
5✔
341
            self.keypairs[k] = c
5✔
342
        self.pw_hash_version = PW_HASH_VERSION_LATEST
5✔
343

344

345
class Deterministic_KeyStore(Software_KeyStore):
5✔
346

347
    def __init__(self, d):
5✔
348
        Software_KeyStore.__init__(self, d)
5✔
349
        self.seed = d.get('seed', '')  # only electrum seeds
5✔
350
        self.passphrase = d.get('passphrase', '')
5✔
351
        self._seed_type = d.get('seed_type', None)  # only electrum seeds
5✔
352

353
    def is_deterministic(self):
5✔
354
        return True
5✔
355

356
    def dump(self):
5✔
357
        d = {
5✔
358
            'type': self.type,
359
            'pw_hash_version': self.pw_hash_version,
360
        }
361
        if self.seed:
5✔
362
            d['seed'] = self.seed
5✔
363
        if self.passphrase:
5✔
364
            d['passphrase'] = self.passphrase
5✔
365
        if self._seed_type:
5✔
366
            d['seed_type'] = self._seed_type
5✔
367
        return d
5✔
368

369
    def has_seed(self):
5✔
370
        return bool(self.seed)
5✔
371

372
    def get_seed_type(self) -> Optional[str]:
5✔
373
        return self._seed_type
5✔
374

375
    def is_watching_only(self):
5✔
376
        return not self.has_seed()
5✔
377

378
    @abstractmethod
5✔
379
    def format_seed(self, seed: str) -> str:
5✔
UNCOV
380
        pass
×
381

382
    def add_seed(self, seed: str) -> None:
5✔
383
        if self.seed:
5✔
UNCOV
384
            raise Exception("a seed exists")
×
385
        self.seed = self.format_seed(seed)
5✔
386
        self._seed_type = calc_seed_type(seed) or None
5✔
387

388
    def get_seed(self, password):
5✔
389
        if not self.has_seed():
5✔
UNCOV
390
            raise Exception("This wallet has no seed words")
×
391
        return pw_decode(self.seed, password, version=self.pw_hash_version)
5✔
392

393
    def get_passphrase(self, password):
5✔
394
        if self.passphrase:
5✔
395
            return pw_decode(self.passphrase, password, version=self.pw_hash_version)
5✔
396
        else:
UNCOV
397
            return ''
×
398

399

400
class MasterPublicKeyMixin(ABC):
5✔
401

402
    @abstractmethod
5✔
403
    def get_master_public_key(self) -> str:
5✔
UNCOV
404
        pass
×
405

406
    @abstractmethod
5✔
407
    def get_derivation_prefix(self) -> Optional[str]:
5✔
408
        """Returns to bip32 path from some root node to self.xpub
409
        Note that the return value might be None; if it is unknown.
410
        """
UNCOV
411
        pass
×
412

413
    @abstractmethod
5✔
414
    def get_root_fingerprint(self) -> Optional[str]:
5✔
415
        """Returns the bip32 fingerprint of the top level node.
416
        This top level node is the node at the beginning of the derivation prefix,
417
        i.e. applying the derivation prefix to it will result self.xpub
418
        Note that the return value might be None; if it is unknown.
419
        """
UNCOV
420
        pass
×
421

422
    @abstractmethod
5✔
423
    def get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
424
            self,
425
            der_suffix: Sequence[int],
426
            *,
427
            only_der_suffix: bool,
428
    ) -> Tuple[bytes, Sequence[int]]:
429
        """Returns fingerprint and derivation path corresponding to a derivation suffix.
430
        The fingerprint is either the root fp or the intermediate fp, depending on what is available
431
        and 'only_der_suffix', and the derivation path is adjusted accordingly.
432
        """
UNCOV
433
        pass
×
434

435
    def get_key_origin_info(self) -> Optional[KeyOriginInfo]:
5✔
UNCOV
436
        return None
×
437

438
    @abstractmethod
5✔
439
    def derive_pubkey(self, for_change: int, n: int) -> bytes:
5✔
440
        """Returns pubkey at given path.
441
        May raise CannotDerivePubkey.
442
        """
UNCOV
443
        pass
×
444

445
    def get_pubkey_derivation(
5✔
446
            self,
447
            pubkey: bytes,
448
            txinout: Union['PartialTxInput', 'PartialTxOutput'],
449
            *,
450
            only_der_suffix=True,
451
    ) -> Union[Sequence[int], str, None]:
452
        EXPECTED_DER_SUFFIX_LEN = 2
5✔
453
        def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
5✔
454
            if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
5✔
UNCOV
455
                return False
×
456
            try:
5✔
457
                if pubkey != self.derive_pubkey(*der_suffix):
5✔
458
                    return False
5✔
459
            except CannotDerivePubkey:
×
UNCOV
460
                return False
×
461
            return True
5✔
462

463
        if pubkey not in txinout.bip32_paths:
5✔
UNCOV
464
            return None
×
465
        fp_found, path_found = txinout.bip32_paths[pubkey]
5✔
466
        der_suffix = None
5✔
467
        full_path = None
5✔
468
        # 1. try fp against our root
469
        ks_root_fingerprint_hex = self.get_root_fingerprint()
5✔
470
        ks_der_prefix_str = self.get_derivation_prefix()
5✔
471
        ks_der_prefix = convert_bip32_strpath_to_intpath(ks_der_prefix_str) if ks_der_prefix_str else None
5✔
472
        if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
5✔
473
                fp_found.hex() == ks_root_fingerprint_hex):
474
            if path_found[:len(ks_der_prefix)] == ks_der_prefix:
5✔
475
                der_suffix = path_found[len(ks_der_prefix):]
5✔
476
                if not test_der_suffix_against_pubkey(der_suffix, pubkey):
5✔
UNCOV
477
                    der_suffix = None
×
478
        # 2. try fp against our intermediate fingerprint
479
        if (der_suffix is None and isinstance(self, Xpub) and
5✔
480
                fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
481
            der_suffix = path_found
5✔
482
            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
5✔
UNCOV
483
                der_suffix = None
×
484
        # 3. hack/bruteforce: ignore fp and check pubkey anyway
485
        #    This is only to resolve the following scenario/problem:
486
        #    problem: if we don't know our root fp, but tx contains root fp and full path,
487
        #             we will miss the pubkey (false negative match). Though it might still work
488
        #             within gap limit due to tx.add_info_from_wallet overwriting the fields.
489
        #             Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
490
        if der_suffix is None:
5✔
491
            der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
5✔
492
            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
5✔
493
                der_suffix = None
5✔
494
        # if all attempts/methods failed, we give up now:
495
        if der_suffix is None:
5✔
496
            return None
5✔
497
        if ks_der_prefix is not None:
5✔
498
            full_path = ks_der_prefix + list(der_suffix)
5✔
499
        return der_suffix if only_der_suffix else full_path
5✔
500

501

502
class Xpub(MasterPublicKeyMixin):
5✔
503

504
    def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
5✔
505
        self.xpub = None
5✔
506
        self.xpub_receive = None
5✔
507
        self.xpub_change = None
5✔
508
        self._xpub_bip32_node = None  # type: Optional[BIP32Node]
5✔
509

510
        # "key origin" info (subclass should persist these):
511
        self._derivation_prefix = derivation_prefix  # type: Optional[str]
5✔
512
        self._root_fingerprint = root_fingerprint  # type: Optional[str]
5✔
513

514
    def get_master_public_key(self):
5✔
515
        return self.xpub
5✔
516

517
    def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
5✔
518
        if self._xpub_bip32_node is None:
5✔
519
            if self.xpub is None:
5✔
UNCOV
520
                return None
×
521
            self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
5✔
522
        return self._xpub_bip32_node
5✔
523

524
    def get_derivation_prefix(self) -> Optional[str]:
5✔
525
        if self._derivation_prefix is None:
5✔
526
            return None
5✔
527
        return normalize_bip32_derivation(self._derivation_prefix)
5✔
528

529
    def get_root_fingerprint(self) -> Optional[str]:
5✔
530
        return self._root_fingerprint
5✔
531

532
    def get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
533
            self,
534
            der_suffix: Sequence[int],
535
            *,
536
            only_der_suffix: bool,
537
    ) -> Tuple[bytes, Sequence[int]]:
538
        fingerprint_hex = self.get_root_fingerprint()
5✔
539
        der_prefix_str = self.get_derivation_prefix()
5✔
540
        if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
5✔
541
            # use root fp, and true full path
542
            fingerprint_bytes = bfh(fingerprint_hex)
5✔
543
            der_prefix_ints = convert_bip32_strpath_to_intpath(der_prefix_str)
5✔
544
        else:
545
            # use intermediate fp, and claim der suffix is the full path
546
            fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
5✔
547
            der_prefix_ints = convert_bip32_strpath_to_intpath('m')
5✔
548
        der_full = der_prefix_ints + list(der_suffix)
5✔
549
        return fingerprint_bytes, der_full
5✔
550

551
    def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
5✔
552
        assert self.xpub
5✔
553
        fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
5✔
554
                                                                                 only_der_suffix=only_der_suffix)
555
        bip32node = self.get_bip32_node_for_xpub()
5✔
556
        depth = len(der_full)
5✔
557
        child_number_int = der_full[-1] if len(der_full) >= 1 else 0
5✔
558
        child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
5✔
559
        fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
5✔
560
        bip32node = bip32node._replace(
5✔
561
            depth=depth,
562
            fingerprint=fingerprint,
563
            child_number=child_number_bytes,
564
            # only put plain xpubs (not ypub/zpub) in PSBTs:
565
            xtype="standard",
566
        )
567
        return bip32node.to_xpub()
5✔
568

569
    def get_key_origin_info(self) -> Optional[KeyOriginInfo]:
5✔
570
        fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
571
            der_suffix=[], only_der_suffix=False)
572
        origin = KeyOriginInfo(fingerprint=fp_bytes, path=der_full)
5✔
573
        return origin
5✔
574

575
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
5✔
576
        strpath = convert_bip32_intpath_to_strpath(sequence)
5✔
577
        strpath = strpath[1:]  # cut leading "m"
5✔
578
        bip32node = self.get_bip32_node_for_xpub()
5✔
579
        return PubkeyProvider(
5✔
580
            origin=self.get_key_origin_info(),
581
            pubkey=bip32node._replace(xtype="standard").to_xkey(),
582
            deriv_path=strpath,
583
        )
584

585
    def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node) -> None:
5✔
586
        assert self.xpub
5✔
587
        # try to derive ourselves from what we were given
588
        child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
5✔
589
        child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
5✔
590
        child_node2 = self.get_bip32_node_for_xpub()
5✔
591
        child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
5✔
592
        if child_pubkey_bytes1 != child_pubkey_bytes2:
5✔
UNCOV
593
            raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
×
594
        self.add_key_origin(derivation_prefix=derivation_prefix,
5✔
595
                            root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
596

597
    def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
5✔
598
        assert self.xpub
5✔
599
        if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
5✔
UNCOV
600
            raise Exception("root fp must be 8 hex characters")
×
601
        derivation_prefix = normalize_bip32_derivation(derivation_prefix)
5✔
602
        if not is_xkey_consistent_with_key_origin_info(self.xpub,
5✔
603
                                                       derivation_prefix=derivation_prefix,
604
                                                       root_fingerprint=root_fingerprint):
UNCOV
605
            raise Exception("xpub inconsistent with provided key origin info")
×
606
        if root_fingerprint is not None:
5✔
607
            self._root_fingerprint = root_fingerprint
5✔
608
        if derivation_prefix is not None:
5✔
609
            self._derivation_prefix = derivation_prefix
5✔
610
        self.is_requesting_to_be_rewritten_to_wallet_file = True
5✔
611

612
    @lru_cache(maxsize=None)
5✔
613
    def derive_pubkey(self, for_change: int, n: int) -> bytes:
5✔
614
        for_change = int(for_change)
5✔
615
        if for_change not in (0, 1):
5✔
UNCOV
616
            raise CannotDerivePubkey("forbidden path")
×
617
        xpub = self.xpub_change if for_change else self.xpub_receive
5✔
618
        if xpub is None:
5✔
619
            rootnode = self.get_bip32_node_for_xpub()
5✔
620
            xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
5✔
621
            if for_change:
5✔
622
                self.xpub_change = xpub
5✔
623
            else:
624
                self.xpub_receive = xpub
5✔
625
        return self.get_pubkey_from_xpub(xpub, (n,))
5✔
626

627
    @classmethod
5✔
628
    def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
5✔
629
        node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
5✔
630
        return node.eckey.get_public_key_bytes(compressed=True)
5✔
631

632

633
class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
5✔
634

635
    type = 'bip32'
5✔
636

637
    def __init__(self, d):
5✔
638
        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
5✔
639
        Deterministic_KeyStore.__init__(self, d)
5✔
640
        self.xpub = d.get('xpub')
5✔
641
        self.xprv = d.get('xprv')
5✔
642

643
    def format_seed(self, seed):
5✔
644
        return ' '.join(seed.split())
5✔
645

646
    def dump(self):
5✔
647
        d = Deterministic_KeyStore.dump(self)
5✔
648
        d['xpub'] = self.xpub
5✔
649
        d['xprv'] = self.xprv
5✔
650
        d['derivation'] = self.get_derivation_prefix()
5✔
651
        d['root_fingerprint'] = self.get_root_fingerprint()
5✔
652
        return d
5✔
653

654
    def get_master_private_key(self, password):
5✔
655
        return pw_decode(self.xprv, password, version=self.pw_hash_version)
5✔
656

657
    @also_test_none_password
5✔
658
    def check_password(self, password):
5✔
659
        xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
5✔
660
        try:
5✔
661
            bip32node = BIP32Node.from_xkey(xprv)
5✔
662
        except BaseDecodeError as e:
5✔
663
            raise InvalidPassword() from e
5✔
664
        if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
5✔
UNCOV
665
            raise InvalidPassword()
×
666

667
    def update_password(self, old_password, new_password):
5✔
668
        self.check_password(old_password)
5✔
669
        if new_password == '':
5✔
UNCOV
670
            new_password = None
×
671
        if self.has_seed():
5✔
672
            decoded = self.get_seed(old_password)
5✔
673
            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
5✔
674
        if self.passphrase:
5✔
675
            decoded = self.get_passphrase(old_password)
5✔
676
            self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
5✔
677
        if self.xprv is not None:
5✔
678
            b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
5✔
679
            self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
5✔
680
        self.pw_hash_version = PW_HASH_VERSION_LATEST
5✔
681

682
    def is_watching_only(self):
5✔
683
        return self.xprv is None
5✔
684

685
    def add_xpub(self, xpub: str) -> None:
5✔
686
        assert is_xpub(xpub)
5✔
687
        self.xpub = xpub
5✔
688
        root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
5✔
689
        self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
5✔
690

691
    def add_xprv(self, xprv: str) -> None:
5✔
692
        assert is_xprv(xprv)
5✔
693
        self.xprv = xprv
5✔
694
        self.add_xpub(bip32.xpub_from_xprv(xprv))
5✔
695

696
    def add_xprv_from_seed(self, bip32_seed: bytes, *, xtype: str, derivation: str) -> None:
5✔
697
        rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
5✔
698
        node = rootnode.subkey_at_private_derivation(derivation)
5✔
699
        self.add_xprv(node.to_xprv())
5✔
700
        self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
5✔
701

702
    def get_private_key(self, sequence: Sequence[int], password):
5✔
703
        xprv = self.get_master_private_key(password)
5✔
704
        node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
5✔
705
        pk = node.eckey.get_secret_bytes()
5✔
706
        return pk, True
5✔
707

708
    def get_keypair(self, sequence, password):
5✔
709
        k, _ = self.get_private_key(sequence, password)
×
710
        cK = ecc.ECPrivkey(k).get_public_key_bytes()
×
UNCOV
711
        return cK, k
×
712

713
    def can_have_deterministic_lightning_xprv(self):
5✔
714
        if (self.get_seed_type() == 'segwit'
5✔
715
                and self.get_bip32_node_for_xpub().xtype == 'p2wpkh'):
716
            return True
5✔
717
        return False
5✔
718

719
    def get_lightning_xprv(self, password) -> str:
5✔
720
        assert self.can_have_deterministic_lightning_xprv()
5✔
721
        xprv = self.get_master_private_key(password)
5✔
722
        rootnode = BIP32Node.from_xkey(xprv)
5✔
723
        node = rootnode.subkey_at_private_derivation("m/67'/")
5✔
724
        return node.to_xprv()
5✔
725

726
class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
5✔
727

728
    type = 'old'
5✔
729

730
    def __init__(self, d):
5✔
731
        Deterministic_KeyStore.__init__(self, d)
5✔
732
        self.mpk = d.get('mpk')
5✔
733
        self._root_fingerprint = None
5✔
734

735
    def get_hex_seed(self, password):
5✔
736
        return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
5✔
737

738
    def dump(self):
5✔
739
        d = Deterministic_KeyStore.dump(self)
5✔
740
        d['mpk'] = self.mpk
5✔
741
        return d
5✔
742

743
    def add_seed(self, seed):
5✔
744
        Deterministic_KeyStore.add_seed(self, seed)
5✔
745
        s = self.get_hex_seed(None)
5✔
746
        self.mpk = self.mpk_from_seed(s)
5✔
747

748
    def add_master_public_key(self, mpk) -> None:
5✔
749
        self.mpk = mpk
5✔
750

751
    def format_seed(self, seed):
5✔
752
        from . import old_mnemonic, mnemonic
5✔
753
        seed = mnemonic.normalize_text(seed)
5✔
754
        # see if seed was entered as hex
755
        if seed:
5✔
756
            try:
5✔
757
                bfh(seed)
5✔
UNCOV
758
                return str(seed)
×
759
            except Exception:
5✔
760
                pass
5✔
761
        words = seed.split()
5✔
762
        seed = old_mnemonic.mn_decode(words)
5✔
763
        if not seed:
5✔
UNCOV
764
            raise Exception("Invalid seed")
×
765
        return seed
5✔
766

767
    def get_seed(self, password):
5✔
768
        from . import old_mnemonic
×
769
        s = self.get_hex_seed(password)
×
UNCOV
770
        return ' '.join(old_mnemonic.mn_encode(s))
×
771

772
    @classmethod
5✔
773
    def mpk_from_seed(klass, seed):
5✔
774
        secexp = klass.stretch_key(seed)
5✔
775
        privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
5✔
776
        return privkey.get_public_key_hex(compressed=False)[2:]
5✔
777

778
    @classmethod
5✔
779
    def stretch_key(self, seed):
5✔
780
        x = seed
5✔
781
        for i in range(100000):
5✔
782
            x = hashlib.sha256(x + seed).digest()
5✔
783
        return string_to_number(x)
5✔
784

785
    @classmethod
5✔
786
    def get_sequence(self, mpk, for_change, n):
5✔
787
        return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
5✔
788

789
    @classmethod
5✔
790
    def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes:
5✔
791
        z = cls.get_sequence(mpk, for_change, n)
5✔
792
        master_public_key = ecc.ECPubkey(bfh('04'+mpk))
5✔
793
        public_key = master_public_key + z*ecc.GENERATOR
5✔
794
        return public_key.get_public_key_bytes(compressed=False)
5✔
795

796
    @lru_cache(maxsize=None)
5✔
797
    def derive_pubkey(self, for_change, n) -> bytes:
5✔
798
        for_change = int(for_change)
5✔
799
        if for_change not in (0, 1):
5✔
UNCOV
800
            raise CannotDerivePubkey("forbidden path")
×
801
        return self.get_pubkey_from_mpk(self.mpk, for_change, n)
5✔
802

803
    def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
5✔
804
        secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
5✔
805
        pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
5✔
806
        return pk
5✔
807

808
    def get_private_key(self, sequence: Sequence[int], password):
5✔
809
        seed = self.get_hex_seed(password)
5✔
810
        secexp = self.stretch_key(seed)
5✔
811
        self._check_seed(seed, secexp=secexp)
5✔
812
        for_change, n = sequence
5✔
813
        pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
5✔
814
        return pk, False
5✔
815

816
    def _check_seed(self, seed, *, secexp=None):
5✔
817
        if secexp is None:
5✔
818
            secexp = self.stretch_key(seed)
5✔
819
        master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
5✔
820
        master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
5✔
821
        if master_public_key != bfh(self.mpk):
5✔
UNCOV
822
            raise InvalidPassword()
×
823

824
    @also_test_none_password
5✔
825
    def check_password(self, password):
5✔
826
        seed = self.get_hex_seed(password)
5✔
827
        self._check_seed(seed)
5✔
828

829
    def get_master_public_key(self):
5✔
UNCOV
830
        return self.mpk
×
831

832
    def get_derivation_prefix(self) -> str:
5✔
833
        return 'm'
5✔
834

835
    def get_root_fingerprint(self) -> str:
5✔
836
        if self._root_fingerprint is None:
5✔
837
            master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
5✔
838
            xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
5✔
839
            self._root_fingerprint = xfp.hex().lower()
5✔
840
        return self._root_fingerprint
5✔
841

842
    def get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
843
            self,
844
            der_suffix: Sequence[int],
845
            *,
846
            only_der_suffix: bool,
847
    ) -> Tuple[bytes, Sequence[int]]:
848
        fingerprint_hex = self.get_root_fingerprint()
5✔
849
        der_prefix_str = self.get_derivation_prefix()
5✔
850
        fingerprint_bytes = bfh(fingerprint_hex)
5✔
851
        der_prefix_ints = convert_bip32_strpath_to_intpath(der_prefix_str)
5✔
852
        der_full = der_prefix_ints + list(der_suffix)
5✔
853
        return fingerprint_bytes, der_full
5✔
854

855
    def get_pubkey_provider(self, sequence: 'AddressIndexGeneric') -> Optional[PubkeyProvider]:
5✔
856
        return PubkeyProvider(
5✔
857
            origin=None,
858
            pubkey=self.derive_pubkey(*sequence).hex(),
859
            deriv_path=None,
860
        )
861

862
    def update_password(self, old_password, new_password):
5✔
863
        self.check_password(old_password)
×
864
        if new_password == '':
×
865
            new_password = None
×
866
        if self.has_seed():
×
867
            decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
×
868
            self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
×
UNCOV
869
        self.pw_hash_version = PW_HASH_VERSION_LATEST
×
870

871

872
class Hardware_KeyStore(Xpub, KeyStore):
5✔
873
    hw_type: str
5✔
874
    device: str
5✔
875
    plugin: 'HW_PluginBase'
5✔
876
    thread: Optional['TaskThread'] = None
5✔
877

878
    type = 'hardware'
5✔
879

880
    def __init__(self, d):
5✔
881
        Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
5✔
882
        KeyStore.__init__(self)
5✔
883
        # Errors and other user interaction is done through the wallet's
884
        # handler.  The handler is per-window and preserved across
885
        # device reconnects
886
        self.xpub = d.get('xpub')
5✔
887
        self.label = d.get('label')  # type: Optional[str]
5✔
888
        self.soft_device_id = d.get('soft_device_id')  # type: Optional[str]
5✔
889
        self.handler = None  # type: Optional[HardwareHandlerBase]
5✔
890
        run_hook('init_keystore', self)
5✔
891

892
    def set_label(self, label):
5✔
UNCOV
893
        self.label = label
×
894

895
    def may_have_password(self):
5✔
UNCOV
896
        return False
×
897

898
    def is_deterministic(self):
5✔
UNCOV
899
        return True
×
900

901
    def get_type_text(self) -> str:
5✔
UNCOV
902
        return f'hw[{self.hw_type}]'
×
903

904
    def dump(self):
5✔
905
        return {
5✔
906
            'type': self.type,
907
            'hw_type': self.hw_type,
908
            'xpub': self.xpub,
909
            'derivation': self.get_derivation_prefix(),
910
            'root_fingerprint': self.get_root_fingerprint(),
911
            'label':self.label,
912
            'soft_device_id': self.soft_device_id,
913
        }
914

915
    def unpaired(self):
5✔
916
        '''A device paired with the wallet was disconnected.  This can be
917
        called in any thread context.'''
UNCOV
918
        self.logger.info("unpaired")
×
919

920
    def paired(self):
5✔
921
        '''A device paired with the wallet was (re-)connected.  This can be
922
        called in any thread context.'''
UNCOV
923
        self.logger.info("paired")
×
924

925
    def is_watching_only(self):
5✔
926
        '''The wallet is not watching-only; the user will be prompted for
927
        pin and passphrase as appropriate when needed.'''
928
        assert not self.has_seed()
×
UNCOV
929
        return False
×
930

931
    def get_client(
5✔
932
            self,
933
            force_pair: bool = True,
934
            *,
935
            devices: Sequence['Device'] = None,
936
            allow_user_interaction: bool = True,
937
    ) -> Optional['HardwareClientBase']:
UNCOV
938
        return self.plugin.get_client(
×
939
            self,
940
            force_pair=force_pair,
941
            devices=devices,
942
            allow_user_interaction=allow_user_interaction,
943
        )
944

945
    def get_password_for_storage_encryption(self) -> str:
5✔
946
        client = self.get_client()
×
UNCOV
947
        return client.get_password_for_storage_encryption()
×
948

949
    def has_usable_connection_with_device(self) -> bool:
5✔
950
        # we try to create a client even if there isn't one already,
951
        # but do not prompt the user if auto-select fails:
UNCOV
952
        client = self.get_client(
×
953
            force_pair=True,
954
            allow_user_interaction=False,
955
        )
956
        if client is None:
×
957
            return False
×
UNCOV
958
        return client.has_usable_connection_with_device()
×
959

960
    def ready_to_sign(self):
5✔
UNCOV
961
        return super().ready_to_sign() and self.has_usable_connection_with_device()
×
962

963
    def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
5✔
964
        assert client is not None
×
965
        if self._root_fingerprint is None:
×
966
            self._root_fingerprint = client.request_root_fingerprint_from_device()
×
967
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
968
        if self.label != client.label():
×
969
            self.label = client.label()
×
970
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
971
        if self.soft_device_id != client.get_soft_device_id():
×
972
            self.soft_device_id = client.get_soft_device_id()
×
UNCOV
973
            self.is_requesting_to_be_rewritten_to_wallet_file = True
×
974

975
    def pairing_code(self) -> Optional[str]:
5✔
976
        """Used by the DeviceMgr to keep track of paired hw devices."""
977
        if not self.soft_device_id:
×
978
            return None
×
UNCOV
979
        return f"{self.plugin.name}/{self.soft_device_id}"
×
980

981

982
KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
5✔
983
AddressIndexGeneric = Union[Sequence[int], str]  # can be hex pubkey str
5✔
984

985

986
def bip39_normalize_passphrase(passphrase: str):
5✔
987
    return normalize('NFKD', passphrase or '')
5✔
988

989

990
def bip39_to_seed(mnemonic: str, *, passphrase: Optional[str]) -> bytes:
5✔
991
    import hashlib
5✔
992
    passphrase = passphrase or ""
5✔
993
    PBKDF2_ROUNDS = 2048
5✔
994
    mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
5✔
995
    passphrase = bip39_normalize_passphrase(passphrase)
5✔
996
    return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
5✔
997
        b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
998

999

1000
def bip39_is_checksum_valid(
5✔
1001
        mnemonic: str,
1002
        *,
1003
        wordlist: Wordlist = None,
1004
) -> Tuple[bool, bool]:
1005
    """Test checksum of bip39 mnemonic assuming English wordlist.
1006
    Returns tuple (is_checksum_valid, is_wordlist_valid)
1007
    """
1008
    words = [normalize('NFKD', word) for word in mnemonic.split()]
5✔
1009
    words_len = len(words)
5✔
1010
    if wordlist is None:
5✔
1011
        wordlist = Wordlist.from_file("english.txt")
5✔
1012
    n = len(wordlist)
5✔
1013
    i = 0
5✔
1014
    words.reverse()
5✔
1015
    while words:
5✔
1016
        w = words.pop()
5✔
1017
        try:
5✔
1018
            k = wordlist.index(w)
5✔
1019
        except ValueError:
×
UNCOV
1020
            return False, False
×
1021
        i = i*n + k
5✔
1022
    if words_len not in [12, 15, 18, 21, 24]:
5✔
UNCOV
1023
        return False, True
×
1024
    checksum_length = 11 * words_len // 33  # num bits
5✔
1025
    entropy_length = 32 * checksum_length  # num bits
5✔
1026
    entropy = i >> checksum_length
5✔
1027
    checksum = i % 2**checksum_length
5✔
1028
    entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
5✔
1029
    hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
5✔
1030
    calculated_checksum = hashed >> (256 - checksum_length)
5✔
1031
    return checksum == calculated_checksum, True
5✔
1032

1033

1034
def from_bip43_rootseed(
5✔
1035
    root_seed: bytes,
1036
    *,
1037
    derivation: str,
1038
    xtype: Optional[str] = None,
1039
):
1040
    k = BIP32_KeyStore({})
5✔
1041
    if xtype is None:
5✔
1042
        xtype = xtype_from_derivation(derivation)
5✔
1043
    k.add_xprv_from_seed(root_seed, xtype=xtype, derivation=derivation)
5✔
1044
    return k
5✔
1045

1046

1047
PURPOSE48_SCRIPT_TYPES = {
5✔
1048
    'p2wsh-p2sh': 1,  # specifically multisig
1049
    'p2wsh': 2,       # specifically multisig
1050
}
1051
PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
5✔
1052

1053

1054
def xtype_from_derivation(derivation: str) -> str:
5✔
1055
    """Returns the script type to be used for this derivation."""
1056
    bip32_indices = convert_bip32_strpath_to_intpath(derivation)
5✔
1057
    if len(bip32_indices) >= 1:
5✔
1058
        if bip32_indices[0] == 84 + BIP32_PRIME:
5✔
1059
            return 'p2wpkh'
5✔
1060
        elif bip32_indices[0] == 49 + BIP32_PRIME:
5✔
1061
            return 'p2wpkh-p2sh'
5✔
1062
        elif bip32_indices[0] == 44 + BIP32_PRIME:
5✔
1063
            return 'standard'
5✔
1064
        elif bip32_indices[0] == 45 + BIP32_PRIME:
5✔
1065
            return 'standard'
5✔
1066

1067
    if len(bip32_indices) >= 4:
5✔
1068
        if bip32_indices[0] == 48 + BIP32_PRIME:
5✔
1069
            # m / purpose' / coin_type' / account' / script_type' / change / address_index
1070
            script_type_int = bip32_indices[3] - BIP32_PRIME
5✔
1071
            script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
5✔
1072
            if script_type is not None:
5✔
1073
                return script_type
5✔
UNCOV
1074
    return 'standard'
×
1075

1076

1077
hw_keystores = {}
5✔
1078

1079
def register_keystore(hw_type, constructor):
5✔
1080
    hw_keystores[hw_type] = constructor
5✔
1081

1082
def hardware_keystore(d) -> Hardware_KeyStore:
5✔
1083
    hw_type = d['hw_type']
5✔
1084
    if hw_type in hw_keystores:
5✔
1085
        constructor = hw_keystores[hw_type]
5✔
1086
        return constructor(d)
5✔
UNCOV
1087
    raise WalletFileException(f'unknown hardware type: {hw_type}. '
×
1088
                              f'hw_keystores: {list(hw_keystores)}')
1089

1090
def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
5✔
1091
    # deepcopy object to avoid keeping a pointer to db.data
1092
    # note: this is needed as type(wallet.db.get("keystore")) != StoredDict
1093
    d = copy.deepcopy(db.get(name, {}))
5✔
1094
    t = d.get('type')
5✔
1095
    if not t:
5✔
UNCOV
1096
        raise WalletFileException(
×
1097
            'Wallet format requires update.\n'
1098
            'Cannot find keystore for name {}'.format(name))
1099
    keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
5✔
1100
    keystore_constructors['hardware'] = hardware_keystore
5✔
1101
    try:
5✔
1102
        ks_constructor = keystore_constructors[t]
5✔
1103
    except KeyError:
×
UNCOV
1104
        raise WalletFileException(f'Unknown type {t} for keystore named {name}')
×
1105
    k = ks_constructor(d)
5✔
1106
    return k
5✔
1107

1108

1109
def is_old_mpk(mpk: str) -> bool:
5✔
1110
    try:
5✔
1111
        int(mpk, 16)  # test if hex string
5✔
1112
    except Exception:
5✔
1113
        return False
5✔
1114
    if len(mpk) != 128:
5✔
UNCOV
1115
        return False
×
1116
    try:
5✔
1117
        ecc.ECPubkey(bfh('04' + mpk))
5✔
1118
    except Exception:
×
UNCOV
1119
        return False
×
1120
    return True
5✔
1121

1122

1123
def is_address_list(text):
5✔
1124
    parts = text.split()
5✔
1125
    return bool(parts) and all(bitcoin.is_address(x) for x in parts)
5✔
1126

1127

1128
def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False):
5✔
1129
    if allow_spaces_inside_key:  # see #1612
5✔
1130
        parts = text.split('\n')
×
1131
        parts = map(lambda x: ''.join(x.split()), parts)
×
UNCOV
1132
        parts = list(filter(bool, parts))
×
1133
    else:
1134
        parts = text.split()
5✔
1135
    if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
5✔
1136
        return parts
5✔
1137

1138

1139
def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False):
5✔
1140
    return bool(get_private_keys(text,
5✔
1141
                                 allow_spaces_inside_key=allow_spaces_inside_key,
1142
                                 raise_on_error=raise_on_error))
1143

1144

1145
def is_master_key(x):
5✔
1146
    return is_old_mpk(x) or is_bip32_key(x)
5✔
1147

1148

1149
def is_bip32_key(x):
5✔
1150
    return is_xprv(x) or is_xpub(x)
5✔
1151

1152

1153
def bip44_derivation(account_id, bip43_purpose=44):
5✔
1154
    coin = constants.net.BIP44_COIN_TYPE
5✔
1155
    der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
5✔
1156
    return normalize_bip32_derivation(der)
5✔
1157

1158

1159
def purpose48_derivation(account_id: int, xtype: str) -> str:
5✔
1160
    # m / purpose' / coin_type' / account' / script_type' / change / address_index
1161
    bip43_purpose = 48
×
1162
    coin = constants.net.BIP44_COIN_TYPE
×
1163
    account_id = int(account_id)
×
1164
    script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
×
1165
    if script_type_int is None:
×
1166
        raise Exception('unknown xtype: {}'.format(xtype))
×
1167
    der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
×
UNCOV
1168
    return normalize_bip32_derivation(der)
×
1169

1170

1171
def from_seed(seed: str, *, passphrase: Optional[str], for_multisig: bool = False):
5✔
1172
    passphrase = passphrase or ""
5✔
1173
    t = calc_seed_type(seed)
5✔
1174
    if t == 'old':
5✔
1175
        if passphrase:
5✔
UNCOV
1176
            raise Exception("'old'-type electrum seed cannot have passphrase")
×
1177
        keystore = Old_KeyStore({})
5✔
1178
        keystore.add_seed(seed)
5✔
1179
    elif t in ['standard', 'segwit']:
5✔
1180
        keystore = BIP32_KeyStore({})
5✔
1181
        keystore.add_seed(seed)
5✔
1182
        keystore.passphrase = passphrase
5✔
1183
        bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase=passphrase)
5✔
1184
        if t == 'standard':
5✔
1185
            der = "m/"
5✔
1186
            xtype = 'standard'
5✔
1187
        else:
1188
            der = "m/1'/" if for_multisig else "m/0'/"
5✔
1189
            xtype = 'p2wsh' if for_multisig else 'p2wpkh'
5✔
1190
        keystore.add_xprv_from_seed(bip32_seed, xtype=xtype, derivation=der)
5✔
1191
    else:
UNCOV
1192
        raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
×
1193
    return keystore
5✔
1194

1195
def from_private_key_list(text):
5✔
1196
    keystore = Imported_KeyStore({})
×
1197
    for x in get_private_keys(text):
×
1198
        keystore.import_privkey(x, None)
×
UNCOV
1199
    return keystore
×
1200

1201
def from_old_mpk(mpk):
5✔
1202
    keystore = Old_KeyStore({})
5✔
1203
    keystore.add_master_public_key(mpk)
5✔
1204
    return keystore
5✔
1205

1206
def from_xpub(xpub):
5✔
1207
    k = BIP32_KeyStore({})
5✔
1208
    k.add_xpub(xpub)
5✔
1209
    return k
5✔
1210

1211
def from_xprv(xprv):
5✔
1212
    k = BIP32_KeyStore({})
5✔
1213
    k.add_xprv(xprv)
5✔
1214
    return k
5✔
1215

1216
def from_master_key(text):
5✔
1217
    if is_xprv(text):
5✔
1218
        k = from_xprv(text)
5✔
1219
    elif is_old_mpk(text):
5✔
1220
        k = from_old_mpk(text)
5✔
1221
    elif is_xpub(text):
5✔
1222
        k = from_xpub(text)
5✔
1223
    else:
UNCOV
1224
        raise BitcoinException('Invalid master key')
×
1225
    return k
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc