• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5707590002278400

25 Apr 2025 10:46AM UTC coverage: 60.343% (+0.05%) from 60.296%
5707590002278400

Pull #9751

CirrusCI

ecdsa
fixes
Pull Request #9751: Txbatcher without password in memory

42 of 67 new or added lines in 4 files covered. (62.69%)

1196 existing lines in 9 files now uncovered.

21659 of 35893 relevant lines covered (60.34%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.98
/electrum/network.py
1
# Electrum - Lightweight Bitcoin Client
2
# Copyright (c) 2011-2016 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import asyncio
5✔
24
import time
5✔
25
import queue
5✔
26
import os
5✔
27
import random
5✔
28
import re
5✔
29
from collections import defaultdict
5✔
30
import threading
5✔
31
import socket
5✔
32
import json
5✔
33
import sys
5✔
34
from typing import (
5✔
35
    NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any, TypeVar,
36
    Callable
37
)
38
import traceback
5✔
39
import concurrent
5✔
40
from concurrent import futures
5✔
41
import copy
5✔
42
import functools
5✔
43
from enum import IntEnum
5✔
44
from contextlib import nullcontext
5✔
45

46
import aiorpcx
5✔
47
from aiorpcx import ignore_after, NetAddress
5✔
48
from aiohttp import ClientResponse
5✔
49

50
from . import util
5✔
51
from .util import (log_exceptions, ignore_exceptions, OldTaskGroup,
5✔
52
                   bfh, make_aiohttp_session, send_exception_to_crash_reporter,
53
                   is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
54
                   error_text_str_to_safe_str, detect_tor_socks_proxy)
55
from .bitcoin import COIN, DummyAddress, DummyAddressUsedInTxException
5✔
56
from . import constants
5✔
57
from . import blockchain
5✔
58
from . import bitcoin
5✔
59
from . import dns_hacks
5✔
60
from .transaction import Transaction
5✔
61
from .blockchain import Blockchain, HEADER_SIZE
5✔
62
from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
5✔
63
                        RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
64
                        NetworkException, RequestCorrupted, ServerAddr)
65
from .version import PROTOCOL_VERSION
5✔
66
from .i18n import _
5✔
67
from .logging import get_logger, Logger
5✔
68
from .fee_policy import FeeHistogram, FeeTimeEstimates, FEE_ETA_TARGETS
5✔
69

70

71
if TYPE_CHECKING:
5✔
UNCOV
72
    from collections.abc import Coroutine
×
73

UNCOV
74
    from .channel_db import ChannelDB
×
UNCOV
75
    from .lnrouter import LNPathFinder
×
UNCOV
76
    from .lnworker import LNGossip
×
77
    #from .lnwatcher import WatchTower
UNCOV
78
    from .daemon import Daemon
×
UNCOV
79
    from .simple_config import SimpleConfig
×
80

81

82
_logger = get_logger(__name__)
5✔
83

84

85
NUM_TARGET_CONNECTED_SERVERS = 10
5✔
86
NUM_STICKY_SERVERS = 4
5✔
87
NUM_RECENT_SERVERS = 20
5✔
88

89
T = TypeVar('T')
5✔
90

91

92
class ConnectionState(IntEnum):
5✔
93
    DISCONNECTED  = 0
5✔
94
    CONNECTING    = 1
5✔
95
    CONNECTED     = 2
5✔
96

97

98
def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
5✔
99
    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
100
    Also validate values, such as IP addresses and ports.
101
    """
102
    servers = {}
×
103
    for item in result:
×
104
        host = item[1]
×
105
        out = {}
×
106
        version = None
×
107
        pruning_level = '-'
×
108
        if len(item) > 2:
×
109
            for v in item[2]:
×
110
                if re.match(r"[st]\d*", v):
×
111
                    protocol, port = v[0], v[1:]
×
112
                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
×
113
                    ServerAddr(host, port, protocol=protocol)  # check if raises
×
114
                    out[protocol] = port
×
115
                elif re.match("v(.?)+", v):
×
116
                    version = v[1:]
×
UNCOV
117
                elif re.match(r"p\d*", v):
×
UNCOV
118
                    pruning_level = v[1:]
×
UNCOV
119
                if pruning_level == '': pruning_level = '0'
×
120
        if out:
×
121
            out['pruning'] = pruning_level
×
122
            out['version'] = version
×
123
            servers[host] = out
×
124
    return servers
×
125

126

127
def filter_version(servers):
5✔
UNCOV
128
    def is_recent(version):
×
129
        try:
×
UNCOV
130
            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
×
UNCOV
131
        except Exception as e:
×
UNCOV
132
            return False
×
UNCOV
133
    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
×
134

135

136
def filter_noonion(servers):
5✔
137
    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
×
138

139

140
def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
5✔
141
    """Filters the hostmap for those implementing protocol."""
142
    if allowed_protocols is None:
×
UNCOV
143
        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
UNCOV
144
    eligible = []
×
UNCOV
145
    for host, portmap in hostmap.items():
×
UNCOV
146
        for protocol in allowed_protocols:
×
147
            port = portmap.get(protocol)
×
148
            if port:
×
149
                eligible.append(ServerAddr(host, port, protocol=protocol))
×
150
    return eligible
×
151

152

153
def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
5✔
154
                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
UNCOV
155
    if hostmap is None:
×
UNCOV
156
        hostmap = constants.net.DEFAULT_SERVERS
×
157
    if exclude_set is None:
×
158
        exclude_set = set()
×
159
    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
×
160
    eligible = list(servers - exclude_set)
×
UNCOV
161
    return random.choice(eligible) if eligible else None
×
162

163

164
def is_valid_port(ps: str):
5✔
165
    try:
×
166
        return 0 < int(ps) < 65535
×
167
    except ValueError:
×
168
        return False
×
169

170

171
def is_valid_host(ph: str):
5✔
UNCOV
172
    try:
×
UNCOV
173
        NetAddress(ph, '1')
×
UNCOV
174
    except ValueError:
×
UNCOV
175
        return False
×
UNCOV
176
    return True
×
177

178

179
class ProxySettings:
5✔
180
    MODES = ['socks4', 'socks5']
5✔
181

182
    probe_fut = None
5✔
183

184
    def __init__(self):
5✔
185
        self.enabled = False
5✔
186
        self.mode = 'socks5'
5✔
187
        self.host = ''
5✔
188
        self.port = ''
5✔
189
        self.user = None
5✔
190
        self.password = None
5✔
191

192
    def set_defaults(self):
5✔
193
        self.__init__()  # call __init__ for default values
×
194

195
    def serialize_proxy_cfgstr(self):
5✔
UNCOV
196
        return ':'.join([self.mode, self.host, self.port])
×
197

198
    def deserialize_proxy_cfgstr(self, s: Optional[str], user: str = None, password: str = None) -> None:
5✔
UNCOV
199
        if s is None or (isinstance(s, str) and s.lower() == 'none'):
×
200
            self.set_defaults()
×
201
            self.user = user
×
202
            self.password = password
×
203
            return
×
204

UNCOV
205
        if not isinstance(s, str):
×
206
            raise ValueError('proxy config not a string')
×
207

208
        args = s.split(':')
×
209
        if args[0] in ProxySettings.MODES:
×
210
            self.mode = args[0]
×
UNCOV
211
            args = args[1:]
×
212

213
        # detect migrate from old settings
214
        if len(args) == 4 and is_valid_host(args[0]) and is_valid_port(args[1]):  # host:port:user:pass,
×
215
            self.host = args[0]
×
UNCOV
216
            self.port = args[1]
×
217
            self.user = args[2]
×
218
            self.password = args[3]
×
219
        else:
UNCOV
220
            self.host = ':'.join(args[:-1])
×
221
            self.port = args[-1]
×
UNCOV
222
            self.user = user
×
UNCOV
223
            self.password = password
×
224

UNCOV
225
        if not is_valid_host(self.host) or not is_valid_port(self.port):
×
UNCOV
226
            self.enabled = False
×
227

228
    def to_dict(self):
5✔
UNCOV
229
        return {
×
230
            'enabled': self.enabled,
231
            'mode': self.mode,
232
            'host': self.host,
233
            'port': self.port,
234
            'user': self.user,
235
            'password': self.password
236
        }
237

238
    @classmethod
5✔
239
    def from_config(cls, config: 'SimpleConfig') -> 'ProxySettings':
5✔
UNCOV
240
        proxy = ProxySettings()
×
UNCOV
241
        proxy.deserialize_proxy_cfgstr(
×
242
            config.NETWORK_PROXY, config.NETWORK_PROXY_USER, config.NETWORK_PROXY_PASSWORD
243
        )
UNCOV
244
        proxy.enabled = config.NETWORK_PROXY_ENABLED
×
UNCOV
245
        return proxy
×
246

247
    @classmethod
5✔
248
    def from_dict(cls, d: dict) -> 'ProxySettings':
5✔
249
        proxy = ProxySettings()
5✔
250
        proxy.enabled = d.get('enabled', proxy.enabled)
5✔
251
        proxy.mode = d.get('mode', proxy.mode)
5✔
252
        proxy.host = d.get('host', proxy.host)
5✔
253
        proxy.port = d.get('port', proxy.port)
5✔
254
        proxy.user = d.get('user', proxy.user)
5✔
255
        proxy.password = d.get('password', proxy.password)
5✔
256
        return proxy
5✔
257

258
    @classmethod
5✔
259
    def probe_tor(cls, on_finished: Callable[[str | None, int | None], None]):
5✔
260
        async def detect_task(finished: Callable[[str | None, int | None], None]):
×
UNCOV
261
            try:
×
262
                net_addr = await detect_tor_socks_proxy()
×
UNCOV
263
                if net_addr is None:
×
264
                    finished('', -1)
×
265
                else:
266
                    host = net_addr[0]
×
UNCOV
267
                    port = net_addr[1]
×
UNCOV
268
                    finished(host, port)
×
269
            finally:
UNCOV
270
                cls.probe_fut = None
×
271

UNCOV
272
        if cls.probe_fut:  # one probe at a time
×
UNCOV
273
            return
×
UNCOV
274
        cls.probe_fut = asyncio.run_coroutine_threadsafe(detect_task(on_finished), util.get_asyncio_loop())
×
275

276
    def __eq__(self, other):
5✔
277
        return self.enabled == other.enabled \
5✔
278
            and self.mode == other.mode \
279
            and self.host == other.host \
280
            and self.port == other.port \
281
            and self.user == other.user \
282
            and self.password == other.password
283

284
    def __str__(self):
5✔
UNCOV
285
        return f'{self.enabled=} {self.mode=} {self.host=} {self.port=} {self.user=}'
×
286

287

288
class NetworkParameters(NamedTuple):
5✔
289
    server: ServerAddr
5✔
290
    proxy: ProxySettings
5✔
291
    auto_connect: bool
5✔
292
    oneserver: bool = False
5✔
293

294

295
class BestEffortRequestFailed(NetworkException): pass
5✔
296

297

298
class TxBroadcastError(NetworkException):
5✔
299
    def get_message_for_gui(self):
5✔
UNCOV
300
        raise NotImplementedError()
×
301

302

303
class TxBroadcastHashMismatch(TxBroadcastError):
5✔
304
    def get_message_for_gui(self):
5✔
305
        return "{}\n{}\n\n{}" \
×
306
            .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
307
                    _("Consider trying to connect to a different server, or updating Electrum."),
308
                    str(self))
309

310

311
class TxBroadcastServerReturnedError(TxBroadcastError):
5✔
312
    def get_message_for_gui(self):
5✔
313
        return "{}\n{}\n\n{}" \
×
314
            .format(_("The server returned an error when broadcasting the transaction."),
315
                    _("Consider trying to connect to a different server, or updating Electrum."),
316
                    str(self))
317

318

319
class TxBroadcastUnknownError(TxBroadcastError):
5✔
320
    def get_message_for_gui(self):
5✔
UNCOV
321
        return "{}\n{}" \
×
322
            .format(_("Unknown error when broadcasting the transaction."),
323
                    _("Consider trying to connect to a different server, or updating Electrum."))
324

325

326
class UntrustedServerReturnedError(NetworkException):
5✔
327
    def __init__(self, *, original_exception):
5✔
UNCOV
328
        self.original_exception = original_exception
×
329

330
    def get_message_for_gui(self) -> str:
5✔
UNCOV
331
        return str(self)
×
332

333
    def get_untrusted_message(self) -> str:
5✔
UNCOV
334
        e = self.original_exception
×
UNCOV
335
        return (f"<UntrustedServerReturnedError "
×
336
                f"[DO NOT TRUST THIS MESSAGE] original_exception: {error_text_str_to_safe_str(repr(e))}>")
337

338
    def __str__(self):
5✔
339
        # We should not show the untrusted text from self.original_exception,
340
        # to avoid accidentally showing it in the GUI.
UNCOV
341
        return _("The server returned an error.")
×
342

343
    def __repr__(self):
5✔
344
        # We should not show the untrusted text from self.original_exception,
345
        # to avoid accidentally showing it in the GUI.
UNCOV
346
        return f"<UntrustedServerReturnedError {str(self)!r}>"
×
347

348

349
_INSTANCE = None
5✔
350

351

352
class Network(Logger, NetworkRetryManager[ServerAddr]):
5✔
353
    """The Network class manages a set of connections to remote electrum
354
    servers, each connected socket is handled by an Interface() object.
355
    """
356

357
    LOGGING_SHORTCUT = 'n'
5✔
358

359
    taskgroup: Optional[OldTaskGroup]
5✔
360
    interface: Optional[Interface]
5✔
361
    interfaces: Dict[ServerAddr, Interface]
5✔
362
    _connecting_ifaces: Set[ServerAddr]
5✔
363
    _closing_ifaces: Set[ServerAddr]
5✔
364
    default_server: ServerAddr
5✔
365
    _recent_servers: List[ServerAddr]
5✔
366

367
    channel_db: Optional['ChannelDB'] = None
5✔
368
    lngossip: Optional['LNGossip'] = None
5✔
369
    path_finder: Optional['LNPathFinder'] = None
5✔
370

371
    def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
5✔
372
        global _INSTANCE
UNCOV
373
        assert _INSTANCE is None, "Network is a singleton!"
×
UNCOV
374
        _INSTANCE = self
×
375

UNCOV
376
        Logger.__init__(self)
×
377
        NetworkRetryManager.__init__(
×
378
            self,
379
            max_retry_delay_normal=600,
380
            init_retry_delay_normal=15,
381
            max_retry_delay_urgent=10,
382
            init_retry_delay_urgent=1,
383
        )
384

385
        self.asyncio_loop = util.get_asyncio_loop()
×
386
        assert self.asyncio_loop.is_running(), "event loop not running"
×
387

388
        self.config = config
×
389
        self.daemon = daemon
×
390

391
        blockchain.read_blockchains(self.config)
×
UNCOV
392
        blockchain.init_headers_file_for_best_chain()
×
393
        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
×
394
        self._blockchain_preferred_block = self.config.BLOCKCHAIN_PREFERRED_BLOCK  # type: Dict[str, Any]
×
395
        if self._blockchain_preferred_block is None:
×
UNCOV
396
            self._set_preferred_chain(None)
×
397
        self._blockchain = blockchain.get_best_chain()
×
398

UNCOV
399
        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
400

401
        self.proxy = ProxySettings()
×
402
        self.is_proxy_tor = None  # type: Optional[bool]  # tri-state. None means unknown.
×
403
        self._init_parameters_from_config()
×
404

405
        self.taskgroup = None
×
406

407
        # locks
408
        self.restart_lock = asyncio.Lock()
×
409
        self.bhi_lock = asyncio.Lock()
×
410
        self.recent_servers_lock = threading.RLock()       # <- re-entrant
×
UNCOV
411
        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
×
412

413
        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
×
UNCOV
414
        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
×
415

416
        self.banner = ''
×
417
        self.donation_address = ''
×
UNCOV
418
        self.relay_fee = None  # type: Optional[int]
×
419

UNCOV
420
        dir_path = os.path.join(self.config.path, 'certs')
×
UNCOV
421
        util.make_dir(dir_path)
×
422

423
        # the main server we are currently communicating with
424
        self.interface = None
×
425
        self.default_server_changed_event = asyncio.Event()
×
426
        # Set of servers we have an ongoing connection with.
427
        # For any ServerAddr, at most one corresponding Interface object
428
        # can exist at any given time. Depending on the state of that Interface,
429
        # the ServerAddr can be found in one of the following sets.
430
        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
431
        self._connecting_ifaces = set()
×
432
        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
×
UNCOV
433
        self._closing_ifaces = set()
×
434

435
        # Dump network messages (all interfaces).  Set at runtime from the console.
436
        self.debug = False
×
437

UNCOV
438
        self._set_status(ConnectionState.DISCONNECTED)
×
UNCOV
439
        self._has_ever_managed_to_connect_to_server = False
×
440
        self._was_started = False
×
441

UNCOV
442
        self.mempool_fees = FeeHistogram()
×
443
        self.fee_estimates = FeeTimeEstimates()
×
UNCOV
444
        self.last_time_fee_estimates_requested = 0  # zero ensures immediate fees
×
445

446
    def has_internet_connection(self) -> bool:
5✔
447
        """Our guess whether the device has Internet-connectivity."""
448
        return self._has_ever_managed_to_connect_to_server
×
449

450
    def has_channel_db(self):
5✔
451
        return self.channel_db is not None
×
452

453
    def start_gossip(self):
5✔
454
        from . import lnrouter
×
455
        from . import channel_db
×
456
        from . import lnworker
×
UNCOV
457
        if not self.config.LIGHTNING_USE_GOSSIP:
×
UNCOV
458
            return
×
459
        if self.lngossip is None:
×
460
            self.channel_db = channel_db.ChannelDB(self)
×
461
            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
×
462
            self.channel_db.load_data()
×
463
            self.lngossip = lnworker.LNGossip(self.config)
×
464
            self.lngossip.start_network(self)
×
465

466
    async def stop_gossip(self, *, full_shutdown: bool = False):
5✔
UNCOV
467
        if self.lngossip:
×
UNCOV
468
            await self.lngossip.stop()
×
UNCOV
469
            self.lngossip = None
×
470
            self.channel_db.stop()
×
471
            if full_shutdown:
×
472
                await self.channel_db.stopped_event.wait()
×
473
            self.channel_db = None
×
UNCOV
474
            self.path_finder = None
×
475

476
    @classmethod
5✔
477
    def run_from_another_thread(cls, coro: 'Coroutine[Any, Any, T]', *, timeout=None) -> T:
5✔
UNCOV
478
        loop = util.get_asyncio_loop()
×
UNCOV
479
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
480
        fut = asyncio.run_coroutine_threadsafe(coro, loop)
×
UNCOV
481
        return fut.result(timeout)
×
482

483
    @staticmethod
5✔
484
    def get_instance() -> Optional["Network"]:
5✔
485
        """Return the global singleton network instance.
486
        Note that this can return None! If we are run with the --offline flag, there is no network.
487
        """
UNCOV
488
        return _INSTANCE
×
489

490
    def with_recent_servers_lock(func):
5✔
491
        def func_wrapper(self, *args, **kwargs):
5✔
492
            with self.recent_servers_lock:
×
493
                return func(self, *args, **kwargs)
×
494
        return func_wrapper
5✔
495

496
    def _read_recent_servers(self) -> List[ServerAddr]:
5✔
497
        if not self.config.path:
×
498
            return []
×
UNCOV
499
        path = os.path.join(self.config.path, "recent_servers")
×
UNCOV
500
        try:
×
UNCOV
501
            with open(path, "r", encoding='utf-8') as f:
×
502
                data = f.read()
×
503
                servers_list = json.loads(data)
×
504
            return [ServerAddr.from_str(s) for s in servers_list]
×
505
        except Exception:
×
506
            return []
×
507

508
    @with_recent_servers_lock
5✔
509
    def _save_recent_servers(self):
5✔
510
        if not self.config.path:
×
UNCOV
511
            return
×
UNCOV
512
        path = os.path.join(self.config.path, "recent_servers")
×
513
        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
×
514
        try:
×
515
            with open(path, "w", encoding='utf-8') as f:
×
516
                f.write(s)
×
517
        except Exception:
×
518
            pass
×
519

520
    async def _server_is_lagging(self) -> bool:
5✔
521
        sh = self.get_server_height()
×
UNCOV
522
        if not sh:
×
UNCOV
523
            self.logger.info('no height for main interface')
×
524
            return True
×
525
        lh = self.get_local_height()
×
UNCOV
526
        result = (lh - sh) > 1
×
UNCOV
527
        if result:
×
528
            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
×
529
        return result
×
530

531
    def _set_status(self, status):
5✔
532
        self.connection_status = status
×
UNCOV
533
        util.trigger_callback('status')
×
534

535
    def is_connected(self):
5✔
UNCOV
536
        interface = self.interface
×
UNCOV
537
        return interface is not None and interface.is_connected_and_ready()
×
538

539
    def is_connecting(self):
5✔
540
        return self.connection_status == ConnectionState.CONNECTING
×
541

542
    def get_connection_status_for_GUI(self):
5✔
543
        ConnectionStates = {
×
544
            ConnectionState.DISCONNECTED: _('Disconnected'),
545
            ConnectionState.CONNECTING: _('Connecting'),
546
            ConnectionState.CONNECTED: _('Connected'),
547
        }
548
        return ConnectionStates[self.connection_status]
×
549

550
    async def _request_server_info(self, interface: 'Interface'):
5✔
551
        await interface.ready
×
UNCOV
552
        session = interface.session
×
553

554
        async def get_banner():
×
555
            self.banner = await interface.get_server_banner()
×
556
            util.trigger_callback('banner', self.banner)
×
557
        async def get_donation_address():
×
UNCOV
558
            self.donation_address = await interface.get_donation_address()
×
559
        async def get_server_peers():
×
560
            server_peers = await session.send_request('server.peers.subscribe')
×
UNCOV
561
            random.shuffle(server_peers)
×
562
            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
×
563
            server_peers = server_peers[:max_accepted_peers]
×
564
            # note that 'parse_servers' also validates the data (which is untrusted input!)
565
            self.server_peers = parse_servers(server_peers)
×
566
            util.trigger_callback('servers', self.get_servers())
×
567
        async def get_relay_fee():
×
568
            self.relay_fee = await interface.get_relay_fee()
×
569

570
        async with OldTaskGroup() as group:
×
UNCOV
571
            await group.spawn(get_banner)
×
UNCOV
572
            await group.spawn(get_donation_address)
×
573
            await group.spawn(get_server_peers)
×
574
            await group.spawn(get_relay_fee)
×
575
            await group.spawn(self._request_fee_estimates(interface))
×
576

577
    async def _request_fee_estimates(self, interface):
5✔
UNCOV
578
        self.requested_fee_estimates()
×
UNCOV
579
        histogram = await interface.get_fee_histogram()
×
UNCOV
580
        self.mempool_fees.set_data(histogram)
×
UNCOV
581
        self.logger.info(f'fee_histogram {len(histogram)}')
×
UNCOV
582
        util.trigger_callback('fee_histogram', self.mempool_fees)
×
583

584
    def is_fee_estimates_update_required(self):
5✔
585
        """Checks time since last requested and updated fee estimates.
586
        Returns True if an update should be requested.
587
        """
UNCOV
588
        now = time.time()
×
UNCOV
589
        return now - self.last_time_fee_estimates_requested > 60
×
590

591
    def has_fee_etas(self):
5✔
UNCOV
592
        return self.fee_estimates.has_data()
×
593

594
    def has_fee_mempool(self) -> bool:
5✔
UNCOV
595
        return self.mempool_fees.has_data()
×
596

597
    def requested_fee_estimates(self):
5✔
UNCOV
598
        self.last_time_fee_estimates_requested = time.time()
×
599

600
    def get_parameters(self) -> NetworkParameters:
5✔
UNCOV
601
        return NetworkParameters(server=self.default_server,
×
602
                                 proxy=self.proxy,
603
                                 auto_connect=self.auto_connect,
604
                                 oneserver=self.oneserver)
605

606
    def _init_parameters_from_config(self) -> None:
5✔
UNCOV
607
        dns_hacks.configure_dns_resolver()
×
UNCOV
608
        self.auto_connect = self.config.NETWORK_AUTO_CONNECT
×
609
        self._set_default_server()
×
610
        self._set_proxy(ProxySettings.from_config(self.config))
×
UNCOV
611
        self._maybe_set_oneserver()
×
612

613
    def get_donation_address(self):
5✔
614
        if self.is_connected():
×
615
            return self.donation_address
×
616

617
    def get_interfaces(self) -> List[ServerAddr]:
5✔
618
        """The list of servers for the connected interfaces."""
619
        with self.interfaces_lock:
×
UNCOV
620
            return list(self.interfaces)
×
621

622
    def get_status(self):
5✔
623
        n = len(self.get_interfaces())
×
624
        return _("Connected to {0} nodes.").format(n) if n > 1 else _("Connected to {0} node.").format(n) if n == 1 else _("Not connected")
×
625

626
    def get_fee_estimates(self):
5✔
627
        from statistics import median
×
628
        if self.auto_connect:
×
629
            with self.interfaces_lock:
×
630
                out = {}
×
631
                for n in FEE_ETA_TARGETS[0:-1]:
×
UNCOV
632
                    try:
×
633
                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
×
634
                    except Exception:
×
635
                        continue
×
UNCOV
636
                return out
×
637
        else:
638
            if not self.interface:
×
639
                return {}
×
640
            return self.interface.fee_estimates_eta
×
641

642
    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
5✔
643
        if fee_est is None:
×
644
            fee_est = self.get_fee_estimates()
×
645
        for nblock_target, fee in fee_est.items():
×
UNCOV
646
            self.fee_estimates.set_data(nblock_target, fee)
×
UNCOV
647
        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
×
UNCOV
648
            self._prev_fee_est = copy.copy(fee_est)
×
UNCOV
649
            self.logger.info(f'fee_estimates {fee_est}')
×
UNCOV
650
        util.trigger_callback('fee', self.fee_estimates)
×
651

652

653
    @with_recent_servers_lock
5✔
654
    def get_servers(self):
5✔
655
        # note: order of sources when adding servers here is crucial!
656
        # don't let "server_peers" overwrite anything,
657
        # otherwise main server can eclipse the client
UNCOV
658
        out = dict()
×
659
        # add servers received from main interface
UNCOV
660
        server_peers = self.server_peers
×
661
        if server_peers:
×
662
            out.update(filter_version(server_peers.copy()))
×
663
        # hardcoded servers
664
        out.update(constants.net.DEFAULT_SERVERS)
×
665
        # add recent servers
666
        for server in self._recent_servers:
×
UNCOV
667
            port = str(server.port)
×
668
            if server.host in out:
×
669
                out[server.host].update({server.protocol: port})
×
670
            else:
671
                out[server.host] = {server.protocol: port}
×
672
        # add bookmarks
673
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
674
        for server_str in bookmarks:
×
675
            try:
×
676
                server = ServerAddr.from_str(server_str)
×
UNCOV
677
            except ValueError:
×
678
                continue
×
UNCOV
679
            port = str(server.port)
×
680
            if server.host in out:
×
681
                out[server.host].update({server.protocol: port})
×
682
            else:
UNCOV
683
                out[server.host] = {server.protocol: port}
×
684
        # potentially filter out some
685
        if self.config.NETWORK_NOONION:
×
686
            out = filter_noonion(out)
×
687
        return out
×
688

689
    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
5✔
UNCOV
690
        now = time.time()
×
UNCOV
691
        with self.interfaces_lock:
×
UNCOV
692
            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
×
693
        # First try from recent servers. (which are persisted)
694
        # As these are servers we successfully connected to recently, they are
695
        # most likely to work. This also makes servers "sticky".
696
        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
697
        #       however if they succeed, the eclipsing would persist. To try to balance this,
698
        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
699
        with self.recent_servers_lock:
×
700
            recent_servers = list(self._recent_servers)
×
701
        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
×
702
        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
×
703
            for server in recent_servers:
×
UNCOV
704
                if server in connected_servers:
×
705
                    continue
×
706
                if not self._can_retry_addr(server, now=now):
×
707
                    continue
×
708
                return server
×
709
        # try all servers we know about, pick one at random
710
        hostmap = self.get_servers()
×
711
        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
×
712
        random.shuffle(servers)
×
UNCOV
713
        for server in servers:
×
UNCOV
714
            if not self._can_retry_addr(server, now=now):
×
UNCOV
715
                continue
×
716
            return server
×
UNCOV
717
        return None
×
718

719
    def _set_default_server(self) -> None:
5✔
720
        # Server for addresses and transactions
721
        server = self.config.NETWORK_SERVER
×
722
        # Sanitize default server
723
        if server:
×
UNCOV
724
            try:
×
725
                self.default_server = ServerAddr.from_str(server)
×
726
            except Exception:
×
UNCOV
727
                self.logger.warning(f'failed to parse server-string ({server!r}); falling back to localhost:1:s.')
×
UNCOV
728
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
729
        else:
730
            self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
×
UNCOV
731
        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
×
732

733
    def _set_proxy(self, proxy: ProxySettings):
5✔
UNCOV
734
        if self.proxy == proxy:
×
UNCOV
735
            return
×
736

737
        self.logger.info(f'setting proxy {proxy}')
×
UNCOV
738
        self.proxy = proxy
×
739

740
        # reset is_proxy_tor to unknown, and re-detect it:
UNCOV
741
        self.is_proxy_tor = None
×
742
        self._detect_if_proxy_is_tor()
×
743

744
        util.trigger_callback('proxy_set', self.proxy)
×
745

746
    def _detect_if_proxy_is_tor(self) -> None:
5✔
747
        async def tor_probe_task(p):
×
748
            assert p is not None
×
749
            is_tor = await util.is_tor_socks_port(p.host, int(p.port))
×
UNCOV
750
            if self.proxy == p:  # is this the proxy we probed?
×
751
                if self.is_proxy_tor != is_tor:
×
752
                    self.logger.info(f'Proxy is {"" if is_tor else "not "}TOR')
×
753
                    self.is_proxy_tor = is_tor
×
UNCOV
754
                util.trigger_callback('tor_probed', is_tor)
×
755

UNCOV
756
        proxy = self.proxy
×
757
        if proxy and proxy.enabled and proxy.mode == 'socks5':
×
758
            asyncio.run_coroutine_threadsafe(tor_probe_task(proxy), self.asyncio_loop)
×
759

760
    @log_exceptions
5✔
761
    async def set_parameters(self, net_params: NetworkParameters):
5✔
762
        proxy = net_params.proxy
×
UNCOV
763
        proxy_str = proxy.serialize_proxy_cfgstr()
×
764
        proxy_enabled = proxy.enabled
×
765
        proxy_user = proxy.user
×
UNCOV
766
        proxy_pass = proxy.password
×
767
        server = net_params.server
×
768
        # sanitize parameters
769
        try:
×
770
            if proxy:
×
771
                # proxy_modes.index(proxy['mode']) + 1
UNCOV
772
                ProxySettings.MODES.index(proxy.mode) + 1
×
773
                # int(proxy['port'])
774
                int(proxy.port)
×
775
        except Exception:
×
776
            proxy.enabled = False
×
777
            # return
778
        self.config.NETWORK_AUTO_CONNECT = net_params.auto_connect
×
779
        self.config.NETWORK_ONESERVER = net_params.oneserver
×
UNCOV
780
        self.config.NETWORK_PROXY_ENABLED = proxy_enabled
×
781
        self.config.NETWORK_PROXY = proxy_str
×
UNCOV
782
        self.config.NETWORK_PROXY_USER = proxy_user
×
UNCOV
783
        self.config.NETWORK_PROXY_PASSWORD = proxy_pass
×
UNCOV
784
        self.config.NETWORK_SERVER = str(server)
×
785
        # abort if changes were not allowed by config
UNCOV
786
        if self.config.NETWORK_SERVER != str(server) \
×
787
                or self.config.NETWORK_PROXY_ENABLED != proxy_enabled \
788
                or self.config.NETWORK_PROXY != proxy_str \
789
                or self.config.NETWORK_PROXY_USER != proxy_user \
790
                or self.config.NETWORK_PROXY_PASSWORD != proxy_pass \
791
                or self.config.NETWORK_ONESERVER != net_params.oneserver:
792
            return
×
793

794
        proxy_changed = self.proxy != proxy
×
UNCOV
795
        oneserver_changed = self.oneserver != net_params.oneserver
×
796
        default_server_changed = self.default_server != server
×
797
        self._init_parameters_from_config()
×
UNCOV
798
        if not self._was_started:
×
799
            return
×
800

801
        async with self.restart_lock:
×
802
            if proxy_changed or oneserver_changed:
×
803
                # Restart the network
804
                await self.stop(full_shutdown=False)
×
805
                await self._start()
×
UNCOV
806
            elif default_server_changed:
×
UNCOV
807
                await self.switch_to_interface(server)
×
808
            else:
809
                await self.switch_lagging_interface()
×
810
        util.trigger_callback('network_updated')
×
811

812
    def _maybe_set_oneserver(self) -> None:
5✔
813
        oneserver = self.config.NETWORK_ONESERVER
×
814
        self.oneserver = oneserver
×
UNCOV
815
        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
×
816

817
    def is_server_bookmarked(self, server: ServerAddr) -> bool:
5✔
818
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
819
        return str(server) in bookmarks
×
820

821
    def set_server_bookmark(self, server: ServerAddr, *, add: bool) -> None:
5✔
822
        server_str = str(server)
×
UNCOV
823
        with self.config.lock:
×
824
            bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
825
            if add:
×
826
                if server_str not in bookmarks:
×
UNCOV
827
                    bookmarks.append(server_str)
×
828
            else:  # remove
UNCOV
829
                if server_str in bookmarks:
×
830
                    bookmarks.remove(server_str)
×
831
            self.config.NETWORK_BOOKMARKED_SERVERS = bookmarks
×
832

833
    async def _switch_to_random_interface(self):
5✔
834
        '''Switch to a random connected server other than the current one'''
UNCOV
835
        servers = self.get_interfaces()    # Those in connected state
×
UNCOV
836
        if self.default_server in servers:
×
UNCOV
837
            servers.remove(self.default_server)
×
838
        if servers:
×
UNCOV
839
            await self.switch_to_interface(random.choice(servers))
×
840

841
    async def switch_lagging_interface(self):
5✔
842
        """If auto_connect and lagging, switch interface (only within fork)."""
843
        if self.auto_connect and await self._server_is_lagging():
×
844
            # switch to one that has the correct header (not height)
845
            best_header = self.blockchain().header_at_tip()
×
UNCOV
846
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
UNCOV
847
            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
×
UNCOV
848
            if filtered:
×
849
                chosen_iface = random.choice(filtered)
×
850
                await self.switch_to_interface(chosen_iface.server)
×
851

852
    async def switch_unwanted_fork_interface(self) -> None:
5✔
853
        """If auto_connect, maybe switch to another fork/chain."""
UNCOV
854
        if not self.auto_connect or not self.interface:
×
855
            return
×
856
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
UNCOV
857
        pref_height = self._blockchain_preferred_block['height']
×
858
        pref_hash   = self._blockchain_preferred_block['hash']
×
859
        # shortcut for common case
860
        if pref_height == 0:
×
UNCOV
861
            return
×
862
        # maybe try switching chains; starting with most desirable first
863
        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
×
UNCOV
864
        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
×
865
        for rank, chain in enumerate(chains_to_try):
×
866
            # check if main interface is already on this fork
867
            if self.interface.blockchain == chain:
×
868
                return
×
869
            # switch to another random interface that is on this fork, if any
870
            filtered = [iface for iface in interfaces
×
871
                        if iface.blockchain == chain]
872
            if filtered:
×
UNCOV
873
                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
×
UNCOV
874
                chosen_iface = random.choice(filtered)
×
UNCOV
875
                await self.switch_to_interface(chosen_iface.server)
×
UNCOV
876
                return
×
UNCOV
877
        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
×
878

879
    async def switch_to_interface(self, server: ServerAddr):
5✔
880
        """Switch to server as our main interface. If no connection exists,
881
        queue interface to be started. The actual switch will
882
        happen when the interface becomes ready.
883
        """
UNCOV
884
        self.default_server = server
×
885
        old_interface = self.interface
×
UNCOV
886
        old_server = old_interface.server if old_interface else None
×
887

888
        # Stop any current interface in order to terminate subscriptions,
889
        # and to cancel tasks in interface.taskgroup.
890
        if old_server and old_server != server:
×
891
            # don't wait for old_interface to close as that might be slow:
892
            await self.taskgroup.spawn(self._close_interface(old_interface))
×
893

894
        if server not in self.interfaces:
×
895
            self.interface = None
×
896
            await self.taskgroup.spawn(self._run_new_interface(server))
×
897
            return
×
898

899
        i = self.interfaces[server]
×
900
        if old_interface != i:
×
901
            if not i.is_connected_and_ready():
×
902
                return
×
903
            self.logger.info(f"switching to {server}")
×
904
            blockchain_updated = i.blockchain != self.blockchain()
×
905
            self.interface = i
×
906
            try:
×
907
                await i.taskgroup.spawn(self._request_server_info(i))
×
908
            except RuntimeError as e:  # see #7677
×
909
                if len(e.args) >= 1 and e.args[0] == 'task group terminated':
×
910
                    self.logger.warning(f"tried to switch to {server} but interface.taskgroup is already dead.")
×
911
                    self.interface = None
×
912
                    return
×
913
                raise
×
914
            util.trigger_callback('default_server_changed')
×
915
            self.default_server_changed_event.set()
×
UNCOV
916
            self.default_server_changed_event.clear()
×
UNCOV
917
            self._set_status(ConnectionState.CONNECTED)
×
918
            util.trigger_callback('network_updated')
×
919
            if blockchain_updated:
×
920
                util.trigger_callback('blockchain_updated')
×
921

922
    async def _close_interface(self, interface: Optional[Interface]):
5✔
923
        if not interface:
×
924
            return
×
925
        if interface.server in self._closing_ifaces:
×
926
            return
×
927
        self._closing_ifaces.add(interface.server)
×
928
        with self.interfaces_lock:
×
UNCOV
929
            if self.interfaces.get(interface.server) == interface:
×
930
                self.interfaces.pop(interface.server)
×
931
        if interface == self.interface:
×
UNCOV
932
            self.interface = None
×
933
        try:
×
934
            # this can take some time if server/connection is slow:
UNCOV
935
            await interface.close()
×
UNCOV
936
            await interface.got_disconnected.wait()
×
937
        finally:
UNCOV
938
            self._closing_ifaces.discard(interface.server)
×
939

940
    @with_recent_servers_lock
5✔
941
    def _add_recent_server(self, server: ServerAddr) -> None:
5✔
942
        self._on_connection_successfully_established(server)
×
943
        # list is ordered
UNCOV
944
        if server in self._recent_servers:
×
UNCOV
945
            self._recent_servers.remove(server)
×
UNCOV
946
        self._recent_servers.insert(0, server)
×
UNCOV
947
        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
×
948
        self._save_recent_servers()
×
949

950
    async def connection_down(self, interface: Interface):
5✔
951
        '''A connection to server either went down, or was never made.
952
        We distinguish by whether it is in self.interfaces.'''
UNCOV
953
        if not interface: return
×
UNCOV
954
        if interface.server == self.default_server:
×
955
            self._set_status(ConnectionState.DISCONNECTED)
×
956
        await self._close_interface(interface)
×
957
        util.trigger_callback('network_updated')
×
958

959
    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
5✔
960
        if self.config.NETWORK_TIMEOUT:
×
961
            return self.config.NETWORK_TIMEOUT
×
UNCOV
962
        if self.oneserver and not self.auto_connect:
×
UNCOV
963
            return request_type.MOST_RELAXED
×
UNCOV
964
        if self.proxy and self.proxy.enabled:
×
UNCOV
965
            return request_type.RELAXED
×
966
        return request_type.NORMAL
×
967

968
    @ignore_exceptions  # do not kill outer taskgroup
5✔
969
    @log_exceptions
5✔
970
    async def _run_new_interface(self, server: ServerAddr):
5✔
971
        if (server in self.interfaces
×
972
                or server in self._connecting_ifaces
973
                or server in self._closing_ifaces):
974
            return
×
UNCOV
975
        self._connecting_ifaces.add(server)
×
976
        if server == self.default_server:
×
UNCOV
977
            self.logger.info(f"connecting to {server} as new interface")
×
978
            self._set_status(ConnectionState.CONNECTING)
×
979
        self._trying_addr_now(server)
×
980

981
        interface = Interface(network=self, server=server)
×
982
        # note: using longer timeouts here as DNS can sometimes be slow!
983
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
984
        try:
×
UNCOV
985
            await util.wait_for2(interface.ready, timeout)
×
986
        except BaseException as e:
×
987
            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
×
988
            await interface.close()
×
UNCOV
989
            return
×
990
        else:
UNCOV
991
            with self.interfaces_lock:
×
992
                assert server not in self.interfaces
×
993
                self.interfaces[server] = interface
×
994
        finally:
995
            self._connecting_ifaces.discard(server)
×
996

997
        if server == self.default_server:
×
UNCOV
998
            await self.switch_to_interface(server)
×
999

1000
        self._has_ever_managed_to_connect_to_server = True
×
UNCOV
1001
        self._add_recent_server(server)
×
UNCOV
1002
        util.trigger_callback('network_updated')
×
1003
        # When the proxy settings were set, the proxy (if any) might have been unreachable,
1004
        # resulting in a false-negative for Tor-detection. Given we just connected to a server, re-test now.
1005
        self._detect_if_proxy_is_tor()
×
1006

1007
    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
5✔
1008
        # main interface is exempt. this makes switching servers easier
1009
        if iface_to_check.is_main_server():
×
1010
            return True
×
1011
        if not iface_to_check.bucket_based_on_ipaddress():
×
1012
            return True
×
1013
        # bucket connected interfaces
1014
        with self.interfaces_lock:
×
1015
            interfaces = list(self.interfaces.values())
×
UNCOV
1016
        if iface_to_check in interfaces:
×
1017
            interfaces.remove(iface_to_check)
×
1018
        buckets = defaultdict(list)
×
UNCOV
1019
        for iface in interfaces:
×
1020
            buckets[iface.bucket_based_on_ipaddress()].append(iface)
×
1021
        # check proposed server against buckets
UNCOV
1022
        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
×
1023
        if iface_to_check.is_tor():
×
1024
            # keep number of onion servers below half of all connected servers
1025
            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
×
1026
                return False
×
1027
        else:
UNCOV
1028
            bucket = iface_to_check.bucket_based_on_ipaddress()
×
UNCOV
1029
            if len(buckets[bucket]) > 0:
×
UNCOV
1030
                return False
×
1031
        return True
×
1032

1033
    def best_effort_reliable(func):
5✔
1034
        @functools.wraps(func)
5✔
1035
        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
5✔
1036
            for i in range(10):
×
1037
                iface = self.interface
×
1038
                # retry until there is a main interface
UNCOV
1039
                if not iface:
×
1040
                    async with ignore_after(1):
×
1041
                        await self.default_server_changed_event.wait()
×
1042
                    continue  # try again
×
1043
                assert iface.ready.done(), "interface not ready yet"
×
1044
                # try actual request
1045
                try:
×
1046
                    async with OldTaskGroup(wait=any) as group:
×
1047
                        task = await group.spawn(func(self, *args, **kwargs))
×
1048
                        await group.spawn(iface.got_disconnected.wait())
×
UNCOV
1049
                except RequestTimedOut:
×
1050
                    await iface.close()
×
1051
                    await iface.got_disconnected.wait()
×
1052
                    continue  # try again
×
1053
                except RequestCorrupted as e:
×
1054
                    # TODO ban server?
1055
                    iface.logger.exception(f"RequestCorrupted: {e}")
×
UNCOV
1056
                    await iface.close()
×
1057
                    await iface.got_disconnected.wait()
×
UNCOV
1058
                    continue  # try again
×
UNCOV
1059
                if task.done() and not task.cancelled():
×
UNCOV
1060
                    return task.result()
×
1061
                # otherwise; try again
UNCOV
1062
            raise BestEffortRequestFailed('cannot establish a connection... gave up.')
×
1063
        return make_reliable_wrapper
5✔
1064

1065
    def catch_server_exceptions(func):
5✔
1066
        """Decorator that wraps server errors in UntrustedServerReturnedError,
1067
        to avoid showing untrusted arbitrary text to users.
1068
        """
1069
        @functools.wraps(func)
5✔
1070
        async def wrapper(self, *args, **kwargs):
5✔
1071
            try:
×
1072
                return await func(self, *args, **kwargs)
×
UNCOV
1073
            except aiorpcx.jsonrpc.CodeMessageError as e:
×
UNCOV
1074
                wrapped_exc = UntrustedServerReturnedError(original_exception=e)
×
1075
                # log (sanitized) untrusted error text now, to ease debugging
UNCOV
1076
                self.logger.debug(f"got error from server for {func.__qualname__}: {wrapped_exc.get_untrusted_message()!r}")
×
UNCOV
1077
                raise wrapped_exc from e
×
1078
        return wrapper
5✔
1079

1080
    @best_effort_reliable
5✔
1081
    @catch_server_exceptions
5✔
1082
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
UNCOV
1083
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1084
            raise RequestTimedOut()
×
1085
        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
×
1086

1087
    @best_effort_reliable
5✔
1088
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
5✔
1089
        """caller should handle TxBroadcastError"""
1090
        if self.interface is None:  # handled by best_effort_reliable
×
1091
            raise RequestTimedOut()
×
1092
        if timeout is None:
×
UNCOV
1093
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1094
        if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
×
1095
            raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
×
1096
        try:
×
1097
            out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
×
1098
            # note: both 'out' and exception messages are untrusted input from the server
1099
        except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
×
1100
            raise  # pass-through
×
1101
        except aiorpcx.jsonrpc.CodeMessageError as e:
×
1102
            self.logger.info(f"broadcast_transaction error [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
1103
            raise TxBroadcastServerReturnedError(self.sanitize_tx_broadcast_response(e.message)) from e
×
1104
        except BaseException as e:  # intentional BaseException for sanity!
×
UNCOV
1105
            self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}")
×
1106
            send_exception_to_crash_reporter(e)
×
UNCOV
1107
            raise TxBroadcastUnknownError() from e
×
UNCOV
1108
        if out != tx.txid():
×
1109
            self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
×
1110
                             f"{error_text_str_to_safe_str(out)} != {tx.txid()}")
1111
            raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
×
1112

1113
    async def try_broadcasting(self, tx, name) -> bool:
5✔
UNCOV
1114
        try:
×
1115
            await self.broadcast_transaction(tx)
×
1116
        except Exception as e:
×
UNCOV
1117
            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
×
UNCOV
1118
            return False
×
1119
        else:
UNCOV
1120
            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
×
UNCOV
1121
            return True
×
1122

1123
    @staticmethod
5✔
1124
    def sanitize_tx_broadcast_response(server_msg) -> str:
5✔
1125
        # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
1126
        # So, we use substring matching to grok the error message.
1127
        # server_msg is untrusted input so it should not be shown to the user. see #4968
UNCOV
1128
        server_msg = str(server_msg)
×
UNCOV
1129
        server_msg = server_msg.replace("\n", r"\n")
×
1130

1131
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
UNCOV
1132
        script_error_messages = {
×
1133
            r"Script evaluated without error but finished with a false/empty top stack element",
1134
            r"Script failed an OP_VERIFY operation",
1135
            r"Script failed an OP_EQUALVERIFY operation",
1136
            r"Script failed an OP_CHECKMULTISIGVERIFY operation",
1137
            r"Script failed an OP_CHECKSIGVERIFY operation",
1138
            r"Script failed an OP_NUMEQUALVERIFY operation",
1139
            r"Script is too big",
1140
            r"Push value size limit exceeded",
1141
            r"Operation limit exceeded",
1142
            r"Stack size limit exceeded",
1143
            r"Signature count negative or greater than pubkey count",
1144
            r"Pubkey count negative or limit exceeded",
1145
            r"Opcode missing or not understood",
1146
            r"Attempted to use a disabled opcode",
1147
            r"Operation not valid with the current stack size",
1148
            r"Operation not valid with the current altstack size",
1149
            r"OP_RETURN was encountered",
1150
            r"Invalid OP_IF construction",
1151
            r"Negative locktime",
1152
            r"Locktime requirement not satisfied",
1153
            r"Signature hash type missing or not understood",
1154
            r"Non-canonical DER signature",
1155
            r"Data push larger than necessary",
1156
            r"Only push operators allowed in signatures",
1157
            r"Non-canonical signature: S value is unnecessarily high",
1158
            r"Dummy CHECKMULTISIG argument must be zero",
1159
            r"OP_IF/NOTIF argument must be minimal",
1160
            r"Signature must be zero for failed CHECK(MULTI)SIG operation",
1161
            r"NOPx reserved for soft-fork upgrades",
1162
            r"Witness version reserved for soft-fork upgrades",
1163
            r"Taproot version reserved for soft-fork upgrades",
1164
            r"OP_SUCCESSx reserved for soft-fork upgrades",
1165
            r"Public key version reserved for soft-fork upgrades",
1166
            r"Public key is neither compressed or uncompressed",
1167
            r"Stack size must be exactly one after execution",
1168
            r"Extra items left on stack after execution",
1169
            r"Witness program has incorrect length",
1170
            r"Witness program was passed an empty witness",
1171
            r"Witness program hash mismatch",
1172
            r"Witness requires empty scriptSig",
1173
            r"Witness requires only-redeemscript scriptSig",
1174
            r"Witness provided for non-witness script",
1175
            r"Using non-compressed keys in segwit",
1176
            r"Invalid Schnorr signature size",
1177
            r"Invalid Schnorr signature hash type",
1178
            r"Invalid Schnorr signature",
1179
            r"Invalid Taproot control block size",
1180
            r"Too much signature validation relative to witness weight",
1181
            r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
1182
            r"OP_IF/NOTIF argument must be minimal in tapscript",
1183
            r"Using OP_CODESEPARATOR in non-witness script",
1184
            r"Signature is found in scriptCode",
1185
        }
UNCOV
1186
        for substring in script_error_messages:
×
UNCOV
1187
            if substring in server_msg:
×
1188
                return substring
×
1189
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
1190
        # grep "REJECT_"
1191
        # grep "TxValidationResult"
1192
        # should come after script_error.cpp (due to e.g. "non-mandatory-script-verify-flag")
UNCOV
1193
        validation_error_messages = {
×
1194
            r"coinbase": None,
1195
            r"tx-size-small": None,
1196
            r"non-final": None,
1197
            r"txn-already-in-mempool": None,
1198
            r"txn-mempool-conflict": None,
1199
            r"txn-already-known": None,
1200
            r"non-BIP68-final": None,
1201
            r"bad-txns-nonstandard-inputs": None,
1202
            r"bad-witness-nonstandard": None,
1203
            r"bad-txns-too-many-sigops": None,
1204
            r"mempool min fee not met":
1205
                ("mempool min fee not met\n" +
1206
                 _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1207
                   "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1208
                   "of transactions that all pay higher fees. Try to increase the fee.")),
1209
            r"min relay fee not met": None,
1210
            r"absurdly-high-fee": None,
1211
            r"max-fee-exceeded": None,
1212
            r"too-long-mempool-chain": None,
1213
            r"bad-txns-spends-conflicting-tx": None,
1214
            r"insufficient fee": ("insufficient fee\n" +
1215
                 _("Your transaction is trying to replace another one in the mempool but it "
1216
                   "does not meet the rules to do so. Try to increase the fee.")),
1217
            r"too many potential replacements": None,
1218
            r"replacement-adds-unconfirmed": None,
1219
            r"mempool full": None,
1220
            r"non-mandatory-script-verify-flag": None,
1221
            r"mandatory-script-verify-flag-failed": None,
1222
            r"Transaction check failed": None,
1223
        }
UNCOV
1224
        for substring in validation_error_messages:
×
UNCOV
1225
            if substring in server_msg:
×
UNCOV
1226
                msg = validation_error_messages[substring]
×
1227
                return msg if msg else substring
×
1228
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1229
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1230
        # grep "RPC_TRANSACTION"
1231
        # grep "RPC_DESERIALIZATION_ERROR"
UNCOV
1232
        rawtransaction_error_messages = {
×
1233
            r"Missing inputs": None,
1234
            r"Inputs missing or spent": None,
1235
            r"transaction already in block chain": None,
1236
            r"Transaction already in block chain": None,
1237
            r"TX decode failed": None,
1238
            r"Peer-to-peer functionality missing or disabled": None,
1239
            r"Transaction rejected by AcceptToMemoryPool": None,
1240
            r"AcceptToMemoryPool failed": None,
1241
            r"Fee exceeds maximum configured by user": None,
1242
        }
UNCOV
1243
        for substring in rawtransaction_error_messages:
×
UNCOV
1244
            if substring in server_msg:
×
UNCOV
1245
                msg = rawtransaction_error_messages[substring]
×
1246
                return msg if msg else substring
×
1247
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1248
        # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1249
        # grep "REJECT_"
1250
        # grep "TxValidationResult"
UNCOV
1251
        tx_verify_error_messages = {
×
1252
            r"bad-txns-vin-empty": None,
1253
            r"bad-txns-vout-empty": None,
1254
            r"bad-txns-oversize": None,
1255
            r"bad-txns-vout-negative": None,
1256
            r"bad-txns-vout-toolarge": None,
1257
            r"bad-txns-txouttotal-toolarge": None,
1258
            r"bad-txns-inputs-duplicate": None,
1259
            r"bad-cb-length": None,
1260
            r"bad-txns-prevout-null": None,
1261
            r"bad-txns-inputs-missingorspent":
1262
                ("bad-txns-inputs-missingorspent\n" +
1263
                 _("You might have a local transaction in your wallet that this transaction "
1264
                   "builds on top. You need to either broadcast or remove the local tx.")),
1265
            r"bad-txns-premature-spend-of-coinbase": None,
1266
            r"bad-txns-inputvalues-outofrange": None,
1267
            r"bad-txns-in-belowout": None,
1268
            r"bad-txns-fee-outofrange": None,
1269
        }
UNCOV
1270
        for substring in tx_verify_error_messages:
×
UNCOV
1271
            if substring in server_msg:
×
UNCOV
1272
                msg = tx_verify_error_messages[substring]
×
1273
                return msg if msg else substring
×
1274
        # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
1275
        # grep "reason ="
1276
        # should come after validation.cpp (due to "tx-size" vs "tx-size-small")
1277
        # should come after script_error.cpp (due to e.g. "version")
UNCOV
1278
        policy_error_messages = {
×
1279
            r"version": _("Transaction uses non-standard version."),
1280
            r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
1281
            r"scriptsig-size": None,
1282
            r"scriptsig-not-pushonly": None,
1283
            r"scriptpubkey":
1284
                ("scriptpubkey\n" +
1285
                 _("Some of the outputs pay to a non-standard script.")),
1286
            r"bare-multisig": None,
1287
            r"dust":
1288
                (_("Transaction could not be broadcast due to dust outputs.\n"
1289
                   "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
1290
                   "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
1291
            r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
1292
        }
1293
        for substring in policy_error_messages:
×
UNCOV
1294
            if substring in server_msg:
×
UNCOV
1295
                msg = policy_error_messages[substring]
×
UNCOV
1296
                return msg if msg else substring
×
1297
        # otherwise:
1298
        return _("Unknown error")
×
1299

1300
    @best_effort_reliable
5✔
1301
    @catch_server_exceptions
5✔
1302
    async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
5✔
UNCOV
1303
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1304
            raise RequestTimedOut()
×
1305
        return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
×
1306

1307
    @best_effort_reliable
5✔
1308
    @catch_server_exceptions
5✔
1309
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
UNCOV
1310
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1311
            raise RequestTimedOut()
×
1312
        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
×
1313

1314
    @best_effort_reliable
5✔
1315
    @catch_server_exceptions
5✔
1316
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
UNCOV
1317
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1318
            raise RequestTimedOut()
×
1319
        return await self.interface.get_history_for_scripthash(sh)
×
1320

1321
    @best_effort_reliable
5✔
1322
    @catch_server_exceptions
5✔
1323
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
UNCOV
1324
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1325
            raise RequestTimedOut()
×
1326
        return await self.interface.listunspent_for_scripthash(sh)
×
1327

1328
    @best_effort_reliable
5✔
1329
    @catch_server_exceptions
5✔
1330
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
UNCOV
1331
        if self.interface is None:  # handled by best_effort_reliable
×
UNCOV
1332
            raise RequestTimedOut()
×
1333
        return await self.interface.get_balance_for_scripthash(sh)
×
1334

1335
    @best_effort_reliable
5✔
1336
    @catch_server_exceptions
5✔
1337
    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
5✔
1338
        if self.interface is None:  # handled by best_effort_reliable
×
1339
            raise RequestTimedOut()
×
1340
        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
×
1341

1342
    def blockchain(self) -> Blockchain:
5✔
UNCOV
1343
        interface = self.interface
×
1344
        if interface and interface.blockchain is not None:
×
1345
            self._blockchain = interface.blockchain
×
1346
        return self._blockchain
×
1347

1348
    def get_blockchains(self):
5✔
1349
        out = {}  # blockchain_id -> list(interfaces)
×
1350
        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
×
1351
        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
×
UNCOV
1352
        for chain_id, bc in blockchain_items:
×
UNCOV
1353
            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
×
1354
            if r:
×
1355
                out[chain_id] = r
×
1356
        return out
×
1357

1358
    def _set_preferred_chain(self, chain: Optional[Blockchain]):
5✔
1359
        if chain:
×
1360
            height = chain.get_max_forkpoint()
×
UNCOV
1361
            header_hash = chain.get_hash(height)
×
1362
        else:
UNCOV
1363
            height = 0
×
1364
            header_hash = constants.net.GENESIS
×
UNCOV
1365
        self._blockchain_preferred_block = {
×
1366
            'height': height,
1367
            'hash': header_hash,
1368
        }
1369
        self.config.BLOCKCHAIN_PREFERRED_BLOCK = self._blockchain_preferred_block
×
1370

1371
    async def follow_chain_given_id(self, chain_id: str) -> None:
5✔
1372
        bc = blockchain.blockchains.get(chain_id)
×
1373
        if not bc:
×
1374
            raise Exception('blockchain {} not found'.format(chain_id))
×
1375
        self._set_preferred_chain(bc)
×
1376
        # select server on this chain
1377
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1378
        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
×
1379
        if len(interfaces_on_selected_chain) == 0: return
×
UNCOV
1380
        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
×
1381
        # switch to server (and save to config)
UNCOV
1382
        net_params = self.get_parameters()
×
1383
        net_params = net_params._replace(server=chosen_iface.server)
×
1384
        await self.set_parameters(net_params)
×
1385

1386
    async def follow_chain_given_server(self, server: ServerAddr) -> None:
5✔
1387
        # note that server_str should correspond to a connected interface
1388
        iface = self.interfaces.get(server)
×
1389
        if iface is None:
×
1390
            return
×
UNCOV
1391
        self._set_preferred_chain(iface.blockchain)
×
1392
        # switch to server (and save to config)
UNCOV
1393
        net_params = self.get_parameters()
×
1394
        net_params = net_params._replace(server=server)
×
1395
        await self.set_parameters(net_params)
×
1396

1397
    def get_server_height(self) -> int:
5✔
1398
        """Length of header chain, as claimed by main interface."""
UNCOV
1399
        interface = self.interface
×
UNCOV
1400
        return interface.tip if interface else 0
×
1401

1402
    def get_local_height(self) -> int:
5✔
1403
        """Length of header chain, POW-verified.
1404
        In case of a chain split, this is for the branch the main interface is on,
1405
        but it is the tip of that branch (even if main interface is behind).
1406
        """
UNCOV
1407
        return self.blockchain().height()
×
1408

1409
    def export_checkpoints(self, path):
5✔
1410
        """Run manually to generate blockchain checkpoints.
1411
        Kept for console use only.
1412
        """
1413
        cp = self.blockchain().get_checkpoints()
×
1414
        with open(path, 'w', encoding='utf-8') as f:
×
1415
            f.write(json.dumps(cp, indent=4))
×
1416

1417
    async def _start(self):
5✔
1418
        assert not self.taskgroup
×
1419
        self.taskgroup = taskgroup = OldTaskGroup()
×
1420
        assert not self.interface and not self.interfaces
×
1421
        assert not self._connecting_ifaces
×
UNCOV
1422
        assert not self._closing_ifaces
×
1423
        self.logger.info('starting network')
×
1424
        self._clear_addr_retry_times()
×
1425
        self._init_parameters_from_config()
×
UNCOV
1426
        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
×
1427

1428
        async def main():
×
1429
            self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1430
            try:
×
1431
                # note: if a task finishes with CancelledError, that
1432
                # will NOT raise, and the group will keep the other tasks running
UNCOV
1433
                async with taskgroup as group:
×
1434
                    await group.spawn(self._maintain_sessions())
×
1435
                    [await group.spawn(job) for job in self._jobs]
×
UNCOV
1436
            except Exception as e:
×
1437
                self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
×
1438
            finally:
UNCOV
1439
                self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
UNCOV
1440
        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
×
1441

UNCOV
1442
        util.trigger_callback('network_updated')
×
1443

1444
    def start(self, jobs: Iterable = None):
5✔
1445
        """Schedule starting the network, along with the given job co-routines.
1446

1447
        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1448
        setting changes.
1449
        """
UNCOV
1450
        self._was_started = True
×
1451
        self._jobs = jobs or []
×
1452
        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
×
1453

1454
    @log_exceptions
5✔
1455
    async def stop(self, *, full_shutdown: bool = True):
5✔
UNCOV
1456
        if not self._was_started:
×
1457
            self.logger.info("not stopping network as it was never started")
×
1458
            return
×
1459
        self.logger.info("stopping network")
×
1460
        # timeout: if full_shutdown, it is up to the caller to time us out,
1461
        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1462
        async with (nullcontext() if full_shutdown else ignore_after(1)):
×
1463
            async with OldTaskGroup() as group:
×
1464
                await group.spawn(self.taskgroup.cancel_remaining())
×
1465
                if full_shutdown:
×
1466
                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
×
1467
        self.taskgroup = None
×
1468
        self.interface = None
×
UNCOV
1469
        self.interfaces = {}
×
UNCOV
1470
        self._connecting_ifaces.clear()
×
1471
        self._closing_ifaces.clear()
×
1472
        if not full_shutdown:
×
UNCOV
1473
            util.trigger_callback('network_updated')
×
1474

1475
    async def _ensure_there_is_a_main_interface(self):
5✔
UNCOV
1476
        if self.interface:
×
1477
            return
×
1478
        # if auto_connect is set, try a different server
1479
        if self.auto_connect and not self.is_connecting():
×
UNCOV
1480
            await self._switch_to_random_interface()
×
1481
        # if auto_connect is not set, or still no main interface, retry current
1482
        if not self.interface and not self.is_connecting():
×
1483
            if self._can_retry_addr(self.default_server, urgent=True):
×
1484
                await self.switch_to_interface(self.default_server)
×
1485

1486
    async def _maintain_sessions(self):
5✔
1487
        async def maybe_start_new_interfaces():
×
1488
            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
×
1489
            for i in range(self.num_server - num_existing_ifaces):
×
1490
                # FIXME this should try to honour "healthy spread of connected servers"
1491
                server = self._get_next_server_to_try()
×
1492
                if server:
×
1493
                    await self.taskgroup.spawn(self._run_new_interface(server))
×
1494
        async def maintain_healthy_spread_of_connected_servers():
×
UNCOV
1495
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1496
            random.shuffle(interfaces)
×
1497
            for iface in interfaces:
×
1498
                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
×
1499
                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
×
1500
                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1501
                    await self._close_interface(iface)
×
UNCOV
1502
        async def maintain_main_interface():
×
1503
            await self._ensure_there_is_a_main_interface()
×
1504
            if self.is_connected():
×
1505
                if self.is_fee_estimates_update_required():
×
1506
                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
×
1507

UNCOV
1508
        while True:
×
UNCOV
1509
            await maybe_start_new_interfaces()
×
UNCOV
1510
            await maintain_healthy_spread_of_connected_servers()
×
UNCOV
1511
            await maintain_main_interface()
×
UNCOV
1512
            await asyncio.sleep(0.1)
×
1513

1514
    @classmethod
5✔
1515
    async def async_send_http_on_proxy(
5✔
1516
            cls, method: str, url: str, *,
1517
            params: dict = None,
1518
            body: bytes = None,
1519
            json: dict = None,
1520
            headers=None,
1521
            on_finish=None,
1522
            timeout=None,
1523
    ):
1524
        async def default_on_finish(resp: ClientResponse):
×
1525
            resp.raise_for_status()
×
1526
            return await resp.text()
×
1527
        if headers is None:
×
1528
            headers = {}
×
1529
        if on_finish is None:
×
1530
            on_finish = default_on_finish
×
1531
        network = cls.get_instance()
×
1532
        proxy = network.proxy if network else None
×
1533
        async with make_aiohttp_session(proxy, timeout=timeout) as session:
×
1534
            if method == 'get':
×
1535
                async with session.get(url, params=params, headers=headers) as resp:
×
1536
                    return await on_finish(resp)
×
1537
            elif method == 'post':
×
1538
                assert body is not None or json is not None, 'body or json must be supplied if method is post'
×
1539
                if body is not None:
×
UNCOV
1540
                    async with session.post(url, data=body, headers=headers) as resp:
×
1541
                        return await on_finish(resp)
×
UNCOV
1542
                elif json is not None:
×
UNCOV
1543
                    async with session.post(url, json=json, headers=headers) as resp:
×
UNCOV
1544
                        return await on_finish(resp)
×
1545
            else:
1546
                raise Exception(f"unexpected {method=!r}")
×
1547

1548
    @classmethod
5✔
1549
    def send_http_on_proxy(cls, method, url, **kwargs):
5✔
UNCOV
1550
        loop = util.get_asyncio_loop()
×
UNCOV
1551
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
UNCOV
1552
        coro = asyncio.run_coroutine_threadsafe(cls.async_send_http_on_proxy(method, url, **kwargs), loop)
×
1553
        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1554
        return coro.result()
×
1555

1556
    # methods used in scripts
1557
    async def get_peers(self):
5✔
UNCOV
1558
        while not self.is_connected():
×
UNCOV
1559
            await asyncio.sleep(1)
×
UNCOV
1560
        session = self.interface.session
×
UNCOV
1561
        return parse_servers(await session.send_request('server.peers.subscribe'))
×
1562

1563
    async def send_multiple_requests(
5✔
1564
            self,
1565
            servers: Sequence[ServerAddr],
1566
            method: str,
1567
            params: Sequence,
1568
            *,
1569
            timeout: int = None,
1570
    ):
1571
        if timeout is None:
×
1572
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1573
        responses = dict()
×
1574
        async def get_response(server: ServerAddr):
×
1575
            interface = Interface(network=self, server=server)
×
1576
            try:
×
1577
                await util.wait_for2(interface.ready, timeout)
×
1578
            except BaseException as e:
×
1579
                await interface.close()
×
1580
                return
×
1581
            try:
×
1582
                res = await interface.session.send_request(method, params, timeout=10)
×
1583
            except Exception as e:
×
1584
                res = e
×
UNCOV
1585
            responses[interface.server] = res
×
UNCOV
1586
        async with OldTaskGroup() as group:
×
1587
            for server in servers:
×
1588
                await group.spawn(get_response(server))
×
1589
        return responses
×
1590

1591
    async def prune_offline_servers(self, hostmap):
5✔
UNCOV
1592
        peers = filter_protocol(hostmap, allowed_protocols=("t", "s",))
×
1593
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
UNCOV
1594
        replies = await self.send_multiple_requests(peers, 'blockchain.headers.subscribe', [], timeout=timeout)
×
UNCOV
1595
        servers_replied = {serveraddr.host for serveraddr in replies.keys()}
×
UNCOV
1596
        servers_dict = {k: v for k, v in hostmap.items()
×
1597
                        if k in servers_replied}
UNCOV
1598
        return servers_dict
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc