• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6373379927703552

10 Apr 2025 01:10PM UTC coverage: 60.334%. Remained the same
6373379927703552

push

CirrusCI

SomberNight
tests: util.custom_task_factory: fix res warn "coro was never awaited"

follow-up https://github.com/spesmilo/electrum/commit/70d1e1170

```
=============================== warnings summary ===============================
tests/test_util.py::TestUtil::test_custom_task_factory
  /tmp/cirrus-ci-build/tests/test_util.py:504: RuntimeWarning: coroutine 'TestUtil.test_custom_task_factory.<locals>.foo' was never awaited
    self.assertEqual(foo().__qualname__, task.get_coro().__qualname__)
  Enable tracemalloc to get traceback where the object was allocated.
  See https://docs.pytest.org/en/stable/how-to/capture-warnings.html#resource-warnings for more info.

```

21537 of 35696 relevant lines covered (60.33%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.04
/electrum/transaction.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 Thomas Voegtlin
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26

27

28
# Note: The deserialization code originally comes from ABE.
29

30
import struct
5✔
31
import traceback
5✔
32
import sys
5✔
33
import io
5✔
34
import base64
5✔
35
from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable,
5✔
36
                    Callable, List, Dict, Set, TYPE_CHECKING, Mapping)
37
from collections import defaultdict
5✔
38
from enum import IntEnum
5✔
39
import itertools
5✔
40
import binascii
5✔
41
import copy
5✔
42

43
import electrum_ecc as ecc
5✔
44
from electrum_ecc.util import bip340_tagged_hash
5✔
45

46
from . import bitcoin, constants, segwit_addr, bip32
5✔
47
from .bip32 import BIP32Node
5✔
48
from .i18n import _
5✔
49
from .util import profiler, to_bytes, bfh, chunks, is_hex_str, parse_max_spend
5✔
50
from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160,
5✔
51
                      hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr,
52
                      var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN,
53
                      opcodes, base_decode,
54
                      base_encode, construct_witness, construct_script,
55
                      taproot_tweak_seckey)
56
from .crypto import sha256d, sha256
5✔
57
from .logging import get_logger
5✔
58
from .util import ShortID, OldTaskGroup
5✔
59
from .bitcoin import DummyAddress
5✔
60
from .descriptor import Descriptor, MissingSolutionPiece, create_dummy_descriptor_from_address
5✔
61
from .json_db import stored_in
5✔
62

63
if TYPE_CHECKING:
5✔
64
    from .wallet import Abstract_Wallet
×
65
    from .network import Network
×
66
    from .simple_config import SimpleConfig
×
67

68

69
_logger = get_logger(__name__)
5✔
70
DEBUG_PSBT_PARSING = False
5✔
71

72

73
_NEEDS_RECALC = ...  # sentinel value
5✔
74

75

76
class SerializationError(Exception):
5✔
77
    """ Thrown when there's a problem deserializing or serializing """
78

79

80
class UnknownTxinType(Exception):
5✔
81
    pass
5✔
82

83

84
class BadHeaderMagic(SerializationError):
5✔
85
    pass
5✔
86

87

88
class UnexpectedEndOfStream(SerializationError):
5✔
89
    pass
5✔
90

91

92
class PSBTInputConsistencyFailure(SerializationError):
5✔
93
    pass
5✔
94

95

96
class MalformedBitcoinScript(Exception):
5✔
97
    pass
5✔
98

99

100
class MissingTxInputAmount(Exception):
5✔
101
    pass
5✔
102

103

104
class TxinDataFetchProgress(NamedTuple):
5✔
105
    num_tasks_done: int
5✔
106
    num_tasks_total: int
5✔
107
    has_errored: bool
5✔
108
    has_finished: bool
5✔
109

110

111
class Sighash(IntEnum):
5✔
112
    # note: this is not an IntFlag, as ALL|NONE != SINGLE
113

114
    DEFAULT = 0  # taproot only (bip-0341)
5✔
115
    ALL = 1
5✔
116
    NONE = 2
5✔
117
    SINGLE = 3
5✔
118
    ANYONECANPAY = 0x80
5✔
119

120
    @classmethod
5✔
121
    def is_valid(cls, sighash: int, *, is_taproot: bool = False) -> bool:
5✔
122
        valid_flags = {
5✔
123
            0x01, 0x02, 0x03,
124
            0x81, 0x82, 0x83,
125
        }
126
        if is_taproot:
5✔
127
            valid_flags.add(0x00)
5✔
128
        return sighash in valid_flags
5✔
129

130
    @classmethod
5✔
131
    def to_sigbytes(cls, sighash: int) -> bytes:
5✔
132
        if sighash == Sighash.DEFAULT:
5✔
133
            return b""
5✔
134
        return sighash.to_bytes(length=1, byteorder="big")
5✔
135

136

137
class TxOutput:
5✔
138
    scriptpubkey: bytes
5✔
139
    value: Union[int, str]
5✔
140

141
    def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]):
5✔
142
        self.scriptpubkey = scriptpubkey
5✔
143
        if not (isinstance(value, int) or parse_max_spend(value) is not None):
5✔
144
            raise ValueError(f"bad txout value: {value!r}")
×
145
        self.value = value  # int in satoshis; or spend-max-like str
5✔
146

147
    @classmethod
5✔
148
    def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
5✔
149
        return cls(scriptpubkey=bitcoin.address_to_script(address),
5✔
150
                   value=value)
151

152
    def serialize_to_network(self) -> bytes:
5✔
153
        buf = int.to_bytes(self.value, 8, byteorder="little", signed=False)
5✔
154
        script = self.scriptpubkey
5✔
155
        buf += var_int(len(script))
5✔
156
        buf += script
5✔
157
        return buf
5✔
158

159
    @classmethod
5✔
160
    def from_network_bytes(cls, raw: bytes) -> 'TxOutput':
5✔
161
        vds = BCDataStream()
5✔
162
        vds.write(raw)
5✔
163
        txout = parse_output(vds)
5✔
164
        if vds.can_read_more():
5✔
165
            raise SerializationError('extra junk at the end of TxOutput bytes')
×
166
        return txout
5✔
167

168
    def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]:
5✔
169
        if self.address:
5✔
170
            return TYPE_ADDRESS, self.address, self.value
5✔
171
        return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value
×
172

173
    @classmethod
5✔
174
    def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
5✔
175
        if _type == TYPE_ADDRESS:
5✔
176
            return cls.from_address_and_value(addr, val)
5✔
177
        if _type == TYPE_SCRIPT:
5✔
178
            return cls(scriptpubkey=bfh(addr), value=val)
5✔
179
        raise Exception(f"unexpected legacy address type: {_type}")
×
180

181
    @property
5✔
182
    def scriptpubkey(self) -> bytes:
5✔
183
        return self._scriptpubkey
5✔
184

185
    @scriptpubkey.setter
5✔
186
    def scriptpubkey(self, scriptpubkey: bytes):
5✔
187
        self._scriptpubkey = scriptpubkey
5✔
188
        self._address = _NEEDS_RECALC
5✔
189

190
    @property
5✔
191
    def address(self) -> Optional[str]:
5✔
192
        if self._address is _NEEDS_RECALC:
5✔
193
            self._address = get_address_from_output_script(self._scriptpubkey)
5✔
194
        return self._address
5✔
195

196
    def get_ui_address_str(self) -> str:
5✔
197
        addr = self.address
×
198
        if addr is not None:
×
199
            return addr
×
200
        return f"SCRIPT {self.scriptpubkey.hex()}"
×
201

202
    def __repr__(self):
5✔
203
        return f"<TxOutput script={self.scriptpubkey.hex()} address={self.address} value={self.value}>"
5✔
204

205
    def __eq__(self, other):
5✔
206
        if not isinstance(other, TxOutput):
5✔
207
            return False
×
208
        return self.scriptpubkey == other.scriptpubkey and self.value == other.value
5✔
209

210
    def __ne__(self, other):
5✔
211
        return not (self == other)
5✔
212

213
    def to_json(self):
5✔
214
        d = {
5✔
215
            'scriptpubkey': self.scriptpubkey.hex(),
216
            'address': self.address,
217
            'value_sats': self.value,
218
        }
219
        return d
5✔
220

221

222
class BIP143SharedTxDigestFields(NamedTuple):  # witness v0
5✔
223
    hashPrevouts: bytes
5✔
224
    hashSequence: bytes
5✔
225
    hashOutputs: bytes
5✔
226

227
    @classmethod
5✔
228
    def from_tx(cls, tx: 'PartialTransaction') -> 'BIP143SharedTxDigestFields':
5✔
229
        inputs = tx.inputs()
5✔
230
        outputs = tx.outputs()
5✔
231
        hashPrevouts = sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs))
5✔
232
        hashSequence = sha256d(b''.join(
5✔
233
            int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
234
            for txin in inputs))
235
        hashOutputs = sha256d(b''.join(o.serialize_to_network() for o in outputs))
5✔
236
        return BIP143SharedTxDigestFields(
5✔
237
            hashPrevouts=hashPrevouts,
238
            hashSequence=hashSequence,
239
            hashOutputs=hashOutputs,
240
        )
241

242

243
class BIP341SharedTxDigestFields(NamedTuple):  # witness v1
5✔
244
    sha_prevouts: bytes
5✔
245
    sha_amounts: bytes
5✔
246
    sha_scriptpubkeys: bytes
5✔
247
    sha_sequences: bytes
5✔
248
    sha_outputs: bytes
5✔
249

250
    @classmethod
5✔
251
    def from_tx(cls, tx: 'PartialTransaction') -> 'BIP341SharedTxDigestFields':
5✔
252
        inputs = tx.inputs()
5✔
253
        outputs = tx.outputs()
5✔
254
        sha_prevouts = sha256(b''.join(txin.prevout.serialize_to_network() for txin in inputs))
5✔
255
        sha_amounts = sha256(b''.join(
5✔
256
            int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
257
            for txin in inputs))
258
        sha_scriptpubkeys = sha256(b''.join(
5✔
259
            var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
260
            for txin in inputs))
261
        sha_sequences = sha256(b''.join(
5✔
262
            int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
263
            for txin in inputs))
264
        sha_outputs = sha256(b''.join(o.serialize_to_network() for o in outputs))
5✔
265
        return BIP341SharedTxDigestFields(
5✔
266
            sha_prevouts=sha_prevouts,
267
            sha_amounts=sha_amounts,
268
            sha_scriptpubkeys=sha_scriptpubkeys,
269
            sha_sequences=sha_sequences,
270
            sha_outputs=sha_outputs,
271
        )
272

273

274
class SighashCache:
5✔
275

276
    def __init__(self):
5✔
277
        self._witver0 = None  # type: Optional[BIP143SharedTxDigestFields]
5✔
278
        self._witver1 = None  # type: Optional[BIP341SharedTxDigestFields]
5✔
279

280
    def get_witver0_data_for_tx(self, tx: 'PartialTransaction') -> BIP143SharedTxDigestFields:
5✔
281
        if self._witver0 is None:
5✔
282
            self._witver0 = BIP143SharedTxDigestFields.from_tx(tx)
5✔
283
        return self._witver0
5✔
284

285
    def get_witver1_data_for_tx(self, tx: 'PartialTransaction') -> BIP341SharedTxDigestFields:
5✔
286
        if self._witver1 is None:
5✔
287
            self._witver1 = BIP341SharedTxDigestFields.from_tx(tx)
5✔
288
        return self._witver1
5✔
289

290

291
class TxOutpoint(NamedTuple):
5✔
292
    txid: bytes  # endianness same as hex string displayed; reverse of tx serialization order
5✔
293
    out_idx: int
5✔
294

295
    @classmethod
5✔
296
    def from_str(cls, s: str) -> 'TxOutpoint':
5✔
297
        hash_str, idx_str = s.split(':')
5✔
298
        assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash"
5✔
299
        return TxOutpoint(txid=bfh(hash_str),
5✔
300
                          out_idx=int(idx_str))
301

302
    def __str__(self) -> str:
5✔
303
        return f"""TxOutpoint("{self.to_str()}")"""
5✔
304

305
    def __repr__(self):
5✔
306
        return f"<{str(self)}>"
5✔
307

308
    def to_str(self) -> str:
5✔
309
        return f"{self.txid.hex()}:{self.out_idx}"
5✔
310

311
    def to_json(self):
5✔
312
        return [self.txid.hex(), self.out_idx]
×
313

314
    def serialize_to_network(self) -> bytes:
5✔
315
        return self.txid[::-1] + int.to_bytes(self.out_idx, length=4, byteorder="little", signed=False)
5✔
316

317
    def is_coinbase(self) -> bool:
5✔
318
        return self.txid == bytes(32)
5✔
319

320
    def short_name(self):
5✔
321
        return f"{self.txid.hex()[0:10]}:{self.out_idx}"
×
322

323

324
class TxInput:
5✔
325
    prevout: TxOutpoint
5✔
326
    script_sig: Optional[bytes]
5✔
327
    nsequence: int
5✔
328
    witness: Optional[bytes]
5✔
329
    _is_coinbase_output: bool
5✔
330

331
    def __init__(self, *,
5✔
332
                 prevout: TxOutpoint,
333
                 script_sig: bytes = None,
334
                 nsequence: int = 0xffffffff - 1,
335
                 witness: bytes = None,
336
                 is_coinbase_output: bool = False):
337
        self.prevout = prevout
5✔
338
        self.script_sig = script_sig
5✔
339
        self.nsequence = nsequence
5✔
340
        self.witness = witness
5✔
341
        self._is_coinbase_output = is_coinbase_output
5✔
342
        # blockchain fields
343
        self.block_height = None  # type: Optional[int]  # height at which the TXO is mined; None means unknown. not SPV-ed.
5✔
344
        self.block_txpos = None  # type: Optional[int]  # position of tx in block, if TXO is mined; otherwise None or -1
5✔
345
        self.spent_height = None  # type: Optional[int]  # height at which the TXO got spent
5✔
346
        self.spent_txid = None  # type: Optional[str]  # txid of the spender
5✔
347
        self._utxo = None  # type: Optional[Transaction]
5✔
348
        self.__scriptpubkey = None  # type: Optional[bytes]
5✔
349
        self.__address = None  # type: Optional[str]
5✔
350
        self.__value_sats = None  # type: Optional[int]
5✔
351

352
    @property
5✔
353
    def short_id(self):
5✔
354
        if self.block_txpos is not None and self.block_txpos >= 0:
×
355
            return ShortID.from_components(self.block_height, self.block_txpos, self.prevout.out_idx)
×
356
        else:
357
            return self.prevout.short_name()
×
358

359
    @property
5✔
360
    def utxo(self):
5✔
361
        return self._utxo
5✔
362

363
    @utxo.setter
5✔
364
    def utxo(self, tx: Optional['Transaction']):
5✔
365
        if tx is None:
5✔
366
            return
5✔
367
        # note that tx might be a PartialTransaction
368
        # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx
369
        tx = tx_from_any(str(tx))
5✔
370
        # 'utxo' field should not be a PSBT:
371
        if not tx.is_complete():
5✔
372
            return
5✔
373
        self.validate_data(utxo=tx)
5✔
374
        self._utxo = tx
5✔
375
        # update derived fields
376
        out_idx = self.prevout.out_idx
5✔
377
        self.__scriptpubkey = self._utxo.outputs()[out_idx].scriptpubkey
5✔
378
        self.__address = _NEEDS_RECALC
5✔
379
        self.__value_sats = self._utxo.outputs()[out_idx].value
5✔
380

381
    def validate_data(self, *, utxo: Optional['Transaction'] = None, **kwargs) -> None:
5✔
382
        utxo = utxo or self.utxo
5✔
383
        if utxo:
5✔
384
            if self.prevout.txid.hex() != utxo.txid():
5✔
385
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
386
                                                  f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
387

388
    def is_coinbase_input(self) -> bool:
5✔
389
        """Whether this is the input of a coinbase tx."""
390
        return self.prevout.is_coinbase()
5✔
391

392
    def is_coinbase_output(self) -> bool:
5✔
393
        """Whether the coin being spent is an output of a coinbase tx.
394
        This matters for coin maturity (and pretty much only for that!).
395
        """
396
        return self._is_coinbase_output
5✔
397

398
    def value_sats(self) -> Optional[int]:
5✔
399
        return self.__value_sats
5✔
400

401
    @property
5✔
402
    def address(self) -> Optional[str]:
5✔
403
        if self.__address is _NEEDS_RECALC:
5✔
404
            self.__address = get_address_from_output_script(self.__scriptpubkey)
5✔
405
        return self.__address
5✔
406

407
    @property
5✔
408
    def scriptpubkey(self) -> Optional[bytes]:
5✔
409
        return self.__scriptpubkey
5✔
410

411
    def to_json(self):
5✔
412
        d = {
5✔
413
            'prevout_hash': self.prevout.txid.hex(),
414
            'prevout_n': self.prevout.out_idx,
415
            'coinbase': self.is_coinbase_output(),
416
            'nsequence': self.nsequence,
417
        }
418
        if self.script_sig is not None:
5✔
419
            d['scriptSig'] = self.script_sig.hex()
×
420
        if self.witness is not None:
5✔
421
            d['witness'] = [x.hex() for x in self.witness_elements()]
×
422
        return d
5✔
423

424
    def serialize_to_network(self, *, script_sig: bytes = None) -> bytes:
5✔
425
        if script_sig is None:
5✔
426
            script_sig = self.script_sig
×
427
        # Prev hash and index
428
        s = self.prevout.serialize_to_network()
5✔
429
        # Script length, script, sequence
430
        s += var_int(len(script_sig))
5✔
431
        s += script_sig
5✔
432
        s += int.to_bytes(self.nsequence, length=4, byteorder="little", signed=False)
5✔
433
        return s
5✔
434

435
    def witness_elements(self) -> Sequence[bytes]:
5✔
436
        if not self.witness:
×
437
            return []
×
438
        vds = BCDataStream()
×
439
        vds.write(self.witness)
×
440
        n = vds.read_compact_size()
×
441
        return list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
×
442

443
    def is_segwit(self, *, guess_for_address=False) -> bool:
5✔
444
        if self.witness not in (b'\x00', b'', None):
5✔
445
            return True
5✔
446
        return False
5✔
447

448
    async def add_info_from_network(
5✔
449
            self,
450
            network: Optional['Network'],
451
            *,
452
            ignore_network_issues: bool = True,
453
            timeout=None,
454
    ) -> bool:
455
        """Returns True iff successful."""
456
        from .network import NetworkException
5✔
457
        async def fetch_from_network(txid) -> Optional[Transaction]:
5✔
458
            tx = None
5✔
459
            if network and network.has_internet_connection():
5✔
460
                try:
5✔
461
                    raw_tx = await network.get_transaction(txid, timeout=timeout)
5✔
462
                except NetworkException as e:
×
463
                    _logger.info(f'got network error getting input txn. err: {repr(e)}. txid: {txid}. '
×
464
                                 f'if you are intentionally offline, consider using the --offline flag')
465
                    if not ignore_network_issues:
×
466
                        raise e
×
467
                else:
468
                    tx = Transaction(raw_tx)
5✔
469
            if not tx and not ignore_network_issues:
5✔
470
                raise NetworkException('failed to get prev tx from network')
×
471
            return tx
5✔
472

473
        if self.utxo is None:
5✔
474
            self.utxo = await fetch_from_network(txid=self.prevout.txid.hex())
5✔
475
        return self.utxo is not None
5✔
476

477

478
class BCDataStream(object):
5✔
479
    """Workalike python implementation of Bitcoin's CDataStream class."""
480

481
    def __init__(self):
5✔
482
        self.input = None  # type: Optional[bytearray]
5✔
483
        self.read_cursor = 0
5✔
484

485
    def clear(self):
5✔
486
        self.input = None
×
487
        self.read_cursor = 0
×
488

489
    def write(self, _bytes: Union[bytes, bytearray]):  # Initialize with string of _bytes
5✔
490
        assert isinstance(_bytes, (bytes, bytearray))
5✔
491
        if self.input is None:
5✔
492
            self.input = bytearray(_bytes)
5✔
493
        else:
494
            self.input += bytearray(_bytes)
5✔
495

496
    def read_string(self, encoding='ascii'):
5✔
497
        # Strings are encoded depending on length:
498
        # 0 to 252 :  1-byte-length followed by bytes (if any)
499
        # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
500
        # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
501
        # ... and the Bitcoin client is coded to understand:
502
        # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
503
        # ... but I don't think it actually handles any strings that big.
504
        if self.input is None:
5✔
505
            raise SerializationError("call write(bytes) before trying to deserialize")
5✔
506

507
        length = self.read_compact_size()
5✔
508

509
        return self.read_bytes(length).decode(encoding)
5✔
510

511
    def write_string(self, string, encoding='ascii'):
5✔
512
        string = to_bytes(string, encoding)
5✔
513
        # Length-encoded as with read-string
514
        self.write_compact_size(len(string))
5✔
515
        self.write(string)
5✔
516

517
    def read_bytes(self, length: int) -> bytes:
5✔
518
        if self.input is None:
5✔
519
            raise SerializationError("call write(bytes) before trying to deserialize")
5✔
520
        assert length >= 0
5✔
521
        input_len = len(self.input)
5✔
522
        read_begin = self.read_cursor
5✔
523
        read_end = read_begin + length
5✔
524
        if 0 <= read_begin <= read_end <= input_len:
5✔
525
            result = self.input[read_begin:read_end]  # type: bytearray
5✔
526
            self.read_cursor += length
5✔
527
            return bytes(result)
5✔
528
        else:
529
            raise SerializationError('attempt to read past end of buffer')
5✔
530

531
    def write_bytes(self, _bytes: Union[bytes, bytearray], length: int):
5✔
532
        assert len(_bytes) == length, len(_bytes)
×
533
        self.write(_bytes)
×
534

535
    def can_read_more(self) -> bool:
5✔
536
        if not self.input:
5✔
537
            return False
×
538
        return self.read_cursor < len(self.input)
5✔
539

540
    def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00'
5✔
541
    def read_int16(self): return self._read_num('<h')
5✔
542
    def read_uint16(self): return self._read_num('<H')
5✔
543
    def read_int32(self): return self._read_num('<i')
5✔
544
    def read_uint32(self): return self._read_num('<I')
5✔
545
    def read_int64(self): return self._read_num('<q')
5✔
546
    def read_uint64(self): return self._read_num('<Q')
5✔
547

548
    def write_boolean(self, val): return self.write(b'\x01' if val else b'\x00')
5✔
549
    def write_int16(self, val): return self._write_num('<h', val)
5✔
550
    def write_uint16(self, val): return self._write_num('<H', val)
5✔
551
    def write_int32(self, val): return self._write_num('<i', val)
5✔
552
    def write_uint32(self, val): return self._write_num('<I', val)
5✔
553
    def write_int64(self, val): return self._write_num('<q', val)
5✔
554
    def write_uint64(self, val): return self._write_num('<Q', val)
5✔
555

556
    def read_compact_size(self):
5✔
557
        try:
5✔
558
            size = self.input[self.read_cursor]
5✔
559
            self.read_cursor += 1
5✔
560
            if size == 253:
5✔
561
                size = self._read_num('<H')
5✔
562
            elif size == 254:
5✔
563
                size = self._read_num('<I')
5✔
564
            elif size == 255:
5✔
565
                size = self._read_num('<Q')
5✔
566
            return size
5✔
567
        except IndexError as e:
5✔
568
            raise SerializationError("attempt to read past end of buffer") from e
5✔
569

570
    def write_compact_size(self, size):
5✔
571
        if size < 0:
5✔
572
            raise SerializationError("attempt to write size < 0")
5✔
573
        elif size < 253:
5✔
574
            self.write(bytes([size]))
5✔
575
        elif size < 2**16:
5✔
576
            self.write(b'\xfd')
5✔
577
            self._write_num('<H', size)
5✔
578
        elif size < 2**32:
5✔
579
            self.write(b'\xfe')
5✔
580
            self._write_num('<I', size)
5✔
581
        elif size < 2**64:
5✔
582
            self.write(b'\xff')
5✔
583
            self._write_num('<Q', size)
5✔
584
        else:
585
            raise Exception(f"size {size} too large for compact_size")
×
586

587
    def _read_num(self, format):
5✔
588
        try:
5✔
589
            (i,) = struct.unpack_from(format, self.input, self.read_cursor)
5✔
590
            self.read_cursor += struct.calcsize(format)
5✔
591
        except Exception as e:
×
592
            raise SerializationError(e) from e
×
593
        return i
5✔
594

595
    def _write_num(self, format, num):
5✔
596
        s = struct.pack(format, num)
5✔
597
        self.write(s)
5✔
598

599

600
def script_GetOp(_bytes : bytes):
5✔
601
    i = 0
5✔
602
    while i < len(_bytes):
5✔
603
        vch = None
5✔
604
        opcode = _bytes[i]
5✔
605
        i += 1
5✔
606

607
        if opcode <= opcodes.OP_PUSHDATA4:
5✔
608
            nSize = opcode
5✔
609
            if opcode == opcodes.OP_PUSHDATA1:
5✔
610
                try: nSize = _bytes[i]
5✔
611
                except IndexError: raise MalformedBitcoinScript()
×
612
                i += 1
5✔
613
            elif opcode == opcodes.OP_PUSHDATA2:
5✔
614
                try: (nSize,) = struct.unpack_from('<H', _bytes, i)
5✔
615
                except struct.error: raise MalformedBitcoinScript()
×
616
                i += 2
5✔
617
            elif opcode == opcodes.OP_PUSHDATA4:
5✔
618
                try: (nSize,) = struct.unpack_from('<I', _bytes, i)
5✔
619
                except struct.error: raise MalformedBitcoinScript()
×
620
                i += 4
5✔
621
            if i + nSize > len(_bytes):
5✔
622
                raise MalformedBitcoinScript(
5✔
623
                    f"Push of data element that is larger than remaining data: {nSize} vs {len(_bytes) - i}")
624
            vch = _bytes[i:i + nSize]
5✔
625
            i += nSize
5✔
626

627
        yield opcode, vch, i
5✔
628

629

630
class OPPushDataGeneric:
5✔
631
    def __init__(self, pushlen: Callable=None):
5✔
632
        if pushlen is not None:
5✔
633
            self.check_data_len = pushlen
5✔
634

635
    @classmethod
5✔
636
    def check_data_len(cls, datalen: int) -> bool:
5✔
637
        # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
638
        return opcodes.OP_PUSHDATA4 >= datalen >= 0
×
639

640
    @classmethod
5✔
641
    def is_instance(cls, item):
5✔
642
        # accept objects that are instances of this class
643
        # or other classes that are subclasses
644
        return isinstance(item, cls) \
5✔
645
               or (isinstance(item, type) and issubclass(item, cls))
646

647

648
class OPGeneric:
5✔
649
    def __init__(self, matcher: Callable=None):
5✔
650
        if matcher is not None:
5✔
651
            self.matcher = matcher
5✔
652

653
    def match(self, op) -> bool:
5✔
654
        return self.matcher(op)
5✔
655

656
    @classmethod
5✔
657
    def is_instance(cls, item):
5✔
658
        # accept objects that are instances of this class
659
        # or other classes that are subclasses
660
        return isinstance(item, cls) \
5✔
661
               or (isinstance(item, type) and issubclass(item, cls))
662

663
OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65))
5✔
664
OP_ANYSEGWIT_VERSION = OPGeneric(lambda x: x in list(range(opcodes.OP_1, opcodes.OP_16 + 1)))
5✔
665

666
SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160,
5✔
667
                               OPPushDataGeneric(lambda x: x == 20),
668
                               opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
669
SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL]
5✔
670
SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))]
5✔
671
SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)]
5✔
672
SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)]
5✔
673
SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT = [OP_ANYSEGWIT_VERSION, OPPushDataGeneric(lambda x: x in list(range(2, 40 + 1)))]
5✔
674

675

676
def check_scriptpubkey_template_and_dust(scriptpubkey, amount: Optional[int]):
5✔
677
    if match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
678
        dust_limit = bitcoin.DUST_LIMIT_P2PKH
×
679
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
680
        dust_limit = bitcoin.DUST_LIMIT_P2SH
×
681
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WSH):
5✔
682
        dust_limit = bitcoin.DUST_LIMIT_P2WSH
×
683
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
5✔
684
        dust_limit = bitcoin.DUST_LIMIT_P2WPKH
5✔
685
    elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT):
×
686
        dust_limit = bitcoin.DUST_LIMIT_UNKNOWN_SEGWIT
×
687
    else:
688
        raise Exception(f'scriptpubkey does not conform to any template: {scriptpubkey.hex()}')
×
689
    if amount < dust_limit:
5✔
690
        raise Exception(f'amount ({amount}) is below dust limit for scriptpubkey type ({dust_limit})')
×
691

692
def merge_duplicate_tx_outputs(outputs: Iterable['PartialTxOutput']) -> List['PartialTxOutput']:
5✔
693
    """Merges outputs that are paying to the same address by replacing them with a single larger output."""
694
    output_dict = {}
5✔
695
    for output in outputs:
5✔
696
        assert isinstance(output.value, int), "tx outputs with spend-max-like str cannot be merged"
5✔
697
        if output.scriptpubkey in output_dict:
5✔
698
            output_dict[output.scriptpubkey].value += output.value
5✔
699
        else:
700
            output_dict[output.scriptpubkey] = copy.copy(output)
5✔
701
    return list(output_dict.values())
5✔
702

703
def match_script_against_template(script, template, debug=False) -> bool:
5✔
704
    """Returns whether 'script' matches 'template'."""
705
    if script is None:
5✔
706
        return False
×
707
    # optionally decode script now:
708
    if isinstance(script, (bytes, bytearray)):
5✔
709
        try:
5✔
710
            script = [x for x in script_GetOp(script)]
5✔
711
        except MalformedBitcoinScript:
×
712
            if debug:
×
713
                _logger.debug(f"malformed script")
×
714
            return False
×
715
    if debug:
5✔
716
        _logger.debug(f"match script against template: {script}")
×
717
    if len(script) != len(template):
5✔
718
        if debug:
5✔
719
            _logger.debug(f"length mismatch {len(script)} != {len(template)}")
×
720
        return False
5✔
721
    for i in range(len(script)):
5✔
722
        template_item = template[i]
5✔
723
        script_item = script[i]
5✔
724
        if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]):
5✔
725
            continue
5✔
726
        if OPGeneric.is_instance(template_item) and template_item.match(script_item[0]):
5✔
727
            continue
5✔
728
        if template_item != script_item[0]:
5✔
729
            if debug:
5✔
730
                _logger.debug(f"item mismatch at position {i}: {template_item} != {script_item[0]}")
×
731
            return False
5✔
732
    return True
5✔
733

734
def get_script_type_from_output_script(_bytes: bytes) -> Optional[str]:
5✔
735
    if _bytes is None:
5✔
736
        return None
×
737
    try:
5✔
738
        decoded = [x for x in script_GetOp(_bytes)]
5✔
739
    except MalformedBitcoinScript:
×
740
        return None
×
741
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
742
        return 'p2pkh'
5✔
743
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
744
        return 'p2sh'
×
745
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
5✔
746
        return 'p2wpkh'
5✔
747
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH):
×
748
        return 'p2wsh'
×
749
    return None
×
750

751
def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]:
5✔
752
    try:
5✔
753
        decoded = [x for x in script_GetOp(_bytes)]
5✔
754
    except MalformedBitcoinScript:
×
755
        return None
×
756

757
    # p2pkh
758
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
5✔
759
        return hash160_to_p2pkh(decoded[2][1], net=net)
5✔
760

761
    # p2sh
762
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
5✔
763
        return hash160_to_p2sh(decoded[1][1], net=net)
5✔
764

765
    # segwit address (version 0)
766
    if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
767
        return hash_to_segwit_addr(decoded[1][1], witver=0, net=net)
5✔
768

769
    # segwit address (version 1-16)
770
    future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
5✔
771
    for witver, opcode in enumerate(future_witness_versions, start=1):
5✔
772
        match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
5✔
773
        if match_script_against_template(decoded, match):
5✔
774
            return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net)
5✔
775

776
    return None
5✔
777

778

779
def parse_input(vds: BCDataStream) -> TxInput:
5✔
780
    prevout_hash = vds.read_bytes(32)[::-1]
5✔
781
    prevout_n = vds.read_uint32()
5✔
782
    prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n)
5✔
783
    script_sig = vds.read_bytes(vds.read_compact_size())
5✔
784
    nsequence = vds.read_uint32()
5✔
785
    return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence)
5✔
786

787

788
def parse_witness(vds: BCDataStream, txin: TxInput) -> None:
5✔
789
    n = vds.read_compact_size()
5✔
790
    witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
5✔
791
    txin.witness = construct_witness(witness_elements)
5✔
792

793

794
def parse_output(vds: BCDataStream) -> TxOutput:
5✔
795
    value = vds.read_int64()
5✔
796
    if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN:
5✔
797
        raise SerializationError('invalid output amount (too large)')
×
798
    if value < 0:
5✔
799
        raise SerializationError('invalid output amount (negative)')
×
800
    scriptpubkey = vds.read_bytes(vds.read_compact_size())
5✔
801
    return TxOutput(value=value, scriptpubkey=scriptpubkey)
5✔
802

803

804
# pay & redeem scripts
805

806
def multisig_script(public_keys: Sequence[str], m: int) -> bytes:
5✔
807
    n = len(public_keys)
5✔
808
    assert 1 <= m <= n <= 15, f'm {m}, n {n}'
5✔
809
    return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG])
5✔
810

811

812
class Transaction:
5✔
813
    _cached_network_ser: Optional[str]
5✔
814

815
    def __str__(self):
5✔
816
        return self.serialize()
5✔
817

818
    def __init__(self, raw):
5✔
819
        if raw is None:
5✔
820
            self._cached_network_ser = None
5✔
821
        elif isinstance(raw, str):
5✔
822
            self._cached_network_ser = raw.strip() if raw else None
5✔
823
            assert is_hex_str(self._cached_network_ser)
5✔
824
        elif isinstance(raw, (bytes, bytearray)):
5✔
825
            self._cached_network_ser = raw.hex()
5✔
826
        else:
827
            raise Exception(f"cannot initialize transaction from {raw}")
×
828
        self._inputs = None  # type: List[TxInput]
5✔
829
        self._outputs = None  # type: List[TxOutput]
5✔
830
        self._locktime = 0
5✔
831
        self._version = 2
5✔
832

833
        self._cached_txid = None  # type: Optional[str]
5✔
834

835
    @property
5✔
836
    def locktime(self):
5✔
837
        self.deserialize()
5✔
838
        return self._locktime
5✔
839

840
    @locktime.setter
5✔
841
    def locktime(self, value: int):
5✔
842
        assert isinstance(value, int), f"locktime must be int, not {value!r}"
5✔
843
        self._locktime = value
5✔
844
        self.invalidate_ser_cache()
5✔
845

846
    @property
5✔
847
    def version(self):
5✔
848
        self.deserialize()
5✔
849
        return self._version
5✔
850

851
    @version.setter
5✔
852
    def version(self, value):
5✔
853
        self._version = value
5✔
854
        self.invalidate_ser_cache()
5✔
855

856
    def to_json(self) -> dict:
5✔
857
        d = {
5✔
858
            'version': self.version,
859
            'locktime': self.locktime,
860
            'inputs': [txin.to_json() for txin in self.inputs()],
861
            'outputs': [txout.to_json() for txout in self.outputs()],
862
        }
863
        return d
5✔
864

865
    def inputs(self) -> Sequence[TxInput]:
5✔
866
        if self._inputs is None:
5✔
867
            self.deserialize()
5✔
868
        return self._inputs
5✔
869

870
    def outputs(self) -> Sequence[TxOutput]:
5✔
871
        if self._outputs is None:
5✔
872
            self.deserialize()
5✔
873
        return self._outputs
5✔
874

875
    def deserialize(self) -> None:
5✔
876
        if self._cached_network_ser is None:
5✔
877
            return
5✔
878
        if self._inputs is not None:
5✔
879
            return
5✔
880

881
        raw_bytes = bfh(self._cached_network_ser)
5✔
882
        vds = BCDataStream()
5✔
883
        vds.write(raw_bytes)
5✔
884
        self._version = vds.read_int32()
5✔
885
        n_vin = vds.read_compact_size()
5✔
886
        is_segwit = (n_vin == 0)
5✔
887
        if is_segwit:
5✔
888
            marker = vds.read_bytes(1)
5✔
889
            if marker != b'\x01':
5✔
890
                raise SerializationError('invalid txn marker byte: {}'.format(marker))
×
891
            n_vin = vds.read_compact_size()
5✔
892
        if n_vin < 1:
5✔
893
            raise SerializationError('tx needs to have at least 1 input')
×
894
        txins = [parse_input(vds) for i in range(n_vin)]
5✔
895
        n_vout = vds.read_compact_size()
5✔
896
        if n_vout < 1:
5✔
897
            raise SerializationError('tx needs to have at least 1 output')
×
898
        self._outputs = [parse_output(vds) for i in range(n_vout)]
5✔
899
        if is_segwit:
5✔
900
            for txin in txins:
5✔
901
                parse_witness(vds, txin)
5✔
902
        self._inputs = txins  # only expose field after witness is parsed, for sanity
5✔
903
        self._locktime = vds.read_uint32()
5✔
904
        if vds.can_read_more():
5✔
905
            raise SerializationError('extra junk at the end')
×
906

907
    @classmethod
5✔
908
    def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> bytes:
5✔
909
        if txin.witness is not None:
5✔
910
            return txin.witness
5✔
911
        if txin.is_coinbase_input():
5✔
912
            return b""
×
913
        assert isinstance(txin, PartialTxInput)
5✔
914

915
        if not txin.is_segwit():
5✔
916
            return construct_witness([])
5✔
917

918
        if estimate_size and hasattr(txin, 'make_witness'):
5✔
919
            sig_dummy = b'\x00' * 71  # DER-encoded ECDSA sig, with low S and low R
5✔
920
            txin.witness_sizehint = len(txin.make_witness(sig_dummy))
5✔
921

922
        if estimate_size and txin.witness_sizehint is not None:
5✔
923
            return bytes(txin.witness_sizehint)
5✔
924

925
        dummy_desc = None
5✔
926
        if estimate_size:
5✔
927
            dummy_desc = create_dummy_descriptor_from_address(txin.address)
5✔
928
        if desc := (txin.script_descriptor or dummy_desc):
5✔
929
            sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa)
5✔
930
            if sol.witness is not None:
5✔
931
                return sol.witness
5✔
932
            return construct_witness([])
×
933
        raise UnknownTxinType("cannot construct witness")
×
934

935
    @classmethod
5✔
936
    def input_script(self, txin: TxInput, *, estimate_size=False) -> bytes:
5✔
937
        if txin.script_sig is not None:
5✔
938
            return txin.script_sig
5✔
939
        if txin.is_coinbase_input():
5✔
940
            return b""
×
941
        assert isinstance(txin, PartialTxInput)
5✔
942

943
        if txin.is_p2sh_segwit() and txin.redeem_script:
5✔
944
            return construct_script([txin.redeem_script])
5✔
945
        if txin.is_native_segwit():
5✔
946
            return b""
5✔
947

948
        dummy_desc = None
5✔
949
        if estimate_size:
5✔
950
            dummy_desc = create_dummy_descriptor_from_address(txin.address)
5✔
951
        if desc := (txin.script_descriptor or dummy_desc):
5✔
952
            if desc.is_segwit():
5✔
953
                if redeem_script := desc.expand().redeem_script:
5✔
954
                    return construct_script([redeem_script])
5✔
955
                return b""
5✔
956
            sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa)
5✔
957
            if sol.script_sig is not None:
5✔
958
                return sol.script_sig
5✔
959
            return b""
×
960
        raise UnknownTxinType("cannot construct scriptSig")
×
961

962
    @classmethod
5✔
963
    def get_preimage_script(cls, txin: 'PartialTxInput') -> bytes:
5✔
964
        if txin.witness_script:
5✔
965
            if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]:
5✔
966
                raise Exception('OP_CODESEPARATOR black magic is not supported')
×
967
            return txin.witness_script
5✔
968
        if not txin.is_segwit() and txin.redeem_script:
5✔
969
            if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]:
5✔
970
                raise Exception('OP_CODESEPARATOR black magic is not supported')
×
971
            return txin.redeem_script
5✔
972

973
        if desc := txin.script_descriptor:
5✔
974
            sc = desc.expand()
5✔
975
            if script := sc.scriptcode_for_sighash:
5✔
976
                return script
5✔
977
            raise Exception(f"don't know scriptcode for descriptor: {desc.to_string()}")
×
978
        raise UnknownTxinType(f'cannot construct preimage_script')
×
979

980
    def is_segwit(self, *, guess_for_address=False):
5✔
981
        return any(txin.is_segwit(guess_for_address=guess_for_address)
5✔
982
                   for txin in self.inputs())
983

984
    def invalidate_ser_cache(self):
5✔
985
        self._cached_network_ser = None
5✔
986
        self._cached_txid = None
5✔
987

988
    def serialize(self) -> str:
5✔
989
        if not self._cached_network_ser:
5✔
990
            self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True)
5✔
991
        return self._cached_network_ser
5✔
992

993
    def serialize_as_bytes(self) -> bytes:
5✔
994
        return bfh(self.serialize())
5✔
995

996
    def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str:
5✔
997
        """Serialize the transaction as used on the Bitcoin network, into hex.
998
        `include_sigs` signals whether to include scriptSigs and witnesses.
999
        `force_legacy` signals to use the pre-segwit format
1000
        note: (not include_sigs) implies force_legacy
1001
        """
1002
        self.deserialize()
5✔
1003
        nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True).hex()
5✔
1004
        nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False).hex()
5✔
1005
        inputs = self.inputs()
5✔
1006
        outputs = self.outputs()
5✔
1007

1008
        def create_script_sig(txin: TxInput) -> bytes:
5✔
1009
            if include_sigs:
5✔
1010
                script_sig = self.input_script(txin, estimate_size=estimate_size)
5✔
1011
                return script_sig
5✔
1012
            return b""
5✔
1013
        txins = var_int(len(inputs)).hex() + ''.join(
5✔
1014
            txin.serialize_to_network(script_sig=create_script_sig(txin)).hex()
1015
            for txin in inputs)
1016
        txouts = var_int(len(outputs)).hex() + ''.join(o.serialize_to_network().hex() for o in outputs)
5✔
1017

1018
        use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True)
5✔
1019
        use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit()
5✔
1020
        use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use
5✔
1021
        if include_sigs and not force_legacy and use_segwit_ser:
5✔
1022
            marker = '00'
5✔
1023
            flag = '01'
5✔
1024
            witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size).hex() for x in inputs)
5✔
1025
            return nVersion + marker + flag + txins + txouts + witness + nLocktime
5✔
1026
        else:
1027
            return nVersion + txins + txouts + nLocktime
5✔
1028

1029
    def to_qr_data(self) -> Tuple[str, bool]:
5✔
1030
        """Returns (serialized_tx, is_complete). The tx is serialized to be put inside a QR code. No side-effects.
1031
        As space in a QR code is limited, some data might have to be omitted. This is signalled via is_complete=False.
1032
        """
1033
        is_complete = True
5✔
1034
        tx = copy.deepcopy(self)  # make copy as we mutate tx
5✔
1035
        if isinstance(tx, PartialTransaction):
5✔
1036
            # this makes QR codes a lot smaller (or just possible in the first place!)
1037
            # note: will not apply if all inputs are taproot, due to new sighash.
1038
            tx.convert_all_utxos_to_witness_utxos()
5✔
1039
            is_complete = False
5✔
1040
        tx_bytes = tx.serialize_as_bytes()
5✔
1041
        return base_encode(tx_bytes, base=43), is_complete
5✔
1042

1043
    def txid(self) -> Optional[str]:
5✔
1044
        if self._cached_txid is None:
5✔
1045
            self.deserialize()
5✔
1046
            all_segwit = all(txin.is_segwit() for txin in self.inputs())
5✔
1047
            if not all_segwit and not self.is_complete():
5✔
1048
                return None
5✔
1049
            try:
5✔
1050
                ser = self.serialize_to_network(force_legacy=True)
5✔
1051
            except UnknownTxinType:
×
1052
                # we might not know how to construct scriptSig for some scripts
1053
                return None
×
1054
            self._cached_txid = sha256d(bfh(ser))[::-1].hex()
5✔
1055
        return self._cached_txid
5✔
1056

1057
    def wtxid(self) -> Optional[str]:
5✔
1058
        self.deserialize()
5✔
1059
        if not self.is_complete():
5✔
1060
            return None
×
1061
        try:
5✔
1062
            ser = self.serialize_to_network()
5✔
1063
        except UnknownTxinType:
×
1064
            # we might not know how to construct scriptSig/witness for some scripts
1065
            return None
×
1066
        return sha256d(bfh(ser))[::-1].hex()
5✔
1067

1068
    def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None:
5✔
1069
        # populate prev_txs
1070
        for txin in self.inputs():
5✔
1071
            wallet.add_input_info(txin)
5✔
1072

1073
    async def add_info_from_network(
5✔
1074
        self,
1075
        network: Optional['Network'],
1076
        *,
1077
        ignore_network_issues: bool = True,
1078
        progress_cb: Callable[[TxinDataFetchProgress], None] = None,
1079
        timeout=None,
1080
    ) -> None:
1081
        """note: it is recommended to call add_info_from_wallet first, as this can save some network requests"""
1082
        if not self.is_missing_info_from_network():
5✔
1083
            return
5✔
1084
        if progress_cb is None:
5✔
1085
            progress_cb = lambda *args, **kwargs: None
5✔
1086
        num_tasks_done = 0
5✔
1087
        num_tasks_total = 0
5✔
1088
        has_errored = False
5✔
1089
        has_finished = False
5✔
1090
        async def add_info_to_txin(txin: TxInput):
5✔
1091
            nonlocal num_tasks_done, has_errored
1092
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1093
            success = await txin.add_info_from_network(
5✔
1094
                network=network,
1095
                ignore_network_issues=ignore_network_issues,
1096
                timeout=timeout,
1097
            )
1098
            if success:
5✔
1099
                num_tasks_done += 1
5✔
1100
            else:
1101
                has_errored = True
×
1102
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1103
        # schedule a network task for each txin
1104
        try:
5✔
1105
            async with OldTaskGroup() as group:
5✔
1106
                for txin in self.inputs():
5✔
1107
                    if txin.utxo is None:
5✔
1108
                        num_tasks_total += 1
5✔
1109
                        await group.spawn(add_info_to_txin(txin=txin))
5✔
1110
        except Exception as e:
×
1111
            has_errored = True
×
1112
            _logger.error(f"tx.add_info_from_network() got exc: {e!r}")
×
1113
        finally:
1114
            has_finished = True
5✔
1115
            progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished))
5✔
1116

1117
    def is_missing_info_from_network(self) -> bool:
5✔
1118
        return any(txin.utxo is None for txin in self.inputs())
5✔
1119

1120
    def add_info_from_wallet_and_network(
5✔
1121
        self, *, wallet: 'Abstract_Wallet', show_error: Callable[[str], None],
1122
    ) -> bool:
1123
        """Returns whether successful.
1124
        note: This is sort of a legacy hack... doing network requests in non-async code.
1125
              Relatedly, this should *not* be called from the network thread.
1126
        """
1127
        # note side-effect: tx is being mutated
1128
        from .network import NetworkException, Network
×
1129
        self.add_info_from_wallet(wallet)
×
1130
        try:
×
1131
            if self.is_missing_info_from_network():
×
1132
                Network.run_from_another_thread(
×
1133
                    self.add_info_from_network(wallet.network, ignore_network_issues=False))
1134
        except NetworkException as e:
×
1135
            show_error(repr(e))
×
1136
            return False
×
1137
        return True
×
1138

1139
    def is_rbf_enabled(self) -> bool:
5✔
1140
        """Whether the tx explicitly signals BIP-0125 replace-by-fee."""
1141
        return any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()])
5✔
1142

1143
    def estimated_size(self) -> int:
5✔
1144
        """Return an estimated virtual tx size in vbytes.
1145
        BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up.
1146
        This definition is only for humans, and has little meaning otherwise.
1147
        If we wanted sub-byte precision, fee calculation should use transaction
1148
        weights, but for simplicity we approximate that with (virtual_size)x4
1149
        """
1150
        weight = self.estimated_weight()
5✔
1151
        return self.virtual_size_from_weight(weight)
5✔
1152

1153
    @classmethod
5✔
1154
    def estimated_input_weight(cls, txin: TxInput, is_segwit_tx: bool) -> int:
5✔
1155
        '''Return an estimate of serialized input weight in weight units.'''
1156
        script_sig = cls.input_script(txin, estimate_size=True)
5✔
1157
        input_size = len(txin.serialize_to_network(script_sig=script_sig))
5✔
1158

1159
        if txin.is_segwit(guess_for_address=True):
5✔
1160
            witness_size = len(cls.serialize_witness(txin, estimate_size=True))
5✔
1161
        else:
1162
            witness_size = 1 if is_segwit_tx else 0
5✔
1163

1164
        return 4 * input_size + witness_size
5✔
1165

1166
    @classmethod
5✔
1167
    def estimated_output_size_for_address(cls, address: str) -> int:
5✔
1168
        """Return an estimate of serialized output size in bytes."""
1169
        script = bitcoin.address_to_script(address)
5✔
1170
        return cls.estimated_output_size_for_script(script)
5✔
1171

1172
    @classmethod
5✔
1173
    def estimated_output_size_for_script(cls, script: bytes) -> int:
5✔
1174
        """Return an estimate of serialized output size in bytes."""
1175
        # 8 byte value + varint script len + script
1176
        script_len = len(script)
5✔
1177
        var_int_len = len(var_int(script_len))
5✔
1178
        return 8 + var_int_len + script_len
5✔
1179

1180
    @classmethod
5✔
1181
    def virtual_size_from_weight(cls, weight: int) -> int:
5✔
1182
        return weight // 4 + (weight % 4 > 0)
5✔
1183

1184
    @classmethod
5✔
1185
    def satperbyte_from_satperkw(cls, feerate_kw):
5✔
1186
        """Converts feerate from sat/kw to sat/vbyte."""
1187
        return feerate_kw * 4 / 1000
×
1188

1189
    def estimated_total_size(self):
5✔
1190
        """Return an estimated total transaction size in bytes."""
1191
        if not self.is_complete() or self._cached_network_ser is None:
5✔
1192
            return len(self.serialize_to_network(estimate_size=True)) // 2
5✔
1193
        else:
1194
            return len(self._cached_network_ser) // 2  # ASCII hex string
5✔
1195

1196
    def estimated_witness_size(self):
5✔
1197
        """Return an estimate of witness size in bytes."""
1198
        estimate = not self.is_complete()
5✔
1199
        if not self.is_segwit(guess_for_address=estimate):
5✔
1200
            return 0
5✔
1201
        inputs = self.inputs()
5✔
1202
        witness = b"".join(self.serialize_witness(x, estimate_size=estimate) for x in inputs)
5✔
1203
        witness_size = len(witness) + 2  # include marker and flag
5✔
1204
        return witness_size
5✔
1205

1206
    def estimated_base_size(self):
5✔
1207
        """Return an estimated base transaction size in bytes."""
1208
        return self.estimated_total_size() - self.estimated_witness_size()
5✔
1209

1210
    def estimated_weight(self):
5✔
1211
        """Return an estimate of transaction weight."""
1212
        total_tx_size = self.estimated_total_size()
5✔
1213
        base_tx_size = self.estimated_base_size()
5✔
1214
        return 3 * base_tx_size + total_tx_size
5✔
1215

1216
    def is_complete(self) -> bool:
5✔
1217
        return True
5✔
1218

1219
    def get_output_idxs_from_scriptpubkey(self, script: bytes) -> Set[int]:
5✔
1220
        """Returns the set indices of outputs with given script."""
1221
        assert isinstance(script, bytes)
5✔
1222
        # build cache if there isn't one yet
1223
        # note: can become stale and return incorrect data
1224
        #       if the tx is modified later; that's out of scope.
1225
        if not hasattr(self, '_script_to_output_idx'):
5✔
1226
            d = defaultdict(set)
5✔
1227
            for output_idx, o in enumerate(self.outputs()):
5✔
1228
                o_script = o.scriptpubkey
5✔
1229
                d[o_script].add(output_idx)
5✔
1230
            self._script_to_output_idx = d
5✔
1231
        return set(self._script_to_output_idx[script])  # copy
5✔
1232

1233
    def get_output_idxs_from_address(self, addr: str) -> Set[int]:
5✔
1234
        script = bitcoin.address_to_script(addr)
5✔
1235
        return self.get_output_idxs_from_scriptpubkey(script)
5✔
1236

1237
    def replace_output_address(self, old_address: str, new_address: str) -> None:
5✔
1238
        idx = list(self.get_output_idxs_from_address(old_address))
×
1239
        assert len(idx) == 1
×
1240
        amount = self._outputs[idx[0]].value
×
1241
        funding_output = PartialTxOutput.from_address_and_value(new_address, amount)
×
1242
        old_output = PartialTxOutput.from_address_and_value(old_address, amount)
×
1243
        self._outputs.remove(old_output)
×
1244
        self.add_outputs([funding_output])
×
1245
        delattr(self, '_script_to_output_idx')
×
1246

1247
    def get_change_outputs(self):
5✔
1248
        return  [o for o in self._outputs if o.is_change]
5✔
1249

1250
    def has_change(self):
5✔
1251
        return len(self.get_change_outputs()) > 0
5✔
1252

1253
    def get_dummy_output(self, dummy_addr: str) -> Optional['PartialTxOutput']:
5✔
1254
        idxs = self.get_output_idxs_from_address(dummy_addr)
×
1255
        if not idxs:
×
1256
            return
×
1257
        assert len(idxs) == 1
×
1258
        for i in idxs:
×
1259
            return self.outputs()[i]
×
1260

1261
    def output_value_for_address(self, addr):
5✔
1262
        # assumes exactly one output has that address
1263
        for o in self.outputs():
5✔
1264
            if o.address == addr:
5✔
1265
                return o.value
5✔
1266
        else:
1267
            raise Exception('output not found', addr)
×
1268

1269
    def input_value(self) -> int:
5✔
1270
        input_values = [txin.value_sats() for txin in self.inputs()]
5✔
1271
        if any([val is None for val in input_values]):
5✔
1272
            raise MissingTxInputAmount()
×
1273
        return sum(input_values)
5✔
1274

1275
    def output_value(self) -> int:
5✔
1276
        return sum(o.value for o in self.outputs())
5✔
1277

1278
    def get_fee(self) -> Optional[int]:
5✔
1279
        try:
5✔
1280
            return self.input_value() - self.output_value()
5✔
1281
        except MissingTxInputAmount:
×
1282
            return None
×
1283

1284
    def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]:
5✔
1285
        # build cache if there isn't one yet
1286
        # note: can become stale and return incorrect data
1287
        #       if the tx is modified later; that's out of scope.
1288
        if not hasattr(self, '_prevout_to_input_idx'):
×
1289
            d = {}  # type: Dict[TxOutpoint, int]
×
1290
            for i, txin in enumerate(self.inputs()):
×
1291
                d[txin.prevout] = i
×
1292
            self._prevout_to_input_idx = d
×
1293
        idx = self._prevout_to_input_idx.get(prevout)
×
1294
        if idx is not None:
×
1295
            assert self.inputs()[idx].prevout == prevout
×
1296
        return idx
×
1297

1298

1299
def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
5✔
1300
    """Sanitizes tx-describing input (hex/base43/base64) into
1301
    raw tx hex string."""
1302
    if not raw:
5✔
1303
        raise ValueError("empty string")
×
1304
    raw_unstripped = raw
5✔
1305
    raw = raw.strip()
5✔
1306
    # try hex
1307
    try:
5✔
1308
        return binascii.unhexlify(raw).hex()
5✔
1309
    except Exception:
5✔
1310
        pass
5✔
1311
    # try base43
1312
    try:
5✔
1313
        return base_decode(raw, base=43).hex()
5✔
1314
    except Exception:
5✔
1315
        pass
5✔
1316
    # try base64
1317
    if raw[0:6] in ('cHNidP', b'cHNidP'):  # base64 psbt
5✔
1318
        try:
5✔
1319
            return base64.b64decode(raw).hex()
5✔
1320
        except Exception:
×
1321
            pass
×
1322
    # raw bytes (do not strip whitespaces in this case)
1323
    if isinstance(raw_unstripped, bytes):
5✔
1324
        return raw_unstripped.hex()
5✔
1325
    raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...")
×
1326

1327

1328
def tx_from_any(raw: Union[str, bytes], *,
5✔
1329
                deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']:
1330
    if isinstance(raw, bytearray):
5✔
1331
        raw = bytes(raw)
×
1332
    raw = convert_raw_tx_to_hex(raw)
5✔
1333
    try:
5✔
1334
        return PartialTransaction.from_raw_psbt(raw)
5✔
1335
    except BadHeaderMagic:
5✔
1336
        if raw[:10] == b'EPTF\xff'.hex():
5✔
1337
            raise SerializationError("Partial transactions generated with old Electrum versions "
×
1338
                                     "(< 4.0) are no longer supported. Please upgrade Electrum on "
1339
                                     "the other machine where this transaction was created.")
1340
    try:
5✔
1341
        tx = Transaction(raw)
5✔
1342
        if deserialize:
5✔
1343
            tx.deserialize()
5✔
1344
        return tx
5✔
1345
    except Exception as e:
5✔
1346
        raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. "
5✔
1347
                                 f"raw: {raw[:30]}...") from e
1348

1349

1350
class PSBTGlobalType(IntEnum):
5✔
1351
    UNSIGNED_TX = 0
5✔
1352
    XPUB = 1
5✔
1353
    VERSION = 0xFB
5✔
1354

1355

1356
class PSBTInputType(IntEnum):
5✔
1357
    NON_WITNESS_UTXO = 0
5✔
1358
    WITNESS_UTXO = 1
5✔
1359
    PARTIAL_SIG = 2
5✔
1360
    SIGHASH_TYPE = 3
5✔
1361
    REDEEM_SCRIPT = 4
5✔
1362
    WITNESS_SCRIPT = 5
5✔
1363
    BIP32_DERIVATION = 6
5✔
1364
    FINAL_SCRIPTSIG = 7
5✔
1365
    FINAL_SCRIPTWITNESS = 8
5✔
1366
    TAP_KEY_SIG = 0x13
5✔
1367
    TAP_MERKLE_ROOT = 0x18
5✔
1368
    SLIP19_OWNERSHIP_PROOF = 0x19
5✔
1369

1370

1371
class PSBTOutputType(IntEnum):
5✔
1372
    REDEEM_SCRIPT = 0
5✔
1373
    WITNESS_SCRIPT = 1
5✔
1374
    BIP32_DERIVATION = 2
5✔
1375

1376

1377
# Serialization/deserialization tools
1378
def deser_compact_size(f) -> Optional[int]:
5✔
1379
    # note: ~inverse of bitcoin.var_int
1380
    try:
5✔
1381
        nit = f.read(1)[0]
5✔
1382
    except IndexError:
5✔
1383
        return None     # end of file
5✔
1384

1385
    if nit == 253:
5✔
1386
        nit = struct.unpack("<H", f.read(2))[0]
5✔
1387
    elif nit == 254:
5✔
1388
        nit = struct.unpack("<I", f.read(4))[0]
×
1389
    elif nit == 255:
5✔
1390
        nit = struct.unpack("<Q", f.read(8))[0]
×
1391
    return nit
5✔
1392

1393

1394
class PSBTSection:
5✔
1395

1396
    def _populate_psbt_fields_from_fd(self, fd=None):
5✔
1397
        if not fd: return
5✔
1398

1399
        while True:
5✔
1400
            try:
5✔
1401
                key_type, key, val = self.get_next_kv_from_fd(fd)
5✔
1402
            except StopIteration:
5✔
1403
                break
5✔
1404
            self.parse_psbt_section_kv(key_type, key, val)
5✔
1405

1406
    @classmethod
5✔
1407
    def get_next_kv_from_fd(cls, fd) -> Tuple[int, bytes, bytes]:
5✔
1408
        key_size = deser_compact_size(fd)
5✔
1409
        if key_size == 0:
5✔
1410
            raise StopIteration()
5✔
1411
        if key_size is None:
5✔
1412
            raise UnexpectedEndOfStream()
5✔
1413

1414
        full_key = fd.read(key_size)
5✔
1415
        key_type, key = cls.get_keytype_and_key_from_fullkey(full_key)
5✔
1416

1417
        val_size = deser_compact_size(fd)
5✔
1418
        if val_size is None: raise UnexpectedEndOfStream()
5✔
1419
        val = fd.read(val_size)
5✔
1420

1421
        return key_type, key, val
5✔
1422

1423
    @classmethod
5✔
1424
    def create_psbt_writer(cls, fd):
5✔
1425
        def wr(key_type: int, val: bytes, key: bytes = b''):
5✔
1426
            full_key = cls.get_fullkey_from_keytype_and_key(key_type, key)
5✔
1427
            fd.write(var_int(len(full_key)))  # key_size
5✔
1428
            fd.write(full_key)  # key
5✔
1429
            fd.write(var_int(len(val)))  # val_size
5✔
1430
            fd.write(val)  # val
5✔
1431
        return wr
5✔
1432

1433
    @classmethod
5✔
1434
    def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]:
5✔
1435
        with io.BytesIO(full_key) as key_stream:
5✔
1436
            key_type = deser_compact_size(key_stream)
5✔
1437
            if key_type is None: raise UnexpectedEndOfStream()
5✔
1438
            key = key_stream.read()
5✔
1439
        return key_type, key
5✔
1440

1441
    @classmethod
5✔
1442
    def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes:
5✔
1443
        key_type_bytes = var_int(key_type)
5✔
1444
        return key_type_bytes + key
5✔
1445

1446
    def _serialize_psbt_section(self, fd):
5✔
1447
        wr = self.create_psbt_writer(fd)
5✔
1448
        self.serialize_psbt_section_kvs(wr)
5✔
1449
        fd.write(b'\x00')  # section-separator
5✔
1450

1451
    def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None:
5✔
1452
        raise NotImplementedError()  # implemented by subclasses
×
1453

1454
    def serialize_psbt_section_kvs(self, wr) -> None:
5✔
1455
        raise NotImplementedError()  # implemented by subclasses
×
1456

1457

1458
class PartialTxInput(TxInput, PSBTSection):
5✔
1459
    def __init__(self, *args, **kwargs):
5✔
1460
        TxInput.__init__(self, *args, **kwargs)
5✔
1461
        self._witness_utxo = None  # type: Optional[TxOutput]
5✔
1462
        self.sigs_ecdsa = {}  # type: Dict[bytes, bytes]  # pubkey -> sig
5✔
1463
        self.tap_key_sig = None  # type: Optional[bytes]  # sig for taproot key-path-spending
5✔
1464
        self.sighash = None  # type: Optional[int]
5✔
1465
        self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
5✔
1466
        self.redeem_script = None  # type: Optional[bytes]
5✔
1467
        self.witness_script = None  # type: Optional[bytes]
5✔
1468
        self.tap_merkle_root = None  # type: Optional[bytes]
5✔
1469
        self.slip_19_ownership_proof = None  # type: Optional[bytes]
5✔
1470
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1471

1472
        self._script_descriptor = None  # type: Optional[Descriptor]
5✔
1473
        self.is_mine = False  # type: bool  # whether the wallet considers the input to be ismine
5✔
1474
        self._trusted_value_sats = None  # type: Optional[int]
5✔
1475
        self._trusted_address = None  # type: Optional[str]
5✔
1476
        self._is_p2sh_segwit = None  # type: Optional[bool]  # None means unknown
5✔
1477
        self._is_native_segwit = None  # type: Optional[bool]  # None means unknown
5✔
1478
        self._is_taproot = None  # type: Optional[bool]  # None means unknown
5✔
1479
        self.witness_sizehint = None  # type: Optional[int]  # byte size of serialized complete witness, for tx size est
5✔
1480

1481
    @property
5✔
1482
    def witness_utxo(self):
5✔
1483
        return self._witness_utxo
5✔
1484

1485
    @witness_utxo.setter
5✔
1486
    def witness_utxo(self, value: Optional[TxOutput]):
5✔
1487
        self.validate_data(witness_utxo=value)
5✔
1488
        self._witness_utxo = value
5✔
1489

1490
    @property
5✔
1491
    def pubkeys(self) -> Set[bytes]:
5✔
1492
        if desc := self.script_descriptor:
5✔
1493
            return desc.get_all_pubkeys()
5✔
1494
        return set()
5✔
1495

1496
    @property
5✔
1497
    def script_descriptor(self):
5✔
1498
        return self._script_descriptor
5✔
1499

1500
    @script_descriptor.setter
5✔
1501
    def script_descriptor(self, desc: Optional[Descriptor]):
5✔
1502
        self._script_descriptor = desc
5✔
1503
        if desc:
5✔
1504
            if self.redeem_script is None:
5✔
1505
                self.redeem_script = desc.expand().redeem_script
5✔
1506
            if self.witness_script is None:
5✔
1507
                self.witness_script = desc.expand().witness_script
5✔
1508

1509
    def to_json(self):
5✔
1510
        d = super().to_json()
5✔
1511
        d.update({
5✔
1512
            'height': self.block_height,
1513
            'value_sats': self.value_sats(),
1514
            'address': self.address,
1515
            'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
1516
            'utxo': str(self.utxo) if self.utxo else None,
1517
            'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None,
1518
            'sighash': self.sighash,
1519
            'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1520
            'witness_script': self.witness_script.hex() if self.witness_script else None,
1521
            'sigs_ecdsa': {pubkey.hex(): sig.hex() for pubkey, sig in self.sigs_ecdsa.items()},
1522
            'tap_key_sig': self.tap_key_sig.hex() if self.tap_key_sig else None,
1523
            'tap_merkle_root': self.tap_merkle_root.hex() if self.tap_merkle_root else None,
1524
            'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1525
                            for pubkey, (xfp, path) in self.bip32_paths.items()},
1526
            'slip_19_ownership_proof': self.slip_19_ownership_proof.hex() if self.slip_19_ownership_proof else None,
1527
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1528
        })
1529
        return d
5✔
1530

1531
    @classmethod
5✔
1532
    def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput':
5✔
1533
        # FIXME: if strip_witness is True, res.is_segwit() will return False,
1534
        # and res.estimated_size() will return an incorrect value. These methods
1535
        # will return the correct values after we call add_input_info(). (see dscancel and bump_fee)
1536
        # This is very fragile: the value returned by estimate_size() depends on the calling order.
1537
        res = PartialTxInput(prevout=txin.prevout,
5✔
1538
                             script_sig=None if strip_witness else txin.script_sig,
1539
                             nsequence=txin.nsequence,
1540
                             witness=None if strip_witness else txin.witness,
1541
                             is_coinbase_output=txin.is_coinbase_output())
1542
        res.utxo = txin.utxo
5✔
1543
        return res
5✔
1544

1545
    def validate_data(
5✔
1546
        self,
1547
        *,
1548
        for_signing=False,
1549
        # allow passing provisional fields for 'self', before setting them:
1550
        utxo: Optional[Transaction] = None,
1551
        witness_utxo: Optional[TxOutput] = None,
1552
    ) -> None:
1553
        utxo = utxo or self.utxo
5✔
1554
        witness_utxo = witness_utxo or self.witness_utxo
5✔
1555
        if utxo:
5✔
1556
            if self.prevout.txid.hex() != utxo.txid():
5✔
1557
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
1558
                                                  f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
1559
            if witness_utxo:
5✔
1560
                if utxo.outputs()[self.prevout.out_idx] != witness_utxo:
5✔
1561
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1562
                                                      f"If both non-witness UTXO and witness UTXO are provided, they must be consistent")
1563
        # The following test is disabled, so we are willing to sign non-segwit inputs
1564
        # without verifying the input amount. This means, given a maliciously modified PSBT,
1565
        # for non-segwit inputs, we might end up burning coins as miner fees.
1566
        if for_signing and False:
5✔
1567
            if not self.is_segwit() and witness_utxo:
1568
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1569
                                                  f"If a witness UTXO is provided, no non-witness signature may be created")
1570
        if self.redeem_script and self.address:
5✔
1571
            addr = hash160_to_p2sh(hash_160(self.redeem_script))
5✔
1572
            if self.address != addr:
5✔
1573
                raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1574
                                                  f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript")
1575
        if self.witness_script:
5✔
1576
            if self.redeem_script:
5✔
1577
                if self.redeem_script != bitcoin.p2wsh_nested_script(self.witness_script):
5✔
1578
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
5✔
1579
                                                      f"If a witnessScript is provided, the redeemScript must be for that witnessScript")
1580
            elif self.address:
5✔
1581
                if self.address != bitcoin.script_to_p2wsh(self.witness_script):
5✔
1582
                    raise PSBTInputConsistencyFailure(f"PSBT input validation: "
×
1583
                                                      f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript")
1584

1585
    def parse_psbt_section_kv(self, kt, key, val):
5✔
1586
        try:
5✔
1587
            kt = PSBTInputType(kt)
5✔
1588
        except ValueError:
5✔
1589
            pass  # unknown type
5✔
1590
        if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
1591
        if kt == PSBTInputType.NON_WITNESS_UTXO:
5✔
1592
            if self.utxo is not None:
5✔
1593
                raise SerializationError(f"duplicate key: {repr(kt)}")
5✔
1594
            self.utxo = Transaction(val)
5✔
1595
            self.utxo.deserialize()
5✔
1596
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1597
        elif kt == PSBTInputType.WITNESS_UTXO:
5✔
1598
            if self.witness_utxo is not None:
5✔
1599
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1600
            self.witness_utxo = TxOutput.from_network_bytes(val)
5✔
1601
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1602
        elif kt == PSBTInputType.PARTIAL_SIG:
5✔
1603
            if key in self.sigs_ecdsa:
5✔
1604
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1605
            if len(key) not in (33, 65):
5✔
1606
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1607
            self.sigs_ecdsa[key] = val
5✔
1608
        elif kt == PSBTInputType.TAP_KEY_SIG:
5✔
1609
            if self.tap_key_sig is not None:
×
1610
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1611
            if len(val) not in (64, 65):
×
1612
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
×
1613
            self.tap_key_sig = val
×
1614
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1615
        elif kt == PSBTInputType.TAP_MERKLE_ROOT:
5✔
1616
            if self.tap_merkle_root is not None:
×
1617
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1618
            if len(val) != 32:
×
1619
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
×
1620
            self.tap_merkle_root = val
×
1621
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1622
        elif kt == PSBTInputType.SIGHASH_TYPE:
5✔
1623
            if self.sighash is not None:
5✔
1624
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1625
            if len(val) != 4:
5✔
1626
                raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
5✔
1627
            self.sighash = struct.unpack("<I", val)[0]
5✔
1628
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1629
        elif kt == PSBTInputType.BIP32_DERIVATION:
5✔
1630
            if key in self.bip32_paths:
5✔
1631
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1632
            if len(key) not in (33, 65):
5✔
1633
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1634
            self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
1635
        elif kt == PSBTInputType.REDEEM_SCRIPT:
5✔
1636
            if self.redeem_script is not None:
5✔
1637
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1638
            self.redeem_script = val
5✔
1639
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1640
        elif kt == PSBTInputType.WITNESS_SCRIPT:
5✔
1641
            if self.witness_script is not None:
5✔
1642
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1643
            self.witness_script = val
5✔
1644
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1645
        elif kt == PSBTInputType.FINAL_SCRIPTSIG:
5✔
1646
            if self.script_sig is not None:
5✔
1647
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1648
            self.script_sig = val
5✔
1649
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1650
        elif kt == PSBTInputType.FINAL_SCRIPTWITNESS:
5✔
1651
            if self.witness is not None:
5✔
1652
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1653
            self.witness = val
5✔
1654
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1655
        elif kt == PSBTInputType.SLIP19_OWNERSHIP_PROOF:
5✔
1656
            if self.slip_19_ownership_proof is not None:
×
1657
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1658
            self.slip_19_ownership_proof = val
×
1659
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
1660
        else:
1661
            full_key = self.get_fullkey_from_keytype_and_key(kt, key)
5✔
1662
            if full_key in self._unknown:
5✔
1663
                raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}')
×
1664
            self._unknown[full_key] = val
5✔
1665

1666
    def serialize_psbt_section_kvs(self, wr):
5✔
1667
        if self.witness_utxo:
5✔
1668
            wr(PSBTInputType.WITNESS_UTXO, self.witness_utxo.serialize_to_network())
5✔
1669
        if self.utxo:
5✔
1670
            wr(PSBTInputType.NON_WITNESS_UTXO, bfh(self.utxo.serialize_to_network(include_sigs=True)))
5✔
1671
        for pk, val in sorted(self.sigs_ecdsa.items()):
5✔
1672
            wr(PSBTInputType.PARTIAL_SIG, val, pk)
5✔
1673
        if self.tap_key_sig is not None:
5✔
1674
            wr(PSBTInputType.TAP_KEY_SIG, self.tap_key_sig)
×
1675
        if self.tap_merkle_root is not None:
5✔
1676
            wr(PSBTInputType.TAP_MERKLE_ROOT, self.tap_merkle_root)
×
1677
        if self.sighash is not None:
5✔
1678
            wr(PSBTInputType.SIGHASH_TYPE, struct.pack('<I', self.sighash))
5✔
1679
        if self.redeem_script is not None:
5✔
1680
            wr(PSBTInputType.REDEEM_SCRIPT, self.redeem_script)
5✔
1681
        if self.witness_script is not None:
5✔
1682
            wr(PSBTInputType.WITNESS_SCRIPT, self.witness_script)
5✔
1683
        for k in sorted(self.bip32_paths):
5✔
1684
            packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
5✔
1685
            wr(PSBTInputType.BIP32_DERIVATION, packed_path, k)
5✔
1686
        if self.script_sig is not None:
5✔
1687
            wr(PSBTInputType.FINAL_SCRIPTSIG, self.script_sig)
5✔
1688
        if self.witness is not None:
5✔
1689
            wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness)
5✔
1690
        if self.slip_19_ownership_proof:
5✔
1691
            wr(PSBTInputType.SLIP19_OWNERSHIP_PROOF, self.slip_19_ownership_proof)
×
1692
        for full_key, val in sorted(self._unknown.items()):
5✔
1693
            key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
5✔
1694
            wr(key_type, val, key=key)
5✔
1695

1696
    def value_sats(self) -> Optional[int]:
5✔
1697
        if (val := super().value_sats()) is not None:
5✔
1698
            return val
5✔
1699
        if self._trusted_value_sats is not None:
5✔
1700
            return self._trusted_value_sats
5✔
1701
        if self.witness_utxo:
5✔
1702
            return self.witness_utxo.value
5✔
1703
        return None
×
1704

1705
    @property
5✔
1706
    def address(self) -> Optional[str]:
5✔
1707
        if (addr := super().address) is not None:
5✔
1708
            return addr
5✔
1709
        if self._trusted_address is not None:
5✔
1710
            return self._trusted_address
5✔
1711
        if self.witness_utxo:
5✔
1712
            return self.witness_utxo.address
5✔
1713
        return None
5✔
1714

1715
    @property
5✔
1716
    def scriptpubkey(self) -> Optional[bytes]:
5✔
1717
        if (spk := super().scriptpubkey) is not None:
5✔
1718
            return spk
5✔
1719
        if self._trusted_address is not None:
5✔
1720
            return bitcoin.address_to_script(self._trusted_address)
×
1721
        if self.witness_utxo:
5✔
1722
            return self.witness_utxo.scriptpubkey
5✔
1723
        return None
×
1724

1725
    def is_complete(self) -> bool:
5✔
1726
        if self.script_sig is not None and self.witness is not None:
5✔
1727
            return True
5✔
1728
        if self.is_coinbase_input():
5✔
1729
            return True
×
1730
        if self.script_sig is not None and not self.is_segwit():
5✔
1731
            return True
5✔
1732
        if desc := self.script_descriptor:
5✔
1733
            try:
5✔
1734
                desc.satisfy(allow_dummy=False, sigdata=self.sigs_ecdsa)
5✔
1735
            except MissingSolutionPiece:
5✔
1736
                pass
5✔
1737
            else:
1738
                return True
5✔
1739
        return False
5✔
1740

1741
    def get_satisfaction_progress(self) -> Tuple[int, int]:
5✔
1742
        if desc := self.script_descriptor:
5✔
1743
            return desc.get_satisfaction_progress(sigdata=self.sigs_ecdsa)
5✔
1744
        return 0, 0
×
1745

1746
    def finalize(self) -> None:
5✔
1747
        def clear_fields_when_finalized():
5✔
1748
            # BIP-174: "All other data except the UTXO and unknown fields in the
1749
            #           input key-value map should be cleared from the PSBT"
1750
            self.sigs_ecdsa = {}
5✔
1751
            self.tap_key_sig = None
5✔
1752
            self.tap_merkle_root = None
5✔
1753
            self.sighash = None
5✔
1754
            self.bip32_paths = {}
5✔
1755
            self.redeem_script = None
5✔
1756
            # FIXME: side effect interfers with make_witness
1757
            # self.witness_script = None
1758

1759
        if self.script_sig is not None and self.witness is not None:
5✔
1760
            clear_fields_when_finalized()
5✔
1761
            return  # already finalized
5✔
1762
        if self.is_complete():
5✔
1763
            self.script_sig = Transaction.input_script(self)
5✔
1764
            self.witness = Transaction.serialize_witness(self)
5✔
1765
            clear_fields_when_finalized()
5✔
1766

1767
    def combine_with_other_txin(self, other_txin: 'TxInput') -> None:
5✔
1768
        assert self.prevout == other_txin.prevout
5✔
1769
        if other_txin.script_sig is not None:
5✔
1770
            self.script_sig = other_txin.script_sig
5✔
1771
        if other_txin.witness is not None:
5✔
1772
            self.witness = other_txin.witness
5✔
1773
        if isinstance(other_txin, PartialTxInput):
5✔
1774
            if other_txin.witness_utxo:
5✔
1775
                self.witness_utxo = other_txin.witness_utxo
5✔
1776
            if other_txin.utxo:
5✔
1777
                self.utxo = other_txin.utxo
5✔
1778
            self.sigs_ecdsa.update(other_txin.sigs_ecdsa)
5✔
1779
            if other_txin.sighash is not None:
5✔
1780
                self.sighash = other_txin.sighash
5✔
1781
            if other_txin.tap_key_sig is not None:
5✔
1782
                self.tap_key_sig = other_txin.tap_key_sig
×
1783
            if other_txin.tap_merkle_root is not None:
5✔
1784
                self.tap_merkle_root = other_txin.tap_merkle_root
×
1785
            self.bip32_paths.update(other_txin.bip32_paths)
5✔
1786
            if other_txin.redeem_script is not None:
5✔
1787
                self.redeem_script = other_txin.redeem_script
5✔
1788
            if other_txin.witness_script is not None:
5✔
1789
                self.witness_script = other_txin.witness_script
5✔
1790
            self._unknown.update(other_txin._unknown)
5✔
1791
        self.validate_data()
5✔
1792
        # try to finalize now
1793
        self.finalize()
5✔
1794

1795
    def convert_utxo_to_witness_utxo(self) -> None:
5✔
1796
        if self.utxo:
5✔
1797
            self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx]
5✔
1798
            self._utxo = None  # type: Optional[Transaction]
5✔
1799

1800
    def is_native_segwit(self) -> Optional[bool]:
5✔
1801
        """Whether this input is native segwit (any witness version). None means inconclusive."""
1802
        if self._is_native_segwit is None:
5✔
1803
            if self.address:
5✔
1804
                self._is_native_segwit = bitcoin.is_segwit_address(self.address)
5✔
1805
        return self._is_native_segwit
5✔
1806

1807
    def is_p2sh_segwit(self) -> Optional[bool]:
5✔
1808
        """Whether this input is p2sh-embedded-segwit. None means inconclusive."""
1809
        if self._is_p2sh_segwit is None:
5✔
1810
            def calc_if_p2sh_segwit_now():
5✔
1811
                if not (self.address and self.redeem_script):
5✔
1812
                    return None
5✔
1813
                if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)):
5✔
1814
                    # not p2sh address
1815
                    return False
×
1816
                try:
5✔
1817
                    decoded = [x for x in script_GetOp(self.redeem_script)]
5✔
1818
                except MalformedBitcoinScript:
×
1819
                    decoded = None
×
1820
                # witness version 0
1821
                if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
1822
                    return True
5✔
1823
                # witness version 1-16
1824
                future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
5✔
1825
                for witver, opcode in enumerate(future_witness_versions, start=1):
5✔
1826
                    match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
5✔
1827
                    if match_script_against_template(decoded, match):
5✔
1828
                        return True
×
1829
                return False
5✔
1830

1831
            self._is_p2sh_segwit = calc_if_p2sh_segwit_now()
5✔
1832
        return self._is_p2sh_segwit
5✔
1833

1834
    def is_segwit(self, *, guess_for_address=False) -> bool:
5✔
1835
        """Whether this input is segwit (any witness version)."""
1836
        if super().is_segwit():
5✔
1837
            return True
5✔
1838
        if self.is_native_segwit() or self.is_p2sh_segwit():
5✔
1839
            return True
5✔
1840
        if self.is_native_segwit() is False and self.is_p2sh_segwit() is False:
5✔
1841
            return False
5✔
1842
        if self.witness_script:
5✔
1843
            return True
5✔
1844
        if desc := self.script_descriptor:
5✔
1845
            return desc.is_segwit()
5✔
1846
        if guess_for_address:
5✔
1847
            dummy_desc = create_dummy_descriptor_from_address(self.address)
5✔
1848
            return dummy_desc.is_segwit()
5✔
1849
        return False  # can be false-negative
5✔
1850

1851
    def is_taproot(self) -> bool:
5✔
1852
        if self._is_taproot is None:
5✔
1853
            if self.address:
5✔
1854
                self._is_taproot = bitcoin.is_taproot_address(self.address)
5✔
1855
        if desc := self.script_descriptor:
5✔
1856
            return desc.is_taproot()
5✔
1857
        return self._is_taproot
5✔
1858

1859
    def already_has_some_signatures(self) -> bool:
5✔
1860
        """Returns whether progress has been made towards completing this input."""
1861
        return (self.sigs_ecdsa
5✔
1862
                or self.tap_key_sig is not None
1863
                or self.script_sig is not None
1864
                or self.witness is not None)
1865

1866

1867
class PartialTxOutput(TxOutput, PSBTSection):
5✔
1868
    def __init__(self, *args, **kwargs):
5✔
1869
        TxOutput.__init__(self, *args, **kwargs)
5✔
1870
        self.redeem_script = None  # type: Optional[bytes]
5✔
1871
        self.witness_script = None  # type: Optional[bytes]
5✔
1872
        self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
5✔
1873
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1874

1875
        self._script_descriptor = None  # type: Optional[Descriptor]
5✔
1876
        self.is_mine = False  # type: bool  # whether the wallet considers the output to be ismine
5✔
1877
        self.is_change = False  # type: bool  # whether the wallet considers the output to be change
5✔
1878
        self.is_utxo_reserve = False  # type: bool  # whether this is a change output added to satisfy anchor channel requirements
5✔
1879

1880
    @property
5✔
1881
    def pubkeys(self) -> Set[bytes]:
5✔
1882
        if desc := self.script_descriptor:
×
1883
            return desc.get_all_pubkeys()
×
1884
        return set()
×
1885

1886
    @property
5✔
1887
    def script_descriptor(self):
5✔
1888
        return self._script_descriptor
5✔
1889

1890
    @script_descriptor.setter
5✔
1891
    def script_descriptor(self, desc: Optional[Descriptor]):
5✔
1892
        self._script_descriptor = desc
5✔
1893
        if desc:
5✔
1894
            if self.redeem_script is None:
5✔
1895
                self.redeem_script = desc.expand().redeem_script
5✔
1896
            if self.witness_script is None:
5✔
1897
                self.witness_script = desc.expand().witness_script
5✔
1898

1899
    def to_json(self):
5✔
1900
        d = super().to_json()
5✔
1901
        d.update({
5✔
1902
            'desc': self.script_descriptor.to_string() if self.script_descriptor else None,
1903
            'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1904
            'witness_script': self.witness_script.hex() if self.witness_script else None,
1905
            'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1906
                            for pubkey, (xfp, path) in self.bip32_paths.items()},
1907
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1908
        })
1909
        return d
5✔
1910

1911
    @classmethod
5✔
1912
    def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput':
5✔
1913
        res = PartialTxOutput(scriptpubkey=txout.scriptpubkey,
5✔
1914
                              value=txout.value)
1915
        return res
5✔
1916

1917
    def parse_psbt_section_kv(self, kt, key, val):
5✔
1918
        try:
5✔
1919
            kt = PSBTOutputType(kt)
5✔
1920
        except ValueError:
5✔
1921
            pass  # unknown type
5✔
1922
        if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
1923
        if kt == PSBTOutputType.REDEEM_SCRIPT:
5✔
1924
            if self.redeem_script is not None:
5✔
1925
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1926
            self.redeem_script = val
5✔
1927
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1928
        elif kt == PSBTOutputType.WITNESS_SCRIPT:
5✔
1929
            if self.witness_script is not None:
5✔
1930
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1931
            self.witness_script = val
5✔
1932
            if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
1933
        elif kt == PSBTOutputType.BIP32_DERIVATION:
5✔
1934
            if key in self.bip32_paths:
5✔
1935
                raise SerializationError(f"duplicate key: {repr(kt)}")
×
1936
            if len(key) not in (33, 65):
5✔
1937
                raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
5✔
1938
            self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
1939
        else:
1940
            full_key = self.get_fullkey_from_keytype_and_key(kt, key)
5✔
1941
            if full_key in self._unknown:
5✔
1942
                raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}')
×
1943
            self._unknown[full_key] = val
5✔
1944

1945
    def serialize_psbt_section_kvs(self, wr):
5✔
1946
        if self.redeem_script is not None:
5✔
1947
            wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script)
5✔
1948
        if self.witness_script is not None:
5✔
1949
            wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script)
5✔
1950
        for k in sorted(self.bip32_paths):
5✔
1951
            packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
5✔
1952
            wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k)
5✔
1953
        for full_key, val in sorted(self._unknown.items()):
5✔
1954
            key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
5✔
1955
            wr(key_type, val, key=key)
5✔
1956

1957
    def combine_with_other_txout(self, other_txout: 'TxOutput') -> None:
5✔
1958
        assert self.scriptpubkey == other_txout.scriptpubkey
5✔
1959
        if not isinstance(other_txout, PartialTxOutput):
5✔
1960
            return
×
1961
        if other_txout.redeem_script is not None:
5✔
1962
            self.redeem_script = other_txout.redeem_script
5✔
1963
        if other_txout.witness_script is not None:
5✔
1964
            self.witness_script = other_txout.witness_script
5✔
1965
        self.bip32_paths.update(other_txout.bip32_paths)
5✔
1966
        self._unknown.update(other_txout._unknown)
5✔
1967

1968

1969
class PartialTransaction(Transaction):
5✔
1970

1971
    def __init__(self):
5✔
1972
        Transaction.__init__(self, None)
5✔
1973
        self.xpubs = {}  # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]]  # intermediate bip32node -> (xfp, der_prefix)
5✔
1974
        self._inputs = []  # type: List[PartialTxInput]
5✔
1975
        self._outputs = []  # type: List[PartialTxOutput]
5✔
1976
        self._unknown = {}  # type: Dict[bytes, bytes]
5✔
1977
        self.rbf_merge_txid = None
5✔
1978

1979
    def to_json(self) -> dict:
5✔
1980
        d = super().to_json()
5✔
1981
        d.update({
5✔
1982
            'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1983
                      for bip32node, (xfp, path) in self.xpubs.items()},
1984
            'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1985
        })
1986
        return d
5✔
1987

1988
    @classmethod
5✔
1989
    def from_tx(cls, tx: Transaction) -> 'PartialTransaction':
5✔
1990
        assert tx
5✔
1991
        res = cls()
5✔
1992
        res._inputs = [PartialTxInput.from_txin(txin, strip_witness=True)
5✔
1993
                       for txin in tx.inputs()]
1994
        res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()]
5✔
1995
        res.version = tx.version
5✔
1996
        res.locktime = tx.locktime
5✔
1997
        return res
5✔
1998

1999
    @classmethod
5✔
2000
    def from_raw_psbt(cls, raw) -> 'PartialTransaction':
5✔
2001
        # auto-detect and decode Base64 and Hex.
2002
        if raw[0:10].lower() in (b'70736274ff', '70736274ff'):  # hex
5✔
2003
            raw = bytes.fromhex(raw)
5✔
2004
        elif raw[0:6] in (b'cHNidP', 'cHNidP'):  # base64
5✔
2005
            raw = base64.b64decode(raw)
5✔
2006
        if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
5✔
2007
            raise BadHeaderMagic("bad magic")
5✔
2008

2009
        tx = None  # type: Optional[PartialTransaction]
5✔
2010

2011
        # We parse the raw stream twice. The first pass is used to find the
2012
        # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'.
2013
        # The second pass does everything else.
2014
        with io.BytesIO(raw[5:]) as fd:  # parsing "first pass"
5✔
2015
            while True:
5✔
2016
                try:
5✔
2017
                    kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
5✔
2018
                except StopIteration:
5✔
2019
                    break
5✔
2020
                try:
5✔
2021
                    kt = PSBTGlobalType(kt)
5✔
2022
                except ValueError:
5✔
2023
                    pass  # unknown type
5✔
2024
                if kt == PSBTGlobalType.UNSIGNED_TX:
5✔
2025
                    if tx is not None:
5✔
2026
                        raise SerializationError(f"duplicate key: {repr(kt)}")
×
2027
                    if key: raise SerializationError(f"key for {repr(kt)} must be empty")
5✔
2028
                    unsigned_tx = Transaction(val.hex())
5✔
2029
                    for txin in unsigned_tx.inputs():
5✔
2030
                        if txin.script_sig or txin.witness:
5✔
2031
                            raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses")
5✔
2032
                    tx = PartialTransaction.from_tx(unsigned_tx)
5✔
2033

2034
        if tx is None:
5✔
2035
            raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX")
5✔
2036

2037
        with io.BytesIO(raw[5:]) as fd:  # parsing "second pass"
5✔
2038
            # global section
2039
            while True:
5✔
2040
                try:
5✔
2041
                    kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
5✔
2042
                except StopIteration:
5✔
2043
                    break
5✔
2044
                try:
5✔
2045
                    kt = PSBTGlobalType(kt)
5✔
2046
                except ValueError:
5✔
2047
                    pass  # unknown type
5✔
2048
                if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
5✔
2049
                if kt == PSBTGlobalType.UNSIGNED_TX:
5✔
2050
                    pass  # already handled during "first" parsing pass
5✔
2051
                elif kt == PSBTGlobalType.XPUB:
5✔
2052
                    bip32node = BIP32Node.from_bytes(key)
5✔
2053
                    if bip32node in tx.xpubs:
5✔
2054
                        raise SerializationError(f"duplicate key: {repr(kt)}")
×
2055
                    xfp, path = unpack_bip32_root_fingerprint_and_int_path(val)
5✔
2056
                    if bip32node.depth != len(path):
5✔
2057
                        raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) "
×
2058
                                                 f"and derivation prefix len ({len(path)})")
2059
                    child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big')
5✔
2060
                    if not ((bip32node.depth == 0 and child_number_of_xpub == 0)
5✔
2061
                            or (bip32node.depth != 0 and child_number_of_xpub == path[-1])):
2062
                        raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix")
×
2063
                    tx.xpubs[bip32node] = xfp, path
5✔
2064
                elif kt == PSBTGlobalType.VERSION:
5✔
2065
                    if len(val) > 4:
×
2066
                        raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4")
×
2067
                    psbt_version = int.from_bytes(val, byteorder='little', signed=False)
×
2068
                    if psbt_version > 0:
×
2069
                        raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}")
×
2070
                    if key: raise SerializationError(f"key for {repr(kt)} must be empty")
×
2071
                else:
2072
                    full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key)
5✔
2073
                    if full_key in tx._unknown:
5✔
2074
                        raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}')
×
2075
                    tx._unknown[full_key] = val
5✔
2076
            try:
5✔
2077
                # inputs sections
2078
                for txin in tx.inputs():
5✔
2079
                    if DEBUG_PSBT_PARSING: print("-> new input starts")
5✔
2080
                    txin._populate_psbt_fields_from_fd(fd)
5✔
2081
                # outputs sections
2082
                for txout in tx.outputs():
5✔
2083
                    if DEBUG_PSBT_PARSING: print("-> new output starts")
5✔
2084
                    txout._populate_psbt_fields_from_fd(fd)
5✔
2085
            except UnexpectedEndOfStream:
5✔
2086
                raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None
5✔
2087

2088
            if fd.read(1) != b'':
5✔
2089
                raise SerializationError("extra junk at the end of PSBT")
×
2090

2091
        for txin in tx.inputs():
5✔
2092
            txin.validate_data()
5✔
2093

2094
        return tx
5✔
2095

2096
    @classmethod
5✔
2097
    def from_io(cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *,
5✔
2098
                locktime: int = None, version: int = None, BIP69_sort: bool = True):
2099
        self = cls()
5✔
2100
        self._inputs = list(inputs)
5✔
2101
        self._outputs = list(outputs)
5✔
2102
        if locktime is not None:
5✔
2103
            self.locktime = locktime
5✔
2104
        if version is not None:
5✔
2105
            self.version = version
5✔
2106
        if BIP69_sort:
5✔
2107
            self.BIP69_sort()
5✔
2108
        return self
5✔
2109

2110
    def _serialize_psbt(self, fd) -> None:
5✔
2111
        wr = PSBTSection.create_psbt_writer(fd)
5✔
2112
        fd.write(b'psbt\xff')
5✔
2113
        # global section
2114
        wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False)))
5✔
2115
        for bip32node, (xfp, path) in sorted(self.xpubs.items()):
5✔
2116
            val = pack_bip32_root_fingerprint_and_int_path(xfp, path)
5✔
2117
            wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes())
5✔
2118
        for full_key, val in sorted(self._unknown.items()):
5✔
2119
            key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key)
5✔
2120
            wr(key_type, val, key=key)
5✔
2121
        fd.write(b'\x00')  # section-separator
5✔
2122
        # input sections
2123
        for inp in self._inputs:
5✔
2124
            inp._serialize_psbt_section(fd)
5✔
2125
        # output sections
2126
        for outp in self._outputs:
5✔
2127
            outp._serialize_psbt_section(fd)
5✔
2128

2129
    def finalize_psbt(self) -> None:
5✔
2130
        for txin in self.inputs():
5✔
2131
            txin.finalize()
5✔
2132

2133
    def combine_with_other_psbt(self, other_tx: 'Transaction') -> None:
5✔
2134
        """Pulls in all data from other_tx we don't yet have (e.g. signatures).
2135
        other_tx must be concerning the same unsigned tx.
2136
        """
2137
        if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False):
5✔
2138
            raise Exception('A Combiner must not combine two different PSBTs.')
×
2139
        # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs.
2140
        #           The Combiner must remove any duplicate key-value pairs, in accordance with the specification."
2141
        # global section
2142
        if isinstance(other_tx, PartialTransaction):
5✔
2143
            self.xpubs.update(other_tx.xpubs)
5✔
2144
            self._unknown.update(other_tx._unknown)
5✔
2145
        # input sections
2146
        for txin, other_txin in zip(self.inputs(), other_tx.inputs()):
5✔
2147
            txin.combine_with_other_txin(other_txin)
5✔
2148
        # output sections
2149
        for txout, other_txout in zip(self.outputs(), other_tx.outputs()):
5✔
2150
            txout.combine_with_other_txout(other_txout)
5✔
2151
        self.invalidate_ser_cache()
5✔
2152

2153
    def join_with_other_psbt(self, other_tx: 'PartialTransaction', *, config: 'SimpleConfig') -> None:
5✔
2154
        """Adds inputs and outputs from other_tx into this one."""
2155
        if not isinstance(other_tx, PartialTransaction):
5✔
2156
            raise Exception('Can only join partial transactions.')
×
2157
        # make sure there are no duplicate prevouts
2158
        prevouts = set()
5✔
2159
        for txin in itertools.chain(self.inputs(), other_tx.inputs()):
5✔
2160
            prevout_str = txin.prevout.to_str()
5✔
2161
            if prevout_str in prevouts:
5✔
2162
                raise Exception(f"Duplicate inputs! "
×
2163
                                f"Transactions that spend the same prevout cannot be joined.")
2164
            prevouts.add(prevout_str)
5✔
2165
        # copy global PSBT section
2166
        self.xpubs.update(other_tx.xpubs)
5✔
2167
        self._unknown.update(other_tx._unknown)
5✔
2168
        # copy and add inputs and outputs
2169
        self.add_inputs(list(other_tx.inputs()))
5✔
2170
        self.add_outputs(list(other_tx.outputs()), merge_duplicates=config.WALLET_MERGE_DUPLICATE_OUTPUTS)
5✔
2171
        self.remove_signatures()
5✔
2172
        self.invalidate_ser_cache()
5✔
2173

2174
    def inputs(self) -> Sequence[PartialTxInput]:
5✔
2175
        return self._inputs
5✔
2176

2177
    def outputs(self) -> Sequence[PartialTxOutput]:
5✔
2178
        return self._outputs
5✔
2179

2180
    def add_inputs(self, inputs: List[PartialTxInput], BIP69_sort=True) -> None:
5✔
2181
        self._inputs.extend(inputs)
5✔
2182
        if BIP69_sort:
5✔
2183
            self.BIP69_sort(outputs=False)
5✔
2184
        self.invalidate_ser_cache()
5✔
2185

2186
    def add_outputs(self, outputs: List[PartialTxOutput], *, merge_duplicates: bool = False, BIP69_sort: bool = True) -> None:
5✔
2187
        self._outputs.extend(outputs)
5✔
2188
        if merge_duplicates:
5✔
2189
            self._outputs = merge_duplicate_tx_outputs(self._outputs)
5✔
2190
        if BIP69_sort:
5✔
2191
            self.BIP69_sort(inputs=False)
5✔
2192
        self.invalidate_ser_cache()
5✔
2193

2194
    def set_rbf(self, rbf: bool) -> None:
5✔
2195
        nSequence = 0xffffffff - (2 if rbf else 1)
5✔
2196
        for txin in self.inputs():
5✔
2197
            txin.nsequence = nSequence
5✔
2198
        self.invalidate_ser_cache()
5✔
2199

2200
    def BIP69_sort(self, inputs=True, outputs=True):
5✔
2201
        # NOTE: other parts of the code rely on these sorts being *stable* sorts
2202
        if inputs:
5✔
2203
            self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx))
5✔
2204
        if outputs:
5✔
2205
            self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
5✔
2206
        self.invalidate_ser_cache()
5✔
2207

2208
    def serialize_preimage(
5✔
2209
        self,
2210
        txin_index: int,
2211
        *,
2212
        sighash_cache: SighashCache = None,
2213
    ) -> bytes:
2214
        nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True)
5✔
2215
        nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False)
5✔
2216
        inputs = self.inputs()
5✔
2217
        outputs = self.outputs()
5✔
2218
        txin = inputs[txin_index]
5✔
2219
        sighash = txin.sighash
5✔
2220
        if sighash is None:
5✔
2221
            sighash = Sighash.DEFAULT if txin.is_taproot() else Sighash.ALL
5✔
2222
        if not Sighash.is_valid(sighash, is_taproot=txin.is_taproot()):
5✔
2223
            raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!")
×
2224
        if sighash_cache is None:
5✔
2225
            sighash_cache = SighashCache()
5✔
2226
        if txin.is_segwit():
5✔
2227
            if txin.is_taproot():
5✔
2228
                scache = sighash_cache.get_witver1_data_for_tx(self)
5✔
2229
                sighash_epoch = b"\x00"
5✔
2230
                hash_type = int.to_bytes(sighash, length=1, byteorder="little", signed=False)
5✔
2231
                # txdata
2232
                preimage_txdata = bytearray()
5✔
2233
                preimage_txdata += nVersion
5✔
2234
                preimage_txdata += nLocktime
5✔
2235
                if sighash & 0x80 != Sighash.ANYONECANPAY:
5✔
2236
                    preimage_txdata += scache.sha_prevouts
5✔
2237
                    preimage_txdata += scache.sha_amounts
5✔
2238
                    preimage_txdata += scache.sha_scriptpubkeys
5✔
2239
                    preimage_txdata += scache.sha_sequences
5✔
2240
                if sighash & 3 not in (Sighash.NONE, Sighash.SINGLE):
5✔
2241
                    preimage_txdata += scache.sha_outputs
5✔
2242
                # inputdata
2243
                preimage_inputdata = bytearray()
5✔
2244
                spend_type = bytes([0])  # (ext_flag * 2) + annex_present
5✔
2245
                preimage_inputdata += spend_type
5✔
2246
                if sighash & 0x80 == Sighash.ANYONECANPAY:
5✔
2247
                    preimage_inputdata += txin.prevout.serialize_to_network()
5✔
2248
                    preimage_inputdata += int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
5✔
2249
                    preimage_inputdata += var_int(len(txin.scriptpubkey)) + txin.scriptpubkey
5✔
2250
                    preimage_inputdata += int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
5✔
2251
                else:
2252
                    preimage_inputdata += int.to_bytes(txin_index, length=4, byteorder="little", signed=False)
5✔
2253
                # TODO sha_annex
2254
                # outputdata
2255
                preimage_outputdata = bytearray()
5✔
2256
                if sighash & 3 == Sighash.SINGLE:
5✔
2257
                    try:
5✔
2258
                        txout = outputs[txin_index]
5✔
2259
                    except IndexError:
×
2260
                        raise Exception("Using SIGHASH_SINGLE without a corresponding output") from None
×
2261
                    # note: we could cache this to avoid some potential DOS vectors:
2262
                    preimage_outputdata += sha256(txout.serialize_to_network())
5✔
2263
                return bytes(sighash_epoch + hash_type + preimage_txdata + preimage_inputdata + preimage_outputdata)
5✔
2264
            else:  # segwit (witness v0)
2265
                scache = sighash_cache.get_witver0_data_for_tx(self)
5✔
2266
                if not (sighash & Sighash.ANYONECANPAY):
5✔
2267
                    hashPrevouts = scache.hashPrevouts
5✔
2268
                else:
2269
                    hashPrevouts = bytes(32)
5✔
2270
                if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
5✔
2271
                    hashSequence = scache.hashSequence
5✔
2272
                else:
2273
                    hashSequence = bytes(32)
5✔
2274
                if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE:
5✔
2275
                    hashOutputs = scache.hashOutputs
5✔
2276
                elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs):
5✔
2277
                    # note: we could cache this to avoid some potential DOS vectors:
2278
                    hashOutputs = sha256d(outputs[txin_index].serialize_to_network())
5✔
2279
                else:
2280
                    hashOutputs = bytes(32)
5✔
2281
                outpoint = txin.prevout.serialize_to_network()
5✔
2282
                preimage_script = self.get_preimage_script(txin)
5✔
2283
                scriptCode = var_int(len(preimage_script)) + preimage_script
5✔
2284
                amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False)
5✔
2285
                nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False)
5✔
2286
                nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
5✔
2287
                preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
5✔
2288
                return preimage
5✔
2289
        else:  # legacy sighash (pre-segwit)
2290
            if sighash != Sighash.ALL:
5✔
2291
                raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)")
×
2292
            preimage_script = self.get_preimage_script(txin)
5✔
2293
            txins = var_int(len(inputs)) + b"".join(
5✔
2294
                txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"")
2295
                for k, txin in enumerate(inputs))
2296
            txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs)
5✔
2297
            nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False)
5✔
2298
            preimage = nVersion + txins + txouts + nLocktime + nHashType
5✔
2299
            return preimage
5✔
2300
        raise Exception("should not reach this")
2301

2302
    def sign(self, keypairs: Mapping[bytes, bytes]) -> None:
5✔
2303
        # keypairs:  pubkey_bytes -> secret_bytes
2304
        sighash_cache = SighashCache()
5✔
2305
        for i, txin in enumerate(self.inputs()):
5✔
2306
            for pubkey in txin.pubkeys:
5✔
2307
                if txin.is_complete():
5✔
2308
                    break
4✔
2309
                if pubkey not in keypairs:
5✔
2310
                    continue
5✔
2311
                _logger.info(f"adding signature for {pubkey.hex()}. spending utxo {txin.prevout.to_str()}")
5✔
2312
                sec = keypairs[pubkey]
5✔
2313
                sig = self.sign_txin(i, sec, sighash_cache=sighash_cache)
5✔
2314
                self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig)
5✔
2315

2316
        _logger.debug(f"tx.sign() finished. is_complete={self.is_complete()}")
5✔
2317
        self.invalidate_ser_cache()
5✔
2318

2319
    def sign_txin(
5✔
2320
        self,
2321
        txin_index: int,
2322
        privkey_bytes: bytes,
2323
        *,
2324
        sighash_cache: SighashCache = None,
2325
    ) -> bytes:
2326
        txin = self.inputs()[txin_index]
5✔
2327
        txin.validate_data(for_signing=True)
5✔
2328
        pre_hash = self.serialize_preimage(txin_index, sighash_cache=sighash_cache)
5✔
2329
        if txin.is_taproot():
5✔
2330
            # note: privkey_bytes is the internal key
2331
            merkle_root = txin.tap_merkle_root or bytes()
5✔
2332
            output_privkey_bytes = taproot_tweak_seckey(privkey_bytes, merkle_root)
5✔
2333
            output_privkey = ecc.ECPrivkey(output_privkey_bytes)
5✔
2334
            msg_hash = bip340_tagged_hash(b"TapSighash", pre_hash)
5✔
2335
            sig = output_privkey.schnorr_sign(msg_hash)
5✔
2336
            sighash = txin.sighash if txin.sighash is not None else Sighash.DEFAULT
5✔
2337
        else:
2338
            privkey = ecc.ECPrivkey(privkey_bytes)
5✔
2339
            msg_hash = sha256d(pre_hash)
5✔
2340
            sig = privkey.ecdsa_sign(msg_hash, sigencode=ecc.ecdsa_der_sig_from_r_and_s)
5✔
2341
            sighash = txin.sighash if txin.sighash is not None else Sighash.ALL
5✔
2342
        return sig + Sighash.to_sigbytes(sighash)
5✔
2343

2344
    def is_complete(self) -> bool:
5✔
2345
        return all([txin.is_complete() for txin in self.inputs()])
5✔
2346

2347
    def signature_count(self) -> Tuple[int, int]:
5✔
2348
        nhave, nreq = 0, 0
5✔
2349
        for txin in self.inputs():
5✔
2350
            a, b = txin.get_satisfaction_progress()
5✔
2351
            nhave += a
5✔
2352
            nreq += b
5✔
2353
        return nhave, nreq
5✔
2354

2355
    def serialize(self) -> str:
5✔
2356
        """Returns PSBT as base64 text, or raw hex of network tx (if complete)."""
2357
        self.finalize_psbt()
5✔
2358
        if self.is_complete():
5✔
2359
            return Transaction.serialize(self)
5✔
2360
        return self._serialize_as_base64()
5✔
2361

2362
    def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes:
5✔
2363
        """Returns PSBT as raw bytes, or raw bytes of network tx (if complete)."""
2364
        self.finalize_psbt()
5✔
2365
        if force_psbt or not self.is_complete():
5✔
2366
            with io.BytesIO() as fd:
5✔
2367
                self._serialize_psbt(fd)
5✔
2368
                return fd.getvalue()
5✔
2369
        else:
2370
            return Transaction.serialize_as_bytes(self)
5✔
2371

2372
    def _serialize_as_base64(self) -> str:
5✔
2373
        raw_bytes = self.serialize_as_bytes()
5✔
2374
        return base64.b64encode(raw_bytes).decode('ascii')
5✔
2375

2376
    def update_signatures(self, signatures: Sequence[Union[bytes, None]]) -> None:
5✔
2377
        """Add new signatures to a transaction
2378

2379
        `signatures` is expected to be a list of sigs with signatures[i]
2380
        intended for self._inputs[i].
2381
        This is used by the Trezor, KeepKey and Safe-T plugins.
2382
        """
2383
        if self.is_complete():
5✔
2384
            return
×
2385
        if len(self.inputs()) != len(signatures):
5✔
2386
            raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures)))
×
2387
        for i, txin in enumerate(self.inputs()):
5✔
2388
            sig = signatures[i]
5✔
2389
            if sig is None:
5✔
2390
                continue
×
2391
            if sig in list(txin.sigs_ecdsa.values()):
5✔
2392
                continue
×
2393
            msg_hash = sha256d(self.serialize_preimage(i))
5✔
2394
            sig64 = ecc.ecdsa_sig64_from_der_sig(sig[:-1])
5✔
2395
            for recid in range(4):
5✔
2396
                try:
5✔
2397
                    public_key = ecc.ECPubkey.from_ecdsa_sig64(sig64, recid, msg_hash)
5✔
2398
                except ecc.InvalidECPointException:
×
2399
                    # the point might not be on the curve for some recid values
2400
                    continue
×
2401
                pubkey_bytes = public_key.get_public_key_bytes(compressed=True)
5✔
2402
                if pubkey_bytes in txin.pubkeys:
5✔
2403
                    if not public_key.ecdsa_verify(sig64, msg_hash):
5✔
2404
                        continue
×
2405
                    _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_bytes.hex()}, sig={sig.hex()}")
5✔
2406
                    self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes, sig=sig)
5✔
2407
                    break
5✔
2408
        # redo raw
2409
        self.invalidate_ser_cache()
5✔
2410

2411
    def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: bytes, sig: bytes) -> None:
5✔
2412
        txin = self._inputs[txin_idx]
5✔
2413
        txin.sigs_ecdsa[signing_pubkey] = sig
5✔
2414
        # force re-serialization
2415
        txin.script_sig = None
5✔
2416
        txin.witness = None
5✔
2417
        self.invalidate_ser_cache()
5✔
2418

2419
    def add_info_from_wallet(
5✔
2420
            self,
2421
            wallet: 'Abstract_Wallet',
2422
            *,
2423
            include_xpubs: bool = False,
2424
    ) -> None:
2425
        if self.is_complete():
5✔
2426
            return
5✔
2427
        # only include xpubs for multisig wallets; currently only they need it in practice
2428
        # note: coldcard fw have a limitation that if they are included then all
2429
        #       inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761
2430
        # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs
2431
        from .wallet import Multisig_Wallet
5✔
2432
        if include_xpubs and isinstance(wallet, Multisig_Wallet):
5✔
2433
            from .keystore import Xpub
5✔
2434
            for ks in wallet.get_keystores():
5✔
2435
                if isinstance(ks, Xpub):
5✔
2436
                    fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(
5✔
2437
                        der_suffix=[], only_der_suffix=False)
2438
                    xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False)
5✔
2439
                    bip32node = BIP32Node.from_xkey(xpub)
5✔
2440
                    self.xpubs[bip32node] = (fp_bytes, der_full)
5✔
2441
        for txin in self.inputs():
5✔
2442
            wallet.add_input_info(
5✔
2443
                txin,
2444
                only_der_suffix=False,
2445
            )
2446
        for txout in self.outputs():
5✔
2447
            wallet.add_output_info(
5✔
2448
                txout,
2449
                only_der_suffix=False,
2450
            )
2451

2452
    def remove_xpubs_and_bip32_paths(self) -> None:
5✔
2453
        self.xpubs.clear()
5✔
2454
        for txin in self.inputs():
5✔
2455
            txin.bip32_paths.clear()
5✔
2456
        for txout in self.outputs():
5✔
2457
            txout.bip32_paths.clear()
5✔
2458

2459
    def prepare_for_export_for_coinjoin(self) -> None:
5✔
2460
        """Removes all sensitive details."""
2461
        # globals
2462
        self.xpubs.clear()
5✔
2463
        self._unknown.clear()
5✔
2464
        # inputs
2465
        for txin in self.inputs():
5✔
2466
            txin.bip32_paths.clear()
5✔
2467
        # outputs
2468
        for txout in self.outputs():
5✔
2469
            txout.redeem_script = None
5✔
2470
            txout.witness_script = None
5✔
2471
            txout.bip32_paths.clear()
5✔
2472
            txout._unknown.clear()
5✔
2473

2474
    async def prepare_for_export_for_hardware_device(self, wallet: 'Abstract_Wallet') -> None:
5✔
2475
        self.add_info_from_wallet(wallet, include_xpubs=True)
5✔
2476
        await self.add_info_from_network(wallet.network)
5✔
2477
        # log warning if PSBT_*_BIP32_DERIVATION fields cannot be filled with full path due to missing info
2478
        from .keystore import Xpub
5✔
2479
        def is_ks_missing_info(ks):
5✔
2480
            return (isinstance(ks, Xpub) and (ks.get_root_fingerprint() is None
5✔
2481
                                              or ks.get_derivation_prefix() is None))
2482
        if any([is_ks_missing_info(ks) for ks in wallet.get_keystores()]):
5✔
2483
            _logger.warning('PSBT was requested to be filled with full bip32 paths but '
5✔
2484
                            'some keystores lacked either the derivation prefix or the root fingerprint')
2485

2486
    def convert_all_utxos_to_witness_utxos(self) -> None:
5✔
2487
        """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs.
2488
        This will likely make an exported PSBT invalid spec-wise,
2489
        but it makes e.g. QR codes significantly smaller.
2490
        """
2491
        for txin in self.inputs():
5✔
2492
            txin.convert_utxo_to_witness_utxo()
5✔
2493

2494
    def remove_signatures(self):
5✔
2495
        for txin in self.inputs():
5✔
2496
            txin.sigs_ecdsa = {}
5✔
2497
            txin.tap_key_sig = None
5✔
2498
            txin.script_sig = None
5✔
2499
            txin.witness = None
5✔
2500
        assert not self.is_complete()
5✔
2501
        self.invalidate_ser_cache()
5✔
2502

2503

2504
def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes:
5✔
2505
    if len(xfp) != 4:
5✔
2506
        raise Exception(f'unexpected xfp length. xfp={xfp}')
×
2507
    return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path)
5✔
2508

2509

2510
def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]:
5✔
2511
    if len(path) % 4 != 0:
5✔
2512
        raise Exception(f'unexpected packed path length. path={path.hex()}')
×
2513
    xfp = path[0:4]
5✔
2514
    int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)]
5✔
2515
    return xfp, int_path
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc