• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5497433947373568

08 May 2025 06:34PM UTC coverage: 59.715% (+0.05%) from 59.662%
5497433947373568

push

CirrusCI

SomberNight
Revert "interface: add padding and some noise to protocol messages"

Unforeseen issues. Needs more work..

This reverts commit 097eabed1.

1 of 2 new or added lines in 1 file covered. (50.0%)

27 existing lines in 6 files now uncovered.

21523 of 36043 relevant lines covered (59.71%)

2.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.27
/electrum/interface.py
1
#!/usr/bin/env python
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2011 thomasv@gitorious
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25
import os
5✔
26
import re
5✔
27
import ssl
5✔
28
import sys
5✔
29
import traceback
5✔
30
import asyncio
5✔
31
import socket
5✔
32
from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence, Dict
5✔
33
from collections import defaultdict
5✔
34
from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
5✔
35
import itertools
5✔
36
import logging
5✔
37
import hashlib
5✔
38
import functools
5✔
39

40
import aiorpcx
5✔
41
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
5✔
42
from aiorpcx.curio import timeout_after, TaskTimeout
5✔
43
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
5✔
44
from aiorpcx.rawsocket import RSClient
5✔
45
import certifi
5✔
46

47
from .util import (ignore_exceptions, log_exceptions, bfh, ESocksProxy,
5✔
48
                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
49
                   is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
50
from . import util
5✔
51
from . import x509
5✔
52
from . import pem
5✔
53
from . import version
5✔
54
from . import blockchain
5✔
55
from .blockchain import Blockchain, HEADER_SIZE
5✔
56
from . import bitcoin
5✔
57
from . import constants
5✔
58
from .i18n import _
5✔
59
from .logging import Logger
5✔
60
from .transaction import Transaction
5✔
61
from .fee_policy import FEE_ETA_TARGETS
5✔
62

63
if TYPE_CHECKING:
5✔
64
    from .network import Network
×
65
    from .simple_config import SimpleConfig
×
66

67

68
ca_path = certifi.where()
5✔
69

70
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
5✔
71

72
_KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
5✔
73
PREFERRED_NETWORK_PROTOCOL = 's'
5✔
74
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
5✔
75

76

77
class NetworkTimeout:
5✔
78
    # seconds
79
    class Generic:
5✔
80
        NORMAL = 30
5✔
81
        RELAXED = 45
5✔
82
        MOST_RELAXED = 600
5✔
83

84
    class Urgent(Generic):
5✔
85
        NORMAL = 10
5✔
86
        RELAXED = 20
5✔
87
        MOST_RELAXED = 60
5✔
88

89

90
def assert_non_negative_integer(val: Any) -> None:
5✔
91
    if not is_non_negative_integer(val):
×
92
        raise RequestCorrupted(f'{val!r} should be a non-negative integer')
×
93

94

95
def assert_integer(val: Any) -> None:
5✔
96
    if not is_integer(val):
×
97
        raise RequestCorrupted(f'{val!r} should be an integer')
×
98

99

100
def assert_int_or_float(val: Any) -> None:
5✔
101
    if not is_int_or_float(val):
×
102
        raise RequestCorrupted(f'{val!r} should be int or float')
×
103

104

105
def assert_non_negative_int_or_float(val: Any) -> None:
5✔
106
    if not is_non_negative_int_or_float(val):
×
107
        raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
×
108

109

110
def assert_hash256_str(val: Any) -> None:
5✔
111
    if not is_hash256_str(val):
×
112
        raise RequestCorrupted(f'{val!r} should be a hash256 str')
×
113

114

115
def assert_hex_str(val: Any) -> None:
5✔
116
    if not is_hex_str(val):
×
117
        raise RequestCorrupted(f'{val!r} should be a hex str')
×
118

119

120
def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
5✔
121
    if not isinstance(d, dict):
×
122
        raise RequestCorrupted(f'{d!r} should be a dict')
×
123
    if field_name not in d:
×
124
        raise RequestCorrupted(f'required field {field_name!r} missing from dict')
×
125
    return d[field_name]
×
126

127

128
def assert_list_or_tuple(val: Any) -> None:
5✔
129
    if not isinstance(val, (list, tuple)):
×
130
        raise RequestCorrupted(f'{val!r} should be a list or tuple')
×
131

132

133
class NotificationSession(RPCSession):
5✔
134

135
    def __init__(self, *args, interface: 'Interface', **kwargs):
5✔
136
        super(NotificationSession, self).__init__(*args, **kwargs)
×
137
        self.subscriptions = defaultdict(list)
×
138
        self.cache = {}
×
139
        self._msg_counter = itertools.count(start=1)
×
140
        self.interface = interface
×
141
        self.cost_hard_limit = 0  # disable aiorpcx resource limits
×
142

143
    async def handle_request(self, request):
5✔
144
        self.maybe_log(f"--> {request}")
×
145
        try:
×
146
            if isinstance(request, Notification):
×
147
                params, result = request.args[:-1], request.args[-1]
×
148
                key = self.get_hashable_key_for_rpc_call(request.method, params)
×
149
                if key in self.subscriptions:
×
150
                    self.cache[key] = result
×
151
                    for queue in self.subscriptions[key]:
×
152
                        await queue.put(request.args)
×
153
                else:
154
                    raise Exception(f'unexpected notification')
×
155
            else:
156
                raise Exception(f'unexpected request. not a notification')
×
157
        except Exception as e:
×
158
            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
×
159
            await self.close()
×
160

161
    async def send_request(self, *args, timeout=None, **kwargs):
5✔
162
        # note: semaphores/timeouts/backpressure etc are handled by
163
        # aiorpcx. the timeout arg here in most cases should not be set
164
        msg_id = next(self._msg_counter)
×
165
        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
×
166
        try:
×
167
            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
168
            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
169
            response = await util.wait_for2(
×
170
                super().send_request(*args, **kwargs),
171
                timeout)
172
        except (TaskTimeout, asyncio.TimeoutError) as e:
×
173
            self.maybe_log(f"--> request timed out: {args} (id: {msg_id})")
×
174
            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
×
175
        except CodeMessageError as e:
×
176
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
177
            raise
×
178
        except BaseException as e:  # cancellations, etc. are useful for debugging
×
179
            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
×
180
            raise
×
181
        else:
182
            self.maybe_log(f"--> {response} (id: {msg_id})")
×
183
            return response
×
184

185
    def set_default_timeout(self, timeout):
5✔
186
        assert hasattr(self, "sent_request_timeout")  # in base class
×
187
        self.sent_request_timeout = timeout
×
188
        assert hasattr(self, "max_send_delay")        # in base class
×
189
        self.max_send_delay = timeout
×
190

191
    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
5✔
192
        # note: until the cache is written for the first time,
193
        # each 'subscribe' call might make a request on the network.
194
        key = self.get_hashable_key_for_rpc_call(method, params)
×
195
        self.subscriptions[key].append(queue)
×
196
        if key in self.cache:
×
197
            result = self.cache[key]
×
198
        else:
199
            result = await self.send_request(method, params)
×
200
            self.cache[key] = result
×
201
        await queue.put(params + [result])
×
202

203
    def unsubscribe(self, queue):
5✔
204
        """Unsubscribe a callback to free object references to enable GC."""
205
        # note: we can't unsubscribe from the server, so we keep receiving
206
        # subsequent notifications
207
        for v in self.subscriptions.values():
×
208
            if queue in v:
×
209
                v.remove(queue)
×
210

211
    @classmethod
5✔
212
    def get_hashable_key_for_rpc_call(cls, method, params):
5✔
213
        """Hashable index for subscriptions and cache"""
214
        return str(method) + repr(params)
×
215

216
    def maybe_log(self, msg: str) -> None:
5✔
217
        if not self.interface: return
×
218
        if self.interface.debug or self.interface.network.debug:
×
219
            self.interface.logger.debug(msg)
×
220

221
    def default_framer(self):
5✔
222
        # overridden so that max_size can be customized
223
        max_size = self.interface.network.config.NETWORK_MAX_INCOMING_MSG_SIZE
×
224
        assert max_size > 500_000, f"{max_size=} (< 500_000) is too small"
×
225
        return NewlineFramer(max_size=max_size)
×
226

227
    async def close(self, *, force_after: int = None):
5✔
228
        """Closes the connection and waits for it to be closed.
229
        We try to flush buffered data to the wire, which can take some time.
230
        """
231
        if force_after is None:
×
232
            # We give up after a while and just abort the connection.
233
            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
234
            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
235
            # Note: if the ethernet cable was pulled or wifi disconnected, that too might
236
            #       wait until this timeout is triggered
237
            force_after = 1  # seconds
×
238
        await super().close(force_after=force_after)
×
239

240

241
class NetworkException(Exception): pass
5✔
242

243

244
class GracefulDisconnect(NetworkException):
5✔
245
    log_level = logging.INFO
5✔
246

247
    def __init__(self, *args, log_level=None, **kwargs):
5✔
248
        Exception.__init__(self, *args, **kwargs)
5✔
249
        if log_level is not None:
5✔
250
            self.log_level = log_level
×
251

252

253
class RequestTimedOut(GracefulDisconnect):
5✔
254
    def __str__(self):
5✔
255
        return _("Network request timed out.")
×
256

257

258
class RequestCorrupted(Exception): pass
5✔
259

260
class ErrorParsingSSLCert(Exception): pass
5✔
261
class ErrorGettingSSLCertFromServer(Exception): pass
5✔
262
class ErrorSSLCertFingerprintMismatch(Exception): pass
5✔
263
class InvalidOptionCombination(Exception): pass
5✔
264
class ConnectError(NetworkException): pass
5✔
265

266

267
class _RSClient(RSClient):
5✔
268
    async def create_connection(self):
5✔
269
        try:
×
270
            return await super().create_connection()
×
271
        except OSError as e:
×
272
            # note: using "from e" here will set __cause__ of ConnectError
273
            raise ConnectError(e) from e
×
274

275

276
class ServerAddr:
5✔
277

278
    def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
5✔
279
        assert isinstance(host, str), repr(host)
5✔
280
        if protocol is None:
5✔
281
            protocol = 's'
×
282
        if not host:
5✔
283
            raise ValueError('host must not be empty')
×
284
        if host[0] == '[' and host[-1] == ']':  # IPv6
5✔
285
            host = host[1:-1]
5✔
286
        try:
5✔
287
            net_addr = NetAddress(host, port)  # this validates host and port
5✔
288
        except Exception as e:
5✔
289
            raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
5✔
290
        if protocol not in _KNOWN_NETWORK_PROTOCOLS:
5✔
291
            raise ValueError(f"invalid network protocol: {protocol}")
×
292
        self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
5✔
293
        self.port = int(net_addr.port)
5✔
294
        self.protocol = protocol
5✔
295
        self._net_addr_str = str(net_addr)
5✔
296

297
    @classmethod
5✔
298
    def from_str(cls, s: str) -> 'ServerAddr':
5✔
299
        """Constructs a ServerAddr or raises ValueError."""
300
        # host might be IPv6 address, hence do rsplit:
301
        host, port, protocol = str(s).rsplit(':', 2)
5✔
302
        return ServerAddr(host=host, port=port, protocol=protocol)
5✔
303

304
    @classmethod
5✔
305
    def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
5✔
306
        """Construct ServerAddr from str, guessing missing details.
307
        Does not raise - just returns None if guessing failed.
308
        Ongoing compatibility not guaranteed.
309
        """
310
        if not s:
5✔
311
            return None
×
312
        host = ""
5✔
313
        if s[0] == "[" and "]" in s:  # IPv6 address
5✔
314
            host_end = s.index("]")
5✔
315
            host = s[1:host_end]
5✔
316
            s = s[host_end+1:]
5✔
317
        items = str(s).rsplit(':', 2)
5✔
318
        if len(items) < 2:
5✔
319
            return None  # although maybe we could guess the port too?
5✔
320
        host = host or items[0]
5✔
321
        port = items[1]
5✔
322
        if len(items) >= 3:
5✔
323
            protocol = items[2]
5✔
324
        else:
325
            protocol = PREFERRED_NETWORK_PROTOCOL
5✔
326
        try:
5✔
327
            return ServerAddr(host=host, port=port, protocol=protocol)
5✔
328
        except ValueError:
5✔
329
            return None
5✔
330

331
    def to_friendly_name(self) -> str:
5✔
332
        # note: this method is closely linked to from_str_with_inference
333
        if self.protocol == 's':  # hide trailing ":s"
5✔
334
            return self.net_addr_str()
5✔
335
        return str(self)
5✔
336

337
    def __str__(self):
5✔
338
        return '{}:{}'.format(self.net_addr_str(), self.protocol)
5✔
339

340
    def to_json(self) -> str:
5✔
341
        return str(self)
×
342

343
    def __repr__(self):
5✔
344
        return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
×
345

346
    def net_addr_str(self) -> str:
5✔
347
        return self._net_addr_str
5✔
348

349
    def __eq__(self, other):
5✔
350
        if not isinstance(other, ServerAddr):
5✔
351
            return False
×
352
        return (self.host == other.host
5✔
353
                and self.port == other.port
354
                and self.protocol == other.protocol)
355

356
    def __ne__(self, other):
5✔
357
        return not (self == other)
×
358

359
    def __hash__(self):
5✔
360
        return hash((self.host, self.port, self.protocol))
×
361

362

363
def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
5✔
364
    filename = host
5✔
365
    try:
5✔
366
        ip = ip_address(host)
5✔
367
    except ValueError:
5✔
368
        pass
5✔
369
    else:
370
        if isinstance(ip, IPv6Address):
×
371
            filename = f"ipv6_{ip.packed.hex()}"
×
372
    return os.path.join(config.path, 'certs', filename)
5✔
373

374

375
class Interface(Logger):
5✔
376

377
    LOGGING_SHORTCUT = 'i'
5✔
378

379
    def __init__(self, *, network: 'Network', server: ServerAddr):
5✔
380
        self.ready = network.asyncio_loop.create_future()
5✔
381
        self.got_disconnected = asyncio.Event()
5✔
382
        self.server = server
5✔
383
        Logger.__init__(self)
5✔
384
        assert network.config.path
5✔
385
        self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
5✔
386
        self.blockchain = None  # type: Optional[Blockchain]
5✔
387
        self._requested_chunks = set()  # type: Set[int]
5✔
388
        self.network = network
5✔
389
        self.session = None  # type: Optional[NotificationSession]
5✔
390
        self._ipaddr_bucket = None
5✔
391
        # Set up proxy.
392
        # - for servers running on localhost, the proxy is not used. If user runs their own server
393
        #   on same machine, this lets them enable the proxy (which is used for e.g. FX rates).
394
        #   note: we could maybe relax this further and bypass the proxy for all private
395
        #         addresses...? e.g. 192.168.x.x
396
        if util.is_localhost(server.host):
5✔
397
            self.logger.info(f"looks like localhost: not using proxy for this server")
×
398
            self.proxy = None
×
399
        else:
400
            self.proxy = ESocksProxy.from_network_settings(network)
5✔
401

402
        # Latest block header and corresponding height, as claimed by the server.
403
        # Note that these values are updated before they are verified.
404
        # Especially during initial header sync, verification can take a long time.
405
        # Failing verification will get the interface closed.
406
        self.tip_header = None
5✔
407
        self.tip = 0
5✔
408

409
        self.fee_estimates_eta = {}  # type: Dict[int, int]
5✔
410

411
        # Dump network messages (only for this interface).  Set at runtime from the console.
412
        self.debug = False
5✔
413

414
        self.taskgroup = OldTaskGroup()
5✔
415

416
        async def spawn_task():
5✔
417
            task = await self.network.taskgroup.spawn(self.run())
5✔
418
            task.set_name(f"interface::{str(server)}")
5✔
419
        asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
5✔
420

421
    @property
5✔
422
    def host(self):
5✔
423
        return self.server.host
5✔
424

425
    @property
5✔
426
    def port(self):
5✔
427
        return self.server.port
×
428

429
    @property
5✔
430
    def protocol(self):
5✔
431
        return self.server.protocol
×
432

433
    def diagnostic_name(self):
5✔
434
        return self.server.net_addr_str()
5✔
435

436
    def __str__(self):
5✔
437
        return f"<Interface {self.diagnostic_name()}>"
×
438

439
    async def is_server_ca_signed(self, ca_ssl_context):
5✔
440
        """Given a CA enforcing SSL context, returns True if the connection
441
        can be established. Returns False if the server has a self-signed
442
        certificate but otherwise is okay. Any other failures raise.
443
        """
444
        try:
×
445
            await self.open_session(ca_ssl_context, exit_early=True)
×
446
        except ConnectError as e:
×
447
            cause = e.__cause__
×
448
            if (isinstance(cause, ssl.SSLCertVerificationError)
×
449
                    and cause.reason == 'CERTIFICATE_VERIFY_FAILED'
450
                    and cause.verify_code == 18):  # "self signed certificate"
451
                # Good. We will use this server as self-signed.
452
                return False
×
453
            # Not good. Cannot use this server.
454
            raise
×
455
        # Good. We will use this server as CA-signed.
456
        return True
×
457

458
    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
5✔
459
        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
×
460
        if ca_signed:
×
461
            if self._get_expected_fingerprint():
×
462
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
463
            with open(self.cert_path, 'w') as f:
×
464
                # empty file means this is CA signed, not self-signed
465
                f.write('')
×
466
        else:
467
            await self._save_certificate()
×
468

469
    def _is_saved_ssl_cert_available(self):
5✔
470
        if not os.path.exists(self.cert_path):
×
471
            return False
×
472
        with open(self.cert_path, 'r') as f:
×
473
            contents = f.read()
×
474
        if contents == '':  # CA signed
×
475
            if self._get_expected_fingerprint():
×
476
                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
×
477
            return True
×
478
        # pinned self-signed cert
479
        try:
×
480
            b = pem.dePem(contents, 'CERTIFICATE')
×
481
        except SyntaxError as e:
×
482
            self.logger.info(f"error parsing already saved cert: {e}")
×
483
            raise ErrorParsingSSLCert(e) from e
×
484
        try:
×
485
            x = x509.X509(b)
×
486
        except Exception as e:
×
487
            self.logger.info(f"error parsing already saved cert: {e}")
×
488
            raise ErrorParsingSSLCert(e) from e
×
489
        try:
×
490
            x.check_date()
×
491
        except x509.CertificateError as e:
×
492
            self.logger.info(f"certificate has expired: {e}")
×
493
            os.unlink(self.cert_path)  # delete pinned cert only in this case
×
494
            return False
×
495
        self._verify_certificate_fingerprint(bytearray(b))
×
496
        return True
×
497

498
    async def _get_ssl_context(self):
5✔
499
        if self.protocol != 's':
×
500
            # using plaintext TCP
501
            return None
×
502

503
        # see if we already have cert for this server; or get it for the first time
504
        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
×
505
        if not self._is_saved_ssl_cert_available():
×
506
            try:
×
507
                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
×
508
            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
509
                raise ErrorGettingSSLCertFromServer(e) from e
×
510
        # now we have a file saved in our certificate store
511
        siz = os.stat(self.cert_path).st_size
×
512
        if siz == 0:
×
513
            # CA signed cert
514
            sslc = ca_sslc
×
515
        else:
516
            # pinned self-signed cert
517
            sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
×
518
            # note: Flag "ssl.VERIFY_X509_STRICT" is enabled by default in python 3.13+ (disabled in older versions).
519
            #       We explicitly disable it as it breaks lots of servers.
520
            sslc.verify_flags &= ~ssl.VERIFY_X509_STRICT
×
521
            sslc.check_hostname = False
×
522
        return sslc
×
523

524
    def handle_disconnect(func):
5✔
525
        @functools.wraps(func)
5✔
526
        async def wrapper_func(self: 'Interface', *args, **kwargs):
5✔
527
            try:
×
528
                return await func(self, *args, **kwargs)
×
529
            except GracefulDisconnect as e:
×
530
                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
×
531
            except aiorpcx.jsonrpc.RPCError as e:
×
532
                self.logger.warning(f"disconnecting due to {repr(e)}")
×
533
                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
×
534
            finally:
535
                self.got_disconnected.set()
×
536
                await self.network.connection_down(self)
×
537
                # if was not 'ready' yet, schedule waiting coroutines:
538
                self.ready.cancel()
×
539
        return wrapper_func
5✔
540

541
    @ignore_exceptions  # do not kill network.taskgroup
5✔
542
    @log_exceptions
5✔
543
    @handle_disconnect
5✔
544
    async def run(self):
5✔
545
        try:
×
546
            ssl_context = await self._get_ssl_context()
×
547
        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
×
548
            self.logger.info(f'disconnecting due to: {repr(e)}')
×
549
            return
×
550
        try:
×
551
            await self.open_session(ssl_context)
×
552
        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
×
553
            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
554
            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
×
555
                    and self.is_main_server() and not self.network.auto_connect):
556
                self.logger.warning(f'Cannot connect to main server due to SSL error '
×
557
                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
558
            else:
559
                self.logger.info(f'disconnecting due to: {repr(e)}')
×
560
            return
×
561

562
    def _mark_ready(self) -> None:
5✔
563
        if self.ready.cancelled():
×
564
            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
×
565
        if self.ready.done():
×
566
            return
×
567

568
        assert self.tip_header
×
569
        chain = blockchain.check_header(self.tip_header)
×
570
        if not chain:
×
571
            self.blockchain = blockchain.get_best_chain()
×
572
        else:
573
            self.blockchain = chain
×
574
        assert self.blockchain is not None
×
575

576
        self.logger.info(f"set blockchain with height {self.blockchain.height()}")
×
577

578
        self.ready.set_result(1)
×
579

580
    def is_connected_and_ready(self) -> bool:
5✔
581
        return self.ready.done() and not self.got_disconnected.is_set()
×
582

583
    async def _save_certificate(self) -> None:
5✔
584
        if not os.path.exists(self.cert_path):
×
585
            # we may need to retry this a few times, in case the handshake hasn't completed
586
            for _ in range(10):
×
587
                dercert = await self._fetch_certificate()
×
588
                if dercert:
×
589
                    self.logger.info("succeeded in getting cert")
×
590
                    self._verify_certificate_fingerprint(dercert)
×
591
                    with open(self.cert_path, 'w') as f:
×
592
                        cert = ssl.DER_cert_to_PEM_cert(dercert)
×
593
                        # workaround android bug
594
                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
×
595
                        f.write(cert)
×
596
                        # even though close flushes, we can't fsync when closed.
597
                        # and we must flush before fsyncing, cause flush flushes to OS buffer
598
                        # fsync writes to OS buffer to disk
599
                        f.flush()
×
600
                        os.fsync(f.fileno())
×
601
                    break
×
602
                await asyncio.sleep(1)
×
603
            else:
604
                raise GracefulDisconnect("could not get certificate after 10 tries")
×
605

606
    async def _fetch_certificate(self) -> bytes:
5✔
607
        sslc = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
×
608
        sslc.check_hostname = False
×
609
        sslc.verify_mode = ssl.CERT_NONE
×
610
        async with _RSClient(session_factory=RPCSession,
×
611
                             host=self.host, port=self.port,
612
                             ssl=sslc, proxy=self.proxy) as session:
613
            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
×
614
            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
×
615
            return ssl_object.getpeercert(binary_form=True)
×
616

617
    def _get_expected_fingerprint(self) -> Optional[str]:
5✔
618
        if self.is_main_server():
×
619
            return self.network.config.NETWORK_SERVERFINGERPRINT
×
620

621
    def _verify_certificate_fingerprint(self, certificate):
5✔
622
        expected_fingerprint = self._get_expected_fingerprint()
×
623
        if not expected_fingerprint:
×
624
            return
×
625
        fingerprint = hashlib.sha256(certificate).hexdigest()
×
626
        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
×
627
        if not fingerprints_match:
×
628
            util.trigger_callback('cert_mismatch')
×
629
            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
×
630
        self.logger.info("cert fingerprint verification passed")
×
631

632
    async def get_block_header(self, height, assert_mode):
5✔
633
        if not is_non_negative_integer(height):
×
634
            raise Exception(f"{repr(height)} is not a block height")
×
635
        self.logger.info(f'requesting block header {height} in mode {assert_mode}')
×
636
        # use lower timeout as we usually have network.bhi_lock here
637
        timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
638
        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
×
639
        return blockchain.deserialize_header(bytes.fromhex(res), height)
×
640

641
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
5✔
642
        if not is_non_negative_integer(height):
×
643
            raise Exception(f"{repr(height)} is not a block height")
×
644
        index = height // 2016
×
645
        if can_return_early and index in self._requested_chunks:
×
646
            return
×
647
        self.logger.info(f"requesting chunk from height {height}")
×
648
        size = 2016
×
649
        if tip is not None:
×
650
            size = min(size, tip - index * 2016 + 1)
×
651
            size = max(size, 0)
×
652
        try:
×
653
            self._requested_chunks.add(index)
×
654
            res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
×
655
        finally:
656
            self._requested_chunks.discard(index)
×
657
        assert_dict_contains_field(res, field_name='count')
×
658
        assert_dict_contains_field(res, field_name='hex')
×
659
        assert_dict_contains_field(res, field_name='max')
×
660
        assert_non_negative_integer(res['count'])
×
661
        assert_non_negative_integer(res['max'])
×
662
        assert_hex_str(res['hex'])
×
663
        if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
×
664
            raise RequestCorrupted('inconsistent chunk hex and count')
×
665
        # we never request more than 2016 headers, but we enforce those fit in a single response
666
        if res['max'] < 2016:
×
667
            raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
×
668
        if res['count'] != size:
×
669
            raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
×
670
        conn = self.blockchain.connect_chunk(index, res['hex'])
×
671
        if not conn:
×
672
            return conn, 0
×
673
        return conn, res['count']
×
674

675
    def is_main_server(self) -> bool:
5✔
676
        return (self.network.interface == self or
×
677
                self.network.interface is None and self.network.default_server == self.server)
678

679
    async def open_session(self, sslc, exit_early=False):
5✔
680
        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
×
681
        async with _RSClient(session_factory=session_factory,
×
682
                             host=self.host, port=self.port,
683
                             ssl=sslc, proxy=self.proxy) as session:
684
            self.session = session  # type: NotificationSession
×
685
            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
×
686
            try:
×
687
                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
×
688
            except aiorpcx.jsonrpc.RPCError as e:
×
689
                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
×
690
            if exit_early:
×
691
                return
×
692
            if ver[1] != version.PROTOCOL_VERSION:
×
693
                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
×
694
                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
695
            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
×
696
                raise GracefulDisconnect(f'too many connected servers already '
×
697
                                         f'in bucket {self.bucket_based_on_ipaddress()}')
698
            self.logger.info(f"connection established. version: {ver}")
×
699

700
            try:
×
701
                async with self.taskgroup as group:
×
702
                    await group.spawn(self.ping)
×
703
                    await group.spawn(self.request_fee_estimates)
×
704
                    await group.spawn(self.run_fetch_blocks)
×
705
                    await group.spawn(self.monitor_connection)
×
706
            except aiorpcx.jsonrpc.RPCError as e:
×
707
                if e.code in (
×
708
                    JSONRPC.EXCESSIVE_RESOURCE_USAGE,
709
                    JSONRPC.SERVER_BUSY,
710
                    JSONRPC.METHOD_NOT_FOUND,
711
                    JSONRPC.INTERNAL_ERROR,
712
                ):
713
                    log_level = logging.WARNING if self.is_main_server() else logging.INFO
×
714
                    raise GracefulDisconnect(e, log_level=log_level) from e
×
715
                raise
×
716
            finally:
717
                self.got_disconnected.set()  # set this ASAP, ideally before any awaits
×
718

719
    async def monitor_connection(self):
5✔
720
        while True:
×
721
            await asyncio.sleep(1)
×
722
            # If the session/transport is no longer open, we disconnect.
723
            # e.g. if the remote cleanly sends EOF, we would handle that here.
724
            # note: If the user pulls the ethernet cable or disconnects wifi,
725
            #       ideally we would detect that here, so that the GUI/etc can reflect that.
726
            #       - On Android, this seems to work reliably , where asyncio.BaseProtocol.connection_lost()
727
            #         gets called with e.g. ConnectionAbortedError(103, 'Software caused connection abort').
728
            #       - On desktop Linux/Win, it seems BaseProtocol.connection_lost() is not called in such cases.
729
            #         Hence, in practice the connection issue will only be detected the next time we try
730
            #         to send a message (plus timeout), which can take minutes...
731
            if not self.session or self.session.is_closing():
×
732
                raise GracefulDisconnect('session was closed')
×
733

734
    async def ping(self):
5✔
UNCOV
735
        while True:
×
NEW
736
            await asyncio.sleep(300)
×
737
            await self.session.send_request('server.ping')
×
738

739
    async def request_fee_estimates(self):
5✔
740
        while True:
×
741
            async with OldTaskGroup() as group:
×
742
                fee_tasks = []
×
743
                for i in FEE_ETA_TARGETS[0:-1]:
×
744
                    fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
×
745
            for nblock_target, task in fee_tasks:
×
746
                fee = task.result()
×
747
                if fee < 0: continue
×
748
                assert isinstance(fee, int)
×
749
                self.fee_estimates_eta[nblock_target] = fee
×
750
            self.network.update_fee_estimates()
×
751
            await asyncio.sleep(60)
×
752

753
    async def close(self, *, force_after: int = None):
5✔
754
        """Closes the connection and waits for it to be closed.
755
        We try to flush buffered data to the wire, which can take some time.
756
        """
757
        if self.session:
×
758
            await self.session.close(force_after=force_after)
×
759
        # monitor_connection will cancel tasks
760

761
    async def run_fetch_blocks(self):
5✔
762
        header_queue = asyncio.Queue()
×
763
        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
×
764
        while True:
×
765
            item = await header_queue.get()
×
766
            raw_header = item[0]
×
767
            height = raw_header['height']
×
768
            header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
×
769
            self.tip_header = header
×
770
            self.tip = height
×
771
            if self.tip < constants.net.max_checkpoint():
×
772
                raise GracefulDisconnect('server tip below max checkpoint')
×
773
            self._mark_ready()
×
774
            blockchain_updated = await self._process_header_at_tip()
×
775
            # header processing done
776
            if blockchain_updated:
×
777
                util.trigger_callback('blockchain_updated')
×
778
            util.trigger_callback('network_updated')
×
779
            await self.network.switch_unwanted_fork_interface()
×
780
            await self.network.switch_lagging_interface()
×
781

782
    async def _process_header_at_tip(self) -> bool:
5✔
783
        """Returns:
784
        False - boring fast-forward: we already have this header as part of this blockchain from another interface,
785
        True - new header we didn't have, or reorg
786
        """
787
        height, header = self.tip, self.tip_header
×
788
        async with self.network.bhi_lock:
×
789
            if self.blockchain.height() >= height and self.blockchain.check_header(header):
×
790
                # another interface amended the blockchain
791
                return False
×
792
            _, height = await self.step(height, header)
×
793
            # in the simple case, height == self.tip+1
794
            if height <= self.tip:
×
795
                await self.sync_until(height)
×
796
            return True
×
797

798
    async def sync_until(self, height, next_height=None):
5✔
799
        if next_height is None:
5✔
800
            next_height = self.tip
×
801
        last = None
5✔
802
        while last is None or height <= next_height:
5✔
803
            prev_last, prev_height = last, height
5✔
804
            if next_height > height + 10:
5✔
805
                could_connect, num_headers = await self.request_chunk(height, next_height)
×
806
                if not could_connect:
×
807
                    if height <= constants.net.max_checkpoint():
×
808
                        raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
×
809
                    last, height = await self.step(height)
×
810
                    continue
×
811
                util.trigger_callback('blockchain_updated')
×
812
                util.trigger_callback('network_updated')
×
813
                height = (height // 2016 * 2016) + num_headers
×
814
                assert height <= next_height+1, (height, self.tip)
×
815
                last = 'catchup'
×
816
            else:
817
                last, height = await self.step(height)
5✔
818
            assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
5✔
819
        return last, height
5✔
820

821
    async def step(self, height, header=None):
5✔
822
        assert 0 <= height <= self.tip, (height, self.tip)
5✔
823
        if header is None:
5✔
824
            header = await self.get_block_header(height, 'catchup')
5✔
825

826
        chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
827
        if chain:
5✔
828
            self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
×
829
            # note: there is an edge case here that is not handled.
830
            # we might know the blockhash (enough for check_header) but
831
            # not have the header itself. e.g. regtest chain with only genesis.
832
            # this situation resolves itself on the next block
833
            return 'catchup', height+1
×
834

835
        can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
836
        if not can_connect:
5✔
837
            self.logger.info(f"can't connect new block: {height=}")
5✔
838
            height, header, bad, bad_header = await self._search_headers_backwards(height, header)
5✔
839
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
840
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
841
            assert chain or can_connect
5✔
842
        if can_connect:
5✔
843
            self.logger.info(f"new block: {height=}")
5✔
844
            height += 1
5✔
845
            if isinstance(can_connect, Blockchain):  # not when mocking
5✔
846
                self.blockchain = can_connect
×
847
                self.blockchain.save_header(header)
×
848
            return 'catchup', height
5✔
849

850
        good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
5✔
851
        return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
5✔
852

853
    async def _search_headers_binary(self, height, bad, bad_header, chain):
5✔
854
        assert bad == bad_header['block_height']
5✔
855
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
856

857
        self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
858
        good = height
5✔
859
        while True:
5✔
860
            assert good < bad, (good, bad)
5✔
861
            height = (good + bad) // 2
5✔
862
            self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
5✔
863
            header = await self.get_block_header(height, 'binary')
5✔
864
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
865
            if chain:
5✔
866
                self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
5✔
867
                good = height
5✔
868
            else:
869
                bad = height
5✔
870
                bad_header = header
5✔
871
            if good + 1 == bad:
5✔
872
                break
5✔
873

874
        mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
5✔
875
        real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
5✔
876
        if not real and not mock:
5✔
877
            raise Exception('unexpected bad header during binary: {}'.format(bad_header))
×
878
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
879

880
        self.logger.info(f"binary search exited. good {good}, bad {bad}")
5✔
881
        return good, bad, bad_header
5✔
882

883
    async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
5✔
884
        assert good + 1 == bad
5✔
885
        assert bad == bad_header['block_height']
5✔
886
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
887
        # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
888
        # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
889

890
        bh = self.blockchain.height()
5✔
891
        assert bh >= good, (bh, good)
5✔
892
        if bh == good:
5✔
893
            height = good + 1
×
894
            self.logger.info(f"catching up from {height}")
×
895
            return 'no_fork', height
×
896

897
        # this is a new fork we don't yet have
898
        height = bad + 1
5✔
899
        self.logger.info(f"new fork at bad height {bad}")
5✔
900
        forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
5✔
901
        b = forkfun(bad_header)  # type: Blockchain
5✔
902
        self.blockchain = b
5✔
903
        assert b.forkpoint == bad
5✔
904
        return 'fork', height
5✔
905

906
    async def _search_headers_backwards(self, height, header):
5✔
907
        async def iterate():
5✔
908
            nonlocal height, header
909
            checkp = False
5✔
910
            if height <= constants.net.max_checkpoint():
5✔
911
                height = constants.net.max_checkpoint()
×
912
                checkp = True
×
913
            header = await self.get_block_header(height, 'backward')
5✔
914
            chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
915
            can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
5✔
916
            if chain or can_connect:
5✔
917
                return False
5✔
918
            if checkp:
5✔
919
                raise GracefulDisconnect("server chain conflicts with checkpoints")
×
920
            return True
5✔
921

922
        bad, bad_header = height, header
5✔
923
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
924
        with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
5✔
925
        local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
5✔
926
        height = min(local_max + 1, height - 1)
5✔
927
        while await iterate():
5✔
928
            bad, bad_header = height, header
5✔
929
            delta = self.tip - height
5✔
930
            height = self.tip - 2 * delta
5✔
931

932
        _assert_header_does_not_check_against_any_chain(bad_header)
5✔
933
        self.logger.info(f"exiting backward mode at {height}")
5✔
934
        return height, header, bad, bad_header
5✔
935

936
    @classmethod
5✔
937
    def client_name(cls) -> str:
5✔
938
        return f'electrum/{version.ELECTRUM_VERSION}'
×
939

940
    def is_tor(self):
5✔
941
        return self.host.endswith('.onion')
×
942

943
    def ip_addr(self) -> Optional[str]:
5✔
944
        session = self.session
×
945
        if not session: return None
×
946
        peer_addr = session.remote_address()
×
947
        if not peer_addr: return None
×
948
        return str(peer_addr.host)
×
949

950
    def bucket_based_on_ipaddress(self) -> str:
5✔
951
        def do_bucket():
×
952
            if self.is_tor():
×
953
                return BUCKET_NAME_OF_ONION_SERVERS
×
954
            try:
×
955
                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
×
956
            except ValueError:
×
957
                return ''
×
958
            if not ip_addr:
×
959
                return ''
×
960
            if ip_addr.is_loopback:  # localhost is exempt
×
961
                return ''
×
962
            if ip_addr.version == 4:
×
963
                slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
×
964
                return str(slash16)
×
965
            elif ip_addr.version == 6:
×
966
                slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
×
967
                return str(slash48)
×
968
            return ''
×
969

970
        if not self._ipaddr_bucket:
×
971
            self._ipaddr_bucket = do_bucket()
×
972
        return self._ipaddr_bucket
×
973

974
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
975
        if not is_hash256_str(tx_hash):
×
976
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
977
        if not is_non_negative_integer(tx_height):
×
978
            raise Exception(f"{repr(tx_height)} is not a block height")
×
979
        # do request
980
        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
×
981
        # check response
982
        block_height = assert_dict_contains_field(res, field_name='block_height')
×
983
        merkle = assert_dict_contains_field(res, field_name='merkle')
×
984
        pos = assert_dict_contains_field(res, field_name='pos')
×
985
        # note: tx_height was just a hint to the server, don't enforce the response to match it
986
        assert_non_negative_integer(block_height)
×
987
        assert_non_negative_integer(pos)
×
988
        assert_list_or_tuple(merkle)
×
989
        for item in merkle:
×
990
            assert_hash256_str(item)
×
991
        return res
×
992

993
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
994
        if not is_hash256_str(tx_hash):
×
995
            raise Exception(f"{repr(tx_hash)} is not a txid")
×
996
        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
×
997
        # validate response
998
        if not is_hex_str(raw):
×
999
            raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
×
1000
        tx = Transaction(raw)
×
1001
        try:
×
1002
            tx.deserialize()  # see if raises
×
1003
        except Exception as e:
×
1004
            raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
×
1005
        if tx.txid() != tx_hash:
×
1006
            raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
×
1007
        return raw
×
1008

1009
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1010
        if not is_hash256_str(sh):
×
1011
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1012
        # do request
1013
        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
×
1014
        # check response
1015
        assert_list_or_tuple(res)
×
1016
        prev_height = 1
×
1017
        for tx_item in res:
×
1018
            height = assert_dict_contains_field(tx_item, field_name='height')
×
1019
            assert_dict_contains_field(tx_item, field_name='tx_hash')
×
1020
            assert_integer(height)
×
1021
            assert_hash256_str(tx_item['tx_hash'])
×
1022
            if height in (-1, 0):
×
1023
                assert_dict_contains_field(tx_item, field_name='fee')
×
1024
                assert_non_negative_integer(tx_item['fee'])
×
1025
                prev_height = float("inf")  # this ensures confirmed txs can't follow mempool txs
×
1026
            else:
1027
                # check monotonicity of heights
1028
                if height < prev_height:
×
1029
                    raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
×
1030
                prev_height = height
×
1031
        hashes = set(map(lambda item: item['tx_hash'], res))
×
1032
        if len(hashes) != len(res):
×
1033
            # Either server is sending garbage... or maybe if server is race-prone
1034
            # a recently mined tx could be included in both last block and mempool?
1035
            # Still, it's simplest to just disregard the response.
1036
            raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
×
1037
        return res
×
1038

1039
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1040
        if not is_hash256_str(sh):
×
1041
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1042
        # do request
1043
        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
×
1044
        # check response
1045
        assert_list_or_tuple(res)
×
1046
        for utxo_item in res:
×
1047
            assert_dict_contains_field(utxo_item, field_name='tx_pos')
×
1048
            assert_dict_contains_field(utxo_item, field_name='value')
×
1049
            assert_dict_contains_field(utxo_item, field_name='tx_hash')
×
1050
            assert_dict_contains_field(utxo_item, field_name='height')
×
1051
            assert_non_negative_integer(utxo_item['tx_pos'])
×
1052
            assert_non_negative_integer(utxo_item['value'])
×
1053
            assert_non_negative_integer(utxo_item['height'])
×
1054
            assert_hash256_str(utxo_item['tx_hash'])
×
1055
        return res
×
1056

1057
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1058
        if not is_hash256_str(sh):
×
1059
            raise Exception(f"{repr(sh)} is not a scripthash")
×
1060
        # do request
1061
        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
×
1062
        # check response
1063
        assert_dict_contains_field(res, field_name='confirmed')
×
1064
        assert_dict_contains_field(res, field_name='unconfirmed')
×
1065
        assert_non_negative_integer(res['confirmed'])
×
1066
        assert_integer(res['unconfirmed'])
×
1067
        return res
×
1068

1069
    async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
5✔
1070
        if not is_non_negative_integer(tx_height):
×
1071
            raise Exception(f"{repr(tx_height)} is not a block height")
×
1072
        if not is_non_negative_integer(tx_pos):
×
1073
            raise Exception(f"{repr(tx_pos)} should be non-negative integer")
×
1074
        # do request
1075
        res = await self.session.send_request(
×
1076
            'blockchain.transaction.id_from_pos',
1077
            [tx_height, tx_pos, merkle],
1078
        )
1079
        # check response
1080
        if merkle:
×
1081
            assert_dict_contains_field(res, field_name='tx_hash')
×
1082
            assert_dict_contains_field(res, field_name='merkle')
×
1083
            assert_hash256_str(res['tx_hash'])
×
1084
            assert_list_or_tuple(res['merkle'])
×
1085
            for node_hash in res['merkle']:
×
1086
                assert_hash256_str(node_hash)
×
1087
        else:
1088
            assert_hash256_str(res)
×
1089
        return res
×
1090

1091
    async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
5✔
1092
        # do request
1093
        res = await self.session.send_request('mempool.get_fee_histogram')
×
1094
        # check response
1095
        assert_list_or_tuple(res)
×
1096
        prev_fee = float('inf')
×
1097
        for fee, s in res:
×
1098
            assert_non_negative_int_or_float(fee)
×
1099
            assert_non_negative_integer(s)
×
1100
            if fee >= prev_fee:  # check monotonicity
×
1101
                raise RequestCorrupted(f'fees must be in decreasing order')
×
1102
            prev_fee = fee
×
1103
        return res
×
1104

1105
    async def get_server_banner(self) -> str:
5✔
1106
        # do request
1107
        res = await self.session.send_request('server.banner')
×
1108
        # check response
1109
        if not isinstance(res, str):
×
1110
            raise RequestCorrupted(f'{res!r} should be a str')
×
1111
        return res
×
1112

1113
    async def get_donation_address(self) -> str:
5✔
1114
        # do request
1115
        res = await self.session.send_request('server.donation_address')
×
1116
        # check response
1117
        if not res:  # ignore empty string
×
1118
            return ''
×
1119
        if not bitcoin.is_address(res):
×
1120
            # note: do not hard-fail -- allow server to use future-type
1121
            #       bitcoin address we do not recognize
1122
            self.logger.info(f"invalid donation address from server: {repr(res)}")
×
1123
            res = ''
×
1124
        return res
×
1125

1126
    async def get_relay_fee(self) -> int:
5✔
1127
        """Returns the min relay feerate in sat/kbyte."""
1128
        # do request
1129
        res = await self.session.send_request('blockchain.relayfee')
×
1130
        # check response
1131
        assert_non_negative_int_or_float(res)
×
1132
        relayfee = int(res * bitcoin.COIN)
×
1133
        relayfee = max(0, relayfee)
×
1134
        return relayfee
×
1135

1136
    async def get_estimatefee(self, num_blocks: int) -> int:
5✔
1137
        """Returns a feerate estimate for getting confirmed within
1138
        num_blocks blocks, in sat/kbyte.
1139
        Returns -1 if the server could not provide an estimate.
1140
        """
1141
        if not is_non_negative_integer(num_blocks):
×
1142
            raise Exception(f"{repr(num_blocks)} is not a num_blocks")
×
1143
        # do request
1144
        try:
×
1145
            res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
×
1146
        except aiorpcx.jsonrpc.ProtocolError as e:
×
1147
            # The protocol spec says the server itself should already have returned -1
1148
            # if it cannot provide an estimate, however apparently "electrs" does not conform
1149
            # and sends an error instead. Convert it here:
1150
            if "cannot estimate fee" in e.message:
×
1151
                res = -1
×
1152
            else:
1153
                raise
×
1154
        except aiorpcx.jsonrpc.RPCError as e:
×
1155
            # The protocol spec says the server itself should already have returned -1
1156
            # if it cannot provide an estimate. "Fulcrum" often sends:
1157
            #   aiorpcx.jsonrpc.RPCError: (-32603, 'internal error: bitcoind request timed out')
1158
            if e.code == JSONRPC.INTERNAL_ERROR:
×
1159
                res = -1
×
1160
            else:
1161
                raise
×
1162
        # check response
1163
        if res != -1:
×
1164
            assert_non_negative_int_or_float(res)
×
1165
            res = int(res * bitcoin.COIN)
×
1166
        return res
×
1167

1168

1169
def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
5✔
1170
    chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
5✔
1171
    if chain_bad:
5✔
1172
        raise Exception('bad_header must not check!')
×
1173

1174

1175
def check_cert(host, cert):
5✔
1176
    try:
×
1177
        b = pem.dePem(cert, 'CERTIFICATE')
×
1178
        x = x509.X509(b)
×
1179
    except Exception:
×
1180
        traceback.print_exc(file=sys.stdout)
×
1181
        return
×
1182

1183
    try:
×
1184
        x.check_date()
×
1185
        expired = False
×
1186
    except Exception:
×
1187
        expired = True
×
1188

1189
    m = "host: %s\n"%host
×
1190
    m += "has_expired: %s\n"% expired
×
1191
    util.print_msg(m)
×
1192

1193

1194
# Used by tests
1195
def _match_hostname(name, val):
5✔
1196
    if val == name:
×
1197
        return True
×
1198

1199
    return val.startswith('*.') and name.endswith(val[1:])
×
1200

1201

1202
def test_certificates():
5✔
1203
    from .simple_config import SimpleConfig
×
1204
    config = SimpleConfig()
×
1205
    mydir = os.path.join(config.path, "certs")
×
1206
    certs = os.listdir(mydir)
×
1207
    for c in certs:
×
1208
        p = os.path.join(mydir,c)
×
1209
        with open(p, encoding='utf-8') as f:
×
1210
            cert = f.read()
×
1211
        check_cert(c, cert)
×
1212

1213
if __name__ == "__main__":
5✔
1214
    test_certificates()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc