• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5766584628674560

20 Aug 2025 05:57PM UTC coverage: 61.507% (-0.03%) from 61.535%
5766584628674560

Pull #10159

CirrusCI

SomberNight
logging: add config.LOGS_MAX_TOTAL_SIZE_BYTES: to limit size on disk
Pull Request #10159: logging: add config.LOGS_MAX_TOTAL_SIZE_BYTES: to limit size on disk

7 of 31 new or added lines in 2 files covered. (22.58%)

125 existing lines in 47 files now uncovered.

22810 of 37085 relevant lines covered (61.51%)

3.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.27
/electrum/network.py
1
# Electrum - Lightweight Bitcoin Client
2
# Copyright (c) 2011-2016 Thomas Voegtlin
3
#
4
# Permission is hereby granted, free of charge, to any person
5
# obtaining a copy of this software and associated documentation files
6
# (the "Software"), to deal in the Software without restriction,
7
# including without limitation the rights to use, copy, modify, merge,
8
# publish, distribute, sublicense, and/or sell copies of the Software,
9
# and to permit persons to whom the Software is furnished to do so,
10
# subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be
13
# included in all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
import asyncio
5✔
24
import time
5✔
25
import os
5✔
26
import random
5✔
27
import re
5✔
28
from collections import defaultdict
5✔
29
import threading
5✔
30
import json
5✔
31
from typing import (
5✔
32
    NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any, TypeVar,
33
    Callable
34
)
35
import copy
5✔
36
import functools
5✔
37
from enum import IntEnum
5✔
38
from contextlib import nullcontext
5✔
39

40
import aiorpcx
5✔
41
from aiorpcx import ignore_after, NetAddress
5✔
42
from aiohttp import ClientResponse
5✔
43

44
from . import util
5✔
45
from .util import (
5✔
46
    log_exceptions, ignore_exceptions, OldTaskGroup, make_aiohttp_session, MyEncoder,
47
    NetworkRetryManager, error_text_str_to_safe_str, detect_tor_socks_proxy
48
)
49
from . import constants
5✔
50
from . import blockchain
5✔
51
from . import dns_hacks
5✔
52
from .transaction import Transaction
5✔
53
from .blockchain import Blockchain
5✔
54
from .interface import (
5✔
55
    Interface, PREFERRED_NETWORK_PROTOCOL, RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
56
    NetworkException, RequestCorrupted, ServerAddr, TxBroadcastError,
57
)
58
from .version import PROTOCOL_VERSION
5✔
59
from .i18n import _
5✔
60
from .logging import get_logger, Logger
5✔
61
from .fee_policy import FeeHistogram, FeeTimeEstimates, FEE_ETA_TARGETS
5✔
62

63

64
if TYPE_CHECKING:
2✔
UNCOV
65
    from collections.abc import Coroutine
UNCOV
66
    from .channel_db import ChannelDB
UNCOV
67
    from .lnrouter import LNPathFinder
UNCOV
68
    from .lnworker import LNGossip
UNCOV
69
    from .daemon import Daemon
UNCOV
70
    from .simple_config import SimpleConfig
71

72

73
_logger = get_logger(__name__)
5✔
74

75

76
NUM_TARGET_CONNECTED_SERVERS = 10
5✔
77
NUM_STICKY_SERVERS = 4
5✔
78
NUM_RECENT_SERVERS = 20
5✔
79

80
T = TypeVar('T')
5✔
81

82

83
class ConnectionState(IntEnum):
5✔
84
    DISCONNECTED  = 0
5✔
85
    CONNECTING    = 1
5✔
86
    CONNECTED     = 2
5✔
87

88

89
def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
5✔
90
    """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
91
    Also validate values, such as IP addresses and ports.
92
    """
93
    servers = {}
×
94
    for item in result:
×
95
        host = item[1]
×
96
        out = {}
×
97
        version = None
×
98
        pruning_level = '-'
×
99
        if len(item) > 2:
×
100
            for v in item[2]:
×
101
                if re.match(r"[st]\d*", v):
×
102
                    protocol, port = v[0], v[1:]
×
103
                    if port == '': port = constants.net.DEFAULT_PORTS[protocol]
×
104
                    ServerAddr(host, port, protocol=protocol)  # check if raises
×
105
                    out[protocol] = port
×
106
                elif re.match("v(.?)+", v):
×
107
                    version = v[1:]
×
108
                elif re.match(r"p\d*", v):
×
109
                    pruning_level = v[1:]
×
110
                if pruning_level == '': pruning_level = '0'
×
111
        if out:
×
112
            out['pruning'] = pruning_level
×
113
            out['version'] = version
×
114
            servers[host] = out
×
115
    return servers
×
116

117

118
def filter_version(servers):
5✔
119
    def is_recent(version):
×
120
        try:
×
121
            return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
×
122
        except Exception as e:
×
123
            return False
×
124
    return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
×
125

126

127
def filter_noonion(servers):
5✔
128
    return {k: v for k, v in servers.items() if not k.endswith('.onion')}
×
129

130

131
def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
5✔
132
    """Filters the hostmap for those implementing protocol."""
133
    if allowed_protocols is None:
×
134
        allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
135
    eligible = []
×
136
    for host, portmap in hostmap.items():
×
137
        for protocol in allowed_protocols:
×
138
            port = portmap.get(protocol)
×
139
            if port:
×
140
                eligible.append(ServerAddr(host, port, protocol=protocol))
×
141
    return eligible
×
142

143

144
def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
5✔
145
                       exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
146
    if hostmap is None:
×
147
        hostmap = constants.net.DEFAULT_SERVERS
×
148
    if exclude_set is None:
×
149
        exclude_set = set()
×
150
    servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
×
151
    eligible = list(servers - exclude_set)
×
152
    return random.choice(eligible) if eligible else None
×
153

154

155
def is_valid_port(ps: str):
5✔
156
    try:
×
157
        return 0 < int(ps) < 65535
×
158
    except ValueError:
×
159
        return False
×
160

161

162
def is_valid_host(ph: str):
5✔
163
    try:
×
164
        NetAddress(ph, '1')
×
165
    except ValueError:
×
166
        return False
×
167
    return True
×
168

169

170
class ProxySettings:
5✔
171
    MODES = ['socks4', 'socks5']
5✔
172

173
    probe_fut = None
5✔
174

175
    def __init__(self):
5✔
176
        self.enabled = False
5✔
177
        self.mode = 'socks5'
5✔
178
        self.host = ''
5✔
179
        self.port = ''
5✔
180
        self.user = None
5✔
181
        self.password = None
5✔
182

183
    def set_defaults(self):
5✔
184
        self.__init__()  # call __init__ for default values
×
185

186
    def serialize_proxy_cfgstr(self):
5✔
187
        return ':'.join([self.mode, self.host, self.port])
×
188

189
    def deserialize_proxy_cfgstr(self, s: Optional[str], user: str = None, password: str = None) -> None:
5✔
190
        if s is None or (isinstance(s, str) and s.lower() == 'none'):
×
191
            self.set_defaults()
×
192
            self.user = user
×
193
            self.password = password
×
194
            return
×
195

196
        if not isinstance(s, str):
×
197
            raise ValueError('proxy config not a string')
×
198

199
        args = s.split(':')
×
200
        if args[0] in ProxySettings.MODES:
×
201
            self.mode = args[0]
×
202
            args = args[1:]
×
203

204
        # detect migrate from old settings
205
        if len(args) == 4 and is_valid_host(args[0]) and is_valid_port(args[1]):  # host:port:user:pass,
×
206
            self.host = args[0]
×
207
            self.port = args[1]
×
208
            self.user = args[2]
×
209
            self.password = args[3]
×
210
        else:
211
            self.host = ':'.join(args[:-1])
×
212
            self.port = args[-1]
×
213
            self.user = user
×
214
            self.password = password
×
215

216
        if not is_valid_host(self.host) or not is_valid_port(self.port):
×
217
            self.enabled = False
×
218

219
    def to_dict(self):
5✔
220
        return {
×
221
            'enabled': self.enabled,
222
            'mode': self.mode,
223
            'host': self.host,
224
            'port': self.port,
225
            'user': self.user,
226
            'password': self.password
227
        }
228

229
    @classmethod
5✔
230
    def from_config(cls, config: 'SimpleConfig') -> 'ProxySettings':
5✔
231
        proxy = ProxySettings()
×
232
        proxy.deserialize_proxy_cfgstr(
×
233
            config.NETWORK_PROXY, config.NETWORK_PROXY_USER, config.NETWORK_PROXY_PASSWORD
234
        )
235
        proxy.enabled = config.NETWORK_PROXY_ENABLED
×
236
        return proxy
×
237

238
    @classmethod
5✔
239
    def from_dict(cls, d: dict) -> 'ProxySettings':
5✔
240
        proxy = ProxySettings()
5✔
241
        proxy.enabled = d.get('enabled', proxy.enabled)
5✔
242
        proxy.mode = d.get('mode', proxy.mode)
5✔
243
        proxy.host = d.get('host', proxy.host)
5✔
244
        proxy.port = d.get('port', proxy.port)
5✔
245
        proxy.user = d.get('user', proxy.user)
5✔
246
        proxy.password = d.get('password', proxy.password)
5✔
247
        return proxy
5✔
248

249
    @classmethod
5✔
250
    def probe_tor(cls, on_finished: Callable[[str | None, int | None], None]):
5✔
251
        async def detect_task(finished: Callable[[str | None, int | None], None]):
×
252
            try:
×
253
                net_addr = await detect_tor_socks_proxy()
×
254
                if net_addr is None:
×
255
                    finished('', -1)
×
256
                else:
257
                    host = net_addr[0]
×
258
                    port = net_addr[1]
×
259
                    finished(host, port)
×
260
            finally:
261
                cls.probe_fut = None
×
262

263
        if cls.probe_fut:  # one probe at a time
×
264
            return
×
265
        cls.probe_fut = asyncio.run_coroutine_threadsafe(detect_task(on_finished), util.get_asyncio_loop())
×
266

267
    def __eq__(self, other):
5✔
268
        return self.enabled == other.enabled \
5✔
269
            and self.mode == other.mode \
270
            and self.host == other.host \
271
            and self.port == other.port \
272
            and self.user == other.user \
273
            and self.password == other.password
274

275
    def __str__(self):
5✔
276
        return f'{self.enabled=} {self.mode=} {self.host=} {self.port=} {self.user=}'
×
277

278

279
class NetworkParameters(NamedTuple):
5✔
280
    server: ServerAddr
5✔
281
    proxy: ProxySettings
5✔
282
    auto_connect: bool
5✔
283
    oneserver: bool = False
5✔
284

285

286
class BestEffortRequestFailed(NetworkException): pass
5✔
287

288

289
class UntrustedServerReturnedError(NetworkException):
5✔
290
    def __init__(self, *, original_exception):
5✔
291
        self.original_exception = original_exception
×
292

293
    def get_message_for_gui(self) -> str:
5✔
294
        return str(self)
×
295

296
    def get_untrusted_message(self) -> str:
5✔
297
        e = self.original_exception
×
298
        return (f"<UntrustedServerReturnedError "
×
299
                f"[DO NOT TRUST THIS MESSAGE] original_exception: {error_text_str_to_safe_str(repr(e))}>")
300

301
    def __str__(self):
5✔
302
        # We should not show the untrusted text from self.original_exception,
303
        # to avoid accidentally showing it in the GUI.
304
        return _("The server returned an error.")
×
305

306
    def __repr__(self):
5✔
307
        # We should not show the untrusted text from self.original_exception,
308
        # to avoid accidentally showing it in the GUI.
309
        return f"<UntrustedServerReturnedError {str(self)!r}>"
×
310

311

312
_INSTANCE = None
5✔
313

314

315
class Network(Logger, NetworkRetryManager[ServerAddr]):
5✔
316
    """The Network class manages a set of connections to remote electrum
317
    servers, each connected socket is handled by an Interface() object.
318
    """
319

320
    taskgroup: Optional[OldTaskGroup]
5✔
321
    interface: Optional[Interface]
5✔
322
    interfaces: Dict[ServerAddr, Interface]
5✔
323
    _connecting_ifaces: Set[ServerAddr]
5✔
324
    _closing_ifaces: Set[ServerAddr]
5✔
325
    default_server: ServerAddr
5✔
326
    _recent_servers: List[ServerAddr]
5✔
327

328
    channel_db: Optional['ChannelDB'] = None
5✔
329
    lngossip: Optional['LNGossip'] = None
5✔
330
    path_finder: Optional['LNPathFinder'] = None
5✔
331

332
    def __init__(self, config: 'SimpleConfig', *, daemon: 'Daemon' = None):
5✔
333
        global _INSTANCE
334
        assert _INSTANCE is None, "Network is a singleton!"
×
335
        _INSTANCE = self
×
336

337
        Logger.__init__(self)
×
338
        NetworkRetryManager.__init__(
×
339
            self,
340
            max_retry_delay_normal=600,
341
            init_retry_delay_normal=15,
342
            max_retry_delay_urgent=10,
343
            init_retry_delay_urgent=1,
344
        )
345

346
        self.asyncio_loop = util.get_asyncio_loop()
×
347
        assert self.asyncio_loop.is_running(), "event loop not running"
×
348

349
        self.config = config
×
350
        self.daemon = daemon
×
351

352
        blockchain.read_blockchains(self.config)
×
353
        blockchain.init_headers_file_for_best_chain()
×
354
        self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
×
355
        self._blockchain_preferred_block = self.config.BLOCKCHAIN_PREFERRED_BLOCK  # type: Dict[str, Any]
×
356
        if self._blockchain_preferred_block is None:
×
357
            self._set_preferred_chain(None)
×
358
        self._blockchain = blockchain.get_best_chain()
×
359

360
        self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
×
361

362
        self.proxy = ProxySettings()
×
363
        self.is_proxy_tor = None  # type: Optional[bool]  # tri-state. None means unknown.
×
364
        self._init_parameters_from_config()
×
365

366
        self.taskgroup = None
×
367

368
        # locks
369
        self.restart_lock = asyncio.Lock()
×
370
        self.bhi_lock = asyncio.Lock()
×
371
        self.recent_servers_lock = threading.RLock()       # <- re-entrant
×
372
        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
×
373

374
        self.server_peers = {}  # returned by interface (servers that the main interface knows about)
×
375
        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
×
376

377
        self.banner = ''
×
378
        self.donation_address = ''
×
379
        self.relay_fee = None  # type: Optional[int]
×
380

381
        dir_path = os.path.join(self.config.path, 'certs')
×
382
        util.make_dir(dir_path)
×
383

384
        # the main server we are currently communicating with
385
        self.interface = None
×
386
        self.default_server_changed_event = asyncio.Event()
×
387
        # Set of servers we have an ongoing connection with.
388
        # For any ServerAddr, at most one corresponding Interface object
389
        # can exist at any given time. Depending on the state of that Interface,
390
        # the ServerAddr can be found in one of the following sets.
391
        # Note: during a transition, the ServerAddr can appear in two sets momentarily.
392
        self._connecting_ifaces = set()
×
393
        self.interfaces = {}  # these are the ifaces in "initialised and usable" state
×
394
        self._closing_ifaces = set()
×
395

396
        # Dump network messages (all interfaces).  Set at runtime from the console.
397
        self.debug = False
×
398

399
        self._set_status(ConnectionState.DISCONNECTED)
×
400
        self._has_ever_managed_to_connect_to_server = False
×
401
        self._was_started = False
×
402

403
        self.mempool_fees = FeeHistogram()
×
404
        self.fee_estimates = FeeTimeEstimates()
×
405
        self.last_time_fee_estimates_requested = 0  # zero ensures immediate fees
×
406

407
    def has_internet_connection(self) -> bool:
5✔
408
        """Our guess whether the device has Internet-connectivity."""
409
        return self._has_ever_managed_to_connect_to_server
×
410

411
    def has_channel_db(self):
5✔
412
        return self.channel_db is not None
×
413

414
    def start_gossip(self):
5✔
415
        from . import lnrouter
×
416
        from . import channel_db
×
417
        from . import lnworker
×
418
        if not self.config.LIGHTNING_USE_GOSSIP:
×
419
            return
×
420
        if self.lngossip is None:
×
421
            self.channel_db = channel_db.ChannelDB(self)
×
422
            self.path_finder = lnrouter.LNPathFinder(self.channel_db)
×
423
            self.channel_db.load_data()
×
424
            self.lngossip = lnworker.LNGossip(self.config)
×
425
            self.lngossip.start_network(self)
×
426

427
    async def stop_gossip(self, *, full_shutdown: bool = False):
5✔
428
        if self.lngossip:
×
429
            await self.lngossip.stop()
×
430
            self.lngossip = None
×
431
            self.channel_db.stop()
×
432
            if full_shutdown:
×
433
                await self.channel_db.stopped_event.wait()
×
434
            self.channel_db = None
×
435
            self.path_finder = None
×
436

437
    @classmethod
5✔
438
    def run_from_another_thread(cls, coro: 'Coroutine[Any, Any, T]', *, timeout=None) -> T:
5✔
439
        loop = util.get_asyncio_loop()
×
440
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
441
        fut = asyncio.run_coroutine_threadsafe(coro, loop)
×
442
        return fut.result(timeout)
×
443

444
    @staticmethod
5✔
445
    def get_instance() -> Optional["Network"]:
5✔
446
        """Return the global singleton network instance.
447
        Note that this can return None! If we are run with the --offline flag, there is no network.
448
        """
449
        return _INSTANCE
×
450

451
    def with_recent_servers_lock(func):
5✔
452
        def func_wrapper(self, *args, **kwargs):
5✔
453
            with self.recent_servers_lock:
×
454
                return func(self, *args, **kwargs)
×
455
        return func_wrapper
5✔
456

457
    def _read_recent_servers(self) -> List[ServerAddr]:
5✔
458
        if not self.config.path:
×
459
            return []
×
460
        path = os.path.join(self.config.path, "recent_servers")
×
461
        try:
×
462
            with open(path, "r", encoding='utf-8') as f:
×
463
                data = f.read()
×
464
                servers_list = json.loads(data)
×
465
            return [ServerAddr.from_str(s) for s in servers_list]
×
466
        except Exception:
×
467
            return []
×
468

469
    @with_recent_servers_lock
5✔
470
    def _save_recent_servers(self):
5✔
471
        if not self.config.path:
×
472
            return
×
473
        path = os.path.join(self.config.path, "recent_servers")
×
474
        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
×
475
        try:
×
476
            with open(path, "w", encoding='utf-8') as f:
×
477
                f.write(s)
×
478
        except Exception:
×
479
            pass
×
480

481
    async def _server_is_lagging(self) -> bool:
5✔
482
        sh = self.get_server_height()
×
483
        if not sh:
×
484
            self.logger.info('no height for main interface')
×
485
            return True
×
486
        lh = self.get_local_height()
×
487
        result = (lh - sh) > 1
×
488
        if result:
×
489
            self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
×
490
        return result
×
491

492
    def _set_status(self, status):
5✔
493
        self.connection_status = status
×
494
        util.trigger_callback('status')
×
495

496
    def is_connected(self):
5✔
497
        interface = self.interface
×
498
        return interface is not None and interface.is_connected_and_ready()
×
499

500
    def is_connecting(self):
5✔
501
        return self.connection_status == ConnectionState.CONNECTING
×
502

503
    def get_connection_status_for_GUI(self):
5✔
504
        ConnectionStates = {
×
505
            ConnectionState.DISCONNECTED: _('Disconnected'),
506
            ConnectionState.CONNECTING: _('Connecting'),
507
            ConnectionState.CONNECTED: _('Connected'),
508
        }
509
        return ConnectionStates[self.connection_status]
×
510

511
    async def _request_server_info(self, interface: 'Interface'):
5✔
512
        await interface.ready
×
513
        session = interface.session
×
514

515
        async def get_banner():
×
516
            self.banner = await interface.get_server_banner()
×
517
            util.trigger_callback('banner', self.banner)
×
518

519
        async def get_donation_address():
×
520
            self.donation_address = await interface.get_donation_address()
×
521

522
        async def get_server_peers():
×
523
            server_peers = await session.send_request('server.peers.subscribe')
×
524
            random.shuffle(server_peers)
×
525
            max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
×
526
            server_peers = server_peers[:max_accepted_peers]
×
527
            # note that 'parse_servers' also validates the data (which is untrusted input!)
528
            self.server_peers = parse_servers(server_peers)
×
529
            util.trigger_callback('servers', self.get_servers())
×
530

531
        async def get_relay_fee():
×
532
            self.relay_fee = await interface.get_relay_fee()
×
533

534
        async with OldTaskGroup() as group:
×
535
            await group.spawn(get_banner)
×
536
            await group.spawn(get_donation_address)
×
537
            await group.spawn(get_server_peers)
×
538
            await group.spawn(get_relay_fee)
×
539
            await group.spawn(self._request_fee_estimates(interface))
×
540

541
    async def _request_fee_estimates(self, interface):
5✔
542
        self.requested_fee_estimates()
×
543
        histogram = await interface.get_fee_histogram()
×
544
        self.mempool_fees.set_data(histogram)
×
545
        self.logger.info(f'fee_histogram {len(histogram)}')
×
546
        util.trigger_callback('fee_histogram', self.mempool_fees)
×
547

548
    def is_fee_estimates_update_required(self):
5✔
549
        """Checks time since last requested and updated fee estimates.
550
        Returns True if an update should be requested.
551
        """
552
        now = time.time()
×
553
        return now - self.last_time_fee_estimates_requested > 60
×
554

555
    def has_fee_etas(self):
5✔
556
        return self.fee_estimates.has_data()
×
557

558
    def has_fee_mempool(self) -> bool:
5✔
559
        return self.mempool_fees.has_data()
×
560

561
    def requested_fee_estimates(self):
5✔
562
        self.last_time_fee_estimates_requested = time.time()
×
563

564
    def get_parameters(self) -> NetworkParameters:
5✔
565
        return NetworkParameters(server=self.default_server,
×
566
                                 proxy=self.proxy,
567
                                 auto_connect=self.auto_connect,
568
                                 oneserver=self.oneserver)
569

570
    def _init_parameters_from_config(self) -> None:
5✔
571
        dns_hacks.configure_dns_resolver()
×
572
        self.auto_connect = self.config.NETWORK_AUTO_CONNECT
×
573
        if self.auto_connect and self.config.NETWORK_ONESERVER:
×
574
            # enabling both oneserver and auto_connect doesn't really make sense
575
            # assume oneserver is enabled for privacy reasons, disable auto_connect and assume server is unpredictable
576
            self.logger.warning(f'both "oneserver" and "auto_connect" options enabled, disabling "auto_connect" and resetting "server".')
×
577
            self.config.NETWORK_SERVER = ""  # let _set_default_server set harmless default (localhost)
×
578
            self.auto_connect = False
×
579

580
        self._set_default_server()
×
581
        self._set_proxy(ProxySettings.from_config(self.config))
×
582
        self._maybe_set_oneserver()
×
583

584
    def get_donation_address(self):
5✔
585
        if self.is_connected():
×
586
            return self.donation_address
×
587

588
    def get_interfaces(self) -> List[ServerAddr]:
5✔
589
        """The list of servers for the connected interfaces."""
590
        with self.interfaces_lock:
×
591
            return list(self.interfaces)
×
592

593
    def get_status(self):
5✔
594
        n = len(self.get_interfaces())
×
595
        return _("Connected to {0} nodes.").format(n) if n > 1 else _("Connected to {0} node.").format(n) if n == 1 else _("Not connected")
×
596

597
    def get_fee_estimates(self):
5✔
598
        from statistics import median
×
599
        if self.auto_connect:
×
600
            with self.interfaces_lock:
×
601
                out = {}
×
602
                for n in FEE_ETA_TARGETS[0:-1]:
×
603
                    try:
×
604
                        out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
×
605
                    except Exception:
×
606
                        continue
×
607
                return out
×
608
        else:
609
            if not self.interface:
×
610
                return {}
×
611
            return self.interface.fee_estimates_eta
×
612

613
    def update_fee_estimates(self, *, fee_est: Dict[int, int] = None):
5✔
614
        if fee_est is None:
×
615
            fee_est = self.get_fee_estimates()
×
616
        for nblock_target, fee in fee_est.items():
×
617
            self.fee_estimates.set_data(nblock_target, fee)
×
618
        if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != fee_est:
×
619
            self._prev_fee_est = copy.copy(fee_est)
×
620
            self.logger.info(f'fee_estimates {fee_est}')
×
621
        util.trigger_callback('fee', self.fee_estimates)
×
622

623

624
    @with_recent_servers_lock
5✔
625
    def get_servers(self):
5✔
626
        # note: order of sources when adding servers here is crucial!
627
        # don't let "server_peers" overwrite anything,
628
        # otherwise main server can eclipse the client
629
        out = dict()
×
630
        # add servers received from main interface
631
        server_peers = self.server_peers
×
632
        if server_peers:
×
633
            out.update(filter_version(server_peers.copy()))
×
634
        # hardcoded servers
635
        out.update(constants.net.DEFAULT_SERVERS)
×
636
        # add recent servers
637
        for server in self._recent_servers:
×
638
            port = str(server.port)
×
639
            if server.host in out:
×
640
                out[server.host].update({server.protocol: port})
×
641
            else:
642
                out[server.host] = {server.protocol: port}
×
643
        # add bookmarks
644
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
645
        for server_str in bookmarks:
×
646
            try:
×
647
                server = ServerAddr.from_str(server_str)
×
648
            except ValueError:
×
649
                continue
×
650
            port = str(server.port)
×
651
            if server.host in out:
×
652
                out[server.host].update({server.protocol: port})
×
653
            else:
654
                out[server.host] = {server.protocol: port}
×
655
        # potentially filter out some
656
        if self.config.NETWORK_NOONION:
×
657
            out = filter_noonion(out)
×
658
        return out
×
659

660
    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
5✔
661
        now = time.time()
×
662
        with self.interfaces_lock:
×
663
            connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
×
664
        # First try from recent servers. (which are persisted)
665
        # As these are servers we successfully connected to recently, they are
666
        # most likely to work. This also makes servers "sticky".
667
        # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
668
        #       however if they succeed, the eclipsing would persist. To try to balance this,
669
        #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
670
        with self.recent_servers_lock:
×
671
            recent_servers = list(self._recent_servers)
×
672
        recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
×
673
        if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
×
674
            for server in recent_servers:
×
675
                if server in connected_servers:
×
676
                    continue
×
677
                if not self._can_retry_addr(server, now=now):
×
678
                    continue
×
679
                return server
×
680
        # try all servers we know about, pick one at random
681
        hostmap = self.get_servers()
×
682
        servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
×
683
        random.shuffle(servers)
×
684
        for server in servers:
×
685
            if not self._can_retry_addr(server, now=now):
×
686
                continue
×
687
            return server
×
688
        return None
×
689

690
    def _set_default_server(self) -> None:
5✔
691
        # Server for addresses and transactions
692
        server = self.config.NETWORK_SERVER
×
693
        # Sanitize default server
694
        if server:
×
695
            try:
×
696
                self.default_server = ServerAddr.from_str(server)
×
697
            except Exception:
×
698
                self.logger.warning(f'failed to parse server-string ({server!r}); falling back to localhost:1:s.')
×
699
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
700
        else:
701
            # if oneserver is enabled but no server specified then don't pick a random server
702
            if self.config.NETWORK_ONESERVER:
×
703
                self.logger.warning(f'"oneserver" option enabled, but no "server" defined; falling back to localhost:1:s.')
×
704
                self.default_server = ServerAddr.from_str("localhost:1:s")
×
705
            else:
706
                self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
×
707
        assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
×
708

709
    def _set_proxy(self, proxy: ProxySettings):
5✔
710
        if self.proxy == proxy:
×
711
            return
×
712

713
        self.logger.info(f'setting proxy {proxy}')
×
714
        self.proxy = proxy
×
715

716
        # reset is_proxy_tor to unknown, and re-detect it:
717
        self.is_proxy_tor = None
×
718
        self._detect_if_proxy_is_tor()
×
719

720
        util.trigger_callback('proxy_set', self.proxy)
×
721

722
    def _detect_if_proxy_is_tor(self) -> None:
5✔
723
        async def tor_probe_task(p):
×
724
            assert p is not None
×
725
            is_tor = await util.is_tor_socks_port(p.host, int(p.port))
×
726
            if self.proxy == p:  # is this the proxy we probed?
×
727
                if self.is_proxy_tor != is_tor:
×
728
                    self.logger.info(f'Proxy is {"" if is_tor else "not "}TOR')
×
729
                    self.is_proxy_tor = is_tor
×
730
                util.trigger_callback('tor_probed', is_tor)
×
731

732
        proxy = self.proxy
×
733
        if proxy and proxy.enabled and proxy.mode == 'socks5':
×
734
            asyncio.run_coroutine_threadsafe(tor_probe_task(proxy), self.asyncio_loop)
×
735

736
    @log_exceptions
5✔
737
    async def set_parameters(self, net_params: NetworkParameters):
5✔
738
        proxy = net_params.proxy
×
739
        proxy_str = proxy.serialize_proxy_cfgstr()
×
740
        proxy_enabled = proxy.enabled
×
741
        proxy_user = proxy.user
×
742
        proxy_pass = proxy.password
×
743
        server = net_params.server
×
744
        # sanitize parameters
745
        try:
×
746
            if proxy:
×
747
                # proxy_modes.index(proxy['mode']) + 1
748
                ProxySettings.MODES.index(proxy.mode) + 1
×
749
                # int(proxy['port'])
750
                int(proxy.port)
×
751
        except Exception:
×
752
            proxy.enabled = False
×
753
            # return
754
        self.config.NETWORK_AUTO_CONNECT = net_params.auto_connect
×
755
        self.config.NETWORK_ONESERVER = net_params.oneserver
×
756
        self.config.NETWORK_PROXY_ENABLED = proxy_enabled
×
757
        self.config.NETWORK_PROXY = proxy_str
×
758
        self.config.NETWORK_PROXY_USER = proxy_user
×
759
        self.config.NETWORK_PROXY_PASSWORD = proxy_pass
×
760
        self.config.NETWORK_SERVER = str(server)
×
761
        # abort if changes were not allowed by config
762
        if self.config.NETWORK_SERVER != str(server) \
×
763
                or self.config.NETWORK_PROXY_ENABLED != proxy_enabled \
764
                or self.config.NETWORK_PROXY != proxy_str \
765
                or self.config.NETWORK_PROXY_USER != proxy_user \
766
                or self.config.NETWORK_PROXY_PASSWORD != proxy_pass \
767
                or self.config.NETWORK_ONESERVER != net_params.oneserver:
768
            return
×
769

770
        proxy_changed = self.proxy != proxy
×
771
        oneserver_changed = self.oneserver != net_params.oneserver
×
772
        default_server_changed = self.default_server != server
×
773
        self._init_parameters_from_config()
×
774
        if not self._was_started:
×
775
            return
×
776

777
        async with self.restart_lock:
×
778
            if proxy_changed or oneserver_changed:
×
779
                # Restart the network
780
                await self.stop(full_shutdown=False)
×
781
                await self._start()
×
782
            elif default_server_changed:
×
783
                await self.switch_to_interface(server)
×
784
            else:
785
                await self.switch_lagging_interface()
×
786
        util.trigger_callback('network_updated')
×
787

788
    def _maybe_set_oneserver(self) -> None:
5✔
789
        oneserver = self.config.NETWORK_ONESERVER
×
790
        self.oneserver = oneserver
×
791
        self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
×
792

793
    def is_server_bookmarked(self, server: ServerAddr) -> bool:
5✔
794
        bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
795
        return str(server) in bookmarks
×
796

797
    def set_server_bookmark(self, server: ServerAddr, *, add: bool) -> None:
5✔
798
        server_str = str(server)
×
799
        with self.config.lock:
×
800
            bookmarks = self.config.NETWORK_BOOKMARKED_SERVERS or []
×
801
            if add:
×
802
                if server_str not in bookmarks:
×
803
                    bookmarks.append(server_str)
×
804
            else:  # remove
805
                if server_str in bookmarks:
×
806
                    bookmarks.remove(server_str)
×
807
            self.config.NETWORK_BOOKMARKED_SERVERS = bookmarks
×
808

809
    async def _switch_to_random_interface(self):
5✔
810
        '''Switch to a random connected server other than the current one'''
811
        servers = self.get_interfaces()    # Those in connected state
×
812
        if self.default_server in servers:
×
813
            servers.remove(self.default_server)
×
814
        if servers:
×
815
            await self.switch_to_interface(random.choice(servers))
×
816

817
    async def switch_lagging_interface(self):
5✔
818
        """If auto_connect and lagging, switch interface (only within fork)."""
819
        if self.auto_connect and await self._server_is_lagging():
×
820
            # switch to one that has the correct header (not height)
821
            best_header = self.blockchain().header_at_tip()
×
822
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
823
            filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
×
824
            if filtered:
×
825
                chosen_iface = random.choice(filtered)
×
826
                await self.switch_to_interface(chosen_iface.server)
×
827

828
    async def switch_unwanted_fork_interface(self) -> None:
5✔
829
        """If auto_connect, maybe switch to another fork/chain."""
830
        if not self.auto_connect or not self.interface:
×
831
            return
×
832
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
833
        pref_height = self._blockchain_preferred_block['height']
×
834
        pref_hash   = self._blockchain_preferred_block['hash']
×
835
        # shortcut for common case
836
        if pref_height == 0:
×
837
            return
×
838
        # maybe try switching chains; starting with most desirable first
839
        matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
×
840
        chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
×
841
        for rank, chain in enumerate(chains_to_try):
×
842
            # check if main interface is already on this fork
843
            if self.interface.blockchain == chain:
×
844
                return
×
845
            # switch to another random interface that is on this fork, if any
846
            filtered = [iface for iface in interfaces
×
847
                        if iface.blockchain == chain]
848
            if filtered:
×
849
                self.logger.info(f"switching to (more) preferred fork (rank {rank})")
×
850
                chosen_iface = random.choice(filtered)
×
851
                await self.switch_to_interface(chosen_iface.server)
×
852
                return
×
853
        self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
×
854

855
    async def switch_to_interface(self, server: ServerAddr):
5✔
856
        """Switch to server as our main interface. If no connection exists,
857
        queue interface to be started. The actual switch will
858
        happen when the interface becomes ready.
859
        """
860
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
861
        self.default_server = server
×
862
        old_interface = self.interface
×
863
        old_server = old_interface.server if old_interface else None
×
864

865
        # Stop any current interface in order to terminate subscriptions,
866
        # and to cancel tasks in interface.taskgroup.
867
        if old_server and old_server != server:
×
868
            # don't wait for old_interface to close as that might be slow:
869
            await self.taskgroup.spawn(self._close_interface(old_interface))
×
870

871
        if server not in self.interfaces:
×
872
            self.interface = None
×
873
            await self.taskgroup.spawn(self._run_new_interface(server))
×
874
            return
×
875

876
        i = self.interfaces[server]
×
877
        if old_interface != i:
×
878
            if not i.is_connected_and_ready():
×
879
                return
×
880
            self.logger.info(f"switching to {server}")
×
881
            blockchain_updated = i.blockchain != self.blockchain()
×
882
            self.interface = i
×
883
            try:
×
884
                await i.taskgroup.spawn(self._request_server_info(i))
×
885
            except RuntimeError as e:  # see #7677
×
886
                if len(e.args) >= 1 and e.args[0] == 'task group terminated':
×
887
                    self.logger.warning(f"tried to switch to {server} but interface.taskgroup is already dead.")
×
888
                    self.interface = None
×
889
                    return
×
890
                raise
×
891
            util.trigger_callback('default_server_changed')
×
892
            self.default_server_changed_event.set()
×
893
            self.default_server_changed_event.clear()
×
894
            self._set_status(ConnectionState.CONNECTED)
×
895
            util.trigger_callback('network_updated')
×
896
            if blockchain_updated:
×
897
                util.trigger_callback('blockchain_updated')
×
898

899
    async def _close_interface(self, interface: Optional[Interface]):
5✔
900
        if not interface:
×
901
            return
×
902
        if interface.server in self._closing_ifaces:
×
903
            return
×
904
        self._closing_ifaces.add(interface.server)
×
905
        with self.interfaces_lock:
×
906
            if self.interfaces.get(interface.server) == interface:
×
907
                self.interfaces.pop(interface.server)
×
908
        if interface == self.interface:
×
909
            self.interface = None
×
910
        try:
×
911
            # this can take some time if server/connection is slow:
912
            await interface.close()
×
913
            await interface.got_disconnected.wait()
×
914
        finally:
915
            self._closing_ifaces.discard(interface.server)
×
916

917
    @with_recent_servers_lock
5✔
918
    def _add_recent_server(self, server: ServerAddr) -> None:
5✔
919
        self._on_connection_successfully_established(server)
×
920
        # list is ordered
921
        if server in self._recent_servers:
×
922
            self._recent_servers.remove(server)
×
923
        self._recent_servers.insert(0, server)
×
924
        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
×
925
        self._save_recent_servers()
×
926

927
    async def connection_down(self, interface: Interface):
5✔
928
        '''A connection to server either went down, or was never made.
929
        We distinguish by whether it is in self.interfaces.'''
930
        if not interface: return
×
931
        if interface.server == self.default_server:
×
932
            self._set_status(ConnectionState.DISCONNECTED)
×
933
        await self._close_interface(interface)
×
934
        util.trigger_callback('network_updated')
×
935

936
    def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
5✔
937
        if self.config.NETWORK_TIMEOUT:
×
938
            return self.config.NETWORK_TIMEOUT
×
939
        if self.oneserver and not self.auto_connect:
×
940
            return request_type.MOST_RELAXED
×
941
        if self.proxy and self.proxy.enabled:
×
942
            return request_type.RELAXED
×
943
        return request_type.NORMAL
×
944

945
    @ignore_exceptions  # do not kill outer taskgroup
5✔
946
    @log_exceptions
5✔
947
    async def _run_new_interface(self, server: ServerAddr):
5✔
948
        assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
949
        if (server in self.interfaces
×
950
                or server in self._connecting_ifaces
951
                or server in self._closing_ifaces):
952
            return
×
953
        self._connecting_ifaces.add(server)
×
954
        if server == self.default_server:
×
955
            self.logger.info(f"connecting to {server} as new interface")
×
956
            self._set_status(ConnectionState.CONNECTING)
×
957
        self._trying_addr_now(server)
×
958

959
        interface = Interface(network=self, server=server)
×
960
        # note: using longer timeouts here as DNS can sometimes be slow!
961
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
962
        try:
×
963
            await util.wait_for2(interface.ready, timeout)
×
964
        except BaseException as e:
×
965
            self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
×
966
            await interface.close()
×
967
            return
×
968
        else:
969
            with self.interfaces_lock:
×
970
                assert server not in self.interfaces
×
971
                self.interfaces[server] = interface
×
972
        finally:
973
            self._connecting_ifaces.discard(server)
×
974

975
        if server == self.default_server:
×
976
            await self.switch_to_interface(server)
×
977

978
        self._has_ever_managed_to_connect_to_server = True
×
979
        self._add_recent_server(server)
×
980
        util.trigger_callback('network_updated')
×
981
        # When the proxy settings were set, the proxy (if any) might have been unreachable,
982
        # resulting in a false-negative for Tor-detection. Given we just connected to a server, re-test now.
983
        self._detect_if_proxy_is_tor()
×
984

985
    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
5✔
986
        # main interface is exempt. this makes switching servers easier
987
        if iface_to_check.is_main_server():
×
988
            return True
×
989
        if not iface_to_check.bucket_based_on_ipaddress():
×
990
            return True
×
991
        # bucket connected interfaces
992
        with self.interfaces_lock:
×
993
            interfaces = list(self.interfaces.values())
×
994
        if iface_to_check in interfaces:
×
995
            interfaces.remove(iface_to_check)
×
996
        buckets = defaultdict(list)
×
997
        for iface in interfaces:
×
998
            buckets[iface.bucket_based_on_ipaddress()].append(iface)
×
999
        # check proposed server against buckets
1000
        onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
×
1001
        if iface_to_check.is_tor():
×
1002
            # keep number of onion servers below half of all connected servers
1003
            if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
×
1004
                return False
×
1005
        else:
1006
            bucket = iface_to_check.bucket_based_on_ipaddress()
×
1007
            if len(buckets[bucket]) > 0:
×
1008
                return False
×
1009
        return True
×
1010

1011
    def best_effort_reliable(func):
5✔
1012
        @functools.wraps(func)
5✔
1013
        async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
5✔
1014
            for i in range(10):
×
1015
                iface = self.interface
×
1016
                # retry until there is a main interface
1017
                if not iface:
×
1018
                    async with ignore_after(1):
×
1019
                        await self.default_server_changed_event.wait()
×
1020
                    continue  # try again
×
1021
                assert iface.ready.done(), "interface not ready yet"
×
1022
                # try actual request
1023
                try:
×
1024
                    async with OldTaskGroup(wait=any) as group:
×
1025
                        task = await group.spawn(func(self, *args, **kwargs))
×
1026
                        await group.spawn(iface.got_disconnected.wait())
×
1027
                except RequestTimedOut:
×
1028
                    await iface.close()
×
1029
                    await iface.got_disconnected.wait()
×
1030
                    continue  # try again
×
1031
                except RequestCorrupted as e:
×
1032
                    # TODO ban server?
1033
                    iface.logger.exception(f"RequestCorrupted: {e}")
×
1034
                    await iface.close()
×
1035
                    await iface.got_disconnected.wait()
×
1036
                    continue  # try again
×
1037
                if task.done() and not task.cancelled():
×
1038
                    return task.result()
×
1039
                # otherwise; try again
1040
            raise BestEffortRequestFailed('cannot establish a connection... gave up.')
×
1041
        return make_reliable_wrapper
5✔
1042

1043
    def catch_server_exceptions(func):
5✔
1044
        """Decorator that wraps server errors in UntrustedServerReturnedError,
1045
        to avoid showing untrusted arbitrary text to users.
1046
        """
1047
        @functools.wraps(func)
5✔
1048
        async def wrapper(self, *args, **kwargs):
5✔
1049
            try:
×
1050
                return await func(self, *args, **kwargs)
×
1051
            except aiorpcx.jsonrpc.CodeMessageError as e:
×
1052
                wrapped_exc = UntrustedServerReturnedError(original_exception=e)
×
1053
                # log (sanitized) untrusted error text now, to ease debugging
1054
                self.logger.debug(f"got error from server for {func.__qualname__}: {wrapped_exc.get_untrusted_message()!r}")
×
1055
                raise wrapped_exc from e
×
1056
        return wrapper
5✔
1057

1058
    @best_effort_reliable
5✔
1059
    @catch_server_exceptions
5✔
1060
    async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
5✔
1061
        if self.interface is None:  # handled by best_effort_reliable
×
1062
            raise RequestTimedOut()
×
1063
        return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
×
1064

1065
    @best_effort_reliable
5✔
1066
    async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
5✔
1067
        """caller should handle TxBroadcastError"""
1068
        if self.interface is None:  # handled by best_effort_reliable
×
1069
            raise RequestTimedOut()
×
1070
        await self.interface.broadcast_transaction(tx, timeout=timeout)
×
1071

1072
    async def try_broadcasting(self, tx, name) -> bool:
5✔
1073
        try:
×
1074
            await self.broadcast_transaction(tx)
×
1075
        except Exception as e:
×
1076
            self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
×
1077
            return False
×
1078
        else:
1079
            self.logger.info(f'success: broadcasting {name} {tx.txid()}')
×
1080
            return True
×
1081

1082
    @best_effort_reliable
5✔
1083
    @catch_server_exceptions
5✔
1084
    async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
5✔
1085
        if self.interface is None:  # handled by best_effort_reliable
×
1086
            raise RequestTimedOut()
×
1087
        return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
×
1088

1089
    @best_effort_reliable
5✔
1090
    @catch_server_exceptions
5✔
1091
    async def get_history_for_scripthash(self, sh: str) -> List[dict]:
5✔
1092
        if self.interface is None:  # handled by best_effort_reliable
×
1093
            raise RequestTimedOut()
×
1094
        return await self.interface.get_history_for_scripthash(sh)
×
1095

1096
    @best_effort_reliable
5✔
1097
    @catch_server_exceptions
5✔
1098
    async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
5✔
1099
        if self.interface is None:  # handled by best_effort_reliable
×
1100
            raise RequestTimedOut()
×
1101
        return await self.interface.listunspent_for_scripthash(sh)
×
1102

1103
    @best_effort_reliable
5✔
1104
    @catch_server_exceptions
5✔
1105
    async def get_balance_for_scripthash(self, sh: str) -> dict:
5✔
1106
        if self.interface is None:  # handled by best_effort_reliable
×
1107
            raise RequestTimedOut()
×
1108
        return await self.interface.get_balance_for_scripthash(sh)
×
1109

1110
    @best_effort_reliable
5✔
1111
    @catch_server_exceptions
5✔
1112
    async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
5✔
1113
        if self.interface is None:  # handled by best_effort_reliable
×
1114
            raise RequestTimedOut()
×
1115
        return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
×
1116

1117
    def blockchain(self) -> Blockchain:
5✔
1118
        interface = self.interface
×
1119
        if interface and interface.blockchain is not None:
×
1120
            self._blockchain = interface.blockchain
×
1121
        return self._blockchain
×
1122

1123
    def get_blockchains(self):
5✔
1124
        out = {}  # blockchain_id -> list(interfaces)
×
1125
        with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
×
1126
        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
×
1127
        for chain_id, bc in blockchain_items:
×
1128
            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
×
1129
            if r:
×
1130
                out[chain_id] = r
×
1131
        return out
×
1132

1133
    def _set_preferred_chain(self, chain: Optional[Blockchain]):
5✔
1134
        if chain:
×
1135
            height = chain.get_max_forkpoint()
×
1136
            header_hash = chain.get_hash(height)
×
1137
        else:
1138
            height = 0
×
1139
            header_hash = constants.net.GENESIS
×
1140
        self._blockchain_preferred_block = {
×
1141
            'height': height,
1142
            'hash': header_hash,
1143
        }
1144
        self.config.BLOCKCHAIN_PREFERRED_BLOCK = self._blockchain_preferred_block
×
1145

1146
    async def follow_chain_given_id(self, chain_id: str) -> None:
5✔
1147
        bc = blockchain.blockchains.get(chain_id)
×
1148
        if not bc:
×
1149
            raise Exception('blockchain {} not found'.format(chain_id))
×
1150
        self._set_preferred_chain(bc)
×
1151
        # select server on this chain
1152
        with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1153
        interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
×
1154
        if len(interfaces_on_selected_chain) == 0: return
×
1155
        chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
×
1156
        # switch to server (and save to config)
1157
        net_params = self.get_parameters()
×
1158
        net_params = net_params._replace(server=chosen_iface.server)
×
1159
        await self.set_parameters(net_params)
×
1160

1161
    async def follow_chain_given_server(self, server: ServerAddr) -> None:
5✔
1162
        # note that server_str should correspond to a connected interface
1163
        iface = self.interfaces.get(server)
×
1164
        if iface is None:
×
1165
            return
×
1166
        self._set_preferred_chain(iface.blockchain)
×
1167
        # switch to server (and save to config)
1168
        net_params = self.get_parameters()
×
1169
        net_params = net_params._replace(server=server)
×
1170
        await self.set_parameters(net_params)
×
1171

1172
    def get_server_height(self) -> int:
5✔
1173
        """Length of header chain, as claimed by main interface."""
1174
        interface = self.interface
×
1175
        return interface.tip if interface else 0
×
1176

1177
    def get_local_height(self) -> int:
5✔
1178
        """Length of header chain, POW-verified.
1179
        In case of a chain split, this is for the branch the main interface is on,
1180
        but it is the tip of that branch (even if main interface is behind).
1181
        """
1182
        return self.blockchain().height()
×
1183

1184
    def export_checkpoints(self, path):
5✔
1185
        """Run manually to generate blockchain checkpoints.
1186
        Kept for console use only.
1187
        """
1188
        cp = self.blockchain().get_checkpoints()
×
1189
        with open(path, 'w', encoding='utf-8') as f:
×
1190
            f.write(json.dumps(cp, indent=4))
×
1191

1192
    async def _start(self):
5✔
1193
        assert not self.taskgroup
×
1194
        self.taskgroup = taskgroup = OldTaskGroup()
×
1195
        assert not self.interface and not self.interfaces
×
1196
        assert not self._connecting_ifaces
×
1197
        assert not self._closing_ifaces
×
1198
        self.logger.info('starting network')
×
1199
        self._clear_addr_retry_times()
×
1200
        self._init_parameters_from_config()
×
1201
        await self.taskgroup.spawn(self._run_new_interface(self.default_server))
×
1202

1203
        async def main():
×
1204
            self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).")
×
1205
            try:
×
1206
                # note: if a task finishes with CancelledError, that
1207
                # will NOT raise, and the group will keep the other tasks running
1208
                async with taskgroup as group:
×
1209
                    await group.spawn(self._maintain_sessions())
×
1210
                    [await group.spawn(job) for job in self._jobs]
×
1211
            except Exception as e:
×
1212
                self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).")
×
1213
            finally:
1214
                self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).")
×
1215
        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
×
1216

1217
        util.trigger_callback('network_updated')
×
1218

1219
    def start(self, jobs: Iterable = None):
5✔
1220
        """Schedule starting the network, along with the given job co-routines.
1221

1222
        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1223
        setting changes.
1224
        """
1225
        self._was_started = True
×
1226
        self._jobs = jobs or []
×
1227
        asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
×
1228

1229
    @log_exceptions
5✔
1230
    async def stop(self, *, full_shutdown: bool = True):
5✔
1231
        if not self._was_started:
×
1232
            self.logger.info("not stopping network as it was never started")
×
1233
            return
×
1234
        self.logger.info("stopping network")
×
1235
        # timeout: if full_shutdown, it is up to the caller to time us out,
1236
        #          otherwise if e.g. restarting due to proxy changes, we time out fast
1237
        async with (nullcontext() if full_shutdown else ignore_after(1)):
×
1238
            async with OldTaskGroup() as group:
×
1239
                await group.spawn(self.taskgroup.cancel_remaining())
×
1240
                if full_shutdown:
×
1241
                    await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
×
1242
        self.taskgroup = None
×
1243
        self.interface = None
×
1244
        self.interfaces = {}
×
1245
        self._connecting_ifaces.clear()
×
1246
        self._closing_ifaces.clear()
×
1247
        if not full_shutdown:
×
1248
            util.trigger_callback('network_updated')
×
1249

1250
    async def _ensure_there_is_a_main_interface(self):
5✔
1251
        if self.interface:
×
1252
            return
×
1253
        # if auto_connect is set, try a different server
1254
        if self.auto_connect and not self.is_connecting():
×
1255
            await self._switch_to_random_interface()
×
1256
        # if auto_connect is not set, or still no main interface, retry current
1257
        if not self.interface and not self.is_connecting():
×
1258
            if self._can_retry_addr(self.default_server, urgent=True):
×
1259
                await self.switch_to_interface(self.default_server)
×
1260

1261
    async def _maintain_sessions(self):
5✔
1262
        async def maybe_start_new_interfaces():
×
1263
            num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
×
1264
            for i in range(self.num_server - num_existing_ifaces):
×
1265
                # FIXME this should try to honour "healthy spread of connected servers"
1266
                server = self._get_next_server_to_try()
×
1267
                if server:
×
1268
                    assert isinstance(server, ServerAddr), f"expected ServerAddr, got {type(server)}"
×
1269
                    await self.taskgroup.spawn(self._run_new_interface(server))
×
1270
        async def maintain_healthy_spread_of_connected_servers():
×
1271
            with self.interfaces_lock: interfaces = list(self.interfaces.values())
×
1272
            random.shuffle(interfaces)
×
1273
            for iface in interfaces:
×
1274
                if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
×
1275
                    self.logger.info(f"disconnecting from {iface.server}. too many connected "
×
1276
                                     f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1277
                    await self._close_interface(iface)
×
1278
        async def maintain_main_interface():
×
1279
            await self._ensure_there_is_a_main_interface()
×
1280
            if self.is_connected():
×
1281
                if self.is_fee_estimates_update_required():
×
1282
                    await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
×
1283

1284
        while True:
×
1285
            await maybe_start_new_interfaces()
×
1286
            await maintain_healthy_spread_of_connected_servers()
×
1287
            await maintain_main_interface()
×
1288
            await asyncio.sleep(0.1)
×
1289

1290
    @classmethod
5✔
1291
    async def async_send_http_on_proxy(
5✔
1292
            cls, method: str, url: str, *,
1293
            params: dict = None,
1294
            body: bytes = None,
1295
            json: dict = None,
1296
            headers=None,
1297
            on_finish=None,
1298
            timeout=None,
1299
    ):
1300
        async def default_on_finish(resp: ClientResponse):
×
1301
            resp.raise_for_status()
×
1302
            return await resp.text()
×
1303
        if headers is None:
×
1304
            headers = {}
×
1305
        if on_finish is None:
×
1306
            on_finish = default_on_finish
×
1307
        network = cls.get_instance()
×
1308
        proxy = network.proxy if network else None
×
1309
        async with make_aiohttp_session(proxy, timeout=timeout) as session:
×
1310
            if method == 'get':
×
1311
                async with session.get(url, params=params, headers=headers) as resp:
×
1312
                    return await on_finish(resp)
×
1313
            elif method == 'post':
×
1314
                assert body is not None or json is not None, 'body or json must be supplied if method is post'
×
1315
                if body is not None:
×
1316
                    async with session.post(url, data=body, headers=headers) as resp:
×
1317
                        return await on_finish(resp)
×
1318
                elif json is not None:
×
1319
                    async with session.post(url, json=json, headers=headers) as resp:
×
1320
                        return await on_finish(resp)
×
1321
            else:
1322
                raise Exception(f"unexpected {method=!r}")
×
1323

1324
    @classmethod
5✔
1325
    def send_http_on_proxy(cls, method, url, **kwargs):
5✔
1326
        loop = util.get_asyncio_loop()
×
1327
        assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
×
1328
        coro = asyncio.run_coroutine_threadsafe(cls.async_send_http_on_proxy(method, url, **kwargs), loop)
×
1329
        # note: _send_http_on_proxy has its own timeout, so no timeout here:
1330
        return coro.result()
×
1331

1332
    # methods used in scripts
1333
    async def get_peers(self):
5✔
1334
        while not self.is_connected():
×
1335
            await asyncio.sleep(1)
×
1336
        session = self.interface.session
×
1337
        return parse_servers(await session.send_request('server.peers.subscribe'))
×
1338

1339
    async def send_multiple_requests(
5✔
1340
            self,
1341
            servers: Sequence[ServerAddr],
1342
            method: str,
1343
            params: Sequence,
1344
            *,
1345
            timeout: int = None,
1346
    ):
1347
        if timeout is None:
×
1348
            timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
×
1349
        responses = dict()
×
1350
        async def get_response(server: ServerAddr):
×
1351
            interface = Interface(network=self, server=server)
×
1352
            try:
×
1353
                await util.wait_for2(interface.ready, timeout)
×
1354
            except BaseException as e:
×
1355
                await interface.close()
×
1356
                return
×
1357
            try:
×
1358
                res = await interface.session.send_request(method, params, timeout=10)
×
1359
            except Exception as e:
×
1360
                res = e
×
1361
            responses[interface.server] = res
×
1362
        async with OldTaskGroup() as group:
×
1363
            for server in servers:
×
1364
                await group.spawn(get_response(server))
×
1365
        return responses
×
1366

1367
    async def prune_offline_servers(self, hostmap):
5✔
1368
        peers = filter_protocol(hostmap, allowed_protocols=("t", "s",))
×
1369
        timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
×
1370
        replies = await self.send_multiple_requests(peers, 'blockchain.headers.subscribe', [], timeout=timeout)
×
1371
        servers_replied = {serveraddr.host for serveraddr in replies.keys()}
×
1372
        servers_dict = {k: v for k, v in hostmap.items()
×
1373
                        if k in servers_replied}
1374
        return servers_dict
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc