• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5763146712875008

07 Mar 2025 09:24AM UTC coverage: 60.719% (+0.3%) from 60.38%
5763146712875008

Pull #9586

CirrusCI

f321x
reduce scope of dont_settle_htlc_keys
Pull Request #9586: Add mechanism to block htlcs from settling back

6 of 10 new or added lines in 2 files covered. (60.0%)

3812 existing lines in 21 files now uncovered.

20679 of 34057 relevant lines covered (60.72%)

2.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.15
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
4✔
26
import re
4✔
27
import ssl
4✔
28
import sys
4✔
29
import traceback
4✔
30
import asyncio
4✔
31
import socket
4✔
32
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
4✔
33
from collections import defaultdict
4✔
34
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
4✔
35
import itertools
4✔
36
import logging
4✔
37
import hashlib
4✔
38
import functools
4✔
39

40
import aiorpcx
4✔
41
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
4✔
42
from aiorpcx.curio import timeout_after, TaskTimeout
4✔
43
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
4✔
44
from aiorpcx.rawsocket import RSClient
4✔
45
import certifi
4✔
46

47
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
4✔
48
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
49
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
50
from . import util
4✔
51
from . import x509
4✔
52
from . import pem
4✔
53
from . import version
4✔
54
from . import blockchain
4✔
55
from .blockchain import Blockchain, HEADER_SIZE
4✔
56
from . import bitcoin
4✔
57
from . import constants
4✔
58
from .i18n import _
4✔
59
from .logging import Logger
4✔
60
from .transaction import Transaction
4✔
61

62
if TYPE_CHECKING:
4✔
UNCOV
63
    from .network import Network
×
64
    from .simple_config import SimpleConfig
×
65

66

67
ca_path = certifi.where()
4✔
68

69
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
4✔
70

71
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
4✔
72
PREFERRED_NETWORK_PROTOCOL = 's'
4✔
73
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
4✔
74

75

76
class NetworkTimeout:
4✔
77
    # seconds
78
    class Generic:
4✔
79
        NORMAL = 30
4✔
80
        RELAXED = 45
4✔
81
        MOST_RELAXED = 600
4✔
82

83
    class Urgent(Generic):
4✔
84
        NORMAL = 10
4✔
85
        RELAXED = 20
4✔
86
        MOST_RELAXED = 60
4✔
87

88

89
def assert_non_negative_integer(val: Any) -> None:
4✔
UNCOV
90
    if not is_non_negative_integer(val):
×
91
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
92

93

94
def assert_integer(val: Any) -> None:
4✔
UNCOV
95
    if not is_integer(val):
×
96
        raise RequestCorrupted(f'{val!r} should be an integer')
×
97

98

99
def assert_int_or_float(val: Any) -> None:
4✔
UNCOV
100
    if not is_int_or_float(val):
×
101
        raise RequestCorrupted(f'{val!r} should be int or float')
×
102

103

104
def assert_non_negative_int_or_float(val: Any) -> None:
4✔
UNCOV
105
    if not is_non_negative_int_or_float(val):
×
106
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
107

108

109
def assert_hash256_str(val: Any) -> None:
4✔
UNCOV
110
    if not is_hash256_str(val):
×
111
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
112

113

114
def assert_hex_str(val: Any) -> None:
4✔
UNCOV
115
    if not is_hex_str(val):
×
116
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
117

118

119
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
4✔
UNCOV
120
    if not isinstance(d, dict):
×
121
        raise RequestCorrupted(f'{d!r} should be a dict')
×
122
    if field_name not in d:
×
123
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
124
    return d[field_name]
×
125

126

127
def assert_list_or_tuple(val: Any) -> None:
4✔
UNCOV
128
    if not isinstance(val, (list, tuple)):
×
129
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
130

131

132
class NotificationSession(RPCSession):
4✔
133

134
    def __init__(self, *args, interface: 'Interface', **kwargs):
4✔
UNCOV
135
        super(NotificationSession, self).__init__(*args, **kwargs)
×
136
        self.subscriptions = defaultdict(list)
×
137
        self.cache = {}
×
138
        self._msg_counter = itertools.count(start=1)
×
139
        self.interface = interface
×
140
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
141

142
    async def handle_request(self, request):
4✔
UNCOV
143
        self.maybe_log(f"--> {request}")
×
144
        try:
×
145
            if isinstance(request, Notification):
×
146
                params, result = request.args[:-1], request.args[-1]
×
147
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
148
                if key in self.subscriptions:
×
149
                    self.cache[key] = result
×
150
                    for queue in self.subscriptions[key]:
×
151
                        await queue.put(request.args)
×
152
                else:
UNCOV
153
                    raise Exception(f'unexpected notification')
×
154
            else:
UNCOV
155
                raise Exception(f'unexpected request. not a notification')
×
156
        except Exception as e:
×
157
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
158
            await self.close()
×
159

160
    async def send_request(self, *args, timeout=None, **kwargs):
4✔
161
        # note: semaphores/timeouts/backpressure etc are handled by
162
        # aiorpcx. the timeout arg here in most cases should not be set
UNCOV
163
        msg_id = next(self._msg_counter)
×
164
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
165
        try:
×
166
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
167
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
UNCOV
168
            response = await util.wait_for2(
×
169
                super().send_request(*args, **kwargs),
170
                timeout)
UNCOV
171
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
172
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
173
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
174
        except CodeMessageError as e:
×
175
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
176
            raise
×
177
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
178
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
179
            raise
×
180
        else:
UNCOV
181
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
182
            return response
×
183

184
    def set_default_timeout(self, timeout):
4✔
UNCOV
185
        assert hasattr(self, "sent_request_timeout")  # in base class
×
186
        self.sent_request_timeout = timeout
×
187
        assert hasattr(self, "max_send_delay")        # in base class
×
188
        self.max_send_delay = timeout
×
189

190
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
4✔
191
        # note: until the cache is written for the first time,
192
        # each 'subscribe' call might make a request on the network.
UNCOV
193
        key = self.get_hashable_key_for_rpc_call(method, params)
×
194
        self.subscriptions[key].append(queue)
×
195
        if key in self.cache:
×
196
            result = self.cache[key]
×
197
        else:
UNCOV
198
            result = await self.send_request(method, params)
×
199
            self.cache[key] = result
×
200
        await queue.put(params + [result])
×
201

202
    def unsubscribe(self, queue):
4✔
203
        """Unsubscribe a callback to free object references to enable GC."""
204
        # note: we can't unsubscribe from the server, so we keep receiving
205
        # subsequent notifications
UNCOV
206
        for v in self.subscriptions.values():
×
207
            if queue in v:
×
208
                v.remove(queue)
×
209

210
    @classmethod
4✔
211
    def get_hashable_key_for_rpc_call(cls, method, params):
4✔
212
        """Hashable index for subscriptions and cache"""
UNCOV
213
        return str(method) + repr(params)
×
214

215
    def maybe_log(self, msg: str) -> None:
4✔
UNCOV
216
        if not self.interface: return
×
217
        if self.interface.debug or self.interface.network.debug:
×
218
            self.interface.logger.debug(msg)
×
219

220
    def default_framer(self):
4✔
221
        # overridden so that max_size can be customized
UNCOV
222
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
223
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
224
        return NewlineFramer(max_size=max_size)
×
225

226
    async def close(self, *, force_after: int = None):
4✔
227
        """Closes the connection and waits for it to be closed.
228
        We try to flush buffered data to the wire, which can take some time.
229
        """
UNCOV
230
        if force_after is None:
×
231
            # We give up after a while and just abort the connection.
232
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
233
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
234
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
235
            #       wait until this timeout is triggered
UNCOV
236
            force_after = 1  # seconds
×
237
        await super().close(force_after=force_after)
×
238

239

240
class NetworkException(Exception): pass
4✔
241

242

243
class GracefulDisconnect(NetworkException):
4✔
244
    log_level = logging.INFO
4✔
245

246
    def __init__(self, *args, log_level=None, **kwargs):
4✔
247
        Exception.__init__(self, *args, **kwargs)
4✔
248
        if log_level is not None:
4✔
UNCOV
249
            self.log_level = log_level
×
250

251

252
class RequestTimedOut(GracefulDisconnect):
4✔
253
    def __str__(self):
4✔
UNCOV
254
        return _("Network request timed out.")
×
255

256

257
class RequestCorrupted(Exception): pass
4✔
258

259
class ErrorParsingSSLCert(Exception): pass
4✔
260
class ErrorGettingSSLCertFromServer(Exception): pass
4✔
261
class ErrorSSLCertFingerprintMismatch(Exception): pass
4✔
262
class InvalidOptionCombination(Exception): pass
4✔
263
class ConnectError(NetworkException): pass
4✔
264

265

266
class _RSClient(RSClient):
4✔
267
    async def create_connection(self):
4✔
UNCOV
268
        try:
×
269
            return await super().create_connection()
×
270
        except OSError as e:
×
271
            # note: using "from e" here will set __cause__ of ConnectError
UNCOV
272
            raise ConnectError(e) from e
×
273

274

275
class ServerAddr:
4✔
276

277
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
4✔
278
        assert isinstance(host, str), repr(host)
4✔
279
        if protocol is None:
4✔
UNCOV
280
            protocol = 's'
×
281
        if not host:
4✔
UNCOV
282
            raise ValueError('host must not be empty')
×
283
        if host[0] == '[' and host[-1] == ']':  # IPv6
4✔
284
            host = host[1:-1]
4✔
285
        try:
4✔
286
            net_addr = NetAddress(host, port)  # this validates host and port
4✔
287
        except Exception as e:
4✔
288
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
4✔
289
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
4✔
UNCOV
290
            raise ValueError(f"invalid network protocol: {protocol}")
×
291
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
4✔
292
        self.port = int(net_addr.port)
4✔
293
        self.protocol = protocol
4✔
294
        self._net_addr_str = str(net_addr)
4✔
295

296
    @classmethod
4✔
297
    def from_str(cls, s: str) -> 'ServerAddr':
4✔
298
        """Constructs a ServerAddr or raises ValueError."""
299
        # host might be IPv6 address, hence do rsplit:
300
        host, port, protocol = str(s).rsplit(':', 2)
4✔
301
        return ServerAddr(host=host, port=port, protocol=protocol)
4✔
302

303
    @classmethod
4✔
304
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
4✔
305
        """Construct ServerAddr from str, guessing missing details.
306
        Does not raise - just returns None if guessing failed.
307
        Ongoing compatibility not guaranteed.
308
        """
309
        if not s:
4✔
UNCOV
310
            return None
×
311
        host = ""
4✔
312
        if s[0] == "[" and "]" in s:  # IPv6 address
4✔
313
            host_end = s.index("]")
4✔
314
            host = s[1:host_end]
4✔
315
            s = s[host_end+1:]
4✔
316
        items = str(s).rsplit(':', 2)
4✔
317
        if len(items) < 2:
4✔
318
            return None  # although maybe we could guess the port too?
4✔
319
        host = host or items[0]
4✔
320
        port = items[1]
4✔
321
        if len(items) >= 3:
4✔
322
            protocol = items[2]
4✔
323
        else:
324
            protocol = PREFERRED_NETWORK_PROTOCOL
4✔
325
        try:
4✔
326
            return ServerAddr(host=host, port=port, protocol=protocol)
4✔
327
        except ValueError:
4✔
328
            return None
4✔
329

330
    def to_friendly_name(self) -> str:
4✔
331
        # note: this method is closely linked to from_str_with_inference
332
        if self.protocol == 's':  # hide trailing ":s"
4✔
333
            return self.net_addr_str()
4✔
334
        return str(self)
4✔
335

336
    def __str__(self):
4✔
337
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
4✔
338

339
    def to_json(self) -> str:
4✔
UNCOV
340
        return str(self)
×
341

342
    def __repr__(self):
4✔
UNCOV
343
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
344

345
    def net_addr_str(self) -> str:
4✔
346
        return self._net_addr_str
4✔
347

348
    def __eq__(self, other):
4✔
349
        if not isinstance(other, ServerAddr):
4✔
UNCOV
350
            return False
×
351
        return (self.host == other.host
4✔
352
                and self.port == other.port
353
                and self.protocol == other.protocol)
354

355
    def __ne__(self, other):
4✔
UNCOV
356
        return not (self == other)
×
357

358
    def __hash__(self):
4✔
UNCOV
359
        return hash((self.host, self.port, self.protocol))
×
360

361

362
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
4✔
363
    filename = host
4✔
364
    try:
4✔
365
        ip = ip_address(host)
4✔
366
    except ValueError:
4✔
367
        pass
4✔
368
    else:
UNCOV
369
        if isinstance(ip, IPv6Address):
×
370
            filename = f"ipv6_{ip.packed.hex()}"
×
371
    return os.path.join(config.path, 'certs', filename)
4✔
372

373

374
class Interface(Logger):
4✔
375

376
    LOGGING_SHORTCUT = 'i'
4✔
377

378
    def __init__(self, *, network: 'Network', server: ServerAddr):
4✔
379
        self.ready = network.asyncio_loop.create_future()
4✔
380
        self.got_disconnected = asyncio.Event()
4✔
381
        self.server = server
4✔
382
        Logger.__init__(self)
4✔
383
        assert network.config.path
4✔
384
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
4✔
385
        self.blockchain = None  # type: Optional[Blockchain]
4✔
386
        self._requested_chunks = set()  # type: Set[int]
4✔
387
        self.network = network
4✔
388
        self.session = None  # type: Optional[NotificationSession]
4✔
389
        self._ipaddr_bucket = None
4✔
390
        # Set up proxy.
391
        # - for servers running on localhost, the proxy is not used. If user runs their own server
392
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
393
        #   note: we could maybe relax this further and bypass the proxy for all private
394
        #         addresses...? e.g. 192.168.x.x
395
        if util.is_localhost(server.host):
4✔
UNCOV
396
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
397
            self.proxy = None
×
398
        else:
399
            self.proxy = ESocksProxy.from_network_settings(network)
4✔
400

401
        # Latest block header and corresponding height, as claimed by the server.
402
        # Note that these values are updated before they are verified.
403
        # Especially during initial header sync, verification can take a long time.
404
        # Failing verification will get the interface closed.
405
        self.tip_header = None
4✔
406
        self.tip = 0
4✔
407

408
        self.fee_estimates_eta = {}  # type: Dict[int, int]
4✔
409

410
        # Dump network messages (only for this interface).  Set at runtime from the console.
411
        self.debug = False
4✔
412

413
        self.taskgroup = OldTaskGroup()
4✔
414

415
        async def spawn_task():
4✔
416
            task = await self.network.taskgroup.spawn(self.run())
4✔
417
            task.set_name(f"interface::{str(server)}")
4✔
418
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
4✔
419

420
    @property
4✔
421
    def host(self):
4✔
422
        return self.server.host
4✔
423

424
    @property
4✔
425
    def port(self):
4✔
UNCOV
426
        return self.server.port
×
427

428
    @property
4✔
429
    def protocol(self):
4✔
UNCOV
430
        return self.server.protocol
×
431

432
    def diagnostic_name(self):
4✔
433
        return self.server.net_addr_str()
4✔
434

435
    def __str__(self):
4✔
UNCOV
436
        return f"<Interface {self.diagnostic_name()}>"
×
437

438
    async def is_server_ca_signed(self, ca_ssl_context):
4✔
439
        """Given a CA enforcing SSL context, returns True if the connection
440
        can be established. Returns False if the server has a self-signed
441
        certificate but otherwise is okay. Any other failures raise.
442
        """
UNCOV
443
        try:
×
444
            await self.open_session(ca_ssl_context, exit_early=True)
×
445
        except ConnectError as e:
×
446
            cause = e.__cause__
×
447
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
448
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
449
                    and cause.verify_code == 18):  # "self signed certificate"
450
                # Good. We will use this server as self-signed.
UNCOV
451
                return False
×
452
            # Not good. Cannot use this server.
UNCOV
453
            raise
×
454
        # Good. We will use this server as CA-signed.
UNCOV
455
        return True
×
456

457
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
4✔
UNCOV
458
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
459
        if ca_signed:
×
460
            if self._get_expected_fingerprint():
×
461
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
462
            with open(self.cert_path, 'w') as f:
×
463
                # empty file means this is CA signed, not self-signed
UNCOV
464
                f.write('')
×
465
        else:
UNCOV
466
            await self._save_certificate()
×
467

468
    def _is_saved_ssl_cert_available(self):
4✔
UNCOV
469
        if not os.path.exists(self.cert_path):
×
470
            return False
×
471
        with open(self.cert_path, 'r') as f:
×
472
            contents = f.read()
×
473
        if contents == '':  # CA signed
×
474
            if self._get_expected_fingerprint():
×
475
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
476
            return True
×
477
        # pinned self-signed cert
UNCOV
478
        try:
×
479
            b = pem.dePem(contents, 'CERTIFICATE')
×
480
        except SyntaxError as e:
×
481
            self.logger.info(f"error parsing already saved cert: {e}")
×
482
            raise ErrorParsingSSLCert(e) from e
×
483
        try:
×
484
            x = x509.X509(b)
×
485
        except Exception as e:
×
486
            self.logger.info(f"error parsing already saved cert: {e}")
×
487
            raise ErrorParsingSSLCert(e) from e
×
488
        try:
×
489
            x.check_date()
×
490
        except x509.CertificateError as e:
×
491
            self.logger.info(f"certificate has expired: {e}")
×
492
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
493
            return False
×
494
        self._verify_certificate_fingerprint(bytearray(b))
×
495
        return True
×
496

497
    async def _get_ssl_context(self):
4✔
UNCOV
498
        if self.protocol != 's':
×
499
            # using plaintext TCP
UNCOV
500
            return None
×
501

502
        # see if we already have cert for this server; or get it for the first time
UNCOV
503
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
504
        if not self._is_saved_ssl_cert_available():
×
505
            try:
×
506
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
507
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
508
                raise ErrorGettingSSLCertFromServer(e) from e
×
509
        # now we have a file saved in our certificate store
UNCOV
510
        siz = os.stat(self.cert_path).st_size
×
511
        if siz == 0:
×
512
            # CA signed cert
UNCOV
513
            sslc = ca_sslc
×
514
        else:
515
            # pinned self-signed cert
UNCOV
516
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
517
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
518
            #       We explicitly disable it as it breaks lots of servers.
UNCOV
519
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
520
            sslc.check_hostname = False
×
521
        return sslc
×
522

523
    def handle_disconnect(func):
4✔
524
        @functools.wraps(func)
4✔
525
        async def wrapper_func(self: 'Interface', *args, **kwargs):
4✔
UNCOV
526
            try:
×
527
                return await func(self, *args, **kwargs)
×
528
            except GracefulDisconnect as e:
×
529
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
530
            except aiorpcx.jsonrpc.RPCError as e:
×
531
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
532
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
533
            finally:
UNCOV
534
                self.got_disconnected.set()
×
535
                await self.network.connection_down(self)
×
536
                # if was not 'ready' yet, schedule waiting coroutines:
UNCOV
537
                self.ready.cancel()
×
538
        return wrapper_func
4✔
539

540
    @ignore_exceptions  # do not kill network.taskgroup
4✔
541
    @log_exceptions
4✔
542
    @handle_disconnect
4✔
543
    async def run(self):
4✔
UNCOV
544
        try:
×
545
            ssl_context = await self._get_ssl_context()
×
546
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
547
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
548
            return
×
549
        try:
×
550
            await self.open_session(ssl_context)
×
551
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
552
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
UNCOV
553
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
554
                    and self.is_main_server() and not self.network.auto_connect):
UNCOV
555
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
556
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
557
            else:
UNCOV
558
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
559
            return
×
560

561
    def _mark_ready(self) -> None:
4✔
UNCOV
562
        if self.ready.cancelled():
×
563
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
564
        if self.ready.done():
×
565
            return
×
566

UNCOV
567
        assert self.tip_header
×
568
        chain = blockchain.check_header(self.tip_header)
×
569
        if not chain:
×
570
            self.blockchain = blockchain.get_best_chain()
×
571
        else:
UNCOV
572
            self.blockchain = chain
×
573
        assert self.blockchain is not None
×
574

UNCOV
575
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
576

UNCOV
577
        self.ready.set_result(1)
×
578

579
    def is_connected_and_ready(self) -> bool:
4✔
UNCOV
580
        return self.ready.done() and not self.got_disconnected.is_set()
×
581

582
    async def _save_certificate(self) -> None:
4✔
UNCOV
583
        if not os.path.exists(self.cert_path):
×
584
            # we may need to retry this a few times, in case the handshake hasn't completed
UNCOV
585
            for _ in range(10):
×
586
                dercert = await self._fetch_certificate()
×
587
                if dercert:
×
588
                    self.logger.info("succeeded in getting cert")
×
589
                    self._verify_certificate_fingerprint(dercert)
×
590
                    with open(self.cert_path, 'w') as f:
×
591
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
592
                        # workaround android bug
UNCOV
593
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
594
                        f.write(cert)
×
595
                        # even though close flushes, we can't fsync when closed.
596
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
597
                        # fsync writes to OS buffer to disk
UNCOV
598
                        f.flush()
×
599
                        os.fsync(f.fileno())
×
600
                    break
×
601
                await asyncio.sleep(1)
×
602
            else:
UNCOV
603
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
604

605
    async def _fetch_certificate(self) -> bytes:
4✔
UNCOV
606
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
607
        sslc.check_hostname = False
×
608
        sslc.verify_mode = ssl.CERT_NONE
×
609
        async with _RSClient(session_factory=RPCSession,
×
610
                             host=self.host, port=self.port,
611
                             ssl=sslc, proxy=self.proxy) as session:
UNCOV
612
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
613
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
614
            return ssl_object.getpeercert(binary_form=True)
×
615

616
    def _get_expected_fingerprint(self) -> Optional[str]:
4✔
UNCOV
617
        if self.is_main_server():
×
618
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
619

620
    def _verify_certificate_fingerprint(self, certificate):
4✔
UNCOV
621
        expected_fingerprint = self._get_expected_fingerprint()
×
622
        if not expected_fingerprint:
×
623
            return
×
624
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
625
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
626
        if not fingerprints_match:
×
627
            util.trigger_callback('cert_mismatch')
×
628
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
629
        self.logger.info("cert fingerprint verification passed")
×
630

631
    async def get_block_header(self, height, assert_mode):
4✔
UNCOV
632
        if not is_non_negative_integer(height):
×
633
            raise Exception(f"{repr(height)} is not a block height")
×
634
        self.logger.info(f'requesting block header {height} in mode {assert_mode}')
×
635
        # use lower timeout as we usually have network.bhi_lock here
UNCOV
636
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
637
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
638
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
639

640
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
4✔
UNCOV
641
        if not is_non_negative_integer(height):
×
642
            raise Exception(f"{repr(height)} is not a block height")
×
643
        index = height // 2016
×
644
        if can_return_early and index in self._requested_chunks:
×
645
            return
×
646
        self.logger.info(f"requesting chunk from height {height}")
×
647
        size = 2016
×
648
        if tip is not None:
×
649
            size = min(size, tip - index * 2016 + 1)
×
650
            size = max(size, 0)
×
651
        try:
×
652
            self._requested_chunks.add(index)
×
653
            res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
×
654
        finally:
UNCOV
655
            self._requested_chunks.discard(index)
×
656
        assert_dict_contains_field(res, field_name='count')
×
657
        assert_dict_contains_field(res, field_name='hex')
×
658
        assert_dict_contains_field(res, field_name='max')
×
659
        assert_non_negative_integer(res['count'])
×
660
        assert_non_negative_integer(res['max'])
×
661
        assert_hex_str(res['hex'])
×
662
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
×
663
            raise RequestCorrupted('inconsistent chunk hex and count')
×
664
        # we never request more than 2016 headers, but we enforce those fit in a single response
UNCOV
665
        if res['max'] < 2016:
×
666
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
×
667
        if res['count'] != size:
×
668
            raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
×
669
        conn = self.blockchain.connect_chunk(index, res['hex'])
×
670
        if not conn:
×
671
            return conn, 0
×
672
        return conn, res['count']
×
673

674
    def is_main_server(self) -> bool:
4✔
UNCOV
675
        return (self.network.interface == self or
×
676
                self.network.interface is None and self.network.default_server == self.server)
677

678
    async def open_session(self, sslc, exit_early=False):
4✔
UNCOV
679
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
680
        async with _RSClient(session_factory=session_factory,
×
681
                             host=self.host, port=self.port,
682
                             ssl=sslc, proxy=self.proxy) as session:
UNCOV
683
            self.session = session  # type: NotificationSession
×
684
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
685
            try:
×
686
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
×
687
            except aiorpcx.jsonrpc.RPCError as e:
×
688
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
689
            if exit_early:
×
690
                return
×
691
            if ver[1] != version.PROTOCOL_VERSION:
×
692
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
693
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
UNCOV
694
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
695
                raise GracefulDisconnect(f'too many connected servers already '
×
696
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
UNCOV
697
            self.logger.info(f"connection established. version: {ver}")
×
698

UNCOV
699
            try:
×
700
                async with self.taskgroup as group:
×
701
                    await group.spawn(self.ping)
×
702
                    await group.spawn(self.request_fee_estimates)
×
703
                    await group.spawn(self.run_fetch_blocks)
×
704
                    await group.spawn(self.monitor_connection)
×
705
            except aiorpcx.jsonrpc.RPCError as e:
×
706
                if e.code in (
×
707
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
708
                    JSONRPC.SERVER_BUSY,
709
                    JSONRPC.METHOD_NOT_FOUND,
710
                    JSONRPC.INTERNAL_ERROR,
711
                ):
UNCOV
712
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
713
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
714
                raise
×
715
            finally:
UNCOV
716
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
717

718
    async def monitor_connection(self):
4✔
UNCOV
719
        while True:
×
720
            await asyncio.sleep(1)
×
721
            # If the session/transport is no longer open, we disconnect.
722
            # e.g. if the remote cleanly sends EOF, we would handle that here.
723
            # note: If the user pulls the ethernet cable or disconnects wifi,
724
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
725
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
726
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
727
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
728
            #         Hence, in practice the connection issue will only be detected the next time we try
729
            #         to send a message (plus timeout), which can take minutes...
UNCOV
730
            if not self.session or self.session.is_closing():
×
731
                raise GracefulDisconnect('session was closed')
×
732

733
    async def ping(self):
4✔
UNCOV
734
        while True:
×
735
            await asyncio.sleep(300)
×
736
            await self.session.send_request('server.ping')
×
737

738
    async def request_fee_estimates(self):
4✔
UNCOV
739
        from .simple_config import FEE_ETA_TARGETS
×
740
        while True:
×
741
            async with OldTaskGroup() as group:
×
742
                fee_tasks = []
×
743
                for i in FEE_ETA_TARGETS:
×
744
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
745
            for nblock_target, task in fee_tasks:
×
746
                fee = task.result()
×
747
                if fee < 0: continue
×
748
                assert isinstance(fee, int)
×
749
                self.fee_estimates_eta[nblock_target] = fee
×
750
            self.network.update_fee_estimates()
×
751
            await asyncio.sleep(60)
×
752

753
    async def close(self, *, force_after: int = None):
4✔
754
        """Closes the connection and waits for it to be closed.
755
        We try to flush buffered data to the wire, which can take some time.
756
        """
757
        if self.session:
×
758
            await self.session.close(force_after=force_after)
×
759
        # monitor_connection will cancel tasks
760

761
    async def run_fetch_blocks(self):
4✔
762
        header_queue = asyncio.Queue()
×
763
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
764
        while True:
×
765
            item = await header_queue.get()
×
766
            raw_header = item[0]
×
767
            height = raw_header['height']
×
768
            header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
×
769
            self.tip_header = header
×
770
            self.tip = height
×
771
            if self.tip < constants.net.max_checkpoint():
×
772
                raise GracefulDisconnect('server tip below max checkpoint')
×
773
            self._mark_ready()
×
774
            blockchain_updated = await self._process_header_at_tip()
×
775
            # header processing done
776
            if blockchain_updated:
×
777
                util.trigger_callback('blockchain_updated')
×
778
            util.trigger_callback('network_updated')
×
779
            await self.network.switch_unwanted_fork_interface()
×
780
            await self.network.switch_lagging_interface()
×
781

782
    async def _process_header_at_tip(self) -> bool:
4✔
783
        """Returns:
784
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
785
        True - new header we didn't have, or reorg
786
        """
787
        height, header = self.tip, self.tip_header
×
788
        async with self.network.bhi_lock:
×
789
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
790
                # another interface amended the blockchain
791
                return False
×
792
            _, height = await self.step(height, header)
×
793
            # in the simple case, height == self.tip+1
794
            if height <= self.tip:
×
795
                await self.sync_until(height)
×
796
            return True
×
797

798
    async def sync_until(self, height, next_height=None):
4✔
799
        if next_height is None:
4✔
800
            next_height = self.tip
×
801
        last = None
4✔
802
        while last is None or height <= next_height:
4✔
803
            prev_last, prev_height = last, height
4✔
804
            if next_height > height + 10:
4✔
805
                could_connect, num_headers = await self.request_chunk(height, next_height)
×
806
                if not could_connect:
×
807
                    if height <= constants.net.max_checkpoint():
×
808
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
809
                    last, height = await self.step(height)
×
810
                    continue
×
811
                util.trigger_callback('blockchain_updated')
×
812
                util.trigger_callback('network_updated')
×
813
                height = (height // 2016 * 2016) + num_headers
×
814
                assert height <= next_height+1, (height, self.tip)
×
815
                last = 'catchup'
×
816
            else:
817
                last, height = await self.step(height)
4✔
818
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
4✔
819
        return last, height
4✔
820

821
    async def step(self, height, header=None):
4✔
822
        assert 0 <= height <= self.tip, (height, self.tip)
4✔
823
        if header is None:
4✔
824
            header = await self.get_block_header(height, 'catchup')
4✔
825

826
        chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
4✔
827
        if chain:
4✔
828
            self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
×
829
            # note: there is an edge case here that is not handled.
830
            # we might know the blockhash (enough for check_header) but
831
            # not have the header itself. e.g. regtest chain with only genesis.
832
            # this situation resolves itself on the next block
833
            return 'catchup', height+1
×
834

835
        can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
4✔
836
        if not can_connect:
4✔
837
            self.logger.info(f"can't connect new block: {height=}")
4✔
838
            height, header, bad, bad_header = await self._search_headers_backwards(height, header)
4✔
839
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
4✔
840
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
4✔
841
            assert chain or can_connect
4✔
842
        if can_connect:
4✔
843
            self.logger.info(f"new block: {height=}")
4✔
844
            height += 1
4✔
845
            if isinstance(can_connect, Blockchain):  # not when mocking
4✔
846
                self.blockchain = can_connect
×
847
                self.blockchain.save_header(header)
×
848
            return 'catchup', height
4✔
849

850
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
4✔
851
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
4✔
852

853
    async def _search_headers_binary(self, height, bad, bad_header, chain):
4✔
854
        assert bad == bad_header['block_height']
4✔
855
        _assert_header_does_not_check_against_any_chain(bad_header)
4✔
856

857
        self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
4✔
858
        good = height
4✔
859
        while True:
4✔
860
            assert good < bad, (good, bad)
4✔
861
            height = (good + bad) // 2
4✔
862
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
4✔
863
            header = await self.get_block_header(height, 'binary')
4✔
864
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
4✔
865
            if chain:
4✔
866
                self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
4✔
867
                good = height
4✔
868
            else:
869
                bad = height
4✔
870
                bad_header = header
4✔
871
            if good + 1 == bad:
4✔
872
                break
4✔
873

874
        mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
4✔
875
        real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
4✔
876
        if not real and not mock:
4✔
877
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
878
        _assert_header_does_not_check_against_any_chain(bad_header)
4✔
879

880
        self.logger.info(f"binary search exited. good {good}, bad {bad}")
4✔
881
        return good, bad, bad_header
4✔
882

883
    async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
4✔
884
        assert good + 1 == bad
4✔
885
        assert bad == bad_header['block_height']
4✔
886
        _assert_header_does_not_check_against_any_chain(bad_header)
4✔
887
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
888
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
889

890
        bh = self.blockchain.height()
4✔
891
        assert bh >= good, (bh, good)
4✔
892
        if bh == good:
4✔
893
            height = good + 1
×
894
            self.logger.info(f"catching up from {height}")
×
895
            return 'no_fork', height
×
896

897
        # this is a new fork we don't yet have
898
        height = bad + 1
4✔
899
        self.logger.info(f"new fork at bad height {bad}")
4✔
900
        forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
4✔
901
        b = forkfun(bad_header)  # type: Blockchain
4✔
902
        self.blockchain = b
4✔
903
        assert b.forkpoint == bad
4✔
904
        return 'fork', height
4✔
905

906
    async def _search_headers_backwards(self, height, header):
4✔
907
        async def iterate():
4✔
908
            nonlocal height, header
909
            checkp = False
4✔
910
            if height <= constants.net.max_checkpoint():
4✔
911
                height = constants.net.max_checkpoint()
×
912
                checkp = True
×
913
            header = await self.get_block_header(height, 'backward')
4✔
914
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
4✔
915
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
4✔
916
            if chain or can_connect:
4✔
917
                return False
4✔
918
            if checkp:
4✔
919
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
920
            return True
4✔
921

922
        bad, bad_header = height, header
4✔
923
        _assert_header_does_not_check_against_any_chain(bad_header)
4✔
924
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
4✔
925
        local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
4✔
926
        height = min(local_max + 1, height - 1)
4✔
927
        while await iterate():
4✔
928
            bad, bad_header = height, header
4✔
929
            delta = self.tip - height
4✔
930
            height = self.tip - 2 * delta
4✔
931

932
        _assert_header_does_not_check_against_any_chain(bad_header)
4✔
933
        self.logger.info(f"exiting backward mode at {height}")
4✔
934
        return height, header, bad, bad_header
4✔
935

936
    @classmethod
4✔
937
    def client_name(cls) -> str:
4✔
938
        return f'electrum/{version.ELECTRUM_VERSION}'
×
939

940
    def is_tor(self):
4✔
941
        return self.host.endswith('.onion')
×
942

943
    def ip_addr(self) -> Optional[str]:
4✔
944
        session = self.session
×
945
        if not session: return None
×
946
        peer_addr = session.remote_address()
×
947
        if not peer_addr: return None
×
948
        return str(peer_addr.host)
×
949

950
    def bucket_based_on_ipaddress(self) -> str:
4✔
951
        def do_bucket():
×
952
            if self.is_tor():
×
953
                return BUCKET_NAME_OF_ONION_SERVERS
×
954
            try:
×
955
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
956
            except ValueError:
×
957
                return ''
×
958
            if not ip_addr:
×
959
                return ''
×
960
            if ip_addr.is_loopback:  # localhost is exempt
×
961
                return ''
×
962
            if ip_addr.version == 4:
×
963
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
964
                return str(slash16)
×
965
            elif ip_addr.version == 6:
×
966
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
967
                return str(slash48)
×
968
            return ''
×
969

970
        if not self._ipaddr_bucket:
×
971
            self._ipaddr_bucket = do_bucket()
×
972
        return self._ipaddr_bucket
×
973

974
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
4✔
975
        if not is_hash256_str(tx_hash):
×
976
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
977
        if not is_non_negative_integer(tx_height):
×
978
            raise Exception(f"{repr(tx_height)} is not a block height")
×
979
        # do request
980
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
981
        # check response
982
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
983
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
984
        pos = assert_dict_contains_field(res, field_name='pos')
×
985
        # note: tx_height was just a hint to the server, don't enforce the response to match it
986
        assert_non_negative_integer(block_height)
×
987
        assert_non_negative_integer(pos)
×
988
        assert_list_or_tuple(merkle)
×
989
        for item in merkle:
×
990
            assert_hash256_str(item)
×
991
        return res
×
992

993
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
4✔
994
        if not is_hash256_str(tx_hash):
×
995
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
996
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
997
        # validate response
998
        if not is_hex_str(raw):
×
999
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1000
        tx = Transaction(raw)
×
1001
        try:
×
1002
            tx.deserialize()  # see if raises
×
1003
        except Exception as e:
×
1004
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1005
        if tx.txid() != tx_hash:
×
1006
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1007
        return raw
×
1008

1009
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
4✔
1010
        if not is_hash256_str(sh):
×
1011
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1012
        # do request
1013
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1014
        # check response
1015
        assert_list_or_tuple(res)
×
1016
        prev_height = 1
×
1017
        for tx_item in res:
×
1018
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1019
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1020
            assert_integer(height)
×
1021
            assert_hash256_str(tx_item['tx_hash'])
×
1022
            if height in (-1, 0):
×
1023
                assert_dict_contains_field(tx_item, field_name='fee')
×
1024
                assert_non_negative_integer(tx_item['fee'])
×
1025
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1026
            else:
1027
                # check monotonicity of heights
1028
                if height < prev_height:
×
1029
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1030
                prev_height = height
×
1031
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1032
        if len(hashes) != len(res):
×
1033
            # Either server is sending garbage... or maybe if server is race-prone
1034
            # a recently mined tx could be included in both last block and mempool?
1035
            # Still, it's simplest to just disregard the response.
1036
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1037
        return res
×
1038

1039
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
4✔
1040
        if not is_hash256_str(sh):
×
1041
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1042
        # do request
1043
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1044
        # check response
1045
        assert_list_or_tuple(res)
×
1046
        for utxo_item in res:
×
1047
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1048
            assert_dict_contains_field(utxo_item, field_name='value')
×
1049
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1050
            assert_dict_contains_field(utxo_item, field_name='height')
×
1051
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1052
            assert_non_negative_integer(utxo_item['value'])
×
1053
            assert_non_negative_integer(utxo_item['height'])
×
1054
            assert_hash256_str(utxo_item['tx_hash'])
×
1055
        return res
×
1056

1057
    async def get_balance_for_scripthash(self, sh: str) -> dict:
4✔
1058
        if not is_hash256_str(sh):
×
1059
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1060
        # do request
1061
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1062
        # check response
1063
        assert_dict_contains_field(res, field_name='confirmed')
×
1064
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1065
        assert_non_negative_integer(res['confirmed'])
×
1066
        assert_integer(res['unconfirmed'])
×
1067
        return res
×
1068

1069
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
4✔
1070
        if not is_non_negative_integer(tx_height):
×
1071
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1072
        if not is_non_negative_integer(tx_pos):
×
1073
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1074
        # do request
1075
        res = await self.session.send_request(
×
1076
            'blockchain.transaction.id_from_pos',
1077
            [tx_height, tx_pos, merkle],
1078
        )
1079
        # check response
1080
        if merkle:
×
1081
            assert_dict_contains_field(res, field_name='tx_hash')
×
1082
            assert_dict_contains_field(res, field_name='merkle')
×
1083
            assert_hash256_str(res['tx_hash'])
×
1084
            assert_list_or_tuple(res['merkle'])
×
1085
            for node_hash in res['merkle']:
×
1086
                assert_hash256_str(node_hash)
×
1087
        else:
1088
            assert_hash256_str(res)
×
1089
        return res
×
1090

1091
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
4✔
1092
        # do request
1093
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1094
        # check response
1095
        assert_list_or_tuple(res)
×
1096
        prev_fee = float('inf')
×
1097
        for fee, s in res:
×
1098
            assert_non_negative_int_or_float(fee)
×
1099
            assert_non_negative_integer(s)
×
1100
            if fee >= prev_fee:  # check monotonicity
×
1101
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1102
            prev_fee = fee
×
1103
        return res
×
1104

1105
    async def get_server_banner(self) -> str:
4✔
1106
        # do request
1107
        res = await self.session.send_request('server.banner')
×
1108
        # check response
1109
        if not isinstance(res, str):
×
1110
            raise RequestCorrupted(f'{res!r} should be a str')
×
1111
        return res
×
1112

1113
    async def get_donation_address(self) -> str:
4✔
1114
        # do request
1115
        res = await self.session.send_request('server.donation_address')
×
1116
        # check response
1117
        if not res:  # ignore empty string
×
1118
            return ''
×
1119
        if not bitcoin.is_address(res):
×
1120
            # note: do not hard-fail -- allow server to use future-type
1121
            #       bitcoin address we do not recognize
1122
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1123
            res = ''
×
1124
        return res
×
1125

1126
    async def get_relay_fee(self) -> int:
4✔
1127
        """Returns the min relay feerate in sat/kbyte."""
1128
        # do request
1129
        res = await self.session.send_request('blockchain.relayfee')
×
1130
        # check response
1131
        assert_non_negative_int_or_float(res)
×
1132
        relayfee = int(res * bitcoin.COIN)
×
1133
        relayfee = max(0, relayfee)
×
1134
        return relayfee
×
1135

1136
    async def get_estimatefee(self, num_blocks: int) -> int:
4✔
1137
        """Returns a feerate estimate for getting confirmed within
1138
        num_blocks blocks, in sat/kbyte.
1139
        Returns -1 if the server could not provide an estimate.
1140
        """
1141
        if not is_non_negative_integer(num_blocks):
×
1142
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1143
        # do request
1144
        try:
×
1145
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1146
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1147
            # The protocol spec says the server itself should already have returned -1
1148
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1149
            # and sends an error instead. Convert it here:
1150
            if "cannot estimate fee" in e.message:
×
1151
                res = -1
×
1152
            else:
1153
                raise
×
1154
        except aiorpcx.jsonrpc.RPCError as e:
×
1155
            # The protocol spec says the server itself should already have returned -1
1156
            # if it cannot provide an estimate. "Fulcrum" often sends:
1157
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1158
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1159
                res = -1
×
1160
            else:
1161
                raise
×
1162
        # check response
1163
        if res != -1:
×
1164
            assert_non_negative_int_or_float(res)
×
1165
            res = int(res * bitcoin.COIN)
×
1166
        return res
×
1167

1168

1169
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
4✔
1170
    chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
4✔
1171
    if chain_bad:
4✔
1172
        raise Exception('bad_header must not check!')
×
1173

1174

1175
def check_cert(host, cert):
4✔
1176
    try:
×
1177
        b = pem.dePem(cert, 'CERTIFICATE')
×
1178
        x = x509.X509(b)
×
1179
    except Exception:
×
1180
        traceback.print_exc(file=sys.stdout)
×
1181
        return
×
1182

1183
    try:
×
1184
        x.check_date()
×
1185
        expired = False
×
1186
    except Exception:
×
1187
        expired = True
×
1188

1189
    m = "host: %s\n"%host
×
1190
    m += "has_expired: %s\n"% expired
×
1191
    util.print_msg(m)
×
1192

1193

1194
# Used by tests
1195
def _match_hostname(name, val):
4✔
1196
    if val == name:
×
1197
        return True
×
1198

1199
    return val.startswith('*.') and name.endswith(val[1:])
×
1200

1201

1202
def test_certificates():
4✔
1203
    from .simple_config import SimpleConfig
×
1204
    config = SimpleConfig()
×
1205
    mydir = os.path.join(config.path, "certs")
×
1206
    certs = os.listdir(mydir)
×
1207
    for c in certs:
×
1208
        p = os.path.join(mydir,c)
×
1209
        with open(p, encoding='utf-8') as f:
×
1210
            cert = f.read()
×
1211
        check_cert(c, cert)
×
1212

1213
if __name__ == "__main__":
4✔
1214
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc