• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6147814113148928

31 Oct 2025 01:45PM UTC coverage: 61.375% (+0.02%) from 61.356%
6147814113148928

push

CirrusCI

web-flow
Merge pull request #10287 from f321x/fix_cake_diagram

qt: allow opening BalanceDialog if warning is set and balance is 0

22913 of 37333 relevant lines covered (61.37%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.93
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
1✔
26
import re
1✔
27
import ssl
1✔
28
import sys
1✔
29
import time
1✔
30
import traceback
1✔
31
import asyncio
1✔
32
import socket
1✔
33
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
1✔
34
from collections import defaultdict
1✔
35
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
1✔
36
import itertools
1✔
37
import logging
1✔
38
import hashlib
1✔
39
import functools
1✔
40
import random
1✔
41
import enum
1✔
42

43
import aiorpcx
1✔
44
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
1✔
45
from aiorpcx.curio import timeout_after, TaskTimeout
1✔
46
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
1✔
47
from aiorpcx.rawsocket import RSClient, RSTransport
1✔
48
import certifi
1✔
49

50
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
1✔
51
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
52
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup,
53
                   send_exception_to_crash_reporter, error_text_str_to_safe_str)
54
from . import util
1✔
55
from . import x509
1✔
56
from . import pem
1✔
57
from . import version
1✔
58
from . import blockchain
1✔
59
from .blockchain import Blockchain, HEADER_SIZE, CHUNK_SIZE
1✔
60
from . import bitcoin
1✔
61
from .bitcoin import DummyAddress, DummyAddressUsedInTxException
1✔
62
from . import constants
1✔
63
from .i18n import _
1✔
64
from .logging import Logger
1✔
65
from .transaction import Transaction
1✔
66
from .fee_policy import FEE_ETA_TARGETS
1✔
67
from .lrucache import LRUCache
1✔
68

69
if TYPE_CHECKING:
70
    from .network import Network
71
    from .simple_config import SimpleConfig
72

73

74
ca_path = certifi.where()
1✔
75

76
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
1✔
77

78
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
1✔
79
PREFERRED_NETWORK_PROTOCOL = 's'
1✔
80
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
1✔
81

82
MAX_NUM_HEADERS_PER_REQUEST = 2016
1✔
83
assert MAX_NUM_HEADERS_PER_REQUEST >= CHUNK_SIZE
1✔
84

85

86
class NetworkTimeout:
1✔
87
    # seconds
88
    class Generic:
1✔
89
        NORMAL = 30
1✔
90
        RELAXED = 45
1✔
91
        MOST_RELAXED = 600
1✔
92

93
    class Urgent(Generic):
1✔
94
        NORMAL = 10
1✔
95
        RELAXED = 20
1✔
96
        MOST_RELAXED = 60
1✔
97

98

99
def assert_non_negative_integer(val: Any) -> None:
1✔
100
    if not is_non_negative_integer(val):
1✔
101
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
102

103

104
def assert_integer(val: Any) -> None:
1✔
105
    if not is_integer(val):
×
106
        raise RequestCorrupted(f'{val!r} should be an integer')
×
107

108

109
def assert_int_or_float(val: Any) -> None:
1✔
110
    if not is_int_or_float(val):
×
111
        raise RequestCorrupted(f'{val!r} should be int or float')
×
112

113

114
def assert_non_negative_int_or_float(val: Any) -> None:
1✔
115
    if not is_non_negative_int_or_float(val):
1✔
116
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
117

118

119
def assert_hash256_str(val: Any) -> None:
1✔
120
    if not is_hash256_str(val):
×
121
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
122

123

124
def assert_hex_str(val: Any) -> None:
1✔
125
    if not is_hex_str(val):
1✔
126
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
127

128

129
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
1✔
130
    if not isinstance(d, dict):
1✔
131
        raise RequestCorrupted(f'{d!r} should be a dict')
×
132
    if field_name not in d:
1✔
133
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
134
    return d[field_name]
1✔
135

136

137
def assert_list_or_tuple(val: Any) -> None:
1✔
138
    if not isinstance(val, (list, tuple)):
×
139
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
140

141

142
class ChainResolutionMode(enum.Enum):
1✔
143
    CATCHUP = enum.auto()
1✔
144
    BACKWARD = enum.auto()
1✔
145
    BINARY = enum.auto()
1✔
146
    FORK = enum.auto()
1✔
147
    NO_FORK = enum.auto()
1✔
148

149

150
class NotificationSession(RPCSession):
1✔
151

152
    def __init__(self, *args, interface: 'Interface', **kwargs):
1✔
153
        super(NotificationSession, self).__init__(*args, **kwargs)
1✔
154
        self.subscriptions = defaultdict(list)
1✔
155
        self.cache = {}
1✔
156
        self._msg_counter = itertools.count(start=1)
1✔
157
        self.interface = interface
1✔
158
        self.taskgroup = interface.taskgroup
1✔
159
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
1✔
160

161
    async def handle_request(self, request):
1✔
162
        self.maybe_log(f"--> {request}")
×
163
        try:
×
164
            if isinstance(request, Notification):
×
165
                params, result = request.args[:-1], request.args[-1]
×
166
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
167
                if key in self.subscriptions:
×
168
                    self.cache[key] = result
×
169
                    for queue in self.subscriptions[key]:
×
170
                        await queue.put(request.args)
×
171
                else:
172
                    raise Exception(f'unexpected notification')
×
173
            else:
174
                raise Exception(f'unexpected request. not a notification')
×
175
        except Exception as e:
×
176
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
177
            await self.close()
×
178

179
    async def send_request(self, *args, timeout=None, **kwargs):
1✔
180
        # note: semaphores/timeouts/backpressure etc are handled by
181
        # aiorpcx. the timeout arg here in most cases should not be set
182
        msg_id = next(self._msg_counter)
1✔
183
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
1✔
184
        try:
1✔
185
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
186
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
187
            response = await util.wait_for2(
1✔
188
                super().send_request(*args, **kwargs),
189
                timeout)
190
        except (TaskTimeout, asyncio.TimeoutError) as e:
1✔
191
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
192
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
193
        except CodeMessageError as e:
1✔
194
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
1✔
195
            raise
1✔
196
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
197
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
198
            raise
×
199
        else:
200
            self.maybe_log(f"--> {response} (id: {msg_id})")
1✔
201
            return response
1✔
202

203
    def set_default_timeout(self, timeout):
1✔
204
        assert hasattr(self, "sent_request_timeout")  # in base class
1✔
205
        self.sent_request_timeout = timeout
1✔
206
        assert hasattr(self, "max_send_delay")        # in base class
1✔
207
        self.max_send_delay = timeout
1✔
208

209
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
1✔
210
        # note: until the cache is written for the first time,
211
        # each 'subscribe' call might make a request on the network.
212
        key = self.get_hashable_key_for_rpc_call(method, params)
1✔
213
        self.subscriptions[key].append(queue)
1✔
214
        if key in self.cache:
1✔
215
            result = self.cache[key]
×
216
        else:
217
            result = await self.send_request(method, params)
1✔
218
            self.cache[key] = result
1✔
219
        await queue.put(params + [result])
1✔
220

221
    def unsubscribe(self, queue):
1✔
222
        """Unsubscribe a callback to free object references to enable GC."""
223
        # note: we can't unsubscribe from the server, so we keep receiving
224
        # subsequent notifications
225
        for v in self.subscriptions.values():
×
226
            if queue in v:
×
227
                v.remove(queue)
×
228

229
    @classmethod
1✔
230
    def get_hashable_key_for_rpc_call(cls, method, params):
1✔
231
        """Hashable index for subscriptions and cache"""
232
        return str(method) + repr(params)
1✔
233

234
    def maybe_log(self, msg: str) -> None:
1✔
235
        if not self.interface: return
1✔
236
        if self.interface.debug or self.interface.network.debug:
1✔
237
            self.interface.logger.debug(msg)
1✔
238

239
    def default_framer(self):
1✔
240
        # overridden so that max_size can be customized
241
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
1✔
242
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
1✔
243
        return NewlineFramer(max_size=max_size)
1✔
244

245
    async def close(self, *, force_after: int = None):
1✔
246
        """Closes the connection and waits for it to be closed.
247
        We try to flush buffered data to the wire, which can take some time.
248
        """
249
        if force_after is None:
1✔
250
            # We give up after a while and just abort the connection.
251
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
252
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
253
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
254
            #       wait until this timeout is triggered
255
            force_after = 1  # seconds
1✔
256
        await super().close(force_after=force_after)
1✔
257

258

259
class NetworkException(Exception): pass
1✔
260

261

262
class GracefulDisconnect(NetworkException):
1✔
263
    log_level = logging.INFO
1✔
264

265
    def __init__(self, *args, log_level=None, **kwargs):
1✔
266
        Exception.__init__(self, *args, **kwargs)
1✔
267
        if log_level is not None:
1✔
268
            self.log_level = log_level
×
269

270

271
class RequestTimedOut(GracefulDisconnect):
1✔
272
    def __str__(self):
1✔
273
        return _("Network request timed out.")
×
274

275

276
class RequestCorrupted(Exception): pass
1✔
277

278
class ErrorParsingSSLCert(Exception): pass
1✔
279
class ErrorGettingSSLCertFromServer(Exception): pass
1✔
280
class ErrorSSLCertFingerprintMismatch(Exception): pass
1✔
281
class InvalidOptionCombination(Exception): pass
1✔
282
class ConnectError(NetworkException): pass
1✔
283

284

285
class TxBroadcastError(NetworkException):
1✔
286
    def get_message_for_gui(self):
1✔
287
        raise NotImplementedError()
×
288

289

290
class TxBroadcastHashMismatch(TxBroadcastError):
1✔
291
    def get_message_for_gui(self):
1✔
292
        return "{}\n{}\n\n{}" \
×
293
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
294
                    _("Consider trying to connect to a different server, or updating Electrum."),
295
                    str(self))
296

297

298
class TxBroadcastServerReturnedError(TxBroadcastError):
1✔
299
    def get_message_for_gui(self):
1✔
300
        return "{}\n{}\n\n{}" \
×
301
            .format(_("The server returned an error when broadcasting the transaction."),
302
                    _("Consider trying to connect to a different server, or updating Electrum."),
303
                    str(self))
304

305

306
class TxBroadcastUnknownError(TxBroadcastError):
1✔
307
    def get_message_for_gui(self):
1✔
308
        return "{}\n{}" \
×
309
            .format(_("Unknown error when broadcasting the transaction."),
310
                    _("Consider trying to connect to a different server, or updating Electrum."))
311

312

313
class _RSClient(RSClient):
1✔
314
    async def create_connection(self):
1✔
315
        try:
1✔
316
            return await super().create_connection()
1✔
317
        except OSError as e:
×
318
            # note: using "from e" here will set __cause__ of ConnectError
319
            raise ConnectError(e) from e
×
320

321

322
class PaddedRSTransport(RSTransport):
1✔
323
    """A raw socket transport that provides basic countermeasures against traffic analysis
324
    by padding the jsonrpc payload with whitespaces to have ~uniform-size TCP packets.
325
    (it is assumed that a network observer does not see plaintext transport contents,
326
    due to it being wrapped e.g. in TLS)
327
    """
328

329
    MIN_PACKET_SIZE = 1024
1✔
330
    WAIT_FOR_BUFFER_GROWTH_SECONDS = 1.0
1✔
331
    # (unpadded) amount of bytes sent instantly before beginning with polling.
332
    # This makes the initial handshake where a few small messages are exchanged faster.
333
    WARMUP_BUDGET_SIZE = 1024
1✔
334

335
    session: Optional['RPCSession']
1✔
336

337
    def __init__(self, *args, **kwargs):
1✔
338
        RSTransport.__init__(self, *args, **kwargs)
1✔
339
        self._sbuffer = bytearray()  # "send buffer"
1✔
340
        self._sbuffer_task = None  # type: Optional[asyncio.Task]
1✔
341
        self._sbuffer_has_data_evt = asyncio.Event()
1✔
342
        self._last_send = time.monotonic()
1✔
343
        self._force_send = False  # type: bool
1✔
344

345
    # note: this does not call super().write() but is a complete reimplementation
346
    async def write(self, message):
1✔
347
        await self._can_send.wait()
1✔
348
        if self.is_closing():
1✔
349
            return
×
350
        framed_message = self._framer.frame(message)
1✔
351
        self._sbuffer += framed_message
1✔
352
        self._sbuffer_has_data_evt.set()
1✔
353
        self._maybe_consume_sbuffer()
1✔
354

355
    def _maybe_consume_sbuffer(self) -> None:
1✔
356
        """Maybe take some data from sbuffer and send it on the wire."""
357
        if not self._can_send.is_set() or self.is_closing():
1✔
358
            return
1✔
359
        buf = self._sbuffer
1✔
360
        if not buf:
1✔
361
            return
1✔
362
        # if there is enough data in the buffer, or if we haven't sent in a while, send now:
363
        if not (
1✔
364
            self._force_send
365
            or len(buf) >= self.MIN_PACKET_SIZE
366
            or self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS < time.monotonic()
367
            or self.session.send_size < self.WARMUP_BUDGET_SIZE
368
        ):
369
            return
×
370
        assert buf[-2:] in (b"}\n", b"]\n"), f"unexpected json-rpc terminator: {buf[-2:]=!r}"
1✔
371
        # either (1) pad length to next power of two, to create "lsize" packet:
372
        payload_lsize = len(buf)
1✔
373
        total_lsize = max(self.MIN_PACKET_SIZE, 2 ** (payload_lsize.bit_length()))
1✔
374
        npad_lsize = total_lsize - payload_lsize
1✔
375
        # or if that wasted a lot of bandwidth with padding, (2) defer sending some messages
376
        # and create a packet with half that size ("ssize", s for small)
377
        total_ssize = max(self.MIN_PACKET_SIZE, total_lsize // 2)
1✔
378
        payload_ssize = buf.rfind(b"\n", 0, total_ssize)
1✔
379
        if payload_ssize != -1:
1✔
380
            payload_ssize += 1  # for "\n" char
1✔
381
            npad_ssize = total_ssize - payload_ssize
1✔
382
        else:
383
            npad_ssize = float("inf")
×
384
        # decide between (1) and (2):
385
        if self._force_send or npad_lsize <= npad_ssize:
1✔
386
            # (1) create "lsize" packet: consume full buffer
387
            npad = npad_lsize
1✔
388
            p_idx = payload_lsize
1✔
389
        else:
390
            # (2) create "ssize" packet: consume some, but defer some for later
391
            npad = npad_ssize
×
392
            p_idx = payload_ssize
×
393
        # pad by adding spaces near end
394
        # self.session.maybe_log(
395
        #     f"PaddedRSTransport. calling low-level write(). "
396
        #     f"chose between (lsize:{payload_lsize}+{npad_lsize}, ssize:{payload_ssize}+{npad_ssize}). "
397
        #     f"won: {'tie' if npad_lsize == npad_ssize else 'lsize' if npad_lsize < npad_ssize else 'ssize'}."
398
        # )
399
        json_rpc_terminator = buf[p_idx-2:p_idx]
1✔
400
        assert json_rpc_terminator in (b"}\n", b"]\n"), f"unexpected {json_rpc_terminator=!r}"
1✔
401
        buf2 = buf[:p_idx-2] + (npad * b" ") + json_rpc_terminator
1✔
402
        self._asyncio_transport.write(buf2)
1✔
403
        self._last_send = time.monotonic()
1✔
404
        del self._sbuffer[:p_idx]
1✔
405
        if not self._sbuffer:
1✔
406
            self._sbuffer_has_data_evt.clear()
1✔
407

408
    async def _poll_sbuffer(self):
1✔
409
        while not self.is_closing():
1✔
410
            await self._can_send.wait()
1✔
411
            await self._sbuffer_has_data_evt.wait()  # to avoid busy-waiting
1✔
412
            self._maybe_consume_sbuffer()
1✔
413
            # If there is still data in the buffer, sleep until it would time out.
414
            # note: If the transport is ~idle, when we wake up, we will send the current buf data,
415
            #       but if busy, we might wake up to completely new buffer contents. Either is fine.
416
            if len(self._sbuffer) > 0:
1✔
417
                timeout_abs = self._last_send + self.WAIT_FOR_BUFFER_GROWTH_SECONDS
×
418
                timeout_rel = max(0.0, timeout_abs - time.monotonic())
×
419
                await asyncio.sleep(timeout_rel)
×
420

421
    def connection_made(self, transport: asyncio.BaseTransport):
1✔
422
        super().connection_made(transport)
1✔
423
        if isinstance(self.session, NotificationSession):
1✔
424
            coro = self.session.taskgroup.spawn(self._poll_sbuffer())
1✔
425
            self._sbuffer_task = self.loop.create_task(coro)
1✔
426
        else:
427
            # This a short-lived "fetch_certificate"-type session.
428
            # No polling here, we always force-empty the buffer.
429
            self._force_send = True
×
430

431
    async def close(self, *args, **kwargs):
1✔
432
        '''Close the connection and return when closed.'''
433
        # Flush buffer before disconnecting. This makes ReplyAndDisconnect work:
434
        self._force_send = True
1✔
435
        self._maybe_consume_sbuffer()
1✔
436
        await super().close(*args, **kwargs)
1✔
437

438

439
class ServerAddr:
1✔
440

441
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
1✔
442
        assert isinstance(host, str), repr(host)
1✔
443
        if protocol is None:
1✔
444
            protocol = 's'
×
445
        if not host:
1✔
446
            raise ValueError('host must not be empty')
×
447
        if host[0] == '[' and host[-1] == ']':  # IPv6
1✔
448
            host = host[1:-1]
1✔
449
        try:
1✔
450
            net_addr = NetAddress(host, port)  # this validates host and port
1✔
451
        except Exception as e:
1✔
452
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
1✔
453
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
1✔
454
            raise ValueError(f"invalid network protocol: {protocol}")
×
455
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
1✔
456
        self.port = int(net_addr.port)
1✔
457
        self.protocol = protocol
1✔
458
        self._net_addr_str = str(net_addr)
1✔
459

460
    @classmethod
1✔
461
    def from_str(cls, s: str) -> 'ServerAddr':
1✔
462
        """Constructs a ServerAddr or raises ValueError."""
463
        # host might be IPv6 address, hence do rsplit:
464
        host, port, protocol = str(s).rsplit(':', 2)
1✔
465
        return ServerAddr(host=host, port=port, protocol=protocol)
1✔
466

467
    @classmethod
1✔
468
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
1✔
469
        """Construct ServerAddr from str, guessing missing details.
470
        Does not raise - just returns None if guessing failed.
471
        Ongoing compatibility not guaranteed.
472
        """
473
        if not s:
1✔
474
            return None
×
475
        host = ""
1✔
476
        if s[0] == "[" and "]" in s:  # IPv6 address
1✔
477
            host_end = s.index("]")
1✔
478
            host = s[1:host_end]
1✔
479
            s = s[host_end+1:]
1✔
480
        items = str(s).rsplit(':', 2)
1✔
481
        if len(items) < 2:
1✔
482
            return None  # although maybe we could guess the port too?
1✔
483
        host = host or items[0]
1✔
484
        port = items[1]
1✔
485
        if len(items) >= 3:
1✔
486
            protocol = items[2]
1✔
487
        else:
488
            protocol = PREFERRED_NETWORK_PROTOCOL
1✔
489
        try:
1✔
490
            return ServerAddr(host=host, port=port, protocol=protocol)
1✔
491
        except ValueError:
1✔
492
            return None
1✔
493

494
    def to_friendly_name(self) -> str:
1✔
495
        # note: this method is closely linked to from_str_with_inference
496
        if self.protocol == 's':  # hide trailing ":s"
1✔
497
            return self.net_addr_str()
1✔
498
        return str(self)
1✔
499

500
    def __str__(self):
1✔
501
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
1✔
502

503
    def to_json(self) -> str:
1✔
504
        return str(self)
×
505

506
    def __repr__(self):
1✔
507
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
508

509
    def net_addr_str(self) -> str:
1✔
510
        return self._net_addr_str
1✔
511

512
    def __eq__(self, other):
1✔
513
        if not isinstance(other, ServerAddr):
1✔
514
            return False
×
515
        return (self.host == other.host
1✔
516
                and self.port == other.port
517
                and self.protocol == other.protocol)
518

519
    def __ne__(self, other):
1✔
520
        return not (self == other)
×
521

522
    def __hash__(self):
1✔
523
        return hash((self.host, self.port, self.protocol))
×
524

525

526
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
1✔
527
    filename = host
1✔
528
    try:
1✔
529
        ip = ip_address(host)
1✔
530
    except ValueError:
1✔
531
        pass
1✔
532
    else:
533
        if isinstance(ip, IPv6Address):
1✔
534
            filename = f"ipv6_{ip.packed.hex()}"
×
535
    return os.path.join(config.path, 'certs', filename)
1✔
536

537

538
class Interface(Logger):
1✔
539

540
    def __init__(self, *, network: 'Network', server: ServerAddr):
1✔
541
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
1✔
542
        self.ready = network.asyncio_loop.create_future()
1✔
543
        self.got_disconnected = asyncio.Event()
1✔
544
        self._blockchain_updated = asyncio.Event()
1✔
545
        self.server = server
1✔
546
        Logger.__init__(self)
1✔
547
        assert network.config.path
1✔
548
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
1✔
549
        self.blockchain = None  # type: Optional[Blockchain]
1✔
550
        self._requested_chunks = set()  # type: Set[int]
1✔
551
        self.network = network
1✔
552
        self.session = None  # type: Optional[NotificationSession]
1✔
553
        self._ipaddr_bucket = None
1✔
554
        # Set up proxy.
555
        # - for servers running on localhost, the proxy is not used. If user runs their own server
556
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
557
        #   note: we could maybe relax this further and bypass the proxy for all private
558
        #         addresses...? e.g. 192.168.x.x
559
        if util.is_localhost(server.host):
1✔
560
            self.logger.info(f"looks like localhost: not using proxy for this server")
1✔
561
            self.proxy = None
1✔
562
        else:
563
            self.proxy = ESocksProxy.from_network_settings(network)
1✔
564

565
        # Latest block header and corresponding height, as claimed by the server.
566
        # Note that these values are updated before they are verified.
567
        # Especially during initial header sync, verification can take a long time.
568
        # Failing verification will get the interface closed.
569
        self.tip_header = None  # type: Optional[dict]
1✔
570
        self.tip = 0
1✔
571

572
        self._headers_cache = {}  # type: Dict[int, bytes]
1✔
573
        self._rawtx_cache = LRUCache(maxsize=20)  # type: LRUCache[str, bytes]  # txid->rawtx
1✔
574

575
        self.fee_estimates_eta = {}  # type: Dict[int, int]
1✔
576

577
        # Dump network messages (only for this interface).  Set at runtime from the console.
578
        self.debug = False
1✔
579

580
        self.taskgroup = OldTaskGroup()
1✔
581

582
        async def spawn_task():
1✔
583
            task = await self.network.taskgroup.spawn(self.run())
1✔
584
            task.set_name(f"interface::{str(server)}")
1✔
585
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
1✔
586

587
    @property
1✔
588
    def host(self):
1✔
589
        return self.server.host
1✔
590

591
    @property
1✔
592
    def port(self):
1✔
593
        return self.server.port
1✔
594

595
    @property
1✔
596
    def protocol(self):
1✔
597
        return self.server.protocol
1✔
598

599
    def diagnostic_name(self):
1✔
600
        return self.server.net_addr_str()
1✔
601

602
    def __str__(self):
1✔
603
        return f"<Interface {self.diagnostic_name()}>"
×
604

605
    async def is_server_ca_signed(self, ca_ssl_context: ssl.SSLContext) -> bool:
1✔
606
        """Given a CA enforcing SSL context, returns True if the connection
607
        can be established. Returns False if the server has a self-signed
608
        certificate but otherwise is okay. Any other failures raise.
609
        """
610
        try:
×
611
            await self.open_session(ssl_context=ca_ssl_context, exit_early=True)
×
612
        except ConnectError as e:
×
613
            cause = e.__cause__
×
614
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
615
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
616
                    and cause.verify_code == 18):  # "self signed certificate"
617
                # Good. We will use this server as self-signed.
618
                return False
×
619
            # Not good. Cannot use this server.
620
            raise
×
621
        # Good. We will use this server as CA-signed.
622
        return True
×
623

624
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context: ssl.SSLContext) -> None:
1✔
625
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
626
        if ca_signed:
×
627
            if self._get_expected_fingerprint():
×
628
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
629
            with open(self.cert_path, 'w') as f:
×
630
                # empty file means this is CA signed, not self-signed
631
                f.write('')
×
632
        else:
633
            await self._save_certificate()
×
634

635
    def _is_saved_ssl_cert_available(self):
1✔
636
        if not os.path.exists(self.cert_path):
×
637
            return False
×
638
        with open(self.cert_path, 'r') as f:
×
639
            contents = f.read()
×
640
        if contents == '':  # CA signed
×
641
            if self._get_expected_fingerprint():
×
642
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
643
            return True
×
644
        # pinned self-signed cert
645
        try:
×
646
            b = pem.dePem(contents, 'CERTIFICATE')
×
647
        except SyntaxError as e:
×
648
            self.logger.info(f"error parsing already saved cert: {e}")
×
649
            raise ErrorParsingSSLCert(e) from e
×
650
        try:
×
651
            x = x509.X509(b)
×
652
        except Exception as e:
×
653
            self.logger.info(f"error parsing already saved cert: {e}")
×
654
            raise ErrorParsingSSLCert(e) from e
×
655
        try:
×
656
            x.check_date()
×
657
        except x509.CertificateError as e:
×
658
            self.logger.info(f"certificate has expired: {e}")
×
659
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
660
            return False
×
661
        self._verify_certificate_fingerprint(bytes(b))
×
662
        return True
×
663

664
    async def _get_ssl_context(self) -> Optional[ssl.SSLContext]:
1✔
665
        if self.protocol != 's':
1✔
666
            # using plaintext TCP
667
            return None
1✔
668

669
        # see if we already have cert for this server; or get it for the first time
670
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
671
        if not self._is_saved_ssl_cert_available():
×
672
            try:
×
673
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
674
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
675
                raise ErrorGettingSSLCertFromServer(e) from e
×
676
        # now we have a file saved in our certificate store
677
        siz = os.stat(self.cert_path).st_size
×
678
        if siz == 0:
×
679
            # CA signed cert
680
            sslc = ca_sslc
×
681
        else:
682
            # pinned self-signed cert
683
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
684
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
685
            #       We explicitly disable it as it breaks lots of servers.
686
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
687
            sslc.check_hostname = False
×
688
        return sslc
×
689

690
    def handle_disconnect(func):
1✔
691
        @functools.wraps(func)
1✔
692
        async def wrapper_func(self: 'Interface', *args, **kwargs):
1✔
693
            try:
1✔
694
                return await func(self, *args, **kwargs)
1✔
695
            except GracefulDisconnect as e:
×
696
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
697
            except aiorpcx.jsonrpc.RPCError as e:
×
698
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
699
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
700
            finally:
701
                self.got_disconnected.set()
1✔
702
                # Make sure taskgroup gets cleaned-up. This explicit clean-up is needed here
703
                # in case the "with taskgroup" ctx mgr never got a chance to run:
704
                await self.taskgroup.cancel_remaining()
1✔
705
                await self.network.connection_down(self)
1✔
706
                # if was not 'ready' yet, schedule waiting coroutines:
707
                self.ready.cancel()
1✔
708
        return wrapper_func
1✔
709

710
    @ignore_exceptions  # do not kill network.taskgroup
1✔
711
    @log_exceptions
1✔
712
    @handle_disconnect
1✔
713
    async def run(self):
1✔
714
        try:
1✔
715
            ssl_context = await self._get_ssl_context()
1✔
716
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
717
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
718
            return
×
719
        try:
1✔
720
            await self.open_session(ssl_context=ssl_context)
1✔
721
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
1✔
722
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
723
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
1✔
724
                    and self.is_main_server() and not self.network.auto_connect):
725
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
726
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
727
            else:
728
                self.logger.info(f'disconnecting due to: {repr(e)}')
1✔
729
            return
1✔
730

731
    def _mark_ready(self) -> None:
1✔
732
        if self.ready.cancelled():
1✔
733
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
734
        if self.ready.done():
1✔
735
            return
×
736

737
        assert self.tip_header
1✔
738
        chain = blockchain.check_header(self.tip_header)
1✔
739
        if not chain:
1✔
740
            self.blockchain = blockchain.get_best_chain()
1✔
741
        else:
742
            self.blockchain = chain
×
743
        assert self.blockchain is not None
1✔
744

745
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
1✔
746

747
        self.ready.set_result(1)
1✔
748

749
    def is_connected_and_ready(self) -> bool:
1✔
750
        return self.ready.done() and not self.got_disconnected.is_set()
×
751

752
    async def _save_certificate(self) -> None:
1✔
753
        if not os.path.exists(self.cert_path):
×
754
            # we may need to retry this a few times, in case the handshake hasn't completed
755
            for _ in range(10):
×
756
                dercert = await self._fetch_certificate()
×
757
                if dercert:
×
758
                    self.logger.info("succeeded in getting cert")
×
759
                    self._verify_certificate_fingerprint(dercert)
×
760
                    with open(self.cert_path, 'w') as f:
×
761
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
762
                        # workaround android bug
763
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
764
                        f.write(cert)
×
765
                        # even though close flushes, we can't fsync when closed.
766
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
767
                        # fsync writes to OS buffer to disk
768
                        f.flush()
×
769
                        os.fsync(f.fileno())
×
770
                    break
×
771
                await asyncio.sleep(1)
×
772
            else:
773
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
774

775
    async def _fetch_certificate(self) -> bytes:
1✔
776
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
777
        sslc.check_hostname = False
×
778
        sslc.verify_mode = ssl.CERT_NONE
×
779
        async with _RSClient(
×
780
            session_factory=RPCSession,
781
            host=self.host, port=self.port,
782
            ssl=sslc,
783
            proxy=self.proxy,
784
            transport=PaddedRSTransport,
785
        ) as session:
786
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
787
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
788
            return ssl_object.getpeercert(binary_form=True)
×
789

790
    def _get_expected_fingerprint(self) -> Optional[str]:
1✔
791
        if self.is_main_server():
×
792
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
793
        return None
×
794

795
    def _verify_certificate_fingerprint(self, certificate: bytes) -> None:
1✔
796
        expected_fingerprint = self._get_expected_fingerprint()
×
797
        if not expected_fingerprint:
×
798
            return
×
799
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
800
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
801
        if not fingerprints_match:
×
802
            util.trigger_callback('cert_mismatch')
×
803
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
804
        self.logger.info("cert fingerprint verification passed")
×
805

806
    async def _maybe_warm_headers_cache(self, *, from_height: int, to_height: int, mode: ChainResolutionMode) -> None:
1✔
807
        """Populate header cache for block heights in range [from_height, to_height]."""
808
        assert from_height <= to_height, (from_height, to_height)
1✔
809
        assert to_height - from_height < MAX_NUM_HEADERS_PER_REQUEST
1✔
810
        if all(height in self._headers_cache for height in range(from_height, to_height+1)):
1✔
811
            # cache already has all requested headers
812
            return
1✔
813
        # use lower timeout as we usually have network.bhi_lock here
814
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
815
        count = to_height - from_height + 1
1✔
816
        headers = await self.get_block_headers(start_height=from_height, count=count, timeout=timeout, mode=mode)
1✔
817
        for idx, raw_header in enumerate(headers):
1✔
818
            header_height = from_height + idx
1✔
819
            self._headers_cache[header_height] = raw_header
1✔
820

821
    async def get_block_header(self, height: int, *, mode: ChainResolutionMode) -> dict:
1✔
822
        if not is_non_negative_integer(height):
1✔
823
            raise Exception(f"{repr(height)} is not a block height")
×
824
        #self.logger.debug(f'get_block_header() {height} in {mode=}')
825
        # use lower timeout as we usually have network.bhi_lock here
826
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
827
        if raw_header := self._headers_cache.get(height):
1✔
828
            return blockchain.deserialize_header(raw_header, height)
1✔
829
        self.logger.info(f'requesting block header {height} in {mode=}')
×
830
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
831
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
832

833
    async def get_block_headers(
1✔
834
        self,
835
        *,
836
        start_height: int,
837
        count: int,
838
        timeout=None,
839
        mode: Optional[ChainResolutionMode] = None,
840
    ) -> Sequence[bytes]:
841
        """Request a number of consecutive block headers, starting at `start_height`.
842
        `count` is the num of requested headers, BUT note the server might return fewer than this
843
        (if range would extend beyond its tip).
844
        note: the returned headers are not verified or parsed at all.
845
        """
846
        if not is_non_negative_integer(start_height):
1✔
847
            raise Exception(f"{repr(start_height)} is not a block height")
×
848
        if not is_non_negative_integer(count) or not (0 < count <= MAX_NUM_HEADERS_PER_REQUEST):
1✔
849
            raise Exception(f"{repr(count)} not an int in range ]0, {MAX_NUM_HEADERS_PER_REQUEST}]")
×
850
        self.logger.info(
1✔
851
            f"requesting block headers: [{start_height}, {start_height+count-1}], {count=}"
852
            + (f" (in {mode=})" if mode is not None else "")
853
        )
854
        res = await self.session.send_request('blockchain.block.headers', [start_height, count], timeout=timeout)
1✔
855
        # check response
856
        assert_dict_contains_field(res, field_name='count')
1✔
857
        assert_dict_contains_field(res, field_name='hex')
1✔
858
        assert_dict_contains_field(res, field_name='max')
1✔
859
        assert_non_negative_integer(res['count'])
1✔
860
        assert_non_negative_integer(res['max'])
1✔
861
        assert_hex_str(res['hex'])
1✔
862
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
1✔
863
            raise RequestCorrupted('inconsistent chunk hex and count')
×
864
        # we never request more than MAX_NUM_HEADERS_IN_REQUEST headers, but we enforce those fit in a single response
865
        if res['max'] < MAX_NUM_HEADERS_PER_REQUEST:
1✔
866
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < {MAX_NUM_HEADERS_PER_REQUEST}")
×
867
        if res['count'] > count:
1✔
868
            raise RequestCorrupted(f"asked for {count} headers but got more: {res['count']}")
×
869
        elif res['count'] < count:
1✔
870
            # we only tolerate getting fewer headers if it is due to reaching the tip
871
            end_height = start_height + res['count'] - 1
×
872
            if end_height < self.tip:  # still below tip. why did server not send more?!
×
873
                raise RequestCorrupted(
×
874
                    f"asked for {count} headers but got fewer: {res['count']}. ({start_height=}, {self.tip=})")
875
        # checks done.
876
        headers = list(util.chunks(bfh(res['hex']), size=HEADER_SIZE))
1✔
877
        return headers
1✔
878

879
    async def request_chunk_below_max_checkpoint(
1✔
880
        self,
881
        *,
882
        height: int,
883
    ) -> None:
884
        if not is_non_negative_integer(height):
×
885
            raise Exception(f"{repr(height)} is not a block height")
×
886
        assert height <= constants.net.max_checkpoint(), f"{height=} must be <= cp={constants.net.max_checkpoint()}"
×
887
        index = height // CHUNK_SIZE
×
888
        if index in self._requested_chunks:
×
889
            return None
×
890
        self.logger.debug(f"requesting chunk from height {height}")
×
891
        try:
×
892
            self._requested_chunks.add(index)
×
893
            headers = await self.get_block_headers(start_height=index * CHUNK_SIZE, count=CHUNK_SIZE)
×
894
        finally:
895
            self._requested_chunks.discard(index)
×
896
        conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
897
        if not conn:
×
898
            raise RequestCorrupted(f"chunk ({index=}, for {height=}) does not connect to blockchain")
×
899
        return None
×
900

901
    async def _fast_forward_chain(
1✔
902
        self,
903
        *,
904
        height: int,  # usually local chain tip + 1
905
        tip: int,  # server tip. we should not request past this.
906
    ) -> int:
907
        """Request some headers starting at `height` to grow the blockchain of this interface.
908
        Returns number of headers we managed to connect, starting at `height`.
909
        """
910
        if not is_non_negative_integer(height):
×
911
            raise Exception(f"{repr(height)} is not a block height")
×
912
        if not is_non_negative_integer(tip):
×
913
            raise Exception(f"{repr(tip)} is not a block height")
×
914
        if not (height > constants.net.max_checkpoint()
×
915
                or height == 0 == constants.net.max_checkpoint()):
916
            raise Exception(f"{height=} must be > cp={constants.net.max_checkpoint()}")
×
917
        assert height <= tip, f"{height=} must be <= {tip=}"
×
918
        # Request a few chunks of headers concurrently.
919
        # tradeoffs:
920
        # - more chunks: higher memory requirements
921
        # - more chunks: higher concurrency => syncing needs fewer network round-trips
922
        # - if a chunk does not connect, bandwidth for all later chunks is wasted
923
        async with OldTaskGroup() as group:
×
924
            tasks = []  # type: List[Tuple[int, asyncio.Task[Sequence[bytes]]]]
×
925
            index0 = height // CHUNK_SIZE
×
926
            for chunk_cnt in range(10):
×
927
                index = index0 + chunk_cnt
×
928
                start_height = index * CHUNK_SIZE
×
929
                if start_height > tip:
×
930
                    break
×
931
                end_height = min(start_height + CHUNK_SIZE - 1, tip)
×
932
                size = end_height - start_height + 1
×
933
                tasks.append((index, await group.spawn(self.get_block_headers(start_height=start_height, count=size))))
×
934
        # try to connect chunks
935
        num_headers = 0
×
936
        for index, task in tasks:
×
937
            headers = task.result()
×
938
            conn = self.blockchain.connect_chunk(index, data=b"".join(headers))
×
939
            if not conn:
×
940
                break
×
941
            num_headers += len(headers)
×
942
        # We started at a chunk boundary, instead of requested `height`. Need to correct for that.
943
        offset = height - index0 * CHUNK_SIZE
×
944
        return max(0, num_headers - offset)
×
945

946
    def is_main_server(self) -> bool:
1✔
947
        return (self.network.interface == self or
1✔
948
                self.network.interface is None and self.network.default_server == self.server)
949

950
    async def open_session(
1✔
951
        self,
952
        *,
953
        ssl_context: Optional[ssl.SSLContext],
954
        exit_early: bool = False,
955
    ):
956
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
1✔
957
        async with _RSClient(
1✔
958
            session_factory=session_factory,
959
            host=self.host, port=self.port,
960
            ssl=ssl_context,
961
            proxy=self.proxy,
962
            transport=PaddedRSTransport,
963
        ) as session:
964
            start = time.perf_counter()
1✔
965
            self.session = session  # type: NotificationSession
1✔
966
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
1✔
967
            try:
1✔
968
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
1✔
969
            except aiorpcx.jsonrpc.RPCError as e:
×
970
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
971
            if exit_early:
1✔
972
                return
×
973
            if ver[1] != version.PROTOCOL_VERSION:
1✔
974
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
975
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
976
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
1✔
977
                raise GracefulDisconnect(f'too many connected servers already '
×
978
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
979

980
            try:
1✔
981
                features = await session.send_request('server.features')
1✔
982
                server_genesis_hash = assert_dict_contains_field(features, field_name='genesis_hash')
1✔
983
            except (aiorpcx.jsonrpc.RPCError, RequestCorrupted) as e:
×
984
                raise GracefulDisconnect(e)
×
985
            if server_genesis_hash != constants.net.GENESIS:
1✔
986
                raise GracefulDisconnect(f'server on different chain: {server_genesis_hash=}. ours: {constants.net.GENESIS}')
×
987
            self.logger.info(f"connection established. version: {ver}, handshake duration: {(time.perf_counter() - start) * 1000:.2f} ms")
1✔
988

989
            try:
1✔
990
                async with self.taskgroup as group:
1✔
991
                    await group.spawn(self.ping)
1✔
992
                    await group.spawn(self.request_fee_estimates)
1✔
993
                    await group.spawn(self.run_fetch_blocks)
1✔
994
                    await group.spawn(self.monitor_connection)
1✔
995
            except aiorpcx.jsonrpc.RPCError as e:
1✔
996
                if e.code in (
×
997
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
998
                    JSONRPC.SERVER_BUSY,
999
                    JSONRPC.METHOD_NOT_FOUND,
1000
                    JSONRPC.INTERNAL_ERROR,
1001
                ):
1002
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
1003
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
1004
                raise
×
1005
            finally:
1006
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
1✔
1007

1008
    async def monitor_connection(self):
1✔
1009
        while True:
1✔
1010
            await asyncio.sleep(1)
1✔
1011
            # If the session/transport is no longer open, we disconnect.
1012
            # e.g. if the remote cleanly sends EOF, we would handle that here.
1013
            # note: If the user pulls the ethernet cable or disconnects wifi,
1014
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
1015
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
1016
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
1017
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
1018
            #         Hence, in practice the connection issue will only be detected the next time we try
1019
            #         to send a message (plus timeout), which can take minutes...
1020
            if not self.session or self.session.is_closing():
×
1021
                raise GracefulDisconnect('session was closed')
×
1022

1023
    async def ping(self):
1✔
1024
        # We periodically send a "ping" msg to make sure the server knows we are still here.
1025
        # Adding a bit of randomness generates some noise against traffic analysis.
1026
        while True:
1✔
1027
            await asyncio.sleep(random.random() * 300)
1✔
1028
            await self.session.send_request('server.ping')
×
1029
            await self._maybe_send_noise()
×
1030

1031
    async def _maybe_send_noise(self):
1✔
1032
        while random.random() < 0.2:
1✔
1033
            await asyncio.sleep(random.random())
×
1034
            await self.session.send_request('server.ping')
×
1035

1036
    async def request_fee_estimates(self):
1✔
1037
        while True:
1✔
1038
            async with OldTaskGroup() as group:
1✔
1039
                fee_tasks = []
1✔
1040
                for i in FEE_ETA_TARGETS[0:-1]:
1✔
1041
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
1✔
1042
            for nblock_target, task in fee_tasks:
1✔
1043
                fee = task.result()
1✔
1044
                if fee < 0: continue
1✔
1045
                assert isinstance(fee, int)
1✔
1046
                self.fee_estimates_eta[nblock_target] = fee
1✔
1047
            self.network.update_fee_estimates()
1✔
1048
            await asyncio.sleep(60)
1✔
1049

1050
    async def close(self, *, force_after: int = None):
1✔
1051
        """Closes the connection and waits for it to be closed.
1052
        We try to flush buffered data to the wire, which can take some time.
1053
        """
1054
        if self.session:
1✔
1055
            await self.session.close(force_after=force_after)
1✔
1056
        # monitor_connection will cancel tasks
1057

1058
    async def run_fetch_blocks(self):
1✔
1059
        header_queue = asyncio.Queue()
1✔
1060
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
1✔
1061
        while True:
1✔
1062
            item = await header_queue.get()
1✔
1063
            raw_header = item[0]
1✔
1064
            height = raw_header['height']
1✔
1065
            header_bytes = bfh(raw_header['hex'])
1✔
1066
            header_dict = blockchain.deserialize_header(header_bytes, height)
1✔
1067
            self.tip_header = header_dict
1✔
1068
            self.tip = height
1✔
1069
            if self.tip < constants.net.max_checkpoint():
1✔
1070
                raise GracefulDisconnect(
×
1071
                    f"server tip below max checkpoint. ({self.tip} < {constants.net.max_checkpoint()})")
1072
            self._mark_ready()
1✔
1073
            self._headers_cache.clear()  # tip changed, so assume anything could have happened with chain
1✔
1074
            self._headers_cache[height] = header_bytes
1✔
1075
            try:
1✔
1076
                blockchain_updated = await self._process_header_at_tip()
1✔
1077
            finally:
1078
                self._headers_cache.clear()  # to reduce memory usage
1✔
1079
            # header processing done
1080
            if self.is_main_server() or blockchain_updated:
1✔
1081
                self.logger.info(f"new chain tip. {height=}")
1✔
1082
            if blockchain_updated:
1✔
1083
                util.trigger_callback('blockchain_updated')
1✔
1084
                self._blockchain_updated.set()
1✔
1085
                self._blockchain_updated.clear()
1✔
1086
            util.trigger_callback('network_updated')
1✔
1087
            await self.network.switch_unwanted_fork_interface()
1✔
1088
            await self.network.switch_lagging_interface()
1✔
1089
            await self.taskgroup.spawn(self._maybe_send_noise())
1✔
1090

1091
    async def _process_header_at_tip(self) -> bool:
1✔
1092
        """Returns:
1093
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
1094
        True - new header we didn't have, or reorg
1095
        """
1096
        height, header = self.tip, self.tip_header
1✔
1097
        async with self.network.bhi_lock:
1✔
1098
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
1✔
1099
                # another interface amended the blockchain
1100
                return False
×
1101
            await self.sync_until(height)
1✔
1102
            return True
1✔
1103

1104
    async def sync_until(
1✔
1105
        self,
1106
        height: int,
1107
        *,
1108
        next_height: Optional[int] = None,  # sync target. typically the tip, except in unit tests
1109
    ) -> Tuple[ChainResolutionMode, int]:
1110
        if next_height is None:
1✔
1111
            next_height = self.tip
1✔
1112
        last = None  # type: Optional[ChainResolutionMode]
1✔
1113
        while last is None or height <= next_height:
1✔
1114
            prev_last, prev_height = last, height
1✔
1115
            if next_height > height + 144:
1✔
1116
                # We are far from the tip.
1117
                # It is more efficient to process headers in large batches (CPU/disk_usage/logging).
1118
                # (but this wastes a little bandwidth, if we are not on a chunk boundary)
1119
                num_headers = await self._fast_forward_chain(
×
1120
                    height=height, tip=next_height)
1121
                if num_headers == 0:
×
1122
                    if height <= constants.net.max_checkpoint():
×
1123
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
1124
                    last, height = await self.step(height)
×
1125
                    continue
×
1126
                # report progress to gui/etc
1127
                util.trigger_callback('blockchain_updated')
×
1128
                self._blockchain_updated.set()
×
1129
                self._blockchain_updated.clear()
×
1130
                util.trigger_callback('network_updated')
×
1131
                height += num_headers
×
1132
                assert height <= next_height+1, (height, self.tip)
×
1133
                last = ChainResolutionMode.CATCHUP
×
1134
            else:
1135
                # We are close to the tip, so process headers one-by-one.
1136
                # (note: due to headers_cache, to save network latency, this can still batch-request headers)
1137
                last, height = await self.step(height)
1✔
1138
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
1✔
1139
        return last, height
1✔
1140

1141
    async def step(
1✔
1142
        self,
1143
        height: int,
1144
    ) -> Tuple[ChainResolutionMode, int]:
1145
        assert 0 <= height <= self.tip, (height, self.tip)
1✔
1146
        await self._maybe_warm_headers_cache(
1✔
1147
            from_height=height,
1148
            to_height=min(self.tip, height+MAX_NUM_HEADERS_PER_REQUEST-1),
1149
            mode=ChainResolutionMode.CATCHUP,
1150
        )
1151
        header = await self.get_block_header(height, mode=ChainResolutionMode.CATCHUP)
1✔
1152

1153
        chain = blockchain.check_header(header)
1✔
1154
        if chain:
1✔
1155
            self.blockchain = chain
1✔
1156
            # note: there is an edge case here that is not handled.
1157
            # we might know the blockhash (enough for check_header) but
1158
            # not have the header itself. e.g. regtest chain with only genesis.
1159
            # this situation resolves itself on the next block
1160
            return ChainResolutionMode.CATCHUP, height+1
1✔
1161

1162
        can_connect = blockchain.can_connect(header)
1✔
1163
        if not can_connect:
1✔
1164
            self.logger.info(f"can't connect new block: {height=}")
1✔
1165
            height, header, bad, bad_header = await self._search_headers_backwards(height, header=header)
1✔
1166
            chain = blockchain.check_header(header)
1✔
1167
            can_connect = blockchain.can_connect(header)
1✔
1168
            assert chain or can_connect
1✔
1169
        if can_connect:
1✔
1170
            height += 1
1✔
1171
            self.blockchain = can_connect
1✔
1172
            self.blockchain.save_header(header)
1✔
1173
            return ChainResolutionMode.CATCHUP, height
1✔
1174

1175
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
1✔
1176
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
1✔
1177

1178
    async def _search_headers_binary(
1✔
1179
        self,
1180
        height: int,
1181
        bad: int,
1182
        bad_header: dict,
1183
        chain: Optional[Blockchain],
1184
    ) -> Tuple[int, int, dict]:
1185
        assert bad == bad_header['block_height']
1✔
1186
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1187

1188
        self.blockchain = chain
1✔
1189
        good = height
1✔
1190
        while True:
1✔
1191
            assert 0 <= good < bad, (good, bad)
1✔
1192
            height = (good + bad) // 2
1✔
1193
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
1✔
1194
            if bad - good + 1 <= MAX_NUM_HEADERS_PER_REQUEST:  # if interval is small, trade some bandwidth for lower latency
1✔
1195
                await self._maybe_warm_headers_cache(
1✔
1196
                    from_height=good, to_height=bad, mode=ChainResolutionMode.BINARY)
1197
            header = await self.get_block_header(height, mode=ChainResolutionMode.BINARY)
1✔
1198
            chain = blockchain.check_header(header)
1✔
1199
            if chain:
1✔
1200
                self.blockchain = chain
1✔
1201
                good = height
1✔
1202
            else:
1203
                bad = height
1✔
1204
                bad_header = header
1✔
1205
            if good + 1 == bad:
1✔
1206
                break
1✔
1207

1208
        if not self.blockchain.can_connect(bad_header, check_height=False):
1✔
1209
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
1210
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1211

1212
        self.logger.info(f"binary search exited. good {good}, bad {bad}. {chain=}")
1✔
1213
        return good, bad, bad_header
1✔
1214

1215
    async def _resolve_potential_chain_fork_given_forkpoint(
1✔
1216
        self,
1217
        good: int,
1218
        bad: int,
1219
        bad_header: dict,
1220
    ) -> Tuple[ChainResolutionMode, int]:
1221
        assert good + 1 == bad
1✔
1222
        assert bad == bad_header['block_height']
1✔
1223
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1224
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
1225
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
1226

1227
        bh = self.blockchain.height()
1✔
1228
        assert bh >= good, (bh, good)
1✔
1229
        if bh == good:
1✔
1230
            height = good + 1
1✔
1231
            self.logger.info(f"catching up from {height}")
1✔
1232
            return ChainResolutionMode.NO_FORK, height
1✔
1233

1234
        # this is a new fork we don't yet have
1235
        height = bad + 1
1✔
1236
        self.logger.info(f"new fork at bad height {bad}")
1✔
1237
        b = self.blockchain.fork(bad_header)  # type: Blockchain
1✔
1238
        self.blockchain = b
1✔
1239
        assert b.forkpoint == bad
1✔
1240
        return ChainResolutionMode.FORK, height
1✔
1241

1242
    async def _search_headers_backwards(
1✔
1243
        self,
1244
        height: int,
1245
        *,
1246
        header: dict,
1247
    ) -> Tuple[int, dict, int, dict]:
1248
        async def iterate():
1✔
1249
            nonlocal height, header
1250
            checkp = False
1✔
1251
            if height <= constants.net.max_checkpoint():
1✔
1252
                height = constants.net.max_checkpoint()
1✔
1253
                checkp = True
1✔
1254
            header = await self.get_block_header(height, mode=ChainResolutionMode.BACKWARD)
1✔
1255
            chain = blockchain.check_header(header)
1✔
1256
            can_connect = blockchain.can_connect(header)
1✔
1257
            if chain or can_connect:
1✔
1258
                return False
1✔
1259
            if checkp:
1✔
1260
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
1261
            return True
1✔
1262

1263
        bad, bad_header = height, header
1✔
1264
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1265
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
1✔
1266
        local_max = max([0] + [x.height() for x in chains])
1✔
1267
        height = min(local_max + 1, height - 1)
1✔
1268
        assert height >= 0
1✔
1269

1270
        await self._maybe_warm_headers_cache(
1✔
1271
            from_height=max(0, height-10), to_height=height, mode=ChainResolutionMode.BACKWARD)
1272

1273
        delta = 2
1✔
1274
        while await iterate():
1✔
1275
            bad, bad_header = height, header
1✔
1276
            height -= delta
1✔
1277
            delta *= 2
1✔
1278

1279
        _assert_header_does_not_check_against_any_chain(bad_header)
1✔
1280
        self.logger.info(f"exiting backward mode at {height}")
1✔
1281
        return height, header, bad, bad_header
1✔
1282

1283
    @classmethod
1✔
1284
    def client_name(cls) -> str:
1✔
1285
        return f'electrum/{version.ELECTRUM_VERSION}'
1✔
1286

1287
    def is_tor(self):
1✔
1288
        return self.host.endswith('.onion')
×
1289

1290
    def ip_addr(self) -> Optional[str]:
1✔
1291
        session = self.session
×
1292
        if not session: return None
×
1293
        peer_addr = session.remote_address()
×
1294
        if not peer_addr: return None
×
1295
        return str(peer_addr.host)
×
1296

1297
    def bucket_based_on_ipaddress(self) -> str:
1✔
1298
        def do_bucket():
×
1299
            if self.is_tor():
×
1300
                return BUCKET_NAME_OF_ONION_SERVERS
×
1301
            try:
×
1302
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
1303
            except ValueError:
×
1304
                return ''
×
1305
            if not ip_addr:
×
1306
                return ''
×
1307
            if ip_addr.is_loopback:  # localhost is exempt
×
1308
                return ''
×
1309
            if ip_addr.version == 4:
×
1310
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
1311
                return str(slash16)
×
1312
            elif ip_addr.version == 6:
×
1313
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
1314
                return str(slash48)
×
1315
            return ''
×
1316

1317
        if not self._ipaddr_bucket:
×
1318
            self._ipaddr_bucket = do_bucket()
×
1319
        return self._ipaddr_bucket
×
1320

1321
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
1✔
1322
        if not is_hash256_str(tx_hash):
×
1323
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1324
        if not is_non_negative_integer(tx_height):
×
1325
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1326
        # do request
1327
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
1328
        # check response
1329
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
1330
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
1331
        pos = assert_dict_contains_field(res, field_name='pos')
×
1332
        # note: tx_height was just a hint to the server, don't enforce the response to match it
1333
        assert_non_negative_integer(block_height)
×
1334
        assert_non_negative_integer(pos)
×
1335
        assert_list_or_tuple(merkle)
×
1336
        for item in merkle:
×
1337
            assert_hash256_str(item)
×
1338
        return res
×
1339

1340
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1✔
1341
        if not is_hash256_str(tx_hash):
1✔
1342
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
1343
        if rawtx_bytes := self._rawtx_cache.get(tx_hash):
1✔
1344
            return rawtx_bytes.hex()
1✔
1345
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
1✔
1346
        # validate response
1347
        if not is_hex_str(raw):
1✔
1348
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1349
        tx = Transaction(raw)
1✔
1350
        try:
1✔
1351
            tx.deserialize()  # see if raises
1✔
1352
        except Exception as e:
×
1353
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1354
        if tx.txid() != tx_hash:
1✔
1355
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1356
        self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
1✔
1357
        return raw
1✔
1358

1359
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
1✔
1360
        """caller should handle TxBroadcastError and RequestTimedOut"""
1361
        txid_calc = tx.txid()
1✔
1362
        assert txid_calc is not None
1✔
1363
        rawtx = tx.serialize()
1✔
1364
        assert is_hex_str(rawtx)
1✔
1365
        if timeout is None:
1✔
1366
            timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
1✔
1367
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
1✔
1368
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1369
        try:
1✔
1370
            out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
1✔
1371
            # note: both 'out' and exception messages are untrusted input from the server
1372
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1373
            raise  # pass-through
×
1374
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1375
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1376
            raise TxBroadcastServerReturnedError(sanitize_tx_broadcast_response(e.message)) from e
×
1377
        except BaseException as e:  # intentional BaseException for sanity!
×
1378
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
×
1379
            send_exception_to_crash_reporter(e)
×
1380
            raise TxBroadcastUnknownError() from e
×
1381
        if out != txid_calc:
1✔
1382
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1383
                             f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
1384
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1385
        # broadcast succeeded.
1386
        # We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
1387
        # the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
1388
        self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
1✔
1389

1390
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1✔
1391
        if not is_hash256_str(sh):
×
1392
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1393
        # do request
1394
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1395
        # check response
1396
        assert_list_or_tuple(res)
×
1397
        prev_height = 1
×
1398
        for tx_item in res:
×
1399
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1400
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1401
            assert_integer(height)
×
1402
            assert_hash256_str(tx_item['tx_hash'])
×
1403
            if height in (-1, 0):
×
1404
                assert_dict_contains_field(tx_item, field_name='fee')
×
1405
                assert_non_negative_integer(tx_item['fee'])
×
1406
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1407
            else:
1408
                # check monotonicity of heights
1409
                if height < prev_height:
×
1410
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1411
                prev_height = height
×
1412
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1413
        if len(hashes) != len(res):
×
1414
            # Either server is sending garbage... or maybe if server is race-prone
1415
            # a recently mined tx could be included in both last block and mempool?
1416
            # Still, it's simplest to just disregard the response.
1417
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1418
        return res
×
1419

1420
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1✔
1421
        if not is_hash256_str(sh):
×
1422
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1423
        # do request
1424
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1425
        # check response
1426
        assert_list_or_tuple(res)
×
1427
        for utxo_item in res:
×
1428
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1429
            assert_dict_contains_field(utxo_item, field_name='value')
×
1430
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1431
            assert_dict_contains_field(utxo_item, field_name='height')
×
1432
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1433
            assert_non_negative_integer(utxo_item['value'])
×
1434
            assert_non_negative_integer(utxo_item['height'])
×
1435
            assert_hash256_str(utxo_item['tx_hash'])
×
1436
        return res
×
1437

1438
    async def get_balance_for_scripthash(self, sh: str) -> dict:
1✔
1439
        if not is_hash256_str(sh):
×
1440
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1441
        # do request
1442
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1443
        # check response
1444
        assert_dict_contains_field(res, field_name='confirmed')
×
1445
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1446
        assert_non_negative_integer(res['confirmed'])
×
1447
        assert_integer(res['unconfirmed'])
×
1448
        return res
×
1449

1450
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
1✔
1451
        if not is_non_negative_integer(tx_height):
×
1452
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1453
        if not is_non_negative_integer(tx_pos):
×
1454
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1455
        # do request
1456
        res = await self.session.send_request(
×
1457
            'blockchain.transaction.id_from_pos',
1458
            [tx_height, tx_pos, merkle],
1459
        )
1460
        # check response
1461
        if merkle:
×
1462
            assert_dict_contains_field(res, field_name='tx_hash')
×
1463
            assert_dict_contains_field(res, field_name='merkle')
×
1464
            assert_hash256_str(res['tx_hash'])
×
1465
            assert_list_or_tuple(res['merkle'])
×
1466
            for node_hash in res['merkle']:
×
1467
                assert_hash256_str(node_hash)
×
1468
        else:
1469
            assert_hash256_str(res)
×
1470
        return res
×
1471

1472
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
1✔
1473
        # do request
1474
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1475
        # check response
1476
        assert_list_or_tuple(res)
×
1477
        prev_fee = float('inf')
×
1478
        for fee, s in res:
×
1479
            assert_non_negative_int_or_float(fee)
×
1480
            assert_non_negative_integer(s)
×
1481
            if fee >= prev_fee:  # check monotonicity
×
1482
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1483
            prev_fee = fee
×
1484
        return res
×
1485

1486
    async def get_server_banner(self) -> str:
1✔
1487
        # do request
1488
        res = await self.session.send_request('server.banner')
×
1489
        # check response
1490
        if not isinstance(res, str):
×
1491
            raise RequestCorrupted(f'{res!r} should be a str')
×
1492
        return res
×
1493

1494
    async def get_donation_address(self) -> str:
1✔
1495
        # do request
1496
        res = await self.session.send_request('server.donation_address')
×
1497
        # check response
1498
        if not res:  # ignore empty string
×
1499
            return ''
×
1500
        if not bitcoin.is_address(res):
×
1501
            # note: do not hard-fail -- allow server to use future-type
1502
            #       bitcoin address we do not recognize
1503
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1504
            res = ''
×
1505
        return res
×
1506

1507
    async def get_relay_fee(self) -> int:
1✔
1508
        """Returns the min relay feerate in sat/kbyte."""
1509
        # do request
1510
        res = await self.session.send_request('blockchain.relayfee')
×
1511
        # check response
1512
        assert_non_negative_int_or_float(res)
×
1513
        relayfee = int(res * bitcoin.COIN)
×
1514
        relayfee = max(0, relayfee)
×
1515
        return relayfee
×
1516

1517
    async def get_estimatefee(self, num_blocks: int) -> int:
1✔
1518
        """Returns a feerate estimate for getting confirmed within
1519
        num_blocks blocks, in sat/kbyte.
1520
        Returns -1 if the server could not provide an estimate.
1521
        """
1522
        if not is_non_negative_integer(num_blocks):
1✔
1523
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1524
        # do request
1525
        try:
1✔
1526
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
1✔
1527
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1528
            # The protocol spec says the server itself should already have returned -1
1529
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1530
            # and sends an error instead. Convert it here:
1531
            if "cannot estimate fee" in e.message:
×
1532
                res = -1
×
1533
            else:
1534
                raise
×
1535
        except aiorpcx.jsonrpc.RPCError as e:
×
1536
            # The protocol spec says the server itself should already have returned -1
1537
            # if it cannot provide an estimate. "Fulcrum" often sends:
1538
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1539
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1540
                res = -1
×
1541
            else:
1542
                raise
×
1543
        # check response
1544
        if res != -1:
1✔
1545
            assert_non_negative_int_or_float(res)
1✔
1546
            res = int(res * bitcoin.COIN)
1✔
1547
        return res
1✔
1548

1549

1550
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
1✔
1551
    chain_bad = blockchain.check_header(header)
1✔
1552
    if chain_bad:
1✔
1553
        raise Exception('bad_header must not check!')
×
1554

1555

1556
def sanitize_tx_broadcast_response(server_msg) -> str:
1✔
1557
    # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1558
    # So, we use substring matching to grok the error message.
1559
    # server_msg is untrusted input so it should not be shown to the user. see #4968
1560
    server_msg = str(server_msg)
×
1561
    server_msg = server_msg.replace("\n", r"\n")
×
1562

1563
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
1564
    script_error_messages = {
×
1565
        r"Script evaluated without error but finished with a false/empty top stack element",
1566
        r"Script failed an OP_VERIFY operation",
1567
        r"Script failed an OP_EQUALVERIFY operation",
1568
        r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1569
        r"Script failed an OP_CHECKSIGVERIFY operation",
1570
        r"Script failed an OP_NUMEQUALVERIFY operation",
1571
        r"Script is too big",
1572
        r"Push value size limit exceeded",
1573
        r"Operation limit exceeded",
1574
        r"Stack size limit exceeded",
1575
        r"Signature count negative or greater than pubkey count",
1576
        r"Pubkey count negative or limit exceeded",
1577
        r"Opcode missing or not understood",
1578
        r"Attempted to use a disabled opcode",
1579
        r"Operation not valid with the current stack size",
1580
        r"Operation not valid with the current altstack size",
1581
        r"OP_RETURN was encountered",
1582
        r"Invalid OP_IF construction",
1583
        r"Negative locktime",
1584
        r"Locktime requirement not satisfied",
1585
        r"Signature hash type missing or not understood",
1586
        r"Non-canonical DER signature",
1587
        r"Data push larger than necessary",
1588
        r"Only push operators allowed in signatures",
1589
        r"Non-canonical signature: S value is unnecessarily high",
1590
        r"Dummy CHECKMULTISIG argument must be zero",
1591
        r"OP_IF/NOTIF argument must be minimal",
1592
        r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1593
        r"NOPx reserved for soft-fork upgrades",
1594
        r"Witness version reserved for soft-fork upgrades",
1595
        r"Taproot version reserved for soft-fork upgrades",
1596
        r"OP_SUCCESSx reserved for soft-fork upgrades",
1597
        r"Public key version reserved for soft-fork upgrades",
1598
        r"Public key is neither compressed or uncompressed",
1599
        r"Stack size must be exactly one after execution",
1600
        r"Extra items left on stack after execution",
1601
        r"Witness program has incorrect length",
1602
        r"Witness program was passed an empty witness",
1603
        r"Witness program hash mismatch",
1604
        r"Witness requires empty scriptSig",
1605
        r"Witness requires only-redeemscript scriptSig",
1606
        r"Witness provided for non-witness script",
1607
        r"Using non-compressed keys in segwit",
1608
        r"Invalid Schnorr signature size",
1609
        r"Invalid Schnorr signature hash type",
1610
        r"Invalid Schnorr signature",
1611
        r"Invalid Taproot control block size",
1612
        r"Too much signature validation relative to witness weight",
1613
        r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1614
        r"OP_IF/NOTIF argument must be minimal in tapscript",
1615
        r"Using OP_CODESEPARATOR in non-witness script",
1616
        r"Signature is found in scriptCode",
1617
    }
1618
    for substring in script_error_messages:
×
1619
        if substring in server_msg:
×
1620
            return substring
×
1621
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1622
    # grep "REJECT_"
1623
    # grep "TxValidationResult"
1624
    # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
1625
    validation_error_messages = {
×
1626
        r"coinbase": None,
1627
        r"tx-size-small": None,
1628
        r"non-final": None,
1629
        r"txn-already-in-mempool": None,
1630
        r"txn-mempool-conflict": None,
1631
        r"txn-already-known": None,
1632
        r"non-BIP68-final": None,
1633
        r"bad-txns-nonstandard-inputs": None,
1634
        r"bad-witness-nonstandard": None,
1635
        r"bad-txns-too-many-sigops": None,
1636
        r"mempool min fee not met":
1637
            ("mempool min fee not met\n" +
1638
             _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1639
               "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1640
               "of transactions that all pay higher fees. Try to increase the fee.")),
1641
        r"min relay fee not met": None,
1642
        r"absurdly-high-fee": None,
1643
        r"max-fee-exceeded": None,
1644
        r"too-long-mempool-chain": None,
1645
        r"bad-txns-spends-conflicting-tx": None,
1646
        r"insufficient fee": ("insufficient fee\n" +
1647
             _("Your transaction is trying to replace another one in the mempool but it "
1648
               "does not meet the rules to do so. Try to increase the fee.")),
1649
        r"too many potential replacements": None,
1650
        r"replacement-adds-unconfirmed": None,
1651
        r"mempool full": None,
1652
        r"non-mandatory-script-verify-flag": None,
1653
        r"mandatory-script-verify-flag-failed": None,
1654
        r"Transaction check failed": None,
1655
    }
1656
    for substring in validation_error_messages:
×
1657
        if substring in server_msg:
×
1658
            msg = validation_error_messages[substring]
×
1659
            return msg if msg else substring
×
1660
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1661
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1662
    # https://github.com/bitcoin/bitcoin/blob/3f83c744ac28b700090e15b5dda2260724a56f49/src/common/messages.cpp#L126
1663
    # grep "RPC_TRANSACTION"
1664
    # grep "RPC_DESERIALIZATION_ERROR"
1665
    # grep "TransactionError"
1666
    rawtransaction_error_messages = {
×
1667
        r"Missing inputs": None,
1668
        r"Inputs missing or spent": None,
1669
        r"transaction already in block chain": None,
1670
        r"Transaction already in block chain": None,
1671
        r"Transaction outputs already in utxo set": None,
1672
        r"TX decode failed": None,
1673
        r"Peer-to-peer functionality missing or disabled": None,
1674
        r"Transaction rejected by AcceptToMemoryPool": None,
1675
        r"AcceptToMemoryPool failed": None,
1676
        r"Transaction rejected by mempool": None,
1677
        r"Mempool internal error": None,
1678
        r"Fee exceeds maximum configured by user": None,
1679
        r"Unspendable output exceeds maximum configured by user": None,
1680
        r"Transaction rejected due to invalid package": None,
1681
    }
1682
    for substring in rawtransaction_error_messages:
×
1683
        if substring in server_msg:
×
1684
            msg = rawtransaction_error_messages[substring]
×
1685
            return msg if msg else substring
×
1686
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1687
    # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1688
    # grep "REJECT_"
1689
    # grep "TxValidationResult"
1690
    tx_verify_error_messages = {
×
1691
        r"bad-txns-vin-empty": None,
1692
        r"bad-txns-vout-empty": None,
1693
        r"bad-txns-oversize": None,
1694
        r"bad-txns-vout-negative": None,
1695
        r"bad-txns-vout-toolarge": None,
1696
        r"bad-txns-txouttotal-toolarge": None,
1697
        r"bad-txns-inputs-duplicate": None,
1698
        r"bad-cb-length": None,
1699
        r"bad-txns-prevout-null": None,
1700
        r"bad-txns-inputs-missingorspent":
1701
            ("bad-txns-inputs-missingorspent\n" +
1702
             _("You might have a local transaction in your wallet that this transaction "
1703
               "builds on top. You need to either broadcast or remove the local tx.")),
1704
        r"bad-txns-premature-spend-of-coinbase": None,
1705
        r"bad-txns-inputvalues-outofrange": None,
1706
        r"bad-txns-in-belowout": None,
1707
        r"bad-txns-fee-outofrange": None,
1708
    }
1709
    for substring in tx_verify_error_messages:
×
1710
        if substring in server_msg:
×
1711
            msg = tx_verify_error_messages[substring]
×
1712
            return msg if msg else substring
×
1713
    # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1714
    # grep "reason ="
1715
    # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1716
    # should come after script_error.cpp (due to e.g. "version")
1717
    policy_error_messages = {
×
1718
        r"version": _("Transaction uses non-standard version."),
1719
        r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1720
        r"scriptsig-size": None,
1721
        r"scriptsig-not-pushonly": None,
1722
        r"scriptpubkey":
1723
            ("scriptpubkey\n" +
1724
             _("Some of the outputs pay to a non-standard script.")),
1725
        r"bare-multisig": None,
1726
        r"dust":
1727
            (_("Transaction could not be broadcast due to dust outputs.\n"
1728
               "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1729
               "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1730
        r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1731
    }
1732
    for substring in policy_error_messages:
×
1733
        if substring in server_msg:
×
1734
            msg = policy_error_messages[substring]
×
1735
            return msg if msg else substring
×
1736
    # otherwise:
1737
    return _("Unknown error")
×
1738

1739

1740
def check_cert(host, cert):
1✔
1741
    try:
×
1742
        b = pem.dePem(cert, 'CERTIFICATE')
×
1743
        x = x509.X509(b)
×
1744
    except Exception:
×
1745
        traceback.print_exc(file=sys.stdout)
×
1746
        return
×
1747

1748
    try:
×
1749
        x.check_date()
×
1750
        expired = False
×
1751
    except Exception:
×
1752
        expired = True
×
1753

1754
    m = "host: %s\n"%host
×
1755
    m += "has_expired: %s\n"% expired
×
1756
    util.print_msg(m)
×
1757

1758

1759
# Used by tests
1760
def _match_hostname(name, val):
1✔
1761
    if val == name:
×
1762
        return True
×
1763

1764
    return val.startswith('*.') and name.endswith(val[1:])
×
1765

1766

1767
def test_certificates():
1✔
1768
    from .simple_config import SimpleConfig
×
1769
    config = SimpleConfig()
×
1770
    mydir = os.path.join(config.path, "certs")
×
1771
    certs = os.listdir(mydir)
×
1772
    for c in certs:
×
1773
        p = os.path.join(mydir,c)
×
1774
        with open(p, encoding='utf-8') as f:
×
1775
            cert = f.read()
×
1776
        check_cert(c, cert)
×
1777

1778
if __name__ == "__main__":
1✔
1779
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc