• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4878529344569344

04 Mar 2025 10:05AM UTC coverage: 60.716% (-0.02%) from 60.731%
4878529344569344

Pull #9587

CirrusCI

f321x
disable mpp flags in invoice creation if jit channel is required, check against available liquidity if we need a jit channel
Pull Request #9587: Disable mpp flags in invoice creation if jit channel is required and consider available liquidity

5 of 15 new or added lines in 2 files covered. (33.33%)

847 existing lines in 6 files now uncovered.

20678 of 34057 relevant lines covered (60.72%)

3.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.92
/electrum/transaction.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26

27

28
# Note: The deserialization code originally comes from ABE.
29

30
import struct
5✔
31
import traceback
5✔
32
import sys
5✔
33
import io
5✔
34
import base64
5✔
35
from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable,
5✔
36
                    Callable, List, Dict, Set, TYPE_CHECKING, Mapping)
37
from collections import defaultdict
5✔
38
from enum import IntEnum
5✔
39
import itertools
5✔
40
import binascii
5✔
41
import copy
5✔
42

43
import electrum_ecc as ecc
5✔
44

45
from . import bitcoin, constants, segwit_addr, bip32
5✔
46
from .bip32 import BIP32Node
5✔
47
from .i18n import _
5✔
48
from .util import profiler, to_bytes, bfh, chunks, is_hex_str, parse_max_spend
5✔
49
from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160,
5✔
50
                      hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr,
51
                      var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN,
52
                      opcodes, base_decode,
53
                      base_encode, construct_witness, construct_script,
54
                      taproot_tweak_seckey)
55
from .crypto import sha256d, sha256
5✔
56
from .logging import get_logger
5✔
57
from .util import ShortID, OldTaskGroup
5✔
58
from .bitcoin import DummyAddress
5✔
59
from .descriptor import Descriptor, MissingSolutionPiece, create_dummy_descriptor_from_address
5✔
60
from .json_db import stored_in
5✔
61

62
if TYPE_CHECKING:
5✔
UNCOV
63
    from .wallet import Abstract_Wallet
×
64
    from .network import Network
×
65
    from .simple_config import SimpleConfig
×
66

67

68
_logger = get_logger(__name__)
5✔
69
DEBUG_PSBT_PARSING = False
5✔
70

71

72
_NEEDS_RECALC = ...  # sentinel value
5✔
73

74

75
class SerializationError(Exception):
5✔
76
    """ Thrown when there's a problem deserializing or serializing """
77

78

79
class UnknownTxinType(Exception):
5✔
80
    pass
5✔
81

82

83
class BadHeaderMagic(SerializationError):
5✔
84
    pass
5✔
85

86

87
class UnexpectedEndOfStream(SerializationError):
5✔
88
    pass
5✔
89

90

91
class PSBTInputConsistencyFailure(SerializationError):
5✔
92
    pass
5✔
93

94

95
class MalformedBitcoinScript(Exception):
5✔
96
    pass
5✔
97

98

99
class MissingTxInputAmount(Exception):
5✔
100
    pass
5✔
101

102

103
class TxinDataFetchProgress(NamedTuple):
5✔
104
    num_tasks_done: int
5✔
105
    num_tasks_total: int
5✔
106
    has_errored: bool
5✔
107
    has_finished: bool
5✔
108

109

110
class Sighash(IntEnum):
5✔
111
    # note: this is not an IntFlag, as ALL|NONE != SINGLE
112

113
    DEFAULT = 0  # taproot only (bip-0341)
5✔
114
    ALL = 1
5✔
115
    NONE = 2
5✔
116
    SINGLE = 3
5✔
117
    ANYONECANPAY = 0x80
5✔
118

119
    @classmethod
5✔
120
    def is_valid(cls, sighash: int, *, is_taproot: bool = False) -> bool:
5✔
121
        valid_flags = {
5✔
122
            0x01, 0x02, 0x03,
123
            0x81, 0x82, 0x83,
124
        }
125
        if is_taproot:
5✔
126
            valid_flags.add(0x00)
5✔
127
        return sighash in valid_flags
5✔
128

129
    @classmethod
5✔
130
    def to_sigbytes(cls, sighash: int) -> bytes:
5✔
131
        if sighash == Sighash.DEFAULT:
5✔
132
            return b""
5✔
133
        return sighash.to_bytes(length=1, byteorder="big")
5✔
134

135

136
class TxOutput:
5✔
137
    scriptpubkey: bytes
5✔
138
    value: Union[int, str]
5✔
139

140
    def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]):
5✔
141
        self.scriptpubkey = scriptpubkey
5✔
142
        if not (isinstance(value, int) or parse_max_spend(value) is not None):
5✔
UNCOV
143
            raise ValueError(f"bad txout value: {value!r}")
×
144
        self.value = value  # int in satoshis; or spend-max-like str
5✔
145

146
    @classmethod
5✔
147
    def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
5✔
148
        return cls(scriptpubkey=bitcoin.address_to_script(address),
5✔
149
                   value=value)
150

151
    def serialize_to_network(self) -> bytes:
5✔
152
        buf = int.to_bytes(self.value, 8, byteorder="little", signed=False)
5✔
153
        script = self.scriptpubkey
5✔
154
        buf += var_int(len(script))
5✔
155
        buf += script
5✔
156
        return buf
5✔
157

158
    @classmethod
5✔
159
    def from_network_bytes(cls, raw: bytes) -> 'TxOutput':
5✔
160
        vds = BCDataStream()
5✔
161
        vds.write(raw)
5✔
162
        txout = parse_output(vds)
5✔
163
        if vds.can_read_more():
5✔
UNCOV
164
            raise SerializationError('extra junk at the end of TxOutput bytes')
×
165
        return txout
5✔
166

167
    def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]:
5✔
168
        if self.address:
5✔
169
            return TYPE_ADDRESS, self.address, self.value
5✔
UNCOV
170
        return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value
×
171

172
    @classmethod
5✔
173
    def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
5✔
174
        if _type == TYPE_ADDRESS:
5✔
175
            return cls.from_address_and_value(addr, val)
5✔
176
        if _type == TYPE_SCRIPT:
5✔
177
            return cls(scriptpubkey=bfh(addr), value=val)
5✔
UNCOV
178
        raise Exception(f"unexpected legacy address type: {_type}")
×
179

180
    @property
5✔
181
    def scriptpubkey(self) -> bytes:
5✔
182
        return self._scriptpubkey
5✔
183

184
    @scriptpubkey.setter
5✔
185
    def scriptpubkey(self, scriptpubkey: bytes):
5✔
186
        self._scriptpubkey = scriptpubkey
5✔
187
        self._address = _NEEDS_RECALC
5✔
188

189
    @property
5✔
190
    def address(self) -> Optional[str]:
5✔
191
        if self._address is _NEEDS_RECALC:
5✔
192
            self._address = get_address_from_output_script(self._scriptpubkey)
5✔
193
        return self._address
5✔
194

195
    def get_ui_address_str(self) -> str:
5✔
UNCOV
196
        addr = self.address
×
197
        if addr is not None:
×
198
            return addr
×
199
        return f"SCRIPT {self.scriptpubkey.hex()}"
×
200

201
    def __repr__(self):
5✔
202
        return f"<TxOutput script={self.scriptpubkey.hex()} address={self.address} value={self.value}>"
5✔
203

204
    def __eq__(self, other):
5✔
205
        if not isinstance(other, TxOutput):
5✔
UNCOV
206
            return False
×
207
        return self.scriptpubkey == other.scriptpubkey and self.value == other.value
5✔
208

209
    def __ne__(self, other):
5✔
210
        return not (self == other)
5✔
211

212
    def to_json(self):
5✔
213
        d = {
5✔
214
            'scriptpubkey': self.scriptpubkey.hex(),
215
            'address': self.address,
216
            'value_sats': self.value,
217
        }
218
        return d
5✔
219

220

221
class BIP143SharedTxDigestFields(NamedTuple):  # witness v0
5✔
222
    hashPrevouts: bytes
5✔
223
    hashSequence: bytes
5✔
224
    hashOutputs: bytes
5✔
225

226
    @classmethod
5✔
227
    def from_tx(cls, tx: 'PartialTransaction') -> 'BIP143SharedTxDigestFields':
5✔
228
        inputs = tx.inputs()
5✔
229
        outputs = tx.outputs()
5✔
230
        hashPrevouts = sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs))
5✔
231
        hashSequence = sha256d(b''.join(
5✔
232
            int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
233
            for txin in inputs))
234
        hashOutputs = sha256d(b''.join(o.serialize_to_network() for o in outputs))
5✔
235
        return BIP143SharedTxDigestFields(
5✔
236
            hashPrevouts=hashPrevouts,
237
            hashSequence=hashSequence,
238
            hashOutputs=hashOutputs,
239
        )
240

241

242
class BIP341SharedTxDigestFields(NamedTuple):  # witness v1
5✔
243
    sha_prevouts: bytes
5✔
244
    sha_amounts: bytes
5✔
245
    sha_scriptpubkeys: bytes
5✔
246
    sha_sequences: bytes
5✔
247
    sha_outputs: bytes
5✔
248

249
    @classmethod
5✔
250
    def from_tx(cls, tx: 'PartialTransaction') -> 'BIP341SharedTxDigestFields':
5✔
251
        inputs = tx.inputs()
5✔
252
        outputs = tx.outputs()
5✔
253
        sha_prevouts = sha256(b''.join(txin.prevout.serialize_to_network() for txin in inputs))
5✔
254
        sha_amounts = sha256(b''.join(
5✔
255
            int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
256
            for txin in inputs))
257
        sha_scriptpubkeys = sha256(b''.join(
5✔
258
            var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
259
            for txin in inputs))
260
        sha_sequences = sha256(b''.join(
5✔
261
            int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
262
            for txin in inputs))
263
        sha_outputs = sha256(b''.join(o.serialize_to_network() for o in outputs))
5✔
264
        return BIP341SharedTxDigestFields(
5✔
265
            sha_prevouts=sha_prevouts,
266
            sha_amounts=sha_amounts,
267
            sha_scriptpubkeys=sha_scriptpubkeys,
268
            sha_sequences=sha_sequences,
269
            sha_outputs=sha_outputs,
270
        )
271

272

273
class SighashCache:
5✔
274

275
    def __init__(self):
5✔
276
        self._witver0 = None  # type: Optional[BIP143SharedTxDigestFields]
5✔
277
        self._witver1 = None  # type: Optional[BIP341SharedTxDigestFields]
5✔
278

279
    def get_witver0_data_for_tx(self, tx: 'PartialTransaction') -> BIP143SharedTxDigestFields:
5✔
280
        if self._witver0 is None:
5✔
281
            self._witver0 = BIP143SharedTxDigestFields.from_tx(tx)
5✔
282
        return self._witver0
5✔
283

284
    def get_witver1_data_for_tx(self, tx: 'PartialTransaction') -> BIP341SharedTxDigestFields:
5✔
285
        if self._witver1 is None:
5✔
286
            self._witver1 = BIP341SharedTxDigestFields.from_tx(tx)
5✔
287
        return self._witver1
5✔
288

289

290
class TxOutpoint(NamedTuple):
5✔
291
    txid: bytes  # endianness same as hex string displayed; reverse of tx serialization order
5✔
292
    out_idx: int
5✔
293

294
    @classmethod
5✔
295
    def from_str(cls, s: str) -> 'TxOutpoint':
5✔
296
        hash_str, idx_str = s.split(':')
5✔
297
        assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash"
5✔
298
        return TxOutpoint(txid=bfh(hash_str),
5✔
299
                          out_idx=int(idx_str))
300

301
    def __str__(self) -> str:
5✔
UNCOV
302
        return f"""TxOutpoint("{self.to_str()}")"""
×
303

304
    def __repr__(self):
5✔
UNCOV
305
        return f"<{str(self)}>"
×
306

307
    def to_str(self) -> str:
5✔
308
        return f"{self.txid.hex()}:{self.out_idx}"
5✔
309

310
    def to_json(self):
5✔
UNCOV
311
        return [self.txid.hex(), self.out_idx]
×
312

313
    def serialize_to_network(self) -> bytes:
5✔
314
        return self.txid[::-1] + int.to_bytes(self.out_idx, length=4, byteorder="little", signed=False)
5✔
315

316
    def is_coinbase(self) -> bool:
5✔
317
        return self.txid == bytes(32)
5✔
318

319
    def short_name(self):
5✔
UNCOV
320
        return f"{self.txid.hex()[0:10]}:{self.out_idx}"
×
321

322

323
class TxInput:
5✔
324
    prevout: TxOutpoint
5✔
325
    script_sig: Optional[bytes]
5✔
326
    nsequence: int
5✔
327
    witness: Optional[bytes]
5✔
328
    _is_coinbase_output: bool
5✔
329

330
    def __init__(self, *,
5✔
331
                 prevout: TxOutpoint,
332
                 script_sig: bytes = None,
333
                 nsequence: int = 0xffffffff - 1,
334
                 witness: bytes = None,
335
                 is_coinbase_output: bool = False):
336
        self.prevout = prevout
5✔
337
        self.script_sig = script_sig
5✔
338
        self.nsequence = nsequence
5✔
339
        self.witness = witness
5✔
340
        self._is_coinbase_output = is_coinbase_output
5✔
341
        # blockchain fields
342
        self.block_height = None  # type: Optional[int]  # height at which the TXO is mined; None means unknown. not SPV-ed.
5✔
343
        self.block_txpos = None  # type: Optional[int]  # position of tx in block, if TXO is mined; otherwise None or -1
5✔
344
        self.spent_height = None  # type: Optional[int]  # height at which the TXO got spent
5✔
345
        self.spent_txid = None  # type: Optional[str]  # txid of the spender
5✔
346
        self._utxo = None  # type: Optional[Transaction]
5✔
347
        self.__scriptpubkey = None  # type: Optional[bytes]
5✔
348
        self.__address = None  # type: Optional[str]
5✔
349
        self.__value_sats = None  # type: Optional[int]
5✔
350

351
    @property
5✔
352
    def short_id(self):
5✔
UNCOV
353
        if self.block_txpos is not None and self.block_txpos >= 0:
×
354
            return ShortID.from_components(self.block_height, self.block_txpos, self.prevout.out_idx)
×
355
        else:
UNCOV
356
            return self.prevout.short_name()
×
357

358
    @property
5✔
359
    def utxo(self):
5✔
360
        return self._utxo
5✔
361

362
    @utxo.setter
5✔
363
    def utxo(self, tx: Optional['Transaction']):
5✔
364
        if tx is None:
5✔
365
            return
5✔
366
        # note that tx might be a PartialTransaction
367
        # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx
368
        tx = tx_from_any(str(tx))
5✔
369
        # 'utxo' field should not be a PSBT:
370
        if not tx.is_complete():
5✔
371
            return
5✔
372
        self.validate_data(utxo=tx)
5✔
373
        self._utxo = tx
5✔
374
        # update derived fields
375
        out_idx = self.prevout.out_idx
5✔
376
        self.__scriptpubkey = self._utxo.outputs()[out_idx].scriptpubkey
5✔
377
        self.__address = _NEEDS_RECALC
5✔
378
        self.__value_sats = self._utxo.outputs()[out_idx].value
5✔
379

380
    def validate_data(self, *, utxo: Optional['Transaction'] = None, **kwargs) -> None:
5✔
381
        utxo = utxo or self.utxo
5✔
382
        if utxo:
5✔
383
            if self.prevout.txid.hex() != utxo.txid():
5✔
UNCOV
384
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
385
                                                  f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
386

387
    def is_coinbase_input(self) -> bool:
5✔
388
        """Whether this is the input of a coinbase tx."""
389
        return self.prevout.is_coinbase()
5✔
390

391
    def is_coinbase_output(self) -> bool:
5✔
392
        """Whether the coin being spent is an output of a coinbase tx.
393
        This matters for coin maturity (and pretty much only for that!).
394
        """
395
        return self._is_coinbase_output
5✔
396

397
    def value_sats(self) -> Optional[int]:
5✔
398
        return self.__value_sats
5✔
399

400
    @property
5✔
401
    def address(self) -> Optional[str]:
5✔
402
        if self.__address is _NEEDS_RECALC:
5✔
403
            self.__address = get_address_from_output_script(self.__scriptpubkey)
5✔
404
        return self.__address
5✔
405

406
    @property
5✔
407
    def scriptpubkey(self) -> Optional[bytes]:
5✔
408
        return self.__scriptpubkey
5✔
409

410
    def to_json(self):
5✔
411
        d = {
5✔
412
            'prevout_hash': self.prevout.txid.hex(),
413
            'prevout_n': self.prevout.out_idx,
414
            'coinbase': self.is_coinbase_output(),
415
            'nsequence': self.nsequence,
416
        }
417
        if self.script_sig is not None:
5✔
UNCOV
418
            d['scriptSig'] = self.script_sig.hex()
×
419
        if self.witness is not None:
5✔
UNCOV
420
            d['witness'] = [x.hex() for x in self.witness_elements()]
×
421
        return d
5✔
422

423
    def serialize_to_network(self, *, script_sig: bytes = None) -> bytes:
5✔
424
        if script_sig is None:
5✔
UNCOV
425
            script_sig = self.script_sig
×
426
        # Prev hash and index
427
        s = self.prevout.serialize_to_network()
5✔
428
        # Script length, script, sequence
429
        s += var_int(len(script_sig))
5✔
430
        s += script_sig
5✔
431
        s += int.to_bytes(self.nsequence, length=4, byteorder="little", signed=False)
5✔
432
        return s
5✔
433

434
    def witness_elements(self) -> Sequence[bytes]:
5✔
UNCOV
435
        if not self.witness:
×
436
            return []
×
437
        vds = BCDataStream()
×
438
        vds.write(self.witness)
×
439
        n = vds.read_compact_size()
×
440
        return list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
×
441

442
    def is_segwit(self, *, guess_for_address=False) -> bool:
5✔
443
        if self.witness not in (b'\x00', b'', None):
5✔
444
            return True
5✔
445
        return False
5✔
446

447
    async def add_info_from_network(
5✔
448
            self,
449
            network: Optional['Network'],
450
            *,
451
            ignore_network_issues: bool = True,
452
            timeout=None,
453
    ) -> bool:
454
        """Returns True iff successful."""
455
        from .network import NetworkException
5✔
456
        async def fetch_from_network(txid) -> Optional[Transaction]:
5✔
457
            tx = None
5✔
458
            if network and network.has_internet_connection():
5✔
459
                try:
5✔
460
                    raw_tx = await network.get_transaction(txid, timeout=timeout)
5✔
UNCOV
461
                except NetworkException as e:
×
462
                    _logger.info(f'got network error getting input txn. err: {repr(e)}. txid: {txid}. '
×
463
                                 f'if you are intentionally offline, consider using the --offline flag')
UNCOV
464
                    if not ignore_network_issues:
×
465
                        raise e
×
466
                else:
467
                    tx = Transaction(raw_tx)
5✔
468
            if not tx and not ignore_network_issues:
5✔
UNCOV
469
                raise NetworkException('failed to get prev tx from network')
×
470
            return tx
5✔
471

472
        if self.utxo is None:
5✔
473
            self.utxo = await fetch_from_network(txid=self.prevout.txid.hex())
5✔
474
        return self.utxo is not None
5✔
475

476

477
class BCDataStream(object):
5✔
478
    """Workalike python implementation of Bitcoin's CDataStream class."""
479

480
    def __init__(self):
5✔
481
        self.input = None  # type: Optional[bytearray]
5✔
482
        self.read_cursor = 0
5✔
483

484
    def clear(self):
5✔
UNCOV
485
        self.input = None
×
486
        self.read_cursor = 0
×
487

488
    def write(self, _bytes: Union[bytes, bytearray]):  # Initialize with string of _bytes
5✔
489
        assert isinstance(_bytes, (bytes, bytearray))
5✔
490
        if self.input is None:
5✔
491
            self.input = bytearray(_bytes)
5✔
492
        else:
493
            self.input += bytearray(_bytes)
5✔
494

495
    def read_string(self, encoding='ascii'):
5✔
496
        # Strings are encoded depending on length:
497
        # 0 to 252 :  1-byte-length followed by bytes (if any)
498
        # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
499
        # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
500
        # ... and the Bitcoin client is coded to understand:
501
        # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
502
        # ... but I don't think it actually handles any strings that big.
503
        if self.input is None:
5✔
504
            raise SerializationError("call write(bytes) before trying to deserialize")
5✔
505

506
        length = self.read_compact_size()
5✔
507

508
        return self.read_bytes(length).decode(encoding)
5✔
509

510
    def write_string(self, string, encoding='ascii'):
5✔
511
        string = to_bytes(string, encoding)
5✔
512
        # Length-encoded as with read-string
513
        self.write_compact_size(len(string))
5✔
514
        self.write(string)
5✔
515

516
    def read_bytes(self, length: int) -> bytes:
5✔
517
        if self.input is None:
5✔
518
            raise SerializationError("call write(bytes) before trying to deserialize")
5✔
519
        assert length >= 0
5✔
520
        input_len = len(self.input)
5✔
521
        read_begin = self.read_cursor
5✔
522
        read_end = read_begin + length
5✔
523
        if 0 <= read_begin <= read_end <= input_len:
5✔
524
            result = self.input[read_begin:read_end]  # type: bytearray
5✔
525
            self.read_cursor += length
5✔
526
            return bytes(result)
5✔
527
        else:
528
            raise SerializationError('attempt to read past end of buffer')
5✔
529

530
    def write_bytes(self, _bytes: Union[bytes, bytearray], length: int):
5✔
UNCOV
531
        assert len(_bytes) == length, len(_bytes)
×
532
        self.write(_bytes)
×
533

534
    def can_read_more(self) -> bool:
5✔
535
        if not self.input:
5✔
UNCOV
536
            return False
×
537
        return self.read_cursor < len(self.input)
5✔
538

539
    def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00'
5✔
540
    def read_int16(self): return self._read_num('<h')
5✔
541
    def read_uint16(self): return self._read_num('<H')
5✔
542
    def read_int32(self): return self._read_num('<i')
5✔
543
    def read_uint32(self): return self._read_num('<I')
5✔
544
    def read_int64(self): return self._read_num('<q')
5✔
545
    def read_uint64(self): return self._read_num('<Q')
5✔
546

547
    def write_boolean(self, val): return self.write(b'\x01' if val else b'\x00')
5✔
548
    def write_int16(self, val): return self._write_num('<h', val)
5✔
549
    def write_uint16(self, val): return self._write_num('<H', val)
5✔
550
    def write_int32(self, val): return self._write_num('<i', val)
5✔
551
    def write_uint32(self, val): return self._write_num('<I', val)
5✔
552
    def write_int64(self, val): return self._write_num('<q', val)
5✔
553
    def write_uint64(self, val): return self._write_num('<Q', val)
5✔
554

555
    def read_compact_size(self):
5✔
556
        try:
5✔
557
            size = self.input[self.read_cursor]
5✔
558
            self.read_cursor += 1
5✔
559
            if size == 253:
5✔
560
                size = self._read_num('<H')
5✔
561
            elif size == 254:
5✔
562
                size = self._read_num('<I')
5✔
563
            elif size == 255:
5✔
564
                size = self._read_num('<Q')
5✔
565
            return size
5✔
566
        except IndexError as e:
5✔
567
            raise SerializationError("attempt to read past end of buffer") from e
5✔
568

569
    def write_compact_size(self, size):
5✔
570
        if size < 0:
5✔
571
            raise SerializationError("attempt to write size < 0")
5✔
572
        elif size < 253:
5✔
573
            self.write(bytes([size]))
5✔
574
        elif size < 2**16:
5✔
575
            self.write(b'\xfd')
5✔
576
            self._write_num('<H', size)
5✔
577
        elif size < 2**32:
5✔
578
            self.write(b'\xfe')
5✔
579
            self._write_num('<I', size)
5✔
580
        elif size < 2**64:
5✔
581
            self.write(b'\xff')
5✔
582
            self._write_num('<Q', size)
5✔
583
        else:
UNCOV
584
            raise Exception(f"size {size} too large for compact_size")
×
585

586
    def _read_num(self, format):
5✔
587
        try:
5✔
588
            (i,) = struct.unpack_from(format, self.input, self.read_cursor)
5✔
589
            self.read_cursor += struct.calcsize(format)
5✔
UNCOV
590
        except Exception as e:
×
591
            raise SerializationError(e) from e
×
592
        return i
5✔
593

594
    def _write_num(self, format, num):
5✔
595
        s = struct.pack(format, num)
5✔
596
        self.write(s)
5✔
597

598

599
def script_GetOp(_bytes : bytes):
5✔
600
    i = 0
5✔
601
    while i < len(_bytes):
5✔
602
        vch = None
5✔
603
        opcode = _bytes[i]
5✔
604
        i += 1
5✔
605

606
        if opcode <= opcodes.OP_PUSHDATA4:
5✔
607
            nSize = opcode
5✔
608
            if opcode == opcodes.OP_PUSHDATA1:
5✔
609
                try: nSize = _bytes[i]
5✔
UNCOV
610
                except IndexError: raise MalformedBitcoinScript()
×
611
                i += 1
5✔
612
            elif opcode == opcodes.OP_PUSHDATA2:
5✔
613
                try: (nSize,) = struct.unpack_from('<H', _bytes, i)
5✔
UNCOV
614
                except struct.error: raise MalformedBitcoinScript()
×
615
                i += 2
5✔
616
            elif opcode == opcodes.OP_PUSHDATA4:
5✔
617
                try: (nSize,) = struct.unpack_from('<I', _bytes, i)
5✔
UNCOV
618
                except struct.error: raise MalformedBitcoinScript()
×
619
                i += 4
5✔
620
            if i + nSize > len(_bytes):
5✔
621
                raise MalformedBitcoinScript(
5✔
622
                    f"Push of data element that is larger than remaining data: {nSize} vs {len(_bytes) - i}")
623
            vch = _bytes[i:i + nSize]
5✔
624
            i += nSize
5✔
625

626
        yield opcode, vch, i
5✔
627

628

629
class OPPushDataGeneric:
5✔
630
    def __init__(self, pushlen: Callable=None):
5✔
631
        if pushlen is not None:
5✔
632
            self.check_data_len = pushlen
5✔
633

634
    @classmethod
5✔
635
    def check_data_len(cls, datalen: int) -> bool:
5✔
636
        # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
UNCOV
637
        return opcodes.OP_PUSHDATA4 >= datalen >= 0
×
638

639
    @classmethod
5✔
640
    def is_instance(cls, item):
5✔
641
        # accept objects that are instances of this class
642
        # or other classes that are subclasses
643
        return isinstance(item, cls) \
5✔
644
               or (isinstance(item, type) and issubclass(item, cls))
645

646

647
class OPGeneric:
5✔
648
    def __init__(self, matcher: Callable=None):
5✔
649
        if matcher is not None:
5✔
650
            self.matcher = matcher
5✔
651

652
    def match(self, op) -> bool:
5✔
653
        return self.matcher(op)
5✔
654

655
    @classmethod
5✔
656
    def is_instance(cls, item):
5✔
657
        # accept objects that are instances of this class
658
        # or other classes that are subclasses
659
        return isinstance(item, cls) \
5✔
660
               or (isinstance(item, type) and issubclass(item, cls))
661

662
OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65))
5✔
663
OP_ANYSEGWIT_VERSION = OPGeneric(lambda x: x in list(range(opcodes.OP_1, opcodes.OP_16 + 1)))
5✔
664

665
SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160,
5✔
666
                               OPPushDataGeneric(lambda x: x == 20),
667
                               opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
668
SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL]
5✔
669
SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))]
5✔
670
SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)]
5✔
671
SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)]
5✔
672
SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT = [OP_ANYSEGWIT_VERSION, OPPushDataGeneric(lambda x: x in list(range(2, 40 + 1)))]
5✔
673

674

675
def check_scriptpubkey_template_and_dust(scriptpubkey, amount: Optional[int]):
5✔
676
    if match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
UNCOV
677
        dust_limit = bitcoin.DUST_LIMIT_P2PKH
×
678
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
UNCOV
679
        dust_limit = bitcoin.DUST_LIMIT_P2SH
×
680
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WSH):
5✔
UNCOV
681
        dust_limit = bitcoin.DUST_LIMIT_P2WSH
×
682
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
5✔
683
        dust_limit = bitcoin.DUST_LIMIT_P2WPKH
5✔
UNCOV
684
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT):
×
685
        dust_limit = bitcoin.DUST_LIMIT_UNKNOWN_SEGWIT
×
686
    else:
UNCOV
687
        raise Exception(f'scriptpubkey does not conform to any template: {scriptpubkey.hex()}')
×
688
    if amount < dust_limit:
5✔
UNCOV
689
        raise Exception(f'amount ({amount}) is below dust limit for scriptpubkey type ({dust_limit})')
×
690

691
def merge_duplicate_tx_outputs(outputs: Iterable['PartialTxOutput']) -> List['PartialTxOutput']:
5✔
692
    """Merges outputs that are paying to the same address by replacing them with a single larger output."""
693
    output_dict = {}
5✔
694
    for output in outputs:
5✔
695
        assert isinstance(output.value, int), "tx outputs with spend-max-like str cannot be merged"
5✔
696
        if output.scriptpubkey in output_dict:
5✔
697
            output_dict[output.scriptpubkey].value += output.value
5✔
698
        else:
699
            output_dict[output.scriptpubkey] = copy.copy(output)
5✔
700
    return list(output_dict.values())
5✔
701

702
def match_script_against_template(script, template, debug=False) -> bool:
5✔
703
    """Returns whether 'script' matches 'template'."""
704
    if script is None:
5✔
UNCOV
705
        return False
×
706
    # optionally decode script now:
707
    if isinstance(script, (bytes, bytearray)):
5✔
708
        try:
5✔
709
            script = [x for x in script_GetOp(script)]
5✔
UNCOV
710
        except MalformedBitcoinScript:
×
711
            if debug:
×
712
                _logger.debug(f"malformed script")
×
713
            return False
×
714
    if debug:
5✔
UNCOV
715
        _logger.debug(f"match script against template: {script}")
×
716
    if len(script) != len(template):
5✔
717
        if debug:
5✔
UNCOV
718
            _logger.debug(f"length mismatch {len(script)} != {len(template)}")
×
719
        return False
5✔
720
    for i in range(len(script)):
5✔
721
        template_item = template[i]
5✔
722
        script_item = script[i]
5✔
723
        if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]):
5✔
724
            continue
5✔
725
        if OPGeneric.is_instance(template_item) and template_item.match(script_item[0]):
5✔
726
            continue
5✔
727
        if template_item != script_item[0]:
5✔
728
            if debug:
5✔
UNCOV
729
                _logger.debug(f"item mismatch at position {i}: {template_item} != {script_item[0]}")
×
730
            return False
5✔
731
    return True
5✔
732

733
def get_script_type_from_output_script(_bytes: bytes) -> Optional[str]:
5✔
734
    if _bytes is None:
5✔
UNCOV
735
        return None
×
736
    try:
5✔
737
        decoded = [x for x in script_GetOp(_bytes)]
5✔
UNCOV
738
    except MalformedBitcoinScript:
×
739
        return None
×
740
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
741
        return 'p2pkh'
5✔
742
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
UNCOV
743
        return 'p2sh'
×
744
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
5✔
745
        return 'p2wpkh'
5✔
UNCOV
746
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH):
×
747
        return 'p2wsh'
×
748
    return None
×
749

750
def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]:
5✔
751
    try:
5✔
752
        decoded = [x for x in script_GetOp(_bytes)]
5✔
UNCOV
753
    except MalformedBitcoinScript:
×
754
        return None
×
755

756
    # p2pkh
757
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
758
        return hash160_to_p2pkh(decoded[2][1], net=net)
5✔
759

760
    # p2sh
761
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
762
        return hash160_to_p2sh(decoded[1][1], net=net)
5✔
763

764
    # segwit address (version 0)
765
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
766
        return hash_to_segwit_addr(decoded[1][1], witver=0, net=net)
5✔
767

768
    # segwit address (version 1-16)
769
    future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
5✔
770
    for witver, opcode in enumerate(future_witness_versions, start=1):
5✔
771
        match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
5✔
772
        if match_script_against_template(decoded, match):
5✔
773
            return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net)
5✔
774

775
    return None
5✔
776

777

778
def parse_input(vds: BCDataStream) -> TxInput:
5✔
779
    prevout_hash = vds.read_bytes(32)[::-1]
5✔
780
    prevout_n = vds.read_uint32()
5✔
781
    prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n)
5✔
782
    script_sig = vds.read_bytes(vds.read_compact_size())
5✔
783
    nsequence = vds.read_uint32()
5✔
784
    return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence)
5✔
785

786

787
def parse_witness(vds: BCDataStream, txin: TxInput) -> None:
5✔
788
    n = vds.read_compact_size()
5✔
789
    witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
5✔
790
    txin.witness = construct_witness(witness_elements)
5✔
791

792

793
def parse_output(vds: BCDataStream) -> TxOutput:
5✔
794
    value = vds.read_int64()
5✔
795
    if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN:
5✔
UNCOV
796
        raise SerializationError('invalid output amount (too large)')
×
797
    if value < 0:
5✔
UNCOV
798
        raise SerializationError('invalid output amount (negative)')
×
799
    scriptpubkey = vds.read_bytes(vds.read_compact_size())
5✔
800
    return TxOutput(value=value, scriptpubkey=scriptpubkey)
5✔
801

802

803
# pay & redeem scripts
804

805
def multisig_script(public_keys: Sequence[str], m: int) -> bytes:
5✔
806
    n = len(public_keys)
5✔
807
    assert 1 <= m <= n <= 15, f'm {m}, n {n}'
5✔
808
    return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG])
5✔
809

810

811
class Transaction:
5✔
812
    _cached_network_ser: Optional[str]
5✔
813

814
    def __str__(self):
5✔
815
        return self.serialize()
5✔
816

817
    def __init__(self, raw):
5✔
818
        if raw is None:
5✔
819
            self._cached_network_ser = None
5✔
820
        elif isinstance(raw, str):
5✔
821
            self._cached_network_ser = raw.strip() if raw else None
5✔
822
            assert is_hex_str(self._cached_network_ser)
5✔
823
        elif isinstance(raw, (bytes, bytearray)):
5✔
824
            self._cached_network_ser = raw.hex()
5✔
825
        else:
UNCOV
826
            raise Exception(f"cannot initialize transaction from {raw}")
×
827
        self._inputs = None  # type: List[TxInput]
5✔
828
        self._outputs = None  # type: List[TxOutput]
5✔
829
        self._locktime = 0
5✔
830
        self._version = 2
5✔
831

832
        self._cached_txid = None  # type: Optional[str]
5✔
833

834
    @property
5✔
835
    def locktime(self):
5✔
836
        self.deserialize()
5✔
837
        return self._locktime
5✔
838

839
    @locktime.setter
5✔
840
    def locktime(self, value: int):
5✔
841
        assert isinstance(value, int), f"locktime must be int, not {value!r}"
5✔
842
        self._locktime = value
5✔
843
        self.invalidate_ser_cache()
5✔
844

845
    @property
5✔
846
    def version(self):
5✔
847
        self.deserialize()
5✔
848
        return self._version
5✔
849

850
    @version.setter
5✔
851
    def version(self, value):
5✔
852
        self._version = value
5✔
853
        self.invalidate_ser_cache()
5✔
854

855
    def to_json(self) -> dict:
5✔
856
        d = {
5✔
857
            'version': self.version,
858
            'locktime': self.locktime,
859
            'inputs': [txin.to_json() for txin in self.inputs()],
860
            'outputs': [txout.to_json() for txout in self.outputs()],
861
        }
862
        return d
5✔
863

864
    def inputs(self) -> Sequence[TxInput]:
5✔
865
        if self._inputs is None:
5✔
866
            self.deserialize()
5✔
867
        return self._inputs
5✔
868

869
    def outputs(self) -> Sequence[TxOutput]:
5✔
870
        if self._outputs is None:
5✔
871
            self.deserialize()
5✔
872
        return self._outputs
5✔
873

874
    def deserialize(self) -> None:
5✔
875
        if self._cached_network_ser is None:
5✔
876
            return
5✔
877
        if self._inputs is not None:
5✔
878
            return
5✔
879

880
        raw_bytes = bfh(self._cached_network_ser)
5✔
881
        vds = BCDataStream()
5✔
882
        vds.write(raw_bytes)
5✔
883
        self._version = vds.read_int32()
5✔
884
        n_vin = vds.read_compact_size()
5✔
885
        is_segwit = (n_vin == 0)
5✔
886
        if is_segwit:
5✔
887
            marker = vds.read_bytes(1)
5✔
888
            if marker != b'\x01':
5✔
UNCOV
889
                raise SerializationError('invalid txn marker byte: {}'.format(marker))
×
890
            n_vin = vds.read_compact_size()
5✔
891
        if n_vin < 1:
5✔
UNCOV
892
            raise SerializationError('tx needs to have at least 1 input')
×
893
        txins = [parse_input(vds) for i in range(n_vin)]
5✔
894
        n_vout = vds.read_compact_size()
5✔
895
        if n_vout < 1:
5✔
UNCOV
896
            raise SerializationError('tx needs to have at least 1 output')
×
897
        self._outputs = [parse_output(vds) for i in range(n_vout)]
5✔
898
        if is_segwit:
5✔
899
            for txin in txins:
5✔
900
                parse_witness(vds, txin)
5✔
901
        self._inputs = txins  # only expose field after witness is parsed, for sanity
5✔
902
        self._locktime = vds.read_uint32()
5✔
903
        if vds.can_read_more():
5✔
UNCOV
904
            raise SerializationError('extra junk at the end')
×
905

906
    @classmethod
5✔
907
    def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> bytes:
5✔
908
        if txin.witness is not None:
5✔
909
            return txin.witness
5✔
910
        if txin.is_coinbase_input():
5✔
UNCOV
911
            return b""
×
912
        assert isinstance(txin, PartialTxInput)
5✔
913

914
        if not txin.is_segwit():
5✔
915
            return construct_witness([])
5✔
916

917
        if estimate_size and hasattr(txin, 'make_witness'):
5✔
UNCOV
918
            sig_dummy = b'\x00' * 71  # DER-encoded ECDSA sig, with low S and low R
×
919
            txin.witness_sizehint = len(txin.make_witness(sig_dummy))
×
920

921
        if estimate_size and txin.witness_sizehint is not None:
5✔
UNCOV
922
            return bytes(txin.witness_sizehint)
×
923

924
        dummy_desc = None
5✔
925
        if estimate_size:
5✔
926
            dummy_desc = create_dummy_descriptor_from_address(txin.address)
5✔
927
        if desc := (txin.script_descriptor or dummy_desc):
5✔
928
            sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa)
5✔
929
            if sol.witness is not None:
5✔
930
                return sol.witness
5✔
UNCOV
931
            return construct_witness([])
×
932
        raise UnknownTxinType("cannot construct witness")
×
933

934
    @classmethod
5✔
935
    def input_script(self, txin: TxInput, *, estimate_size=False) -> bytes:
5✔
936
        if txin.script_sig is not None:
5✔
937
            return txin.script_sig
5✔
938
        if txin.is_coinbase_input():
5✔
UNCOV
939
            return b""
×
940
        assert isinstance(txin, PartialTxInput)
5✔
941

942
        if txin.is_p2sh_segwit() and txin.redeem_script:
5✔
943
            return construct_script([txin.redeem_script])
5✔
944
        if txin.is_native_segwit():
5✔
945
            return b""
5✔
946

947
        dummy_desc = None
5✔
948
        if estimate_size:
5✔
949
            dummy_desc = create_dummy_descriptor_from_address(txin.address)
5✔
950
        if desc := (txin.script_descriptor or dummy_desc):
5✔
951
            if desc.is_segwit():
5✔
952
                if redeem_script := desc.expand().redeem_script:
5✔
953
                    return construct_script([redeem_script])
5✔
954
                return b""
5✔
955
            sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa)
5✔
956
            if sol.script_sig is not None:
5✔
957
                return sol.script_sig
5✔
UNCOV
958
            return b""
×
959
        raise UnknownTxinType("cannot construct scriptSig")
×
960

961
    @classmethod
5✔
962
    def get_preimage_script(cls, txin: 'PartialTxInput') -> bytes:
5✔
963
        if txin.witness_script:
5✔
964
            if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]:
5✔
UNCOV
965
                raise Exception('OP_CODESEPARATOR black magic is not supported')
×
966
            return txin.witness_script
5✔
967
        if not txin.is_segwit() and txin.redeem_script:
5✔
968
            if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]:
5✔
UNCOV
969
                raise Exception('OP_CODESEPARATOR black magic is not supported')
×
970
            return txin.redeem_script
5✔
971

972
        if desc := txin.script_descriptor:
5✔
973
            sc = desc.expand()
5✔
974
            if script := sc.scriptcode_for_sighash:
5✔
975
                return script
5✔
UNCOV
976
            raise Exception(f"don't know scriptcode for descriptor: {desc.to_string()}")
×
977
        raise UnknownTxinType(f'cannot construct preimage_script')
×
978

979
    def is_segwit(self, *, guess_for_address=False):
5✔
980
        return any(txin.is_segwit(guess_for_address=guess_for_address)
5✔
981
                   for txin in self.inputs())
982

983
    def invalidate_ser_cache(self):
5✔
984
        self._cached_network_ser = None
5✔
985
        self._cached_txid = None
5✔
986

987
    def serialize(self) -> str:
5✔
988
        if not self._cached_network_ser:
5✔
989
            self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True)
5✔
990
        return self._cached_network_ser
5✔
991

992
    def serialize_as_bytes(self) -> bytes:
5✔
993
        return bfh(self.serialize())
5✔
994

995
    def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str:
5✔
996
        """Serialize the transaction as used on the Bitcoin network, into hex.
997
        `include_sigs` signals whether to include scriptSigs and witnesses.
998
        `force_legacy` signals to use the pre-segwit format
999
        note: (not include_sigs) implies force_legacy
1000
        """
1001
        self.deserialize()
5✔
1002
        nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True).hex()
5✔
1003
        nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False).hex()
5✔
1004
        inputs = self.inputs()
5✔
1005
        outputs = self.outputs()
5✔
1006

1007
        def create_script_sig(txin: TxInput) -> bytes:
5✔
1008
            if include_sigs:
5✔
1009
                script_sig = self.input_script(txin, estimate_size=estimate_size)
5✔
1010
                return script_sig
5✔
1011
            return b""
5✔
1012
        txins = var_int(len(inputs)).hex() + ''.join(
5✔
1013
            txin.serialize_to_network(script_sig=create_script_sig(txin)).hex()
1014
            for txin in inputs)
1015
        txouts = var_int(len(outputs)).hex() + ''.join(o.serialize_to_network().hex() for o in outputs)
5✔
1016

1017
        use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True)
5✔
1018
        use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit()
5✔
1019
        use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use
5✔
1020
        if include_sigs and not force_legacy and use_segwit_ser:
5✔
1021
            marker = '00'
5✔
1022
            flag = '01'
5✔
1023
            witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size).hex() for x in inputs)
5✔
1024
            return nVersion + marker + flag + txins + txouts + witness + nLocktime
5✔
1025
        else:
1026
            return nVersion + txins + txouts + nLocktime
5✔
1027

1028
    def to_qr_data(self) -> Tuple[str, bool]:
5✔
1029
        """Returns (serialized_tx, is_complete). The tx is serialized to be put inside a QR code. No side-effects.
1030
        As space in a QR code is limited, some data might have to be omitted. This is signalled via is_complete=False.
1031
        """
1032
        is_complete = True
5✔
1033
        tx = copy.deepcopy(self)  # make copy as we mutate tx
5✔
1034
        if isinstance(tx, PartialTransaction):
5✔
1035
            # this makes QR codes a lot smaller (or just possible in the first place!)
1036
            # note: will not apply if all inputs are taproot, due to new sighash.
1037
            tx.convert_all_utxos_to_witness_utxos()
5✔
1038
            is_complete = False
5✔
1039
        tx_bytes = tx.serialize_as_bytes()
5✔
1040
        return base_encode(tx_bytes, base=43), is_complete
5✔
1041

1042
    def txid(self) -> Optional[str]:
5✔
1043
        if self._cached_txid is None:
5✔
1044
            self.deserialize()
5✔
1045
            all_segwit = all(txin.is_segwit() for txin in self.inputs())
5✔
1046
            if not all_segwit and not self.is_complete():
5✔
1047
                return None
5✔
1048
            try:
5✔
1049
                ser = self.serialize_to_network(force_legacy=True)
5✔
UNCOV
1050
            except UnknownTxinType:
×
1051
                # we might not know how to construct scriptSig for some scripts
UNCOV
1052
                return None
×
1053
            self._cached_txid = sha256d(bfh(ser))[::-1].hex()
5✔
1054
        return self._cached_txid
5✔
1055

1056
    def wtxid(self) -> Optional[str]:
5✔
1057
        self.deserialize()
5✔
1058
        if not self.is_complete():
5✔
UNCOV
1059
            return None
×
1060
        try:
5✔
1061
            ser = self.serialize_to_network()
5✔
UNCOV
1062
        except UnknownTxinType:
×
1063
            # we might not know how to construct scriptSig/witness for some scripts
UNCOV
1064
            return None
×
1065
        return sha256d(bfh(ser))[::-1].hex()
5✔
1066

1067
    def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None:
5✔
1068
        # populate prev_txs
1069
        for txin in self.inputs():
5✔
1070
            wallet.add_input_info(txin)
5✔
1071

1072
    async def add_info_from_network(
5✔
1073
        self,
1074
        network: Optional['Network'],
1075
        *,
1076
        ignore_network_issues: bool = True,
1077
        progress_cb: Callable[[TxinDataFetchProgress], None] = None,
1078
        timeout=None,
1079
    ) -> None:
1080
        """note: it is recommended to call add_info_from_wallet first, as this can save some network requests"""
1081
        if not self.is_missing_info_from_network():
5✔
1082
            return
5✔
1083
        if progress_cb is None:
5✔
1084
            progress_cb = lambda *args, **kwargs: None
5✔
1085
        num_tasks_done = 0
5✔
1086
        num_tasks_total = 0
5✔
1087
        has_errored = False
5✔
1088
        has_finished = False
5✔
1089
        async def add_info_to_txin(txin: TxInput):
5✔
1090
            nonlocal num_tasks_done, has_errored
1091
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1092
            success = await txin.add_info_from_network(
5✔
1093
                network=network,
1094
                ignore_network_issues=ignore_network_issues,
1095
                timeout=timeout,
1096
            )
1097
            if success:
5✔
1098
                num_tasks_done += 1
5✔
1099
            else:
UNCOV
1100
                has_errored = True
×
1101
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1102
        # schedule a network task for each txin
1103
        try:
5✔
1104
            async with OldTaskGroup() as group:
5✔
1105
                for txin in self.inputs():
5✔
1106
                    if txin.utxo is None:
5✔
1107
                        num_tasks_total += 1
5✔
1108
                        await group.spawn(add_info_to_txin(txin=txin))
5✔
UNCOV
1109
        except Exception as e:
×
1110
            has_errored = True
×
1111
            _logger.error(f"tx.add_info_from_network() got exc: {e!r}")
×
1112
        finally:
1113
            has_finished = True
5✔
1114
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1115

1116
    def is_missing_info_from_network(self) -> bool:
5✔
1117
        return any(txin.utxo is None for txin in self.inputs())
5✔
1118

1119
    def add_info_from_wallet_and_network(
5✔
1120
        self, *, wallet: 'Abstract_Wallet', show_error: Callable[[str], None],
1121
    ) -> bool:
1122
        """Returns whether successful.
1123
        note: This is sort of a legacy hack... doing network requests in non-async code.
1124
              Relatedly, this should *not* be called from the network thread.
1125
        """
1126
        # note side-effect: tx is being mutated
UNCOV
1127
        from .network import NetworkException, Network
×
1128
        self.add_info_from_wallet(wallet)
×
1129
        try:
×
1130
            if self.is_missing_info_from_network():
×
1131
                Network.run_from_another_thread(
×
1132
                    self.add_info_from_network(wallet.network, ignore_network_issues=False))
UNCOV
1133
        except NetworkException as e:
×
1134
            show_error(repr(e))
×
1135
            return False
×
1136
        return True
×
1137

1138
    def is_rbf_enabled(self) -> bool:
5✔
1139
        """Whether the tx explicitly signals BIP-0125 replace-by-fee."""
1140
        return any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()])
5✔
1141

1142
    def estimated_size(self) -> int:
5✔
1143
        """Return an estimated virtual tx size in vbytes.
1144
        BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up.
1145
        This definition is only for humans, and has little meaning otherwise.
1146
        If we wanted sub-byte precision, fee calculation should use transaction
1147
        weights, but for simplicity we approximate that with (virtual_size)x4
1148
        """
1149
        weight = self.estimated_weight()
5✔
1150
        return self.virtual_size_from_weight(weight)
5✔
1151

1152
    @classmethod
5✔
1153
    def estimated_input_weight(cls, txin: TxInput, is_segwit_tx: bool) -> int:
5✔
1154
        '''Return an estimate of serialized input weight in weight units.'''
1155
        script_sig = cls.input_script(txin, estimate_size=True)
5✔
1156
        input_size = len(txin.serialize_to_network(script_sig=script_sig))
5✔
1157

1158
        if txin.is_segwit(guess_for_address=True):
5✔
1159
            witness_size = len(cls.serialize_witness(txin, estimate_size=True))
5✔
1160
        else:
1161
            witness_size = 1 if is_segwit_tx else 0
5✔
1162

1163
        return 4 * input_size + witness_size
5✔
1164

1165
    @classmethod
5✔
1166
    def estimated_output_size_for_address(cls, address: str) -> int:
5✔
1167
        """Return an estimate of serialized output size in bytes."""
1168
        script = bitcoin.address_to_script(address)
5✔
1169
        return cls.estimated_output_size_for_script(script)
5✔
1170

1171
    @classmethod
5✔
1172
    def estimated_output_size_for_script(cls, script: bytes) -> int:
5✔
1173
        """Return an estimate of serialized output size in bytes."""
1174
        # 8 byte value + varint script len + script
1175
        script_len = len(script)
5✔
1176
        var_int_len = len(var_int(script_len))
5✔
1177
        return 8 + var_int_len + script_len
5✔
1178

1179
    @classmethod
5✔
1180
    def virtual_size_from_weight(cls, weight: int) -> int:
5✔
1181
        return weight // 4 + (weight % 4 > 0)
5✔
1182

1183
    @classmethod
5✔
1184
    def satperbyte_from_satperkw(cls, feerate_kw):
5✔
1185
        """Converts feerate from sat/kw to sat/vbyte."""
UNCOV
1186
        return feerate_kw * 4 / 1000
×
1187

1188
    def estimated_total_size(self):
5✔
1189
        """Return an estimated total transaction size in bytes."""
1190
        if not self.is_complete() or self._cached_network_ser is None:
5✔
1191
            return len(self.serialize_to_network(estimate_size=True)) // 2
5✔
1192
        else:
1193
            return len(self._cached_network_ser) // 2  # ASCII hex string
5✔
1194

1195
    def estimated_witness_size(self):
5✔
1196
        """Return an estimate of witness size in bytes."""
1197
        estimate = not self.is_complete()
5✔
1198
        if not self.is_segwit(guess_for_address=estimate):
5✔
1199
            return 0
5✔
1200
        inputs = self.inputs()
5✔
1201
        witness = b"".join(self.serialize_witness(x, estimate_size=estimate) for x in inputs)
5✔
1202
        witness_size = len(witness) + 2  # include marker and flag
5✔
1203
        return witness_size
5✔
1204

1205
    def estimated_base_size(self):
5✔
1206
        """Return an estimated base transaction size in bytes."""
1207
        return self.estimated_total_size() - self.estimated_witness_size()
5✔
1208

1209
    def estimated_weight(self):
5✔
1210
        """Return an estimate of transaction weight."""
1211
        total_tx_size = self.estimated_total_size()
5✔
1212
        base_tx_size = self.estimated_base_size()
5✔
1213
        return 3 * base_tx_size + total_tx_size
5✔
1214

1215
    def is_complete(self) -> bool:
5✔
1216
        return True
5✔
1217

1218
    def get_output_idxs_from_scriptpubkey(self, script: bytes) -> Set[int]:
5✔
1219
        """Returns the set indices of outputs with given script."""
1220
        assert isinstance(script, bytes)
5✔
1221
        # build cache if there isn't one yet
1222
        # note: can become stale and return incorrect data
1223
        #       if the tx is modified later; that's out of scope.
1224
        if not hasattr(self, '_script_to_output_idx'):
5✔
1225
            d = defaultdict(set)
5✔
1226
            for output_idx, o in enumerate(self.outputs()):
5✔
1227
                o_script = o.scriptpubkey
5✔
1228
                d[o_script].add(output_idx)
5✔
1229
            self._script_to_output_idx = d
5✔
1230
        return set(self._script_to_output_idx[script])  # copy
5✔
1231

1232
    def get_output_idxs_from_address(self, addr: str) -> Set[int]:
5✔
1233
        script = bitcoin.address_to_script(addr)
5✔
1234
        return self.get_output_idxs_from_scriptpubkey(script)
5✔
1235

1236
    def replace_output_address(self, old_address: str, new_address: str) -> None:
5✔
UNCOV
1237
        idx = list(self.get_output_idxs_from_address(old_address))
×
1238
        assert len(idx) == 1
×
1239
        amount = self._outputs[idx[0]].value
×
1240
        funding_output = PartialTxOutput.from_address_and_value(new_address, amount)
×
1241
        old_output = PartialTxOutput.from_address_and_value(old_address, amount)
×
1242
        self._outputs.remove(old_output)
×
1243
        self.add_outputs([funding_output])
×
1244
        delattr(self, '_script_to_output_idx')
×
1245

1246
    def get_change_outputs(self):
5✔
UNCOV
1247
        return  [o for o in self._outputs if o.is_change]
×
1248

1249
    def has_dummy_output(self, dummy_addr: str) -> bool:
5✔
UNCOV
1250
        return len(self.get_output_idxs_from_address(dummy_addr)) == 1
×
1251

1252
    def output_value_for_address(self, addr):
5✔
1253
        # assumes exactly one output has that address
1254
        for o in self.outputs():
5✔
1255
            if o.address == addr:
5✔
1256
                return o.value
5✔
1257
        else:
UNCOV
1258
            raise Exception('output not found', addr)
×
1259

1260
    def input_value(self) -> int:
5✔
1261
        input_values = [txin.value_sats() for txin in self.inputs()]
5✔
1262
        if any([val is None for val in input_values]):
5✔
UNCOV
1263
            raise MissingTxInputAmount()
×
1264
        return sum(input_values)
5✔
1265

1266
    def output_value(self) -> int:
5✔
1267
        return sum(o.value for o in self.outputs())
5✔
1268

1269
    def get_fee(self) -> Optional[int]:
5✔
1270
        try:
5✔
1271
            return self.input_value() - self.output_value()
5✔
UNCOV
1272
        except MissingTxInputAmount:
×
UNCOV
1273
            return None
×
1274

1275
    def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]:
5✔
1276
        # build cache if there isn't one yet
1277
        # note: can become stale and return incorrect data
1278
        #       if the tx is modified later; that's out of scope.
1279
        if not hasattr(self, '_prevout_to_input_idx'):
×
UNCOV
1280
            d = {}  # type: Dict[TxOutpoint, int]
×
UNCOV
1281
            for i, txin in enumerate(self.inputs()):
×
UNCOV
1282
                d[txin.prevout] = i
×
UNCOV
1283
            self._prevout_to_input_idx = d
×
UNCOV
1284
        idx = self._prevout_to_input_idx.get(prevout)
×
1285
        if idx is not None:
×
1286
            assert self.inputs()[idx].prevout == prevout
×
1287
        return idx
×
1288

1289

1290
def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
5✔
1291
    """Sanitizes tx-describing input (hex/base43/base64) into
1292
    raw tx hex string."""
1293
    if not raw:
5✔
UNCOV
1294
        raise ValueError("empty string")
×
1295
    raw_unstripped = raw
5✔
1296
    raw = raw.strip()
5✔
1297
    # try hex
1298
    try:
5✔
1299
        return binascii.unhexlify(raw).hex()
5✔
1300
    except Exception:
5✔
1301
        pass
5✔
1302
    # try base43
1303
    try:
5✔
1304
        return base_decode(raw, base=43).hex()
5✔
1305
    except Exception:
5✔
1306
        pass
5✔
1307
    # try base64
1308
    if raw[0:6] in ('cHNidP', b'cHNidP'):  # base64 psbt
5✔
1309
        try:
5✔
1310
            return base64.b64decode(raw).hex()
5✔
UNCOV
1311
        except Exception:
×
UNCOV
1312
            pass
×
1313
    # raw bytes (do not strip whitespaces in this case)
1314
    if isinstance(raw_unstripped, bytes):
5✔
1315
        return raw_unstripped.hex()
5✔
UNCOV
1316
    raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...")
×
1317

1318

1319
def tx_from_any(raw: Union[str, bytes], *,
5✔
1320
                deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']:
1321
    if isinstance(raw, bytearray):
5✔
1322
        raw = bytes(raw)
×
1323
    raw = convert_raw_tx_to_hex(raw)
5✔
1324
    try:
5✔
1325
        return PartialTransaction.from_raw_psbt(raw)
5✔
1326
    except BadHeaderMagic:
5✔
1327
        if raw[:10] == b'EPTF\xff'.hex():
5✔
1328
            raise SerializationError("Partial transactions generated with old Electrum versions "
×
1329
                                     "(< 4.0) are no longer supported. Please upgrade Electrum on "
1330
                                     "the other machine where this transaction was created.")
1331
    try:
5✔
1332
        tx = Transaction(raw)
5✔
1333
        if deserialize:
5✔
1334
            tx.deserialize()
5✔
1335
        return tx
5✔
1336
    except Exception as e:
5✔
1337
        raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. "
5✔
1338
                                 f"raw: {raw[:30]}...") from e
1339

1340

1341
class PSBTGlobalType(IntEnum):
5✔
1342
    UNSIGNED_TX = 0
5✔
1343
    XPUB = 1
5✔
1344
    VERSION = 0xFB
5✔
1345

1346

1347
class PSBTInputType(IntEnum):
5✔
1348
    NON_WITNESS_UTXO = 0
5✔
1349
    WITNESS_UTXO = 1
5✔
1350
    PARTIAL_SIG = 2
5✔
1351
    SIGHASH_TYPE = 3
5✔
1352
    REDEEM_SCRIPT = 4
5✔
1353
    WITNESS_SCRIPT = 5
5✔
1354
    BIP32_DERIVATION = 6
5✔
1355
    FINAL_SCRIPTSIG = 7
5✔
1356
    FINAL_SCRIPTWITNESS = 8
5✔
1357
    TAP_KEY_SIG = 0x13
5✔
1358
    TAP_MERKLE_ROOT = 0x18
5✔
1359
    SLIP19_OWNERSHIP_PROOF = 0x19
5✔
1360

1361

1362
class PSBTOutputType(IntEnum):
5✔
1363
    REDEEM_SCRIPT = 0
5✔
1364
    WITNESS_SCRIPT = 1
5✔
1365
    BIP32_DERIVATION = 2
5✔
1366

1367

1368
# Serialization/deserialization tools
1369
def deser_compact_size(f) -> Optional[int]:
5✔
1370
    # note: ~inverse of bitcoin.var_int
1371
    try:
5✔
1372
        nit = f.read(1)[0]
5✔
1373
    except IndexError:
5✔
1374
        return None     # end of file
5✔
1375

1376
    if nit == 253:
5✔
1377
        nit = struct.unpack("<H", f.read(2))[0]
5✔
1378
    elif nit == 254:
5✔
UNCOV
1379
        nit = struct.unpack("<I", f.read(4))[0]
×
1380
    elif nit == 255:
5✔
UNCOV
1381
        nit = struct.unpack("<Q", f.read(8))[0]
×
1382
    return nit
5✔
1383

1384

1385
class PSBTSection:
5✔
1386

1387
    def _populate_psbt_fields_from_fd(self, fd=None):
5✔
1388
        if not fd: return
5✔
1389

1390
        while True:
5✔
1391
            try:
5✔
1392
                key_type, key, val = self.get_next_kv_from_fd(fd)
5✔
1393
            except StopIteration:
5✔
1394
                break
5✔
1395
            self.parse_psbt_section_kv(key_type, key, val)
5✔
1396

1397
    @classmethod
5✔
1398
    def get_next_kv_from_fd(cls, fd) -> Tuple[int, bytes, bytes]:
5✔
1399
        key_size = deser_compact_size(fd)
5✔
1400
        if key_size == 0:
5✔
1401
            raise StopIteration()
5✔
1402
        if key_size is None:
5✔
1403
            raise UnexpectedEndOfStream()
5✔
1404

1405
        full_key = fd.read(key_size)
5✔
1406
        key_type, key = cls.get_keytype_and_key_from_fullkey(full_key)
5✔
1407

1408
        val_size = deser_compact_size(fd)
5✔
1409
        if val_size is None: raise UnexpectedEndOfStream()
5✔
1410
        val = fd.read(val_size)
5✔
1411

1412
        return key_type, key, val
5✔
1413

1414
    @classmethod
5✔
1415
    def create_psbt_writer(cls, fd):
5✔
1416
        def wr(key_type: int, val: bytes, key: bytes = b''):
5✔
1417
            full_key = cls.get_fullkey_from_keytype_and_key(key_type, key)
5✔
1418
            fd.write(var_int(len(full_key)))  # key_size
5✔
1419
            fd.write(full_key)  # key
5✔
1420
            fd.write(var_int(len(val)))  # val_size
5✔
1421
            fd.write(val)  # val
5✔
1422
        return wr
5✔
1423

1424
    @classmethod
5✔
1425
    def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]:
5✔
1426
        with io.BytesIO(full_key) as key_stream:
5✔
1427
            key_type = deser_compact_size(key_stream)
5✔
1428
            if key_type is None: raise UnexpectedEndOfStream()
5✔
1429
            key = key_stream.read()
5✔
1430
        return key_type, key
5✔
1431

1432
    @classmethod
5✔
1433
    def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes:
5✔
1434
        key_type_bytes = var_int(key_type)
5✔
1435
        return key_type_bytes + key
5✔
1436

1437
    def _serialize_psbt_section(self, fd):
5✔
1438
        wr = self.create_psbt_writer(fd)
5✔
1439
        self.serialize_psbt_section_kvs(wr)
5✔
1440
        fd.write(b'\x00')  # section-separator
5✔
1441

1442
    def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None:
5✔
UNCOV
1443
        raise NotImplementedError()  # implemented by subclasses
×
1444

1445
    def serialize_psbt_section_kvs(self, wr) -> None:
5✔
UNCOV
1446
        raise NotImplementedError()  # implemented by subclasses
×
1447

1448

1449
class PartialTxInput(TxInput, PSBTSection):
5✔
1450
    def __init__(self, *args, **kwargs):
5✔
1451
        TxInput.__init__(self, *args, **kwargs)
5✔
1452
        self._witness_utxo = None  # type: Optional[TxOutput]
5✔
1453
        self.sigs_ecdsa = {}  # type: Dict[bytes, bytes]  # pubkey -> sig
5✔
1454
        self.tap_key_sig = None  # type: Optional[bytes]  # sig for taproot key-path-spending
5✔
1455
        self.sighash = None  # type: Optional[int]
5✔
1456
        self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
5✔
1457
        self.redeem_script = None  # type: Optional[bytes]
5✔
1458
        self.witness_script = None  # type: Optional[bytes]
5✔
1459
        self.tap_merkle_root = None  # type: Optional[bytes]
5✔
1460
        self.slip_19_ownership_proof = None  # type: Optional[bytes]
5✔
1461
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1462

1463
        self._script_descriptor = None  # type: Optional[Descriptor]
5✔
1464
        self.is_mine = False  # type: bool  # whether the wallet considers the input to be ismine
5✔
1465
        self._trusted_value_sats = None  # type: Optional[int]
5✔
1466
        self._trusted_address = None  # type: Optional[str]
5✔
1467
        self._is_p2sh_segwit = None  # type: Optional[bool]  # None means unknown
5✔
1468
        self._is_native_segwit = None  # type: Optional[bool]  # None means unknown
5✔
1469
        self._is_taproot = None  # type: Optional[bool]  # None means unknown
5✔
1470
        self.witness_sizehint = None  # type: Optional[int]  # byte size of serialized complete witness, for tx size est
5✔
1471

1472
    @property
5✔
1473
    def witness_utxo(self):
5✔
1474
        return self._witness_utxo
5✔
1475

1476
    @witness_utxo.setter
5✔
1477
    def witness_utxo(self, value: Optional[TxOutput]):
5✔
1478
        self.validate_data(witness_utxo=value)
5✔
1479
        self._witness_utxo = value
5✔
1480

1481
    @property
5✔
1482
    def pubkeys(self) -> Set[bytes]:
5✔
1483
        if desc := self.script_descriptor:
5✔
1484
            return desc.get_all_pubkeys()
5✔
1485
        return set()
5✔
1486

1487
    @property
5✔
1488
    def script_descriptor(self):
5✔
1489
        return self._script_descriptor
5✔
1490

1491
    @script_descriptor.setter
5✔
1492
    def script_descriptor(self, desc: Optional[Descriptor]):
5✔
1493
        self._script_descriptor = desc
5✔
1494
        if desc:
5✔
1495
            if self.redeem_script is None:
5✔
1496
                self.redeem_script = desc.expand().redeem_script
5✔
1497
            if self.witness_script is None:
5✔
1498
                self.witness_script = desc.expand().witness_script
5✔
1499

1500
    def to_json(self):
5✔
1501
        d = super().to_json()
5✔
1502
        d.update({
5✔
1503
            'height': self.block_height,
1504
            'value_sats': self.value_sats(),
1505
            'address': self.address,
1506
            'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
1507
            'utxo': str(self.utxo) if self.utxo else None,
1508
            'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None,
1509
            'sighash': self.sighash,
1510
            'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1511
            'witness_script': self.witness_script.hex() if self.witness_script else None,
1512
            'sigs_ecdsa': {pubkey.hex(): sig.hex() for pubkey, sig in self.sigs_ecdsa.items()},
1513
            'tap_key_sig': self.tap_key_sig.hex() if self.tap_key_sig else None,
1514
            'tap_merkle_root': self.tap_merkle_root.hex() if self.tap_merkle_root else None,
1515
            'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1516
                            for pubkey, (xfp, path) in self.bip32_paths.items()},
1517
            'slip_19_ownership_proof': self.slip_19_ownership_proof.hex() if self.slip_19_ownership_proof else None,
1518
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1519
        })
1520
        return d
5✔
1521

1522
    @classmethod
5✔
1523
    def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput':
5✔
1524
        # FIXME: if strip_witness is True, res.is_segwit() will return False,
1525
        # and res.estimated_size() will return an incorrect value. These methods
1526
        # will return the correct values after we call add_input_info(). (see dscancel and bump_fee)
1527
        # This is very fragile: the value returned by estimate_size() depends on the calling order.
1528
        res = PartialTxInput(prevout=txin.prevout,
5✔
1529
                             script_sig=None if strip_witness else txin.script_sig,
1530
                             nsequence=txin.nsequence,
1531
                             witness=None if strip_witness else txin.witness,
1532
                             is_coinbase_output=txin.is_coinbase_output())
1533
        res.utxo = txin.utxo
5✔
1534
        return res
5✔
1535

1536
    def validate_data(
5✔
1537
        self,
1538
        *,
1539
        for_signing=False,
1540
        # allow passing provisional fields for 'self', before setting them:
1541
        utxo: Optional[Transaction] = None,
1542
        witness_utxo: Optional[TxOutput] = None,
1543
    ) -> None:
1544
        utxo = utxo or self.utxo
5✔
1545
        witness_utxo = witness_utxo or self.witness_utxo
5✔
1546
        if utxo:
5✔
1547
            if self.prevout.txid.hex() != utxo.txid():
5✔
UNCOV
1548
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
1549
                                                  f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
1550
            if witness_utxo:
5✔
1551
                if utxo.outputs()[self.prevout.out_idx] != witness_utxo:
5✔
1552
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1553
                                                      f"If both non-witness UTXO and witness UTXO are provided, they must be consistent")
1554
        # The following test is disabled, so we are willing to sign non-segwit inputs
1555
        # without verifying the input amount. This means, given a maliciously modified PSBT,
1556
        # for non-segwit inputs, we might end up burning coins as miner fees.
1557
        if for_signing and False:
5✔
1558
            if not self.is_segwit() and witness_utxo:
1559
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1560
                                                  f"If a witness UTXO is provided, no non-witness signature may be created")
1561
        if self.redeem_script and self.address:
5✔
1562
            addr = hash160_to_p2sh(hash_160(self.redeem_script))
5✔
1563
            if self.address != addr:
5✔
1564
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1565
                                                  f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript")
1566
        if self.witness_script:
5✔
1567
            if self.redeem_script:
5✔
1568
                if self.redeem_script != bitcoin.p2wsh_nested_script(self.witness_script):
5✔
1569
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1570
                                                      f"If a witnessScript is provided, the redeemScript must be for that witnessScript")
1571
            elif self.address:
5✔
1572
                if self.address != bitcoin.script_to_p2wsh(self.witness_script):
5✔
UNCOV
1573
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
1574
                                                      f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript")
1575

1576
    def parse_psbt_section_kv(self, kt, key, val):
5✔
1577
        try:
5✔
1578
            kt = PSBTInputType(kt)
5✔
1579
        except ValueError:
5✔
1580
            pass  # unknown type
5✔
1581
        if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
1582
        if kt == PSBTInputType.NON_WITNESS_UTXO:
5✔
1583
            if self.utxo is not None:
5✔
1584
                raise SerializationError(f"duplicate key: {repr(kt)}")
5✔
1585
            self.utxo = Transaction(val)
5✔
1586
            self.utxo.deserialize()
5✔
1587
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1588
        elif kt == PSBTInputType.WITNESS_UTXO:
5✔
1589
            if self.witness_utxo is not None:
5✔
UNCOV
1590
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1591
            self.witness_utxo = TxOutput.from_network_bytes(val)
5✔
1592
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1593
        elif kt == PSBTInputType.PARTIAL_SIG:
5✔
1594
            if key in self.sigs_ecdsa:
5✔
UNCOV
1595
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1596
            if len(key) not in (33, 65):
5✔
1597
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1598
            self.sigs_ecdsa[key] = val
5✔
1599
        elif kt == PSBTInputType.TAP_KEY_SIG:
5✔
UNCOV
1600
            if self.tap_key_sig is not None:
×
1601
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
UNCOV
1602
            if len(val) not in (64, 65):
×
UNCOV
1603
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
×
UNCOV
1604
            self.tap_key_sig = val
×
UNCOV
1605
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1606
        elif kt == PSBTInputType.TAP_MERKLE_ROOT:
5✔
1607
            if self.tap_merkle_root is not None:
×
1608
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1609
            if len(val) != 32:
×
1610
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
×
1611
            self.tap_merkle_root = val
×
UNCOV
1612
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1613
        elif kt == PSBTInputType.SIGHASH_TYPE:
5✔
1614
            if self.sighash is not None:
5✔
1615
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1616
            if len(val) != 4:
5✔
1617
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
5✔
1618
            self.sighash = struct.unpack("<I", val)[0]
5✔
1619
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1620
        elif kt == PSBTInputType.BIP32_DERIVATION:
5✔
1621
            if key in self.bip32_paths:
5✔
UNCOV
1622
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1623
            if len(key) not in (33, 65):
5✔
1624
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1625
            self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
1626
        elif kt == PSBTInputType.REDEEM_SCRIPT:
5✔
1627
            if self.redeem_script is not None:
5✔
1628
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1629
            self.redeem_script = val
5✔
1630
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1631
        elif kt == PSBTInputType.WITNESS_SCRIPT:
5✔
1632
            if self.witness_script is not None:
5✔
UNCOV
1633
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1634
            self.witness_script = val
5✔
1635
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1636
        elif kt == PSBTInputType.FINAL_SCRIPTSIG:
5✔
1637
            if self.script_sig is not None:
5✔
UNCOV
1638
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1639
            self.script_sig = val
5✔
1640
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1641
        elif kt == PSBTInputType.FINAL_SCRIPTWITNESS:
5✔
1642
            if self.witness is not None:
5✔
UNCOV
1643
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1644
            self.witness = val
5✔
1645
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1646
        elif kt == PSBTInputType.SLIP19_OWNERSHIP_PROOF:
5✔
UNCOV
1647
            if self.slip_19_ownership_proof is not None:
×
UNCOV
1648
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1649
            self.slip_19_ownership_proof = val
×
UNCOV
1650
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1651
        else:
1652
            full_key = self.get_fullkey_from_keytype_and_key(kt, key)
5✔
1653
            if full_key in self._unknown:
5✔
1654
                raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}')
×
1655
            self._unknown[full_key] = val
5✔
1656

1657
    def serialize_psbt_section_kvs(self, wr):
5✔
1658
        if self.witness_utxo:
5✔
1659
            wr(PSBTInputType.WITNESS_UTXO, self.witness_utxo.serialize_to_network())
5✔
1660
        if self.utxo:
5✔
1661
            wr(PSBTInputType.NON_WITNESS_UTXO, bfh(self.utxo.serialize_to_network(include_sigs=True)))
5✔
1662
        for pk, val in sorted(self.sigs_ecdsa.items()):
5✔
1663
            wr(PSBTInputType.PARTIAL_SIG, val, pk)
5✔
1664
        if self.tap_key_sig is not None:
5✔
UNCOV
1665
            wr(PSBTInputType.TAP_KEY_SIG, self.tap_key_sig)
×
1666
        if self.tap_merkle_root is not None:
5✔
UNCOV
1667
            wr(PSBTInputType.TAP_MERKLE_ROOT, self.tap_merkle_root)
×
1668
        if self.sighash is not None:
5✔
1669
            wr(PSBTInputType.SIGHASH_TYPE, struct.pack('<I', self.sighash))
5✔
1670
        if self.redeem_script is not None:
5✔
1671
            wr(PSBTInputType.REDEEM_SCRIPT, self.redeem_script)
5✔
1672
        if self.witness_script is not None:
5✔
1673
            wr(PSBTInputType.WITNESS_SCRIPT, self.witness_script)
5✔
1674
        for k in sorted(self.bip32_paths):
5✔
1675
            packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
5✔
1676
            wr(PSBTInputType.BIP32_DERIVATION, packed_path, k)
5✔
1677
        if self.script_sig is not None:
5✔
1678
            wr(PSBTInputType.FINAL_SCRIPTSIG, self.script_sig)
5✔
1679
        if self.witness is not None:
5✔
1680
            wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness)
5✔
1681
        if self.slip_19_ownership_proof:
5✔
UNCOV
1682
            wr(PSBTInputType.SLIP19_OWNERSHIP_PROOF, self.slip_19_ownership_proof)
×
1683
        for full_key, val in sorted(self._unknown.items()):
5✔
1684
            key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
5✔
1685
            wr(key_type, val, key=key)
5✔
1686

1687
    def value_sats(self) -> Optional[int]:
5✔
1688
        if (val := super().value_sats()) is not None:
5✔
1689
            return val
5✔
1690
        if self._trusted_value_sats is not None:
5✔
1691
            return self._trusted_value_sats
5✔
1692
        if self.witness_utxo:
5✔
1693
            return self.witness_utxo.value
5✔
UNCOV
1694
        return None
×
1695

1696
    @property
5✔
1697
    def address(self) -> Optional[str]:
5✔
1698
        if (addr := super().address) is not None:
5✔
1699
            return addr
5✔
1700
        if self._trusted_address is not None:
5✔
1701
            return self._trusted_address
5✔
1702
        if self.witness_utxo:
5✔
1703
            return self.witness_utxo.address
5✔
1704
        return None
5✔
1705

1706
    @property
5✔
1707
    def scriptpubkey(self) -> Optional[bytes]:
5✔
1708
        if (spk := super().scriptpubkey) is not None:
5✔
1709
            return spk
5✔
1710
        if self._trusted_address is not None:
5✔
UNCOV
1711
            return bitcoin.address_to_script(self._trusted_address)
×
1712
        if self.witness_utxo:
5✔
1713
            return self.witness_utxo.scriptpubkey
5✔
UNCOV
1714
        return None
×
1715

1716
    def is_complete(self) -> bool:
5✔
1717
        if self.script_sig is not None and self.witness is not None:
5✔
1718
            return True
5✔
1719
        if self.is_coinbase_input():
5✔
1720
            return True
×
1721
        if self.script_sig is not None and not self.is_segwit():
5✔
1722
            return True
5✔
1723
        if desc := self.script_descriptor:
5✔
1724
            try:
5✔
1725
                desc.satisfy(allow_dummy=False, sigdata=self.sigs_ecdsa)
5✔
1726
            except MissingSolutionPiece:
5✔
1727
                pass
5✔
1728
            else:
1729
                return True
5✔
1730
        return False
5✔
1731

1732
    def get_satisfaction_progress(self) -> Tuple[int, int]:
5✔
1733
        if desc := self.script_descriptor:
5✔
1734
            return desc.get_satisfaction_progress(sigdata=self.sigs_ecdsa)
5✔
UNCOV
1735
        return 0, 0
×
1736

1737
    def finalize(self) -> None:
5✔
1738
        def clear_fields_when_finalized():
5✔
1739
            # BIP-174: "All other data except the UTXO and unknown fields in the
1740
            #           input key-value map should be cleared from the PSBT"
1741
            self.sigs_ecdsa = {}
5✔
1742
            self.tap_key_sig = None
5✔
1743
            self.tap_merkle_root = None
5✔
1744
            self.sighash = None
5✔
1745
            self.bip32_paths = {}
5✔
1746
            self.redeem_script = None
5✔
1747
            # FIXME: side effect interfers with make_witness
1748
            # self.witness_script = None
1749

1750
        if self.script_sig is not None and self.witness is not None:
5✔
1751
            clear_fields_when_finalized()
5✔
1752
            return  # already finalized
5✔
1753
        if self.is_complete():
5✔
1754
            self.script_sig = Transaction.input_script(self)
5✔
1755
            self.witness = Transaction.serialize_witness(self)
5✔
1756
            clear_fields_when_finalized()
5✔
1757

1758
    def combine_with_other_txin(self, other_txin: 'TxInput') -> None:
5✔
1759
        assert self.prevout == other_txin.prevout
5✔
1760
        if other_txin.script_sig is not None:
5✔
1761
            self.script_sig = other_txin.script_sig
5✔
1762
        if other_txin.witness is not None:
5✔
1763
            self.witness = other_txin.witness
5✔
1764
        if isinstance(other_txin, PartialTxInput):
5✔
1765
            if other_txin.witness_utxo:
5✔
1766
                self.witness_utxo = other_txin.witness_utxo
5✔
1767
            if other_txin.utxo:
5✔
1768
                self.utxo = other_txin.utxo
5✔
1769
            self.sigs_ecdsa.update(other_txin.sigs_ecdsa)
5✔
1770
            if other_txin.sighash is not None:
5✔
1771
                self.sighash = other_txin.sighash
5✔
1772
            if other_txin.tap_key_sig is not None:
5✔
UNCOV
1773
                self.tap_key_sig = other_txin.tap_key_sig
×
1774
            if other_txin.tap_merkle_root is not None:
5✔
UNCOV
1775
                self.tap_merkle_root = other_txin.tap_merkle_root
×
1776
            self.bip32_paths.update(other_txin.bip32_paths)
5✔
1777
            if other_txin.redeem_script is not None:
5✔
1778
                self.redeem_script = other_txin.redeem_script
5✔
1779
            if other_txin.witness_script is not None:
5✔
1780
                self.witness_script = other_txin.witness_script
5✔
1781
            self._unknown.update(other_txin._unknown)
5✔
1782
        self.validate_data()
5✔
1783
        # try to finalize now
1784
        self.finalize()
5✔
1785

1786
    def convert_utxo_to_witness_utxo(self) -> None:
5✔
1787
        if self.utxo:
5✔
1788
            self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx]
5✔
1789
            self._utxo = None  # type: Optional[Transaction]
5✔
1790

1791
    def is_native_segwit(self) -> Optional[bool]:
5✔
1792
        """Whether this input is native segwit (any witness version). None means inconclusive."""
1793
        if self._is_native_segwit is None:
5✔
1794
            if self.address:
5✔
1795
                self._is_native_segwit = bitcoin.is_segwit_address(self.address)
5✔
1796
        return self._is_native_segwit
5✔
1797

1798
    def is_p2sh_segwit(self) -> Optional[bool]:
5✔
1799
        """Whether this input is p2sh-embedded-segwit. None means inconclusive."""
1800
        if self._is_p2sh_segwit is None:
5✔
1801
            def calc_if_p2sh_segwit_now():
5✔
1802
                if not (self.address and self.redeem_script):
5✔
1803
                    return None
5✔
1804
                if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)):
5✔
1805
                    # not p2sh address
UNCOV
1806
                    return False
×
1807
                try:
5✔
1808
                    decoded = [x for x in script_GetOp(self.redeem_script)]
5✔
UNCOV
1809
                except MalformedBitcoinScript:
×
UNCOV
1810
                    decoded = None
×
1811
                # witness version 0
1812
                if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
1813
                    return True
5✔
1814
                # witness version 1-16
1815
                future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
5✔
1816
                for witver, opcode in enumerate(future_witness_versions, start=1):
5✔
1817
                    match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
5✔
1818
                    if match_script_against_template(decoded, match):
5✔
UNCOV
1819
                        return True
×
1820
                return False
5✔
1821

1822
            self._is_p2sh_segwit = calc_if_p2sh_segwit_now()
5✔
1823
        return self._is_p2sh_segwit
5✔
1824

1825
    def is_segwit(self, *, guess_for_address=False) -> bool:
5✔
1826
        """Whether this input is segwit (any witness version)."""
1827
        if super().is_segwit():
5✔
1828
            return True
5✔
1829
        if self.is_native_segwit() or self.is_p2sh_segwit():
5✔
1830
            return True
5✔
1831
        if self.is_native_segwit() is False and self.is_p2sh_segwit() is False:
5✔
1832
            return False
5✔
1833
        if self.witness_script:
5✔
1834
            return True
5✔
1835
        if desc := self.script_descriptor:
5✔
1836
            return desc.is_segwit()
5✔
1837
        if guess_for_address:
5✔
1838
            dummy_desc = create_dummy_descriptor_from_address(self.address)
5✔
1839
            return dummy_desc.is_segwit()
5✔
1840
        return False  # can be false-negative
5✔
1841

1842
    def is_taproot(self) -> bool:
5✔
1843
        if self._is_taproot is None:
5✔
1844
            if self.address:
5✔
1845
                self._is_taproot = bitcoin.is_taproot_address(self.address)
5✔
1846
        if desc := self.script_descriptor:
5✔
1847
            return desc.is_taproot()
5✔
1848
        return self._is_taproot
5✔
1849

1850
    def already_has_some_signatures(self) -> bool:
5✔
1851
        """Returns whether progress has been made towards completing this input."""
1852
        return (self.sigs_ecdsa
5✔
1853
                or self.tap_key_sig is not None
1854
                or self.script_sig is not None
1855
                or self.witness is not None)
1856

1857

1858
class PartialTxOutput(TxOutput, PSBTSection):
5✔
1859
    def __init__(self, *args, **kwargs):
5✔
1860
        TxOutput.__init__(self, *args, **kwargs)
5✔
1861
        self.redeem_script = None  # type: Optional[bytes]
5✔
1862
        self.witness_script = None  # type: Optional[bytes]
5✔
1863
        self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
5✔
1864
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1865

1866
        self._script_descriptor = None  # type: Optional[Descriptor]
5✔
1867
        self.is_mine = False  # type: bool  # whether the wallet considers the output to be ismine
5✔
1868
        self.is_change = False  # type: bool  # whether the wallet considers the output to be change
5✔
1869

1870
    @property
5✔
1871
    def pubkeys(self) -> Set[bytes]:
5✔
UNCOV
1872
        if desc := self.script_descriptor:
×
UNCOV
1873
            return desc.get_all_pubkeys()
×
UNCOV
1874
        return set()
×
1875

1876
    @property
5✔
1877
    def script_descriptor(self):
5✔
1878
        return self._script_descriptor
5✔
1879

1880
    @script_descriptor.setter
5✔
1881
    def script_descriptor(self, desc: Optional[Descriptor]):
5✔
1882
        self._script_descriptor = desc
5✔
1883
        if desc:
5✔
1884
            if self.redeem_script is None:
5✔
1885
                self.redeem_script = desc.expand().redeem_script
5✔
1886
            if self.witness_script is None:
5✔
1887
                self.witness_script = desc.expand().witness_script
5✔
1888

1889
    def to_json(self):
5✔
1890
        d = super().to_json()
5✔
1891
        d.update({
5✔
1892
            'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
1893
            'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1894
            'witness_script': self.witness_script.hex() if self.witness_script else None,
1895
            'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1896
                            for pubkey, (xfp, path) in self.bip32_paths.items()},
1897
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1898
        })
1899
        return d
5✔
1900

1901
    @classmethod
5✔
1902
    def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput':
5✔
1903
        res = PartialTxOutput(scriptpubkey=txout.scriptpubkey,
5✔
1904
                              value=txout.value)
1905
        return res
5✔
1906

1907
    def parse_psbt_section_kv(self, kt, key, val):
5✔
1908
        try:
5✔
1909
            kt = PSBTOutputType(kt)
5✔
1910
        except ValueError:
5✔
1911
            pass  # unknown type
5✔
1912
        if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
1913
        if kt == PSBTOutputType.REDEEM_SCRIPT:
5✔
1914
            if self.redeem_script is not None:
5✔
UNCOV
1915
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1916
            self.redeem_script = val
5✔
1917
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1918
        elif kt == PSBTOutputType.WITNESS_SCRIPT:
5✔
1919
            if self.witness_script is not None:
5✔
UNCOV
1920
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1921
            self.witness_script = val
5✔
1922
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1923
        elif kt == PSBTOutputType.BIP32_DERIVATION:
5✔
1924
            if key in self.bip32_paths:
5✔
UNCOV
1925
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1926
            if len(key) not in (33, 65):
5✔
1927
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1928
            self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
1929
        else:
1930
            full_key = self.get_fullkey_from_keytype_and_key(kt, key)
5✔
1931
            if full_key in self._unknown:
5✔
UNCOV
1932
                raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}')
×
1933
            self._unknown[full_key] = val
5✔
1934

1935
    def serialize_psbt_section_kvs(self, wr):
5✔
1936
        if self.redeem_script is not None:
5✔
1937
            wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script)
5✔
1938
        if self.witness_script is not None:
5✔
1939
            wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script)
5✔
1940
        for k in sorted(self.bip32_paths):
5✔
1941
            packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
5✔
1942
            wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k)
5✔
1943
        for full_key, val in sorted(self._unknown.items()):
5✔
1944
            key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
5✔
1945
            wr(key_type, val, key=key)
5✔
1946

1947
    def combine_with_other_txout(self, other_txout: 'TxOutput') -> None:
5✔
1948
        assert self.scriptpubkey == other_txout.scriptpubkey
5✔
1949
        if not isinstance(other_txout, PartialTxOutput):
5✔
UNCOV
1950
            return
×
1951
        if other_txout.redeem_script is not None:
5✔
1952
            self.redeem_script = other_txout.redeem_script
5✔
1953
        if other_txout.witness_script is not None:
5✔
1954
            self.witness_script = other_txout.witness_script
5✔
1955
        self.bip32_paths.update(other_txout.bip32_paths)
5✔
1956
        self._unknown.update(other_txout._unknown)
5✔
1957

1958

1959
class PartialTransaction(Transaction):
5✔
1960

1961
    def __init__(self):
5✔
1962
        Transaction.__init__(self, None)
5✔
1963
        self.xpubs = {}  # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]]  # intermediate bip32node -> (xfp, der_prefix)
5✔
1964
        self._inputs = []  # type: List[PartialTxInput]
5✔
1965
        self._outputs = []  # type: List[PartialTxOutput]
5✔
1966
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1967
        self.rbf_merge_txid = None
5✔
1968

1969
    def to_json(self) -> dict:
5✔
1970
        d = super().to_json()
5✔
1971
        d.update({
5✔
1972
            'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1973
                      for bip32node, (xfp, path) in self.xpubs.items()},
1974
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1975
        })
1976
        return d
5✔
1977

1978
    @classmethod
5✔
1979
    def from_tx(cls, tx: Transaction) -> 'PartialTransaction':
5✔
1980
        assert tx
5✔
1981
        res = cls()
5✔
1982
        res._inputs = [PartialTxInput.from_txin(txin, strip_witness=True)
5✔
1983
                       for txin in tx.inputs()]
1984
        res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()]
5✔
1985
        res.version = tx.version
5✔
1986
        res.locktime = tx.locktime
5✔
1987
        return res
5✔
1988

1989
    @classmethod
5✔
1990
    def from_raw_psbt(cls, raw) -> 'PartialTransaction':
5✔
1991
        # auto-detect and decode Base64 and Hex.
1992
        if raw[0:10].lower() in (b'70736274ff', '70736274ff'):  # hex
5✔
1993
            raw = bytes.fromhex(raw)
5✔
1994
        elif raw[0:6] in (b'cHNidP', 'cHNidP'):  # base64
5✔
1995
            raw = base64.b64decode(raw)
5✔
1996
        if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
5✔
1997
            raise BadHeaderMagic("bad magic")
5✔
1998

1999
        tx = None  # type: Optional[PartialTransaction]
5✔
2000

2001
        # We parse the raw stream twice. The first pass is used to find the
2002
        # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'.
2003
        # The second pass does everything else.
2004
        with io.BytesIO(raw[5:]) as fd:  # parsing "first pass"
5✔
2005
            while True:
5✔
2006
                try:
5✔
2007
                    kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
5✔
2008
                except StopIteration:
5✔
2009
                    break
5✔
2010
                try:
5✔
2011
                    kt = PSBTGlobalType(kt)
5✔
2012
                except ValueError:
5✔
2013
                    pass  # unknown type
5✔
2014
                if kt == PSBTGlobalType.UNSIGNED_TX:
5✔
2015
                    if tx is not None:
5✔
UNCOV
2016
                        raise SerializationError(f"duplicate key: {repr(kt)}")
×
2017
                    if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
2018
                    unsigned_tx = Transaction(val.hex())
5✔
2019
                    for txin in unsigned_tx.inputs():
5✔
2020
                        if txin.script_sig or txin.witness:
5✔
2021
                            raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses")
5✔
2022
                    tx = PartialTransaction.from_tx(unsigned_tx)
5✔
2023

2024
        if tx is None:
5✔
2025
            raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX")
5✔
2026

2027
        with io.BytesIO(raw[5:]) as fd:  # parsing "second pass"
5✔
2028
            # global section
2029
            while True:
5✔
2030
                try:
5✔
2031
                    kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
5✔
2032
                except StopIteration:
5✔
2033
                    break
5✔
2034
                try:
5✔
2035
                    kt = PSBTGlobalType(kt)
5✔
2036
                except ValueError:
5✔
2037
                    pass  # unknown type
5✔
2038
                if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
2039
                if kt == PSBTGlobalType.UNSIGNED_TX:
5✔
2040
                    pass  # already handled during "first" parsing pass
5✔
2041
                elif kt == PSBTGlobalType.XPUB:
5✔
2042
                    bip32node = BIP32Node.from_bytes(key)
5✔
2043
                    if bip32node in tx.xpubs:
5✔
UNCOV
2044
                        raise SerializationError(f"duplicate key: {repr(kt)}")
×
2045
                    xfp, path = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
2046
                    if bip32node.depth != len(path):
5✔
UNCOV
2047
                        raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) "
×
2048
                                                 f"and derivation prefix len ({len(path)})")
2049
                    child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big')
5✔
2050
                    if not ((bip32node.depth == 0 and child_number_of_xpub == 0)
5✔
2051
                            or (bip32node.depth != 0 and child_number_of_xpub == path[-1])):
UNCOV
2052
                        raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix")
×
2053
                    tx.xpubs[bip32node] = xfp, path
5✔
2054
                elif kt == PSBTGlobalType.VERSION:
5✔
UNCOV
2055
                    if len(val) > 4:
×
UNCOV
2056
                        raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4")
×
UNCOV
2057
                    psbt_version = int.from_bytes(val, byteorder='little', signed=False)
×
2058
                    if psbt_version > 0:
×
UNCOV
2059
                        raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}")
×
UNCOV
2060
                    if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
2061
                else:
2062
                    full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key)
5✔
2063
                    if full_key in tx._unknown:
5✔
2064
                        raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}')
×
2065
                    tx._unknown[full_key] = val
5✔
2066
            try:
5✔
2067
                # inputs sections
2068
                for txin in tx.inputs():
5✔
2069
                    if DEBUG_PSBT_PARSING: print("-> new input starts")
5✔
2070
                    txin._populate_psbt_fields_from_fd(fd)
5✔
2071
                # outputs sections
2072
                for txout in tx.outputs():
5✔
2073
                    if DEBUG_PSBT_PARSING: print("-> new output starts")
5✔
2074
                    txout._populate_psbt_fields_from_fd(fd)
5✔
2075
            except UnexpectedEndOfStream:
5✔
2076
                raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None
5✔
2077

2078
            if fd.read(1) != b'':
5✔
UNCOV
2079
                raise SerializationError("extra junk at the end of PSBT")
×
2080

2081
        for txin in tx.inputs():
5✔
2082
            txin.validate_data()
5✔
2083

2084
        return tx
5✔
2085

2086
    @classmethod
5✔
2087
    def from_io(cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *,
5✔
2088
                locktime: int = None, version: int = None, BIP69_sort: bool = True):
2089
        self = cls()
5✔
2090
        self._inputs = list(inputs)
5✔
2091
        self._outputs = list(outputs)
5✔
2092
        if locktime is not None:
5✔
2093
            self.locktime = locktime
5✔
2094
        if version is not None:
5✔
2095
            self.version = version
5✔
2096
        if BIP69_sort:
5✔
2097
            self.BIP69_sort()
5✔
2098
        return self
5✔
2099

2100
    def _serialize_psbt(self, fd) -> None:
5✔
2101
        wr = PSBTSection.create_psbt_writer(fd)
5✔
2102
        fd.write(b'psbt\xff')
5✔
2103
        # global section
2104
        wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False)))
5✔
2105
        for bip32node, (xfp, path) in sorted(self.xpubs.items()):
5✔
2106
            val = pack_bip32_root_fingerprint_and_int_path(xfp, path)
5✔
2107
            wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes())
5✔
2108
        for full_key, val in sorted(self._unknown.items()):
5✔
2109
            key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key)
5✔
2110
            wr(key_type, val, key=key)
5✔
2111
        fd.write(b'\x00')  # section-separator
5✔
2112
        # input sections
2113
        for inp in self._inputs:
5✔
2114
            inp._serialize_psbt_section(fd)
5✔
2115
        # output sections
2116
        for outp in self._outputs:
5✔
2117
            outp._serialize_psbt_section(fd)
5✔
2118

2119
    def finalize_psbt(self) -> None:
5✔
2120
        for txin in self.inputs():
5✔
2121
            txin.finalize()
5✔
2122

2123
    def combine_with_other_psbt(self, other_tx: 'Transaction') -> None:
5✔
2124
        """Pulls in all data from other_tx we don't yet have (e.g. signatures).
2125
        other_tx must be concerning the same unsigned tx.
2126
        """
2127
        if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False):
5✔
UNCOV
2128
            raise Exception('A Combiner must not combine two different PSBTs.')
×
2129
        # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs.
2130
        #           The Combiner must remove any duplicate key-value pairs, in accordance with the specification."
2131
        # global section
2132
        if isinstance(other_tx, PartialTransaction):
5✔
2133
            self.xpubs.update(other_tx.xpubs)
5✔
2134
            self._unknown.update(other_tx._unknown)
5✔
2135
        # input sections
2136
        for txin, other_txin in zip(self.inputs(), other_tx.inputs()):
5✔
2137
            txin.combine_with_other_txin(other_txin)
5✔
2138
        # output sections
2139
        for txout, other_txout in zip(self.outputs(), other_tx.outputs()):
5✔
2140
            txout.combine_with_other_txout(other_txout)
5✔
2141
        self.invalidate_ser_cache()
5✔
2142

2143
    def join_with_other_psbt(self, other_tx: 'PartialTransaction', *, config: 'SimpleConfig') -> None:
5✔
2144
        """Adds inputs and outputs from other_tx into this one."""
2145
        if not isinstance(other_tx, PartialTransaction):
5✔
UNCOV
2146
            raise Exception('Can only join partial transactions.')
×
2147
        # make sure there are no duplicate prevouts
2148
        prevouts = set()
5✔
2149
        for txin in itertools.chain(self.inputs(), other_tx.inputs()):
5✔
2150
            prevout_str = txin.prevout.to_str()
5✔
2151
            if prevout_str in prevouts:
5✔
2152
                raise Exception(f"Duplicate inputs! "
×
2153
                                f"Transactions that spend the same prevout cannot be joined.")
2154
            prevouts.add(prevout_str)
5✔
2155
        # copy global PSBT section
2156
        self.xpubs.update(other_tx.xpubs)
5✔
2157
        self._unknown.update(other_tx._unknown)
5✔
2158
        # copy and add inputs and outputs
2159
        self.add_inputs(list(other_tx.inputs()))
5✔
2160
        self.add_outputs(list(other_tx.outputs()), merge_duplicates=config.WALLET_MERGE_DUPLICATE_OUTPUTS)
5✔
2161
        self.remove_signatures()
5✔
2162
        self.invalidate_ser_cache()
5✔
2163

2164
    def inputs(self) -> Sequence[PartialTxInput]:
5✔
2165
        return self._inputs
5✔
2166

2167
    def outputs(self) -> Sequence[PartialTxOutput]:
5✔
2168
        return self._outputs
5✔
2169

2170
    def add_inputs(self, inputs: List[PartialTxInput], BIP69_sort=True) -> None:
5✔
2171
        self._inputs.extend(inputs)
5✔
2172
        if BIP69_sort:
5✔
2173
            self.BIP69_sort(outputs=False)
5✔
2174
        self.invalidate_ser_cache()
5✔
2175

2176
    def add_outputs(self, outputs: List[PartialTxOutput], *, merge_duplicates: bool = False, BIP69_sort: bool = True) -> None:
5✔
2177
        self._outputs.extend(outputs)
5✔
2178
        if merge_duplicates:
5✔
2179
            self._outputs = merge_duplicate_tx_outputs(self._outputs)
5✔
2180
        if BIP69_sort:
5✔
2181
            self.BIP69_sort(inputs=False)
5✔
2182
        self.invalidate_ser_cache()
5✔
2183

2184
    def set_rbf(self, rbf: bool) -> None:
5✔
2185
        nSequence = 0xffffffff - (2 if rbf else 1)
5✔
2186
        for txin in self.inputs():
5✔
2187
            txin.nsequence = nSequence
5✔
2188
        self.invalidate_ser_cache()
5✔
2189

2190
    def BIP69_sort(self, inputs=True, outputs=True):
5✔
2191
        # NOTE: other parts of the code rely on these sorts being *stable* sorts
2192
        if inputs:
5✔
2193
            self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx))
5✔
2194
        if outputs:
5✔
2195
            self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
5✔
2196
        self.invalidate_ser_cache()
5✔
2197

2198
    def serialize_preimage(
5✔
2199
        self,
2200
        txin_index: int,
2201
        *,
2202
        sighash_cache: SighashCache = None,
2203
    ) -> bytes:
2204
        nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True)
5✔
2205
        nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False)
5✔
2206
        inputs = self.inputs()
5✔
2207
        outputs = self.outputs()
5✔
2208
        txin = inputs[txin_index]
5✔
2209
        sighash = txin.sighash
5✔
2210
        if sighash is None:
5✔
2211
            sighash = Sighash.DEFAULT if txin.is_taproot() else Sighash.ALL
5✔
2212
        if not Sighash.is_valid(sighash, is_taproot=txin.is_taproot()):
5✔
UNCOV
2213
            raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!")
×
2214
        if sighash_cache is None:
5✔
2215
            sighash_cache = SighashCache()
5✔
2216
        if txin.is_segwit():
5✔
2217
            if txin.is_taproot():
5✔
2218
                scache = sighash_cache.get_witver1_data_for_tx(self)
5✔
2219
                sighash_epoch = b"\x00"
5✔
2220
                hash_type = int.to_bytes(sighash, length=1, byteorder="little", signed=False)
5✔
2221
                # txdata
2222
                preimage_txdata = bytearray()
5✔
2223
                preimage_txdata += nVersion
5✔
2224
                preimage_txdata += nLocktime
5✔
2225
                if sighash & 0x80 != Sighash.ANYONECANPAY:
5✔
2226
                    preimage_txdata += scache.sha_prevouts
5✔
2227
                    preimage_txdata += scache.sha_amounts
5✔
2228
                    preimage_txdata += scache.sha_scriptpubkeys
5✔
2229
                    preimage_txdata += scache.sha_sequences
5✔
2230
                if sighash & 3 not in (Sighash.NONE, Sighash.SINGLE):
5✔
2231
                    preimage_txdata += scache.sha_outputs
5✔
2232
                # inputdata
2233
                preimage_inputdata = bytearray()
5✔
2234
                spend_type = bytes([0])  # (ext_flag * 2) + annex_present
5✔
2235
                preimage_inputdata += spend_type
5✔
2236
                if sighash & 0x80 == Sighash.ANYONECANPAY:
5✔
2237
                    preimage_inputdata += txin.prevout.serialize_to_network()
5✔
2238
                    preimage_inputdata += int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
5✔
2239
                    preimage_inputdata += var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
5✔
2240
                    preimage_inputdata += int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
5✔
2241
                else:
2242
                    preimage_inputdata += int.to_bytes(txin_index, length=4, byteorder="little", signed=False)
5✔
2243
                # TODO sha_annex
2244
                # outputdata
2245
                preimage_outputdata = bytearray()
5✔
2246
                if sighash & 3 == Sighash.SINGLE:
5✔
2247
                    try:
5✔
2248
                        txout = outputs[txin_index]
5✔
UNCOV
2249
                    except IndexError:
×
UNCOV
2250
                        raise Exception("Using SIGHASH_SINGLE without a corresponding output") from None
×
2251
                    preimage_outputdata += sha256(txout.serialize_to_network())
5✔
2252
                return bytes(sighash_epoch + hash_type + preimage_txdata + preimage_inputdata + preimage_outputdata)
5✔
2253
            else:  # segwit (witness v0)
2254
                scache = sighash_cache.get_witver0_data_for_tx(self)
5✔
2255
                if not (sighash & Sighash.ANYONECANPAY):
5✔
2256
                    hashPrevouts = scache.hashPrevouts
5✔
2257
                else:
2258
                    hashPrevouts = bytes(32)
5✔
2259
                if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
5✔
2260
                    hashSequence = scache.hashSequence
5✔
2261
                else:
2262
                    hashSequence = bytes(32)
5✔
2263
                if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
5✔
2264
                    hashOutputs = scache.hashOutputs
5✔
2265
                elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs):
5✔
2266
                    hashOutputs = sha256d(outputs[txin_index].serialize_to_network())
5✔
2267
                else:
2268
                    hashOutputs = bytes(32)
5✔
2269
                outpoint = txin.prevout.serialize_to_network()
5✔
2270
                preimage_script = self.get_preimage_script(txin)
5✔
2271
                scriptCode = var_int(len(preimage_script)) + preimage_script
5✔
2272
                amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
5✔
2273
                nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
5✔
2274
                nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
5✔
2275
                preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
5✔
2276
                return preimage
5✔
2277
        else:  # legacy sighash (pre-segwit)
2278
            if sighash != Sighash.ALL:
5✔
UNCOV
2279
                raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)")
×
2280
            preimage_script = self.get_preimage_script(txin)
5✔
2281
            txins = var_int(len(inputs)) + b"".join(
5✔
2282
                txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"")
2283
                for k, txin in enumerate(inputs))
2284
            txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs)
5✔
2285
            nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
5✔
2286
            preimage = nVersion + txins + txouts + nLocktime + nHashType
5✔
2287
            return preimage
5✔
2288
        raise Exception("should not reach this")
2289

2290
    def sign(self, keypairs: Mapping[bytes, bytes]) -> None:
5✔
2291
        # keypairs:  pubkey_bytes -> secret_bytes
2292
        sighash_cache = SighashCache()
5✔
2293
        for i, txin in enumerate(self.inputs()):
5✔
2294
            for pubkey in txin.pubkeys:
5✔
2295
                if txin.is_complete():
5✔
2296
                    break
5✔
2297
                if pubkey not in keypairs:
5✔
2298
                    continue
5✔
2299
                _logger.info(f"adding signature for {pubkey.hex()}. spending utxo {txin.prevout.to_str()}")
5✔
2300
                sec = keypairs[pubkey]
5✔
2301
                sig = self.sign_txin(i, sec, sighash_cache=sighash_cache)
5✔
2302
                self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig)
5✔
2303

2304
        _logger.debug(f"tx.sign() finished. is_complete={self.is_complete()}")
5✔
2305
        self.invalidate_ser_cache()
5✔
2306

2307
    def sign_txin(
5✔
2308
        self,
2309
        txin_index: int,
2310
        privkey_bytes: bytes,
2311
        *,
2312
        sighash_cache: SighashCache = None,
2313
    ) -> bytes:
2314
        txin = self.inputs()[txin_index]
5✔
2315
        txin.validate_data(for_signing=True)
5✔
2316
        pre_hash = self.serialize_preimage(txin_index, sighash_cache=sighash_cache)
5✔
2317
        if txin.is_taproot():
5✔
2318
            # note: privkey_bytes is the internal key
2319
            merkle_root = txin.tap_merkle_root or bytes()
5✔
2320
            output_privkey_bytes = taproot_tweak_seckey(privkey_bytes, merkle_root)
5✔
2321
            output_privkey = ecc.ECPrivkey(output_privkey_bytes)
5✔
2322
            msg_hash = bitcoin.bip340_tagged_hash(b"TapSighash", pre_hash)
5✔
2323
            sig = output_privkey.schnorr_sign(msg_hash)
5✔
2324
            sighash = txin.sighash if txin.sighash is not None else Sighash.DEFAULT
5✔
2325
        else:
2326
            privkey = ecc.ECPrivkey(privkey_bytes)
5✔
2327
            msg_hash = sha256d(pre_hash)
5✔
2328
            sig = privkey.ecdsa_sign(msg_hash, sigencode=ecc.ecdsa_der_sig_from_r_and_s)
5✔
2329
            sighash = txin.sighash if txin.sighash is not None else Sighash.ALL
5✔
2330
        return sig + Sighash.to_sigbytes(sighash)
5✔
2331

2332
    def is_complete(self) -> bool:
5✔
2333
        return all([txin.is_complete() for txin in self.inputs()])
5✔
2334

2335
    def signature_count(self) -> Tuple[int, int]:
5✔
2336
        nhave, nreq = 0, 0
5✔
2337
        for txin in self.inputs():
5✔
2338
            a, b = txin.get_satisfaction_progress()
5✔
2339
            nhave += a
5✔
2340
            nreq += b
5✔
2341
        return nhave, nreq
5✔
2342

2343
    def serialize(self) -> str:
5✔
2344
        """Returns PSBT as base64 text, or raw hex of network tx (if complete)."""
2345
        self.finalize_psbt()
5✔
2346
        if self.is_complete():
5✔
2347
            return Transaction.serialize(self)
5✔
2348
        return self._serialize_as_base64()
5✔
2349

2350
    def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes:
5✔
2351
        """Returns PSBT as raw bytes, or raw bytes of network tx (if complete)."""
2352
        self.finalize_psbt()
5✔
2353
        if force_psbt or not self.is_complete():
5✔
2354
            with io.BytesIO() as fd:
5✔
2355
                self._serialize_psbt(fd)
5✔
2356
                return fd.getvalue()
5✔
2357
        else:
2358
            return Transaction.serialize_as_bytes(self)
5✔
2359

2360
    def _serialize_as_base64(self) -> str:
5✔
2361
        raw_bytes = self.serialize_as_bytes()
5✔
2362
        return base64.b64encode(raw_bytes).decode('ascii')
5✔
2363

2364
    def update_signatures(self, signatures: Sequence[Union[bytes, None]]) -> None:
5✔
2365
        """Add new signatures to a transaction
2366

2367
        `signatures` is expected to be a list of sigs with signatures[i]
2368
        intended for self._inputs[i].
2369
        This is used by the Trezor, KeepKey and Safe-T plugins.
2370
        """
2371
        if self.is_complete():
5✔
UNCOV
2372
            return
×
2373
        if len(self.inputs()) != len(signatures):
5✔
UNCOV
2374
            raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures)))
×
2375
        for i, txin in enumerate(self.inputs()):
5✔
2376
            sig = signatures[i]
5✔
2377
            if sig is None:
5✔
2378
                continue
×
2379
            if sig in list(txin.sigs_ecdsa.values()):
5✔
2380
                continue
×
2381
            msg_hash = sha256d(self.serialize_preimage(i))
5✔
2382
            sig64 = ecc.ecdsa_sig64_from_der_sig(sig[:-1])
5✔
2383
            for recid in range(4):
5✔
2384
                try:
5✔
2385
                    public_key = ecc.ECPubkey.from_ecdsa_sig64(sig64, recid, msg_hash)
5✔
2386
                except ecc.InvalidECPointException:
×
2387
                    # the point might not be on the curve for some recid values
UNCOV
2388
                    continue
×
2389
                pubkey_bytes = public_key.get_public_key_bytes(compressed=True)
5✔
2390
                if pubkey_bytes in txin.pubkeys:
5✔
2391
                    if not public_key.ecdsa_verify(sig64, msg_hash):
5✔
2392
                        continue
×
2393
                    _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_bytes.hex()}, sig={sig.hex()}")
5✔
2394
                    self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes, sig=sig)
5✔
2395
                    break
5✔
2396
        # redo raw
2397
        self.invalidate_ser_cache()
5✔
2398

2399
    def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: bytes, sig: bytes) -> None:
5✔
2400
        txin = self._inputs[txin_idx]
5✔
2401
        txin.sigs_ecdsa[signing_pubkey] = sig
5✔
2402
        # force re-serialization
2403
        txin.script_sig = None
5✔
2404
        txin.witness = None
5✔
2405
        self.invalidate_ser_cache()
5✔
2406

2407
    def add_info_from_wallet(
5✔
2408
            self,
2409
            wallet: 'Abstract_Wallet',
2410
            *,
2411
            include_xpubs: bool = False,
2412
    ) -> None:
2413
        if self.is_complete():
5✔
2414
            return
5✔
2415
        # only include xpubs for multisig wallets; currently only they need it in practice
2416
        # note: coldcard fw have a limitation that if they are included then all
2417
        #       inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761
2418
        # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs
2419
        from .wallet import Multisig_Wallet
5✔
2420
        if include_xpubs and isinstance(wallet, Multisig_Wallet):
5✔
2421
            from .keystore import Xpub
5✔
2422
            for ks in wallet.get_keystores():
5✔
2423
                if isinstance(ks, Xpub):
5✔
2424
                    fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
2425
                        der_suffix=[], only_der_suffix=False)
2426
                    xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False)
5✔
2427
                    bip32node = BIP32Node.from_xkey(xpub)
5✔
2428
                    self.xpubs[bip32node] = (fp_bytes, der_full)
5✔
2429
        for txin in self.inputs():
5✔
2430
            wallet.add_input_info(
5✔
2431
                txin,
2432
                only_der_suffix=False,
2433
            )
2434
        for txout in self.outputs():
5✔
2435
            wallet.add_output_info(
5✔
2436
                txout,
2437
                only_der_suffix=False,
2438
            )
2439

2440
    def remove_xpubs_and_bip32_paths(self) -> None:
5✔
2441
        self.xpubs.clear()
5✔
2442
        for txin in self.inputs():
5✔
2443
            txin.bip32_paths.clear()
5✔
2444
        for txout in self.outputs():
5✔
2445
            txout.bip32_paths.clear()
5✔
2446

2447
    def prepare_for_export_for_coinjoin(self) -> None:
5✔
2448
        """Removes all sensitive details."""
2449
        # globals
2450
        self.xpubs.clear()
5✔
2451
        self._unknown.clear()
5✔
2452
        # inputs
2453
        for txin in self.inputs():
5✔
2454
            txin.bip32_paths.clear()
5✔
2455
        # outputs
2456
        for txout in self.outputs():
5✔
2457
            txout.redeem_script = None
5✔
2458
            txout.witness_script = None
5✔
2459
            txout.bip32_paths.clear()
5✔
2460
            txout._unknown.clear()
5✔
2461

2462
    async def prepare_for_export_for_hardware_device(self, wallet: 'Abstract_Wallet') -> None:
5✔
2463
        self.add_info_from_wallet(wallet, include_xpubs=True)
5✔
2464
        await self.add_info_from_network(wallet.network)
5✔
2465
        # log warning if PSBT_*_BIP32_DERIVATION fields cannot be filled with full path due to missing info
2466
        from .keystore import Xpub
5✔
2467
        def is_ks_missing_info(ks):
5✔
2468
            return (isinstance(ks, Xpub) and (ks.get_root_fingerprint() is None
5✔
2469
                                              or ks.get_derivation_prefix() is None))
2470
        if any([is_ks_missing_info(ks) for ks in wallet.get_keystores()]):
5✔
2471
            _logger.warning('PSBT was requested to be filled with full bip32 paths but '
5✔
2472
                            'some keystores lacked either the derivation prefix or the root fingerprint')
2473

2474
    def convert_all_utxos_to_witness_utxos(self) -> None:
5✔
2475
        """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs.
2476
        This will likely make an exported PSBT invalid spec-wise,
2477
        but it makes e.g. QR codes significantly smaller.
2478
        """
2479
        for txin in self.inputs():
5✔
2480
            txin.convert_utxo_to_witness_utxo()
5✔
2481

2482
    def remove_signatures(self):
5✔
2483
        for txin in self.inputs():
5✔
2484
            txin.sigs_ecdsa = {}
5✔
2485
            txin.tap_key_sig = None
5✔
2486
            txin.script_sig = None
5✔
2487
            txin.witness = None
5✔
2488
        assert not self.is_complete()
5✔
2489
        self.invalidate_ser_cache()
5✔
2490

2491

2492
def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes:
5✔
2493
    if len(xfp) != 4:
5✔
UNCOV
2494
        raise Exception(f'unexpected xfp length. xfp={xfp}')
×
2495
    return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path)
5✔
2496

2497

2498
def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]:
5✔
2499
    if len(path) % 4 != 0:
5✔
2500
        raise Exception(f'unexpected packed path length. path={path.hex()}')
×
2501
    xfp = path[0:4]
5✔
2502
    int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)]
5✔
2503
    return xfp, int_path
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc