• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 4911558783926272

11 Apr 2025 05:06PM UTC coverage: 60.306% (+0.03%) from 60.278%
4911558783926272

Pull #9729

CirrusCI

ecdsa
recursive config file

move plugin variables into sub dictionaries of user config
Pull Request #9729: recursive config file

26 of 35 new or added lines in 2 files covered. (74.29%)

92 existing lines in 6 files now uncovered.

21586 of 35794 relevant lines covered (60.31%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.12
/electrum/lnworker.py
1
# Copyright (C) 2018 The Electrum developers
2
# Distributed under the MIT software license, see the accompanying
3
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
4

5
import asyncio
5✔
6
import os
5✔
7
from decimal import Decimal
5✔
8
import random
5✔
9
import time
5✔
10
from enum import IntEnum
5✔
11
from typing import (
5✔
12
    Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, NamedTuple, Mapping, Any, Iterable, AsyncGenerator,
13
    Callable, Awaitable
14
)
15
import threading
5✔
16
import socket
5✔
17
from functools import partial
5✔
18
from collections import defaultdict
5✔
19
import concurrent
5✔
20
from concurrent import futures
5✔
21
import urllib.parse
5✔
22
import itertools
5✔
23

24
import aiohttp
5✔
25
import dns.resolver
5✔
26
import dns.exception
5✔
27
from aiorpcx import run_in_thread, NetAddress, ignore_after
5✔
28

29
from .logging import Logger
5✔
30
from .i18n import _
5✔
31
from .json_db import stored_in
5✔
32
from .channel_db import UpdateStatus, ChannelDBNotLoaded, get_mychannel_info, get_mychannel_policy
5✔
33

34
from . import constants, util
5✔
35
from .util import (
5✔
36
    profiler, OldTaskGroup, ESocksProxy, NetworkRetryManager, JsonRPCClient, NotEnoughFunds, EventListener,
37
    event_listener, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions, ignore_exceptions,
38
    make_aiohttp_session, timestamp_to_datetime, random_shuffled_copy, is_private_netaddress,
39
    UnrelatedTransactionException, LightningHistoryItem
40
)
41
from .fee_policy import FeePolicy, FixedFeePolicy
5✔
42
from .fee_policy import FEERATE_FALLBACK_STATIC_FEE, FEE_LN_ETA_TARGET, FEE_LN_LOW_ETA_TARGET, FEERATE_PER_KW_MIN_RELAY_LIGHTNING
5✔
43
from .invoices import Invoice, PR_UNPAID, PR_PAID, PR_INFLIGHT, PR_FAILED, LN_EXPIRY_NEVER, BaseInvoice
5✔
44
from .bitcoin import COIN, opcodes, make_op_return, address_to_scripthash, DummyAddress
5✔
45
from .bip32 import BIP32Node
5✔
46
from .address_synchronizer import TX_HEIGHT_LOCAL, TX_TIMESTAMP_INF
5✔
47
from .transaction import (
5✔
48
    Transaction, get_script_type_from_output_script, PartialTxOutput, PartialTransaction, PartialTxInput
49
)
50
from .crypto import (
5✔
51
    sha256, chacha20_encrypt, chacha20_decrypt, pw_encode_with_version_and_mac, pw_decode_with_version_and_mac
52
)
53

54
from .onion_message import OnionMessageManager
5✔
55
from .lntransport import LNTransport, LNResponderTransport, LNTransportBase, LNPeerAddr, split_host_port, extract_nodeid, ConnStringFormatError
5✔
56
from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT
5✔
57
from .lnaddr import lnencode, LnAddr, lndecode
5✔
58
from .lnchannel import Channel, AbstractChannel, ChannelState, PeerState, HTLCWithStatus, ChannelBackup
5✔
59
from .lnrater import LNRater
5✔
60
from .lnutil import (
5✔
61
    get_compressed_pubkey_from_bech32, serialize_htlc_key, deserialize_htlc_key, PaymentFailure, generate_keypair,
62
    LnKeyFamily, LOCAL, REMOTE, MIN_FINAL_CLTV_DELTA_FOR_INVOICE, SENT, RECEIVED, HTLCOwner, UpdateAddHtlc, LnFeatures,
63
    ShortChannelID, HtlcLog, NoPathFound, InvalidGossipMsg, FeeBudgetExceeded, ImportedChannelBackupStorage,
64
    OnchainChannelBackupStorage, ln_compare_features, IncompatibleLightningFeatures, PaymentFeeBudget,
65
    NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE, GossipForwardingMessage, MIN_FUNDING_SAT
66
)
67
from .lnonion import decode_onion_error, OnionFailureCode, OnionRoutingFailure, OnionPacket
5✔
68
from .lnmsg import decode_msg
5✔
69
from .lnrouter import (
5✔
70
    RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_within_budget, NoChannelPolicy, LNPathInconsistent
71
)
72
from .lnwatcher import LNWatcher
5✔
73
from .submarine_swaps import SwapManager
5✔
74
from .mpp_split import suggest_splits, SplitConfigRating
5✔
75
from .trampoline import (
5✔
76
    create_trampoline_route_and_onion, is_legacy_relay, trampolines_by_id, hardcoded_trampoline_nodes,
77
    is_hardcoded_trampoline
78
)
79

80
if TYPE_CHECKING:
5✔
81
    from .network import Network
×
82
    from .wallet import Abstract_Wallet
×
83
    from .channel_db import ChannelDB
×
84
    from .simple_config import SimpleConfig
×
85

86

87
SAVED_PR_STATUS = [PR_PAID, PR_UNPAID]  # status that are persisted
5✔
88

89
NUM_PEERS_TARGET = 4
5✔
90

91
# onchain channel backup data
92
CB_VERSION = 0
5✔
93
CB_MAGIC_BYTES = bytes([0, 0, 0, CB_VERSION])
5✔
94
NODE_ID_PREFIX_LEN = 16
5✔
95

96

97
class PaymentDirection(IntEnum):
5✔
98
    SENT = 0
5✔
99
    RECEIVED = 1
5✔
100
    SELF_PAYMENT = 2
5✔
101
    FORWARDING = 3
5✔
102

103

104
class PaymentInfo(NamedTuple):
5✔
105
    payment_hash: bytes
5✔
106
    amount_msat: Optional[int]
5✔
107
    direction: int
5✔
108
    status: int
5✔
109

110

111
# Note: these states are persisted in the wallet file.
112
# Do not modify them without performing a wallet db upgrade
113
class RecvMPPResolution(IntEnum):
5✔
114
    WAITING = 0
5✔
115
    EXPIRED = 1
5✔
116
    ACCEPTED = 2
5✔
117
    FAILED = 3
5✔
118

119

120
class ReceivedMPPStatus(NamedTuple):
5✔
121
    resolution: RecvMPPResolution
5✔
122
    expected_msat: int
5✔
123
    htlc_set: Set[Tuple[ShortChannelID, UpdateAddHtlc]]
5✔
124

125
    @stored_in('received_mpp_htlcs', tuple)
5✔
126
    def from_tuple(resolution, expected_msat, htlc_list) -> 'ReceivedMPPStatus':
5✔
127
        htlc_set = set([(ShortChannelID(bytes.fromhex(scid)), UpdateAddHtlc.from_tuple(*x)) for (scid, x) in htlc_list])
×
128
        return ReceivedMPPStatus(
×
129
            resolution=RecvMPPResolution(resolution),
130
            expected_msat=expected_msat,
131
            htlc_set=htlc_set)
132

133

134
SentHtlcKey = Tuple[bytes, ShortChannelID, int]  # RHASH, scid, htlc_id
5✔
135

136

137
class SentHtlcInfo(NamedTuple):
5✔
138
    route: LNPaymentRoute
5✔
139
    payment_secret_orig: bytes
5✔
140
    payment_secret_bucket: bytes
5✔
141
    amount_msat: int
5✔
142
    bucket_msat: int
5✔
143
    amount_receiver_msat: int
5✔
144
    trampoline_fee_level: Optional[int]
5✔
145
    trampoline_route: Optional[LNPaymentRoute]
5✔
146

147

148
class ErrorAddingPeer(Exception): pass
5✔
149

150

151
# set some feature flags as baseline for both LNWallet and LNGossip
152
# note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
153
BASE_FEATURES = (
5✔
154
    LnFeatures(0)
155
    | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT
156
    | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT
157
    | LnFeatures.VAR_ONION_OPT
158
    | LnFeatures.PAYMENT_SECRET_OPT
159
    | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT
160
)
161

162
# we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update)
163
LNWALLET_FEATURES = (
5✔
164
    BASE_FEATURES
165
    | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ
166
    | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ
167
    | LnFeatures.VAR_ONION_REQ
168
    | LnFeatures.PAYMENT_SECRET_REQ
169
    | LnFeatures.BASIC_MPP_OPT
170
    | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM
171
    | LnFeatures.OPTION_SHUTDOWN_ANYSEGWIT_OPT
172
    | LnFeatures.OPTION_CHANNEL_TYPE_OPT
173
    | LnFeatures.OPTION_SCID_ALIAS_OPT
174
    | LnFeatures.OPTION_SUPPORT_LARGE_CHANNEL_OPT
175
)
176

177
LNGOSSIP_FEATURES = (
5✔
178
    BASE_FEATURES
179
    # LNGossip doesn't serve gossip but weirdly have to signal so
180
    # that peers satisfy our queries
181
    | LnFeatures.GOSSIP_QUERIES_REQ
182
    | LnFeatures.GOSSIP_QUERIES_OPT
183
)
184

185

186
class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
5✔
187

188
    def __init__(self, node_keypair, features: LnFeatures, *, config: 'SimpleConfig'):
5✔
189
        Logger.__init__(self)
5✔
190
        NetworkRetryManager.__init__(
5✔
191
            self,
192
            max_retry_delay_normal=3600,
193
            init_retry_delay_normal=600,
194
            max_retry_delay_urgent=300,
195
            init_retry_delay_urgent=4,
196
        )
197
        self.lock = threading.RLock()
5✔
198
        self.node_keypair = node_keypair
5✔
199
        self._peers = {}  # type: Dict[bytes, Peer]  # pubkey -> Peer  # needs self.lock
5✔
200
        self.taskgroup = OldTaskGroup()
5✔
201
        self.listen_server = None  # type: Optional[asyncio.AbstractServer]
5✔
202
        self.features = features
5✔
203
        self.network = None  # type: Optional[Network]
5✔
204
        self.config = config
5✔
205
        self.stopping_soon = False  # whether we are being shut down
5✔
206
        self.register_callbacks()
5✔
207

208
    @property
5✔
209
    def channel_db(self) -> 'ChannelDB':
5✔
210
        return self.network.channel_db if self.network else None
×
211

212
    def uses_trampoline(self) -> bool:
5✔
213
        return not bool(self.channel_db)
×
214

215
    @property
5✔
216
    def peers(self) -> Mapping[bytes, Peer]:
5✔
217
        """Returns a read-only copy of peers."""
218
        with self.lock:
×
219
            return self._peers.copy()
×
220

221
    def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]:
5✔
222
        return {}
×
223

224
    def get_node_alias(self, node_id: bytes) -> Optional[str]:
5✔
225
        """Returns the alias of the node, or None if unknown."""
226
        node_alias = None
×
227
        if not self.uses_trampoline():
×
228
            node_info = self.channel_db.get_node_info_for_node_id(node_id)
×
229
            if node_info:
×
230
                node_alias = node_info.alias
×
231
        else:
232
            for k, v in hardcoded_trampoline_nodes().items():
×
233
                if v.pubkey.startswith(node_id):
×
234
                    node_alias = k
×
235
                    break
×
236
        return node_alias
×
237

238
    async def maybe_listen(self):
5✔
239
        # FIXME: only one LNWorker can listen at a time (single port)
240
        listen_addr = self.config.LIGHTNING_LISTEN
×
241
        if listen_addr:
×
242
            self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}')
×
243
            try:
×
244
                netaddr = NetAddress.from_string(listen_addr)
×
245
            except Exception as e:
×
246
                self.logger.error(f"failed to parse config key '{self.config.cv.LIGHTNING_LISTEN.key()}'. got: {e!r}")
×
247
                return
×
248
            addr = str(netaddr.host)
×
249

250
            async def cb(reader, writer):
×
251
                transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
×
252
                try:
×
253
                    node_id = await transport.handshake()
×
254
                except Exception as e:
×
255
                    self.logger.info(f'handshake failure from incoming connection: {e!r}')
×
256
                    return
×
257
                await self._add_peer_from_transport(node_id=node_id, transport=transport)
×
258
            try:
×
259
                self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
×
260
            except OSError as e:
×
261
                self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
×
262

263
    async def main_loop(self):
5✔
264
        self.logger.info("starting taskgroup.")
×
265
        try:
×
266
            async with self.taskgroup as group:
×
267
                await group.spawn(asyncio.Event().wait)  # run forever (until cancel)
×
268
        except Exception as e:
×
269
            self.logger.exception("taskgroup died.")
×
270
        finally:
271
            self.logger.info("taskgroup stopped.")
×
272

273
    async def _maintain_connectivity(self):
5✔
274
        while True:
×
275
            await asyncio.sleep(1)
×
276
            if self.stopping_soon:
×
277
                return
×
278
            now = time.time()
×
279
            if len(self._peers) >= NUM_PEERS_TARGET:
×
280
                continue
×
281
            peers = await self._get_next_peers_to_try()
×
282
            for peer in peers:
×
283
                if self._can_retry_addr(peer, now=now):
×
284
                    try:
×
285
                        await self._add_peer(peer.host, peer.port, peer.pubkey)
×
286
                    except ErrorAddingPeer as e:
×
287
                        self.logger.info(f"failed to add peer: {peer}. exc: {e!r}")
×
288

289
    async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer:
5✔
290
        if node_id in self._peers:
×
291
            return self._peers[node_id]
×
292
        port = int(port)
×
293
        peer_addr = LNPeerAddr(host, port, node_id)
×
294
        self._trying_addr_now(peer_addr)
×
295
        self.logger.info(f"adding peer {peer_addr}")
×
296
        if node_id == self.node_keypair.pubkey or self.is_our_lnwallet(node_id):
×
297
            raise ErrorAddingPeer("cannot connect to self")
×
298
        transport = LNTransport(self.node_keypair.privkey, peer_addr,
×
299
                                e_proxy=ESocksProxy.from_network_settings(self.network))
300
        peer = await self._add_peer_from_transport(node_id=node_id, transport=transport)
×
301
        assert peer
×
302
        return peer
×
303

304
    async def _add_peer_from_transport(self, *, node_id: bytes, transport: LNTransportBase) -> Optional[Peer]:
5✔
305
        with self.lock:
×
306
            existing_peer = self._peers.get(node_id)
×
307
            if existing_peer:
×
308
                # Two instances of the same wallet are attempting to connect simultaneously.
309
                # If we let the new connection replace the existing one, the two instances might
310
                # both keep trying to reconnect, resulting in neither being usable.
311
                if existing_peer.is_initialized():
×
312
                    # give priority to the existing connection
313
                    return
×
314
                else:
315
                    # Use the new connection. (e.g. old peer might be an outgoing connection
316
                    # for an outdated host/port that will never connect)
317
                    existing_peer.close_and_cleanup()
×
318
            peer = Peer(self, node_id, transport)
×
319
            assert node_id not in self._peers
×
320
            self._peers[node_id] = peer
×
321
        await self.taskgroup.spawn(peer.main_loop())
×
322
        return peer
×
323

324
    def peer_closed(self, peer: Peer) -> None:
5✔
325
        with self.lock:
×
326
            peer2 = self._peers.get(peer.pubkey)
×
327
            if peer2 is peer:
×
328
                self._peers.pop(peer.pubkey)
×
329

330
    def num_peers(self) -> int:
5✔
331
        return sum([p.is_initialized() for p in self.peers.values()])
×
332

333
    def is_our_lnwallet(self, node_id: bytes) -> bool:
5✔
334
        """Check if node_id is one of our own wallets"""
335
        wallets = self.network.daemon.get_wallets()
×
336
        for wallet in wallets.values():
×
337
            if wallet.lnworker and wallet.lnworker.node_keypair.pubkey == node_id:
×
338
                return True
×
339
        return False
×
340

341
    def start_network(self, network: 'Network'):
5✔
342
        assert network
×
343
        assert self.network is None, "already started"
×
344
        self.network = network
×
345
        self._add_peers_from_config()
×
346
        asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
×
347

348
    async def stop(self):
5✔
349
        if self.listen_server:
5✔
350
            self.listen_server.close()
×
351
        self.unregister_callbacks()
5✔
352
        await self.taskgroup.cancel_remaining()
5✔
353

354
    def _add_peers_from_config(self):
5✔
355
        peer_list = self.config.LIGHTNING_PEERS or []
×
356
        for host, port, pubkey in peer_list:
×
357
            asyncio.run_coroutine_threadsafe(
×
358
                self._add_peer(host, int(port), bfh(pubkey)),
359
                self.network.asyncio_loop)
360

361
    def is_good_peer(self, peer: LNPeerAddr) -> bool:
5✔
362
        # the purpose of this method is to filter peers that advertise the desired feature bits
363
        # it is disabled for now, because feature bits published in node announcements seem to be unreliable
364
        return True
×
365
        node_id = peer.pubkey
366
        node = self.channel_db._nodes.get(node_id)
367
        if not node:
368
            return False
369
        try:
370
            ln_compare_features(self.features, node.features)
371
        except IncompatibleLightningFeatures:
372
            return False
373
        #self.logger.info(f'is_good {peer.host}')
374
        return True
375

376
    def on_peer_successfully_established(self, peer: Peer) -> None:
5✔
377
        if isinstance(peer.transport, LNTransport):
5✔
378
            peer_addr = peer.transport.peer_addr
×
379
            # reset connection attempt count
380
            self._on_connection_successfully_established(peer_addr)
×
381
            if not self.uses_trampoline():
×
382
                # add into channel db
383
                self.channel_db.add_recent_peer(peer_addr)
×
384
            # save network address into channels we might have with peer
385
            for chan in peer.channels.values():
×
386
                chan.add_or_update_peer_addr(peer_addr)
×
387

388
    async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
5✔
389
        now = time.time()
×
390
        await self.channel_db.data_loaded.wait()
×
391
        # first try from recent peers
392
        recent_peers = self.channel_db.get_recent_peers()
×
393
        for peer in recent_peers:
×
394
            if not peer:
×
395
                continue
×
396
            if peer.pubkey in self._peers:
×
397
                continue
×
398
            if not self._can_retry_addr(peer, now=now):
×
399
                continue
×
400
            if not self.is_good_peer(peer):
×
401
                continue
×
402
            return [peer]
×
403
        # try random peer from graph
404
        unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys())
×
405
        if unconnected_nodes:
×
406
            for node_id in unconnected_nodes:
×
407
                addrs = self.channel_db.get_node_addresses(node_id)
×
408
                if not addrs:
×
409
                    continue
×
410
                host, port, timestamp = self.choose_preferred_address(list(addrs))
×
411
                try:
×
412
                    peer = LNPeerAddr(host, port, node_id)
×
413
                except ValueError:
×
414
                    continue
×
415
                if not self._can_retry_addr(peer, now=now):
×
416
                    continue
×
417
                if not self.is_good_peer(peer):
×
418
                    continue
×
419
                #self.logger.info('taking random ln peer from our channel db')
420
                return [peer]
×
421

422
        # getting desperate... let's try hardcoded fallback list of peers
423
        fallback_list = constants.net.FALLBACK_LN_NODES
×
424
        fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)]
×
425
        if fallback_list:
×
426
            return [random.choice(fallback_list)]
×
427

428
        # last resort: try dns seeds (BOLT-10)
429
        return await run_in_thread(self._get_peers_from_dns_seeds)
×
430

431
    def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]:
5✔
432
        # NOTE: potentially long blocking call, do not run directly on asyncio event loop.
433
        # Return several peers to reduce the number of dns queries.
434
        if not constants.net.LN_DNS_SEEDS:
×
435
            return []
×
436
        dns_seed = random.choice(constants.net.LN_DNS_SEEDS)
×
437
        self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed))
×
438
        try:
×
439
            # note: this might block for several seconds
440
            # this will include bech32-encoded-pubkeys and ports
441
            srv_answers = resolve_dns_srv('r{}.{}'.format(
×
442
                constants.net.LN_REALM_BYTE, dns_seed))
443
        except dns.exception.DNSException as e:
×
444
            self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}')
×
445
            return []
×
446
        random.shuffle(srv_answers)
×
447
        num_peers = 2 * NUM_PEERS_TARGET
×
448
        srv_answers = srv_answers[:num_peers]
×
449
        # we now have pubkeys and ports but host is still needed
450
        peers = []
×
451
        for srv_ans in srv_answers:
×
452
            try:
×
453
                # note: this might block for several seconds
454
                answers = dns.resolver.resolve(srv_ans['host'])
×
455
            except dns.exception.DNSException as e:
×
456
                self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}')
×
457
                continue
×
458
            try:
×
459
                ln_host = str(answers[0])
×
460
                port = int(srv_ans['port'])
×
461
                bech32_pubkey = srv_ans['host'].split('.')[0]
×
462
                pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey)
×
463
                peers.append(LNPeerAddr(ln_host, port, pubkey))
×
464
            except Exception as e:
×
465
                self.logger.info(f'error with parsing peer from dns seed: {repr(e)}')
×
466
                continue
×
467
        self.logger.info(f'got {len(peers)} ln peers from dns seed')
×
468
        return peers
×
469

470
    @staticmethod
5✔
471
    def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]:
5✔
472
        assert len(addr_list) >= 1
×
473
        # choose the most recent one that is an IP
474
        for host, port, timestamp in sorted(addr_list, key=lambda a: -a[2]):
×
475
            if is_ip_address(host):
×
476
                return host, port, timestamp
×
477
        # otherwise choose one at random
478
        # TODO maybe filter out onion if not on tor?
479
        choice = random.choice(addr_list)
×
480
        return choice
×
481

482
    @event_listener
5✔
483
    def on_event_proxy_set(self, *args):
5✔
484
        for peer in self.peers.values():
×
485
            peer.close_and_cleanup()
×
486
        self._clear_addr_retry_times()
×
487

488
    @log_exceptions
5✔
489
    async def add_peer(self, connect_str: str) -> Peer:
5✔
490
        node_id, rest = extract_nodeid(connect_str)
×
491
        peer = self._peers.get(node_id)
×
492
        if not peer:
×
493
            if rest is not None:
×
494
                host, port = split_host_port(rest)
×
495
            else:
496
                if self.uses_trampoline():
×
497
                    addr = trampolines_by_id().get(node_id)
×
498
                    if not addr:
×
499
                        raise ConnStringFormatError(_('Address unknown for node:') + ' ' + node_id.hex())
×
500
                    host, port = addr.host, addr.port
×
501
                else:
502
                    addrs = self.channel_db.get_node_addresses(node_id)
×
503
                    if not addrs:
×
504
                        raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + node_id.hex())
×
505
                    host, port, timestamp = self.choose_preferred_address(list(addrs))
×
506
            port = int(port)
×
507

508
            if not self.network.proxy:
×
509
                # Try DNS-resolving the host (if needed). This is simply so that
510
                # the caller gets a nice exception if it cannot be resolved.
511
                # (we don't do the DNS lookup if a proxy is set, to avoid a DNS-leak)
512
                if host.endswith('.onion'):
×
513
                    raise ConnStringFormatError(_('.onion address, but no proxy configured'))
×
514
                try:
×
515
                    await asyncio.get_running_loop().getaddrinfo(host, port)
×
516
                except socket.gaierror:
×
517
                    raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
×
518

519
            # add peer
520
            peer = await self._add_peer(host, port, node_id)
×
521
        return peer
×
522

523

524
class LNGossip(LNWorker):
5✔
525
    """The LNGossip class is a separate, unannounced Lightning node with random id that is just querying
526
    gossip from other nodes. The LNGossip node does not satisfy gossip queries, this is done by the
527
    LNWallet class(es). LNWallets are the advertised nodes used for actual payments and only satisfy
528
    peer queries without fetching gossip themselves. This separation is done so that gossip can be queried
529
    independently of the active LNWallets. LNGossip keeps a curated batch of gossip in _forwarding_gossip
530
    that is fetched by the LNWallets for regular forwarding."""
531
    max_age = 14*24*3600
5✔
532
    LOGGING_SHORTCUT = 'g'
5✔
533

534
    def __init__(self, config: 'SimpleConfig'):
5✔
535
        seed = os.urandom(32)
×
536
        node = BIP32Node.from_rootseed(seed, xtype='standard')
×
537
        xprv = node.to_xprv()
×
538
        node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
×
539
        LNWorker.__init__(self, node_keypair, LNGOSSIP_FEATURES, config=config)
×
540
        self.unknown_ids = set()
×
541
        self._forwarding_gossip = []  # type: List[GossipForwardingMessage]
×
542
        self._last_gossip_batch_ts = 0  # type: int
×
543
        self._forwarding_gossip_lock = asyncio.Lock()
×
544
        self.gossip_request_semaphore = asyncio.Semaphore(5)
×
545
        # statistics
546
        self._num_chan_ann = 0
×
547
        self._num_node_ann = 0
×
548
        self._num_chan_upd = 0
×
549
        self._num_chan_upd_good = 0
×
550

551
    def start_network(self, network: 'Network'):
5✔
552
        super().start_network(network)
×
553
        for coro in [
×
554
                self._maintain_connectivity(),
555
                self.maintain_db(),
556
                self._maintain_forwarding_gossip()
557
        ]:
558
            tg_coro = self.taskgroup.spawn(coro)
×
559
            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
×
560

561
    async def maintain_db(self):
5✔
562
        await self.channel_db.data_loaded.wait()
×
563
        while True:
×
564
            if len(self.unknown_ids) == 0:
×
565
                self.channel_db.prune_old_policies(self.max_age)
×
566
                self.channel_db.prune_orphaned_channels()
×
567
            await asyncio.sleep(120)
×
568

569
    async def _maintain_forwarding_gossip(self):
5✔
570
        await self.channel_db.data_loaded.wait()
×
571
        await self.wait_for_sync()
×
572
        while True:
×
573
            async with self._forwarding_gossip_lock:
×
574
                self._forwarding_gossip = self.channel_db.get_forwarding_gossip_batch()
×
575
                self._last_gossip_batch_ts = int(time.time())
×
576
            self.logger.debug(f"{len(self._forwarding_gossip)} gossip messages available to forward")
×
577
            await asyncio.sleep(60)
×
578

579
    async def get_forwarding_gossip(self) -> tuple[List[GossipForwardingMessage], int]:
5✔
580
        async with self._forwarding_gossip_lock:
×
581
            return self._forwarding_gossip, self._last_gossip_batch_ts
×
582

583
    async def add_new_ids(self, ids: Iterable[bytes]):
5✔
584
        known = self.channel_db.get_channel_ids()
×
585
        new = set(ids) - set(known)
×
586
        self.unknown_ids.update(new)
×
587
        util.trigger_callback('unknown_channels', len(self.unknown_ids))
×
588
        util.trigger_callback('gossip_peers', self.num_peers())
×
589
        util.trigger_callback('ln_gossip_sync_progress')
×
590

591
    def get_ids_to_query(self) -> Sequence[bytes]:
5✔
592
        N = 500
×
593
        l = list(self.unknown_ids)
×
594
        self.unknown_ids = set(l[N:])
×
595
        util.trigger_callback('unknown_channels', len(self.unknown_ids))
×
596
        util.trigger_callback('ln_gossip_sync_progress')
×
597
        return l[0:N]
×
598

599
    def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]:
5✔
600
        """Estimates the gossip synchronization process and returns the number
601
        of synchronized channels, the total channels in the network and a
602
        rescaled percentage of the synchronization process."""
603
        if self.num_peers() == 0:
×
604
            return None, None, None
×
605
        nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count()
×
606
        num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p
×
607
        num_nodes = self.channel_db.num_nodes
×
608
        num_nodes_associated_to_chans = max(len(self.channel_db._channels_for_node.keys()), 1)
×
609
        # some channels will never have two policies (only one is in gossip?...)
610
        # so if we have at least 1 policy for a channel, we consider that channel "complete" here
611
        current_est = num_db_channels - nchans_with_0p
×
612
        total_est = len(self.unknown_ids) + num_db_channels
×
613

614
        progress_chans = current_est / total_est if total_est and current_est else 0
×
615
        # consider that we got at least 10% of the node anns of node ids we know about
616
        progress_nodes = min((num_nodes / num_nodes_associated_to_chans) * 10, 1)
×
617
        progress = (progress_chans * 3 + progress_nodes) / 4  # weigh the channel progress higher
×
618
        # self.logger.debug(f"Sync process chans: {progress_chans} | Progress nodes: {progress_nodes} | "
619
        #                   f"Total progress: {progress} | NUM_NODES: {num_nodes} / {num_nodes_associated_to_chans}")
620
        progress_percent = (1.0 / 0.95 * progress) * 100
×
621
        progress_percent = min(progress_percent, 100)
×
622
        progress_percent = round(progress_percent)
×
623
        # take a minimal number of synchronized channels to get a more accurate
624
        # percentage estimate
625
        if current_est < 200:
×
626
            progress_percent = 0
×
627
        return current_est, total_est, progress_percent
×
628

629
    async def process_gossip(self, chan_anns, node_anns, chan_upds):
5✔
630
        # note: we run in the originating peer's TaskGroup, so we can safely raise here
631
        #       and disconnect only from that peer
632
        await self.channel_db.data_loaded.wait()
×
633

634
        # channel announcements
635
        def process_chan_anns():
×
636
            for payload in chan_anns:
×
637
                self.channel_db.verify_channel_announcement(payload)
×
638
            self.channel_db.add_channel_announcements(chan_anns)
×
639
        await run_in_thread(process_chan_anns)
×
640

641
        # node announcements
642
        def process_node_anns():
×
643
            for payload in node_anns:
×
644
                self.channel_db.verify_node_announcement(payload)
×
645
            self.channel_db.add_node_announcements(node_anns)
×
646
        await run_in_thread(process_node_anns)
×
647
        # channel updates
648
        categorized_chan_upds = await run_in_thread(partial(
×
649
            self.channel_db.add_channel_updates,
650
            chan_upds,
651
            max_age=self.max_age))
652
        orphaned = categorized_chan_upds.orphaned
×
653
        if orphaned:
×
654
            self.logger.info(f'adding {len(orphaned)} unknown channel ids')
×
655
            orphaned_ids = [c['short_channel_id'] for c in orphaned]
×
656
            await self.add_new_ids(orphaned_ids)
×
657

658
        self._num_chan_ann += len(chan_anns)
×
659
        self._num_node_ann += len(node_anns)
×
660
        self._num_chan_upd += len(chan_upds)
×
661
        self._num_chan_upd_good += len(categorized_chan_upds.good)
×
662

663
    def is_synced(self) -> bool:
5✔
664
        _, _, percentage_synced = self.get_sync_progress_estimate()
×
665
        if percentage_synced is not None and percentage_synced >= 100:
×
666
            return True
×
667
        return False
×
668

669
    async def wait_for_sync(self, times_to_check: int = 3):
5✔
670
        """Check if we have 100% sync progress `times_to_check` times in a row (because the
671
        estimate often jumps back after some seconds when doing initial sync)."""
672
        while True:
×
673
            if self.is_synced():
×
674
                times_to_check -= 1
×
675
                if times_to_check <= 0:
×
676
                    return
×
677
            await asyncio.sleep(10)
×
678
            # flush the gossip queue so we don't forward old gossip after sync is complete
679
            self.channel_db.get_forwarding_gossip_batch()
×
680

681

682
class PaySession(Logger):
5✔
683
    def __init__(
5✔
684
            self,
685
            *,
686
            payment_hash: bytes,
687
            payment_secret: bytes,
688
            initial_trampoline_fee_level: int,
689
            invoice_features: int,
690
            r_tags,
691
            min_final_cltv_delta: int,  # delta for last node (typically from invoice)
692
            amount_to_pay: int,  # total payment amount final receiver will get
693
            invoice_pubkey: bytes,
694
            uses_trampoline: bool,  # whether sender uses trampoline or gossip
695
            use_two_trampolines: bool,  # whether legacy payments will try to use two trampolines
696
    ):
697
        assert payment_hash
5✔
698
        assert payment_secret
5✔
699
        self.payment_hash = payment_hash
5✔
700
        self.payment_secret = payment_secret
5✔
701
        self.payment_key = payment_hash + payment_secret
5✔
702
        Logger.__init__(self)
5✔
703

704
        self.invoice_features = LnFeatures(invoice_features)
5✔
705
        self.r_tags = r_tags
5✔
706
        self.min_final_cltv_delta = min_final_cltv_delta
5✔
707
        self.amount_to_pay = amount_to_pay
5✔
708
        self.invoice_pubkey = invoice_pubkey
5✔
709

710
        self.sent_htlcs_q = asyncio.Queue()  # type: asyncio.Queue[HtlcLog]
5✔
711
        self.start_time = time.time()
5✔
712

713
        self.uses_trampoline = uses_trampoline
5✔
714
        self.trampoline_fee_level = initial_trampoline_fee_level
5✔
715
        self.failed_trampoline_routes = []
5✔
716
        self.use_two_trampolines = use_two_trampolines
5✔
717
        self._sent_buckets = dict()  # psecret_bucket -> (amount_sent, amount_failed)
5✔
718

719
        self._amount_inflight = 0  # what we sent in htlcs (that receiver gets, without fees)
5✔
720
        self._nhtlcs_inflight = 0
5✔
721
        self.is_active = True  # is still trying to send new htlcs?
5✔
722

723
    def diagnostic_name(self):
5✔
724
        pkey = sha256(self.payment_key)
5✔
725
        return f"{self.payment_hash[:4].hex()}-{pkey[:2].hex()}"
5✔
726

727
    def maybe_raise_trampoline_fee(self, htlc_log: HtlcLog):
5✔
728
        if htlc_log.trampoline_fee_level == self.trampoline_fee_level:
5✔
729
            self.trampoline_fee_level += 1
5✔
730
            self.failed_trampoline_routes = []
5✔
731
            self.logger.info(f'raising trampoline fee level {self.trampoline_fee_level}')
5✔
732
        else:
733
            self.logger.info(f'NOT raising trampoline fee level, already at {self.trampoline_fee_level}')
5✔
734

735
    def handle_failed_trampoline_htlc(self, *, htlc_log: HtlcLog, failure_msg: OnionRoutingFailure):
5✔
736
        # FIXME The trampoline nodes in the path are chosen randomly.
737
        #       Some of the errors might depend on how we have chosen them.
738
        #       Having more attempts is currently useful in part because of the randomness,
739
        #       instead we should give feedback to create_routes_for_payment.
740
        # Sometimes the trampoline node fails to send a payment and returns
741
        # TEMPORARY_CHANNEL_FAILURE, while it succeeds with a higher trampoline fee.
742
        if failure_msg.code in (
5✔
743
                OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
744
                OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON,
745
                OnionFailureCode.TEMPORARY_CHANNEL_FAILURE):
746
            # TODO: parse the node policy here (not returned by eclair yet)
747
            # TODO: erring node is always the first trampoline even if second
748
            #  trampoline demands more fees, we can't influence this
749
            self.maybe_raise_trampoline_fee(htlc_log)
5✔
750
        elif self.use_two_trampolines:
5✔
751
            self.use_two_trampolines = False
×
752
        elif failure_msg.code in (
5✔
753
                OnionFailureCode.UNKNOWN_NEXT_PEER,
754
                OnionFailureCode.TEMPORARY_NODE_FAILURE):
755
            trampoline_route = htlc_log.route
5✔
756
            r = [hop.end_node.hex() for hop in trampoline_route]
5✔
757
            self.logger.info(f'failed trampoline route: {r}')
5✔
758
            if r not in self.failed_trampoline_routes:
5✔
759
                self.failed_trampoline_routes.append(r)
5✔
760
            else:
761
                pass  # maybe the route was reused between different MPP parts
×
762
        else:
763
            raise PaymentFailure(failure_msg.code_name())
5✔
764

765
    async def wait_for_one_htlc_to_resolve(self) -> HtlcLog:
5✔
766
        self.logger.info(f"waiting... amount_inflight={self._amount_inflight}. nhtlcs_inflight={self._nhtlcs_inflight}")
5✔
767
        htlc_log = await self.sent_htlcs_q.get()
5✔
768
        self._amount_inflight -= htlc_log.amount_msat
5✔
769
        self._nhtlcs_inflight -= 1
5✔
770
        if self._amount_inflight < 0 or self._nhtlcs_inflight < 0:
5✔
771
            raise Exception(f"amount_inflight={self._amount_inflight}, nhtlcs_inflight={self._nhtlcs_inflight}. both should be >= 0 !")
×
772
        return htlc_log
5✔
773

774
    def add_new_htlc(self, sent_htlc_info: SentHtlcInfo):
5✔
775
        self._nhtlcs_inflight += 1
5✔
776
        self._amount_inflight += sent_htlc_info.amount_receiver_msat
5✔
777
        if self._amount_inflight > self.amount_to_pay:  # safety belts
5✔
778
            raise Exception(f"amount_inflight={self._amount_inflight} > amount_to_pay={self.amount_to_pay}")
×
779
        shi = sent_htlc_info
5✔
780
        bkey = shi.payment_secret_bucket
5✔
781
        # if we sent MPP to a trampoline, add item to sent_buckets
782
        if self.uses_trampoline and shi.amount_msat != shi.bucket_msat:
5✔
783
            if bkey not in self._sent_buckets:
5✔
784
                self._sent_buckets[bkey] = (0, 0)
5✔
785
            amount_sent, amount_failed = self._sent_buckets[bkey]
5✔
786
            amount_sent += shi.amount_receiver_msat
5✔
787
            self._sent_buckets[bkey] = amount_sent, amount_failed
5✔
788

789
    def on_htlc_fail_get_fail_amt_to_propagate(self, sent_htlc_info: SentHtlcInfo) -> Optional[int]:
5✔
790
        shi = sent_htlc_info
5✔
791
        # check sent_buckets if we use trampoline
792
        bkey = shi.payment_secret_bucket
5✔
793
        if self.uses_trampoline and bkey in self._sent_buckets:
5✔
794
            amount_sent, amount_failed = self._sent_buckets[bkey]
5✔
795
            amount_failed += shi.amount_receiver_msat
5✔
796
            self._sent_buckets[bkey] = amount_sent, amount_failed
5✔
797
            if amount_sent != amount_failed:
5✔
798
                self.logger.info('bucket still active...')
5✔
799
                return None
5✔
800
            self.logger.info('bucket failed')
5✔
801
            return amount_sent
5✔
802
        # not using trampoline buckets
803
        return shi.amount_receiver_msat
5✔
804

805
    def get_outstanding_amount_to_send(self) -> int:
5✔
806
        return self.amount_to_pay - self._amount_inflight
5✔
807

808
    def can_be_deleted(self) -> bool:
5✔
809
        """Returns True iff finished sending htlcs AND all pending htlcs have resolved."""
810
        if self.is_active:
5✔
811
            return False
5✔
812
        # note: no one is consuming from sent_htlcs_q anymore
813
        nhtlcs_resolved = self.sent_htlcs_q.qsize()
5✔
814
        assert nhtlcs_resolved <= self._nhtlcs_inflight
5✔
815
        return nhtlcs_resolved == self._nhtlcs_inflight
5✔
816

817

818
class LNWallet(LNWorker):
5✔
819

820
    lnwatcher: Optional['LNWatcher']
5✔
821
    MPP_EXPIRY = 120
5✔
822
    TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3  # seconds
5✔
823
    PAYMENT_TIMEOUT = 120
5✔
824
    MPP_SPLIT_PART_FRACTION = 0.2
5✔
825
    MPP_SPLIT_PART_MINAMT_MSAT = 5_000_000
5✔
826

827
    def __init__(self, wallet: 'Abstract_Wallet', xprv):
5✔
828
        self.wallet = wallet
5✔
829
        self.config = wallet.config
5✔
830
        self.db = wallet.db
5✔
831
        self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
5✔
832
        self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey
5✔
833
        self.static_payment_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.PAYMENT_BASE)
5✔
834
        self.payment_secret_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.PAYMENT_SECRET_KEY).privkey
5✔
835
        self.funding_root_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.FUNDING_ROOT_KEY)
5✔
836
        Logger.__init__(self)
5✔
837
        features = LNWALLET_FEATURES
5✔
838
        if self.config.ENABLE_ANCHOR_CHANNELS:
5✔
839
            features |= LnFeatures.OPTION_ANCHORS_ZERO_FEE_HTLC_OPT
×
840
        if self.config.ACCEPT_ZEROCONF_CHANNELS:
5✔
841
            features |= LnFeatures.OPTION_ZEROCONF_OPT
×
842
        if self.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS and self.config.LIGHTNING_USE_GOSSIP:
5✔
843
            features |= LnFeatures.GOSSIP_QUERIES_OPT  # signal we have gossip to fetch
×
844
        LNWorker.__init__(self, self.node_keypair, features, config=self.config)
5✔
845
        self.lnwatcher = LNWatcher(self)
5✔
846
        self.lnrater: LNRater = None
5✔
847
        self.payment_info = self.db.get_dict('lightning_payments')     # RHASH -> amount, direction, is_paid
5✔
848
        self.preimages = self.db.get_dict('lightning_preimages')   # RHASH -> preimage
5✔
849
        self._bolt11_cache = {}
5✔
850
        # note: this sweep_address is only used as fallback; as it might result in address-reuse
851
        self.logs = defaultdict(list)  # type: Dict[str, List[HtlcLog]]  # key is RHASH  # (not persisted)
5✔
852
        # used in tests
853
        self.enable_htlc_settle = True
5✔
854
        self.enable_htlc_forwarding = True
5✔
855

856
        # note: accessing channels (besides simple lookup) needs self.lock!
857
        self._channels = {}  # type: Dict[bytes, Channel]
5✔
858
        channels = self.db.get_dict("channels")
5✔
859
        for channel_id, c in random_shuffled_copy(channels.items()):
5✔
860
            self._channels[bfh(channel_id)] = chan = Channel(c, lnworker=self)
5✔
861
            self.wallet.set_reserved_addresses_for_chan(chan, reserved=True)
5✔
862

863
        self._channel_backups = {}  # type: Dict[bytes, ChannelBackup]
5✔
864
        # order is important: imported should overwrite onchain
865
        for name in ["onchain_channel_backups", "imported_channel_backups"]:
5✔
866
            channel_backups = self.db.get_dict(name)
5✔
867
            for channel_id, storage in channel_backups.items():
5✔
868
                self._channel_backups[bfh(channel_id)] = cb = ChannelBackup(storage, lnworker=self)
×
869
                self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
870

871
        self._paysessions = dict()                      # type: Dict[bytes, PaySession]
5✔
872
        self.sent_htlcs_info = dict()                   # type: Dict[SentHtlcKey, SentHtlcInfo]
5✔
873
        self.received_mpp_htlcs = self.db.get_dict('received_mpp_htlcs')   # type: Dict[str, ReceivedMPPStatus]  # payment_key -> ReceivedMPPStatus
5✔
874

875
        # detect inflight payments
876
        self.inflight_payments = set()        # (not persisted) keys of invoices that are in PR_INFLIGHT state
5✔
877
        for payment_hash in self.get_payments(status='inflight').keys():
5✔
878
            self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT)
×
879

880
        # payment forwarding
881
        self.active_forwardings = self.db.get_dict('active_forwardings')    # type: Dict[str, List[str]]        # Dict: payment_key -> list of htlc_keys
5✔
882
        self.forwarding_failures = self.db.get_dict('forwarding_failures')  # type: Dict[str, Tuple[str, str]]  # Dict: payment_key -> (error_bytes, error_message)
5✔
883
        self.downstream_to_upstream_htlc = {}                               # type: Dict[str, str]              # Dict: htlc_key -> htlc_key (not persisted)
5✔
884
        self.dont_settle_htlcs = self.db.get_dict('dont_settle_htlcs')      # type: Dict[str, None]             # payment_hashes of htlcs that we should not settle back yet even if we have the preimage
5✔
885

886
        # payment_hash -> callback:
887
        self.hold_invoice_callbacks = {}                # type: Dict[bytes, Callable[[bytes], Awaitable[None]]]
5✔
888
        self.payment_bundles = []                       # lists of hashes. todo:persist
5✔
889

890
        self.nostr_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NOSTR_KEY)
5✔
891
        self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self)
5✔
892
        self.onion_message_manager = OnionMessageManager(self)
5✔
893
        self.subscribe_to_channels()
5✔
894

895
    def subscribe_to_channels(self):
5✔
896
        for chan in self.channels.values():
5✔
897
            self.lnwatcher.add_channel(chan)
5✔
898
        for cb in self.channel_backups.values():
5✔
899
            self.lnwatcher.add_channel(cb)
×
900

901
    def has_deterministic_node_id(self) -> bool:
5✔
902
        return bool(self.db.get('lightning_xprv'))
×
903

904
    def can_have_recoverable_channels(self) -> bool:
5✔
905
        return (self.has_deterministic_node_id()
×
906
                and not self.config.LIGHTNING_LISTEN)
907

908
    def has_recoverable_channels(self) -> bool:
5✔
909
        """Whether *future* channels opened by this wallet would be recoverable
910
        from seed (via putting OP_RETURN outputs into funding txs).
911
        """
912
        return (self.can_have_recoverable_channels()
×
913
                and self.config.LIGHTNING_USE_RECOVERABLE_CHANNELS)
914

915
    def has_anchor_channels(self) -> bool:
5✔
916
        """Returns True if any active channel is an anchor channel."""
917
        return any(chan.has_anchors() and not chan.is_redeemed()
5✔
918
                    for chan in self.channels.values())
919

920
    @property
5✔
921
    def channels(self) -> Mapping[bytes, Channel]:
5✔
922
        """Returns a read-only copy of channels."""
923
        with self.lock:
5✔
924
            return self._channels.copy()
5✔
925

926
    @property
5✔
927
    def channel_backups(self) -> Mapping[bytes, ChannelBackup]:
5✔
928
        """Returns a read-only copy of channels."""
929
        with self.lock:
5✔
930
            return self._channel_backups.copy()
5✔
931

932
    def get_channel_objects(self) -> Mapping[bytes, AbstractChannel]:
5✔
933
        r = self.channel_backups
×
934
        r.update(self.channels)
×
935
        return r
×
936

937
    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
5✔
938
        return self._channels.get(channel_id, None)
5✔
939

940
    def diagnostic_name(self):
5✔
941
        return self.wallet.diagnostic_name()
5✔
942

943
    @ignore_exceptions
5✔
944
    @log_exceptions
5✔
945
    async def sync_with_remote_watchtower(self):
5✔
946
        self.watchtower_ctns = {}
×
947
        while True:
×
948
            # periodically poll if the user updated 'watchtower_url'
949
            await asyncio.sleep(5)
×
950
            watchtower_url = self.config.WATCHTOWER_CLIENT_URL
×
951
            if not watchtower_url:
×
952
                continue
×
953
            parsed_url = urllib.parse.urlparse(watchtower_url)
×
954
            if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)):
×
955
                self.logger.warning(f"got watchtower URL for remote tower but we won't use it! "
×
956
                                    f"can only use HTTPS (except if private IP): not using {watchtower_url!r}")
957
                continue
×
958
            # try to sync with the remote watchtower
959
            try:
×
960
                async with make_aiohttp_session(proxy=self.network.proxy) as session:
×
961
                    watchtower = JsonRPCClient(session, watchtower_url)
×
962
                    watchtower.add_method('get_ctn')
×
963
                    watchtower.add_method('add_sweep_tx')
×
964
                    for chan in self.channels.values():
×
965
                        await self.sync_channel_with_watchtower(chan, watchtower)
×
966
            except aiohttp.client_exceptions.ClientConnectorError:
×
967
                self.logger.info(f'could not contact remote watchtower {watchtower_url}')
×
968

969
    def get_watchtower_ctn(self, channel_point):
5✔
970
        return self.watchtower_ctns.get(channel_point)
×
971

972
    async def sync_channel_with_watchtower(self, chan: Channel, watchtower):
5✔
973
        outpoint = chan.funding_outpoint.to_str()
×
974
        addr = chan.get_funding_address()
×
975
        current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
×
976
        watchtower_ctn = await watchtower.get_ctn(outpoint, addr)
×
977
        for ctn in range(watchtower_ctn + 1, current_ctn):
×
978
            sweeptxs = chan.create_sweeptxs_for_watchtower(ctn)
×
979
            for tx in sweeptxs:
×
980
                await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize())
×
981
            self.watchtower_ctns[outpoint] = ctn
×
982

983
    def start_network(self, network: 'Network'):
5✔
984
        super().start_network(network)
×
985
        self.lnwatcher.start_network(network)
×
986
        self.swap_manager.start_network(network)
×
987
        self.lnrater = LNRater(self, network)
×
988
        self.onion_message_manager.start_network(network=network)
×
989

990
        for coro in [
×
991
                self.maybe_listen(),
992
                self.lnwatcher.trigger_callbacks(), # shortcut (don't block) if funding tx locked and verified
993
                self.reestablish_peers_and_channels(),
994
                self.sync_with_remote_watchtower(),
995
        ]:
996
            tg_coro = self.taskgroup.spawn(coro)
×
997
            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
×
998

999
    async def stop(self):
5✔
1000
        self.stopping_soon = True
5✔
1001
        if self.listen_server:  # stop accepting new peers
5✔
1002
            self.listen_server.close()
×
1003
        async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS):
5✔
1004
            await self.wait_for_received_pending_htlcs_to_get_removed()
5✔
1005
        await LNWorker.stop(self)
5✔
1006
        if self.lnwatcher:
5✔
1007
            self.lnwatcher.stop()
×
1008
            self.lnwatcher = None
×
1009
        if self.swap_manager and self.swap_manager.network:  # may not be present in tests
5✔
1010
            await self.swap_manager.stop()
×
1011
        if self.onion_message_manager:
5✔
1012
            await self.onion_message_manager.stop()
×
1013

1014
    async def wait_for_received_pending_htlcs_to_get_removed(self):
5✔
1015
        assert self.stopping_soon is True
5✔
1016
        # We try to fail pending MPP HTLCs, and wait a bit for them to get removed.
1017
        # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good
1018
        #       to wait a bit for it to become irrevocably removed.
1019
        # Note: we don't wait for *all htlcs* to get removed, only for those
1020
        #       that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
1021
        async with OldTaskGroup() as group:
5✔
1022
            for peer in self.peers.values():
5✔
1023
                await group.spawn(peer.wait_one_htlc_switch_iteration())
5✔
1024
        while True:
5✔
1025
            if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
5✔
1026
                break
5✔
1027
            async with OldTaskGroup(wait=any) as group:
5✔
1028
                for peer in self.peers.values():
5✔
1029
                    await group.spawn(peer.received_htlc_removed_event.wait())
5✔
1030

1031
    def peer_closed(self, peer):
5✔
1032
        for chan in self.channels_for_peer(peer.pubkey).values():
×
1033
            chan.peer_state = PeerState.DISCONNECTED
×
1034
            util.trigger_callback('channel', self.wallet, chan)
×
1035
        super().peer_closed(peer)
×
1036

1037
    def get_payments(self, *, status=None) -> Mapping[bytes, List[HTLCWithStatus]]:
5✔
1038
        out = defaultdict(list)
5✔
1039
        for chan in self.channels.values():
5✔
1040
            d = chan.get_payments(status=status)
5✔
1041
            for payment_hash, plist in d.items():
5✔
1042
                out[payment_hash] += plist
5✔
1043
        return out
5✔
1044

1045
    def get_payment_value(
5✔
1046
            self, info: Optional['PaymentInfo'],
1047
            plist: List[HTLCWithStatus]) -> Tuple[PaymentDirection, int, Optional[int], int]:
1048
        """ fee_msat is included in amount_msat"""
1049
        assert plist
×
1050
        amount_msat = sum(int(x.direction) * x.htlc.amount_msat for x in plist)
×
1051
        if all(x.direction == SENT for x in plist):
×
1052
            direction = PaymentDirection.SENT
×
1053
            fee_msat = (- info.amount_msat - amount_msat) if info else None
×
1054
        elif all(x.direction == RECEIVED for x in plist):
×
1055
            direction = PaymentDirection.RECEIVED
×
1056
            fee_msat = None
×
1057
        elif amount_msat < 0:
×
1058
            direction = PaymentDirection.SELF_PAYMENT
×
1059
            fee_msat = - amount_msat
×
1060
        else:
1061
            direction = PaymentDirection.FORWARDING
×
1062
            fee_msat = - amount_msat
×
1063
        timestamp = min([htlc_with_status.htlc.timestamp for htlc_with_status in plist])
×
1064
        return direction, amount_msat, fee_msat, timestamp
×
1065

1066
    def get_lightning_history(self) -> Dict[str, LightningHistoryItem]:
5✔
1067
        """
1068
        side effect: sets defaults labels
1069
        note that the result is not ordered
1070
        """
1071
        out = {}
×
1072
        for payment_hash, plist in self.get_payments(status='settled').items():
×
1073
            if len(plist) == 0:
×
1074
                continue
×
1075
            key = payment_hash.hex()
×
1076
            info = self.get_payment_info(payment_hash)
×
1077
            # note: just after successfully paying an invoice using MPP, amount and fee values might be shifted
1078
            #       temporarily: the amount only considers 'settled' htlcs (see plist above), but we might also
1079
            #       have some inflight htlcs still. Until all relevant htlcs settle, the amount will be lower than
1080
            #       expected and the fee higher (the inflight htlcs will be effectively counted as fees).
1081
            direction, amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
×
1082
            label = self.wallet.get_label_for_rhash(key)
×
1083
            if not label and direction == PaymentDirection.FORWARDING:
×
1084
                label = _('Forwarding')
×
1085
            preimage = self.get_preimage(payment_hash).hex()
×
1086
            group_id = self.swap_manager.get_group_id_for_payment_hash(payment_hash)
×
1087
            item = LightningHistoryItem(
×
1088
                type = 'payment',
1089
                payment_hash = payment_hash.hex(),
1090
                preimage = preimage,
1091
                amount_msat = amount_msat,
1092
                fee_msat = fee_msat,
1093
                group_id = group_id,
1094
                timestamp = timestamp or 0,
1095
                label=label,
1096
                direction=direction,
1097
            )
1098
            out[payment_hash.hex()] = item
×
1099
        for chan in itertools.chain(self.channels.values(), self.channel_backups.values()):  # type: AbstractChannel
×
1100
            item = chan.get_funding_height()
×
1101
            if item is None:
×
1102
                continue
×
1103
            funding_txid, funding_height, funding_timestamp = item
×
1104
            label = _('Open channel') + ' ' + chan.get_id_for_log()
×
1105
            self.wallet.set_default_label(funding_txid, label)
×
1106
            self.wallet.set_group_label(funding_txid, label)
×
1107
            item = LightningHistoryItem(
×
1108
                type = 'channel_opening',
1109
                label = label,
1110
                group_id = funding_txid,
1111
                timestamp = funding_timestamp,
1112
                amount_msat = chan.balance(LOCAL, ctn=0),
1113
                fee_msat = None,
1114
                payment_hash = None,
1115
                preimage = None,
1116
                direction=None,
1117
            )
1118
            out[funding_txid] = item
×
1119
            item = chan.get_closing_height()
×
1120
            if item is None:
×
1121
                continue
×
1122
            closing_txid, closing_height, closing_timestamp = item
×
1123
            label = _('Close channel') + ' ' + chan.get_id_for_log()
×
1124
            self.wallet.set_default_label(closing_txid, label)
×
1125
            self.wallet.set_group_label(closing_txid, label)
×
1126
            item = LightningHistoryItem(
×
1127
                type = 'channel_closing',
1128
                label = label,
1129
                group_id = closing_txid,
1130
                timestamp = closing_timestamp,
1131
                amount_msat = -chan.balance(LOCAL),
1132
                fee_msat = None,
1133
                payment_hash = None,
1134
                preimage = None,
1135
                direction=None,
1136
            )
1137
            out[closing_txid] = item
×
1138

1139
        # sanity check
1140
        balance_msat = sum([x.amount_msat for x in out.values()])
×
1141
        lb = sum(chan.balance(LOCAL) if not chan.is_closed_or_closing() else 0
×
1142
                for chan in self.channels.values())
1143
        if balance_msat != lb:
×
1144
            # this typically happens when a channel is recently force closed
1145
            self.logger.info(f'get_lightning_history: balance mismatch {balance_msat - lb}')
×
1146
        return out
×
1147

1148
    def get_groups_for_onchain_history(self) -> Dict[str, str]:
5✔
1149
        """
1150
        returns dict: txid -> group_id
1151
        side effect: sets default labels
1152
        """
1153
        groups = {}
×
1154
        # add funding events
1155
        for chan in itertools.chain(self.channels.values(), self.channel_backups.values()):  # type: AbstractChannel
×
1156
            item = chan.get_funding_height()
×
1157
            if item is None:
×
1158
                continue
×
1159
            funding_txid, funding_height, funding_timestamp = item
×
1160
            groups[funding_txid] = funding_txid
×
1161
            item = chan.get_closing_height()
×
1162
            if item is None:
×
1163
                continue
×
1164
            closing_txid, closing_height, closing_timestamp = item
×
1165
            groups[closing_txid] = closing_txid
×
1166

1167
        d = self.swap_manager.get_groups_for_onchain_history()
×
1168
        for txid, v in d.items():
×
1169
            group_id = v['group_id']
×
1170
            label = v.get('label')
×
1171
            group_label = v.get('group_label') or label
×
1172
            groups[txid] = group_id
×
1173
            if label:
×
1174
                self.wallet.set_default_label(txid, label)
×
1175
            if group_label:
×
1176
                self.wallet.set_group_label(group_id, group_label)
×
1177

1178
        return groups
×
1179

1180
    def channel_peers(self) -> List[bytes]:
5✔
1181
        node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()]
×
1182
        return node_ids
×
1183

1184
    def channels_for_peer(self, node_id):
5✔
1185
        assert type(node_id) is bytes
5✔
1186
        return {chan_id: chan for (chan_id, chan) in self.channels.items()
5✔
1187
                if chan.node_id == node_id}
1188

1189
    def channel_state_changed(self, chan: Channel):
5✔
1190
        if type(chan) is Channel:
×
1191
            self.save_channel(chan)
×
1192
        self.clear_invoices_cache()
×
1193
        util.trigger_callback('channel', self.wallet, chan)
×
1194

1195
    def save_channel(self, chan: Channel):
5✔
1196
        assert type(chan) is Channel
×
1197
        if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point:
×
1198
            raise Exception("Tried to save channel with next_point == current_point, this should not happen")
×
1199
        self.wallet.save_db()
×
1200
        util.trigger_callback('channel', self.wallet, chan)
×
1201

1202
    def channel_by_txo(self, txo: str) -> Optional[AbstractChannel]:
5✔
1203
        for chan in self.channels.values():
5✔
1204
            if chan.funding_outpoint.to_str() == txo:
5✔
1205
                return chan
5✔
1206
        for chan in self.channel_backups.values():
×
1207
            if chan.funding_outpoint.to_str() == txo:
×
1208
                return chan
×
1209

1210
    def handle_onchain_state(self, chan: Channel):
5✔
1211
        if self.network is None:
5✔
1212
            # network not started yet
1213
            return
5✔
1214

1215
        if type(chan) is ChannelBackup:
×
1216
            util.trigger_callback('channel', self.wallet, chan)
×
1217
            return
×
1218

1219
        if (chan.get_state() in (ChannelState.OPEN, ChannelState.SHUTDOWN)
×
1220
                and chan.should_be_closed_due_to_expiring_htlcs(self.wallet.adb.get_local_height())):
1221
            self.logger.info(f"force-closing due to expiring htlcs")
×
1222
            asyncio.ensure_future(self.schedule_force_closing(chan.channel_id))
×
1223

1224
        elif chan.get_state() == ChannelState.FUNDED:
×
1225
            peer = self._peers.get(chan.node_id)
×
1226
            if peer and peer.is_initialized() and chan.peer_state == PeerState.GOOD:
×
1227
                peer.send_channel_ready(chan)
×
1228

1229
        elif chan.get_state() == ChannelState.OPEN:
×
1230
            peer = self._peers.get(chan.node_id)
×
1231
            if peer and peer.is_initialized() and chan.peer_state == PeerState.GOOD:
×
1232
                peer.maybe_update_fee(chan)
×
1233
                peer.maybe_send_announcement_signatures(chan)
×
1234

1235
        elif chan.get_state() == ChannelState.FORCE_CLOSING:
×
1236
            force_close_tx = chan.force_close_tx()
×
1237
            txid = force_close_tx.txid()
×
1238
            height = self.lnwatcher.adb.get_tx_height(txid).height
×
1239
            if height == TX_HEIGHT_LOCAL:
×
1240
                self.logger.info('REBROADCASTING CLOSING TX')
×
1241
                asyncio.ensure_future(self.network.try_broadcasting(force_close_tx, 'force-close'))
×
1242

1243
    def get_peer_by_static_jit_scid_alias(self, scid_alias: bytes) -> Optional[Peer]:
5✔
1244
        for nodeid, peer in self.peers.items():
×
1245
            if scid_alias == self._scid_alias_of_node(nodeid):
×
1246
                return peer
×
1247

1248
    def _scid_alias_of_node(self, nodeid: bytes) -> bytes:
5✔
1249
        # scid alias for just-in-time channels
1250
        return sha256(b'Electrum' + nodeid)[0:8]
×
1251

1252
    def get_static_jit_scid_alias(self) -> bytes:
5✔
1253
        return self._scid_alias_of_node(self.node_keypair.pubkey)
×
1254

1255
    @log_exceptions
5✔
1256
    async def open_channel_just_in_time(
5✔
1257
        self,
1258
        *,
1259
        next_peer: Peer,
1260
        next_amount_msat_htlc: int,
1261
        next_cltv_abs: int,
1262
        payment_hash: bytes,
1263
        next_onion: OnionPacket,
1264
    ) -> str:
1265
        # if an exception is raised during negotiation, we raise an OnionRoutingFailure.
1266
        # this will cancel the incoming HTLC
1267

1268
        # prevent settling the htlc until the channel opening was successfull so we can fail it if needed
1269
        self.dont_settle_htlcs[payment_hash.hex()] = None
×
1270
        try:
×
1271
            funding_sat = 2 * (next_amount_msat_htlc // 1000) # try to fully spend htlcs
×
1272
            password = self.wallet.get_unlocked_password() if self.wallet.has_password() else None
×
1273
            channel_opening_fee = next_amount_msat_htlc // 100
×
1274
            if channel_opening_fee // 1000 < self.config.ZEROCONF_MIN_OPENING_FEE:
×
1275
                self.logger.info(f'rejecting JIT channel: payment too low')
×
1276
                raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, data=b'payment too low')
×
1277
            self.logger.info(f'channel opening fee (sats): {channel_opening_fee//1000}')
×
1278
            next_chan, funding_tx = await self.open_channel_with_peer(
×
1279
                next_peer, funding_sat,
1280
                push_sat=0,
1281
                zeroconf=True,
1282
                public=False,
1283
                opening_fee=channel_opening_fee,
1284
                password=password,
1285
            )
1286
            async def wait_for_channel():
×
1287
                while not next_chan.is_open():
×
1288
                    await asyncio.sleep(1)
×
1289
            await util.wait_for2(wait_for_channel(), LN_P2P_NETWORK_TIMEOUT)
×
1290
            next_chan.save_remote_scid_alias(self._scid_alias_of_node(next_peer.pubkey))
×
1291
            self.logger.info(f'JIT channel is open')
×
1292
            next_amount_msat_htlc -= channel_opening_fee
×
1293
            # fixme: some checks are missing
1294
            htlc = next_peer.send_htlc(
×
1295
                chan=next_chan,
1296
                payment_hash=payment_hash,
1297
                amount_msat=next_amount_msat_htlc,
1298
                cltv_abs=next_cltv_abs,
1299
                onion=next_onion)
1300
            async def wait_for_preimage():
×
1301
                while self.get_preimage(payment_hash) is None:
×
1302
                    await asyncio.sleep(1)
×
1303
            await util.wait_for2(wait_for_preimage(), LN_P2P_NETWORK_TIMEOUT)
×
1304

1305
            # We have been paid and can broadcast
1306
            # todo: if broadcasting raise an exception, we should try to rebroadcast
1307
            await self.network.broadcast_transaction(funding_tx)
×
1308
        except OnionRoutingFailure:
×
1309
            raise
×
1310
        except Exception:
×
1311
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
1312
        finally:
1313
            del self.dont_settle_htlcs[payment_hash.hex()]
×
1314

1315
        htlc_key = serialize_htlc_key(next_chan.get_scid_or_local_alias(), htlc.htlc_id)
×
1316
        return htlc_key
×
1317

1318
    @log_exceptions
5✔
1319
    async def open_channel_with_peer(
5✔
1320
            self, peer, funding_sat, *,
1321
            push_sat: int = 0,
1322
            public: bool = False,
1323
            zeroconf: bool = False,
1324
            opening_fee: int = None,
1325
            password=None):
1326
        if self.config.ENABLE_ANCHOR_CHANNELS:
×
1327
            self.wallet.unlock(password)
×
1328
        coins = self.wallet.get_spendable_coins(None)
×
1329
        node_id = peer.pubkey
×
1330
        fee_policy = FeePolicy(self.config.FEE_POLICY)
×
1331
        funding_tx = self.mktx_for_open_channel(
×
1332
            coins=coins,
1333
            funding_sat=funding_sat,
1334
            node_id=node_id,
1335
            fee_policy=fee_policy)
1336
        chan, funding_tx = await self._open_channel_coroutine(
×
1337
            peer=peer,
1338
            funding_tx=funding_tx,
1339
            funding_sat=funding_sat,
1340
            push_sat=push_sat,
1341
            public=public,
1342
            zeroconf=zeroconf,
1343
            opening_fee=opening_fee,
1344
            password=password)
1345
        return chan, funding_tx
×
1346

1347
    @log_exceptions
5✔
1348
    async def _open_channel_coroutine(
5✔
1349
            self, *,
1350
            peer: Peer,
1351
            funding_tx: PartialTransaction,
1352
            funding_sat: int,
1353
            push_sat: int,
1354
            public: bool,
1355
            zeroconf=False,
1356
            opening_fee=None,
1357
            password: Optional[str],
1358
    ) -> Tuple[Channel, PartialTransaction]:
1359

1360
        if funding_sat > self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1361
            raise Exception(
×
1362
                _("Requested channel capacity is over maximum.")
1363
                + f"\n{funding_sat} sat > {self.config.LIGHTNING_MAX_FUNDING_SAT} sat"
1364
            )
1365
        coro = peer.channel_establishment_flow(
×
1366
            funding_tx=funding_tx,
1367
            funding_sat=funding_sat,
1368
            push_msat=push_sat * 1000,
1369
            public=public,
1370
            zeroconf=zeroconf,
1371
            opening_fee=opening_fee,
1372
            temp_channel_id=os.urandom(32))
1373
        chan, funding_tx = await util.wait_for2(coro, LN_P2P_NETWORK_TIMEOUT)
×
1374
        util.trigger_callback('channels_updated', self.wallet)
×
1375
        self.wallet.adb.add_transaction(funding_tx)  # save tx as local into the wallet
×
1376
        self.wallet.sign_transaction(funding_tx, password)
×
1377
        if funding_tx.is_complete() and not zeroconf:
×
1378
            await self.network.try_broadcasting(funding_tx, 'open_channel')
×
1379
        return chan, funding_tx
×
1380

1381
    def add_channel(self, chan: Channel):
5✔
1382
        with self.lock:
×
1383
            self._channels[chan.channel_id] = chan
×
1384
        self.lnwatcher.add_channel(chan)
×
1385

1386
    def add_new_channel(self, chan: Channel):
5✔
1387
        self.add_channel(chan)
×
1388
        channels_db = self.db.get_dict('channels')
×
1389
        channels_db[chan.channel_id.hex()] = chan.storage
×
1390
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=True)
×
1391
        try:
×
1392
            self.save_channel(chan)
×
1393
        except Exception:
×
1394
            chan.set_state(ChannelState.REDEEMED)
×
1395
            self.remove_channel(chan.channel_id)
×
1396
            raise
×
1397

1398
    def cb_data(self, node_id: bytes) -> bytes:
5✔
1399
        return CB_MAGIC_BYTES + node_id[0:NODE_ID_PREFIX_LEN]
×
1400

1401
    def decrypt_cb_data(self, encrypted_data, funding_address):
5✔
1402
        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
×
1403
        nonce = funding_scripthash[0:12]
×
1404
        return chacha20_decrypt(key=self.backup_key, data=encrypted_data, nonce=nonce)
×
1405

1406
    def encrypt_cb_data(self, data, funding_address):
5✔
1407
        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
×
1408
        nonce = funding_scripthash[0:12]
×
1409
        # note: we are only using chacha20 instead of chacha20+poly1305 to save onchain space
1410
        #       (not have the 16 byte MAC). Otherwise, the latter would be preferable.
1411
        return chacha20_encrypt(key=self.backup_key, data=data, nonce=nonce)
×
1412

1413
    def mktx_for_open_channel(
5✔
1414
            self, *,
1415
            coins: Sequence[PartialTxInput],
1416
            funding_sat: int,
1417
            node_id: bytes,
1418
            fee_policy: FeePolicy,
1419
    ) -> PartialTransaction:
1420
        from .wallet import get_locktime_for_new_transaction
×
1421

1422
        outputs = [PartialTxOutput.from_address_and_value(DummyAddress.CHANNEL, funding_sat)]
×
1423
        if self.has_recoverable_channels():
×
1424
            dummy_scriptpubkey = make_op_return(self.cb_data(node_id))
×
1425
            outputs.append(PartialTxOutput(scriptpubkey=dummy_scriptpubkey, value=0))
×
1426
        tx = self.wallet.make_unsigned_transaction(
×
1427
            coins=coins,
1428
            outputs=outputs,
1429
            fee_policy=fee_policy,
1430
            # we do not know yet if peer accepts anchors, just assume they do
1431
            is_anchor_channel_opening=self.config.ENABLE_ANCHOR_CHANNELS,
1432
        )
1433
        tx.set_rbf(False)
×
1434
        # rm randomness from locktime, as we use the locktime as entropy for deriving the funding_privkey
1435
        # (and it would be confusing to get a collision as a consequence of the randomness)
1436
        tx.locktime = get_locktime_for_new_transaction(self.network, include_random_component=False)
×
1437
        return tx
×
1438

1439
    def suggest_funding_amount(self, amount_to_pay, coins) -> Tuple[int, int] | None:
5✔
1440
        """ whether we can pay amount_sat after opening a new channel"""
1441
        num_sats_can_send = int(self.num_sats_can_send())
×
1442
        lightning_needed = amount_to_pay - num_sats_can_send
×
1443
        assert lightning_needed > 0
×
1444
        min_funding_sat = lightning_needed + (lightning_needed // 20) + 1000  # safety margin
×
1445
        min_funding_sat = max(min_funding_sat, MIN_FUNDING_SAT)  # at least MIN_FUNDING_SAT
×
1446
        if min_funding_sat > self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1447
            return
×
1448
        fee_policy = FeePolicy(f'feerate:{FEERATE_FALLBACK_STATIC_FEE}')
×
1449
        try:
×
1450
            self.mktx_for_open_channel(coins=coins, funding_sat=min_funding_sat, node_id=bytes(32), fee_policy=fee_policy)
×
1451
            funding_sat = min_funding_sat
×
1452
        except NotEnoughFunds:
×
1453
            return
×
1454
        # if available, suggest twice that amount:
1455
        if 2 * min_funding_sat <= self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1456
            try:
×
1457
                self.mktx_for_open_channel(coins=coins, funding_sat=2*min_funding_sat, node_id=bytes(32), fee_policy=fee_policy)
×
1458
                funding_sat = 2 * min_funding_sat
×
1459
            except NotEnoughFunds:
×
1460
                pass
×
1461
        return funding_sat, min_funding_sat
×
1462

1463
    def open_channel(
5✔
1464
            self, *,
1465
            connect_str: str,
1466
            funding_tx: PartialTransaction,
1467
            funding_sat: int,
1468
            push_amt_sat: int,
1469
            public: bool = False,
1470
            password: str = None,
1471
    ) -> Tuple[Channel, PartialTransaction]:
1472

1473
        fut = asyncio.run_coroutine_threadsafe(self.add_peer(connect_str), self.network.asyncio_loop)
×
1474
        try:
×
1475
            peer = fut.result()
×
1476
        except concurrent.futures.TimeoutError:
×
1477
            raise Exception(_("add peer timed out"))
×
1478
        coro = self._open_channel_coroutine(
×
1479
            peer=peer,
1480
            funding_tx=funding_tx,
1481
            funding_sat=funding_sat,
1482
            push_sat=push_amt_sat,
1483
            public=public,
1484
            password=password)
1485
        fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
×
1486
        try:
×
1487
            chan, funding_tx = fut.result()
×
1488
        except concurrent.futures.TimeoutError:
×
1489
            raise Exception(_("open_channel timed out"))
×
1490
        return chan, funding_tx
×
1491

1492
    def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]:
5✔
1493
        # First check against *real* SCIDs.
1494
        # This e.g. protects against maliciously chosen SCID aliases, and accidental collisions.
1495
        for chan in self.channels.values():
×
1496
            if chan.short_channel_id == short_channel_id:
×
1497
                return chan
×
1498
        # Now we also consider aliases.
1499
        # TODO we should split this as this search currently ignores the "direction"
1500
        #      of the aliases. We should only look at either the remote OR the local alias,
1501
        #      depending on context.
1502
        for chan in self.channels.values():
×
1503
            if chan.get_remote_scid_alias() == short_channel_id:
×
1504
                return chan
×
1505
            if chan.get_local_scid_alias() == short_channel_id:
×
1506
                return chan
×
1507

1508
    def can_pay_invoice(self, invoice: Invoice) -> bool:
5✔
1509
        assert invoice.is_lightning()
×
1510
        return (invoice.get_amount_sat() or 0) <= self.num_sats_can_send()
×
1511

1512
    @log_exceptions
5✔
1513
    async def pay_invoice(
5✔
1514
            self, invoice: Invoice, *,
1515
            amount_msat: int = None,
1516
            attempts: int = None,  # used only in unit tests
1517
            full_path: LNPaymentPath = None,
1518
            channels: Optional[Sequence[Channel]] = None,
1519
    ) -> Tuple[bool, List[HtlcLog]]:
1520
        bolt11 = invoice.lightning_invoice
5✔
1521
        lnaddr = self._check_bolt11_invoice(bolt11, amount_msat=amount_msat)
5✔
1522
        min_final_cltv_delta = lnaddr.get_min_final_cltv_delta()
5✔
1523
        payment_hash = lnaddr.paymenthash
5✔
1524
        key = payment_hash.hex()
5✔
1525
        payment_secret = lnaddr.payment_secret
5✔
1526
        invoice_pubkey = lnaddr.pubkey.serialize()
5✔
1527
        invoice_features = lnaddr.get_features()
5✔
1528
        r_tags = lnaddr.get_routing_info('r')
5✔
1529
        amount_to_pay = lnaddr.get_amount_msat()
5✔
1530
        status = self.get_payment_status(payment_hash)
5✔
1531
        if status == PR_PAID:
5✔
1532
            raise PaymentFailure(_("This invoice has been paid already"))
×
1533
        if status == PR_INFLIGHT:
5✔
1534
            raise PaymentFailure(_("A payment was already initiated for this invoice"))
×
1535
        if payment_hash in self.get_payments(status='inflight'):
5✔
1536
            raise PaymentFailure(_("A previous attempt to pay this invoice did not clear"))
×
1537
        info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID)
5✔
1538
        self.save_payment_info(info)
5✔
1539
        self.wallet.set_label(key, lnaddr.get_description())
5✔
1540
        self.set_invoice_status(key, PR_INFLIGHT)
5✔
1541
        budget = PaymentFeeBudget.default(invoice_amount_msat=amount_to_pay, config=self.config)
5✔
1542
        if attempts is None and self.uses_trampoline():
5✔
1543
            # we don't expect lots of failed htlcs with trampoline, so we can fail sooner
1544
            attempts = 30
5✔
1545
        success = False
5✔
1546
        try:
5✔
1547
            await self.pay_to_node(
5✔
1548
                node_pubkey=invoice_pubkey,
1549
                payment_hash=payment_hash,
1550
                payment_secret=payment_secret,
1551
                amount_to_pay=amount_to_pay,
1552
                min_final_cltv_delta=min_final_cltv_delta,
1553
                r_tags=r_tags,
1554
                invoice_features=invoice_features,
1555
                attempts=attempts,
1556
                full_path=full_path,
1557
                channels=channels,
1558
                budget=budget,
1559
            )
1560
            success = True
5✔
1561
        except PaymentFailure as e:
5✔
1562
            self.logger.info(f'payment failure: {e!r}')
5✔
1563
            reason = str(e)
5✔
1564
        except ChannelDBNotLoaded as e:
5✔
1565
            self.logger.info(f'payment failure: {e!r}')
×
1566
            reason = str(e)
×
1567
        finally:
1568
            self.logger.info(f"pay_invoice ending session for RHASH={payment_hash.hex()}. {success=}")
5✔
1569
        if success:
5✔
1570
            self.set_invoice_status(key, PR_PAID)
5✔
1571
            util.trigger_callback('payment_succeeded', self.wallet, key)
5✔
1572
        else:
1573
            self.set_invoice_status(key, PR_UNPAID)
5✔
1574
            util.trigger_callback('payment_failed', self.wallet, key, reason)
5✔
1575
        log = self.logs[key]
5✔
1576
        return success, log
5✔
1577

1578
    async def pay_to_node(
5✔
1579
            self, *,
1580
            node_pubkey: bytes,
1581
            payment_hash: bytes,
1582
            payment_secret: bytes,
1583
            amount_to_pay: int,  # in msat
1584
            min_final_cltv_delta: int,
1585
            r_tags,
1586
            invoice_features: int,
1587
            attempts: int = None,
1588
            full_path: LNPaymentPath = None,
1589
            fwd_trampoline_onion: OnionPacket = None,
1590
            budget: PaymentFeeBudget,
1591
            channels: Optional[Sequence[Channel]] = None,
1592
            fw_payment_key: str = None,  # for forwarding
1593
    ) -> None:
1594

1595
        assert budget
5✔
1596
        assert budget.fee_msat >= 0, budget
5✔
1597
        assert budget.cltv >= 0, budget
5✔
1598

1599
        payment_key = payment_hash + payment_secret
5✔
1600
        assert payment_key not in self._paysessions
5✔
1601
        self._paysessions[payment_key] = paysession = PaySession(
5✔
1602
            payment_hash=payment_hash,
1603
            payment_secret=payment_secret,
1604
            initial_trampoline_fee_level=self.config.INITIAL_TRAMPOLINE_FEE_LEVEL,
1605
            invoice_features=invoice_features,
1606
            r_tags=r_tags,
1607
            min_final_cltv_delta=min_final_cltv_delta,
1608
            amount_to_pay=amount_to_pay,
1609
            invoice_pubkey=node_pubkey,
1610
            uses_trampoline=self.uses_trampoline(),
1611
            use_two_trampolines=self.config.LIGHTNING_LEGACY_ADD_TRAMPOLINE,
1612
        )
1613
        self.logs[payment_hash.hex()] = log = []  # TODO incl payment_secret in key (re trampoline forwarding)
5✔
1614

1615
        paysession.logger.info(
5✔
1616
            f"pay_to_node starting session for RHASH={payment_hash.hex()}. "
1617
            f"using_trampoline={self.uses_trampoline()}. "
1618
            f"invoice_features={paysession.invoice_features.get_names()}. "
1619
            f"{amount_to_pay=} msat. {budget=}")
1620
        if not self.uses_trampoline():
5✔
1621
            self.logger.info(
5✔
1622
                f"gossip_db status. sync progress: {self.network.lngossip.get_sync_progress_estimate()}. "
1623
                f"num_nodes={self.channel_db.num_nodes}, "
1624
                f"num_channels={self.channel_db.num_channels}, "
1625
                f"num_policies={self.channel_db.num_policies}.")
1626

1627
        # when encountering trampoline forwarding difficulties in the legacy case, we
1628
        # sometimes need to fall back to a single trampoline forwarder, at the expense
1629
        # of privacy
1630
        try:
5✔
1631
            while True:
5✔
1632
                if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0:
5✔
1633
                    # 1. create a set of routes for remaining amount.
1634
                    # note: path-finding runs in a separate thread so that we don't block the asyncio loop
1635
                    # graph updates might occur during the computation
1636
                    remaining_fee_budget_msat = (budget.fee_msat * amount_to_send) // amount_to_pay
5✔
1637
                    routes = self.create_routes_for_payment(
5✔
1638
                        paysession=paysession,
1639
                        amount_msat=amount_to_send,
1640
                        full_path=full_path,
1641
                        fwd_trampoline_onion=fwd_trampoline_onion,
1642
                        channels=channels,
1643
                        budget=budget._replace(fee_msat=remaining_fee_budget_msat),
1644
                    )
1645
                    # 2. send htlcs
1646
                    async for sent_htlc_info, cltv_delta, trampoline_onion in routes:
5✔
1647
                        await self.pay_to_route(
5✔
1648
                            paysession=paysession,
1649
                            sent_htlc_info=sent_htlc_info,
1650
                            min_final_cltv_delta=cltv_delta,
1651
                            trampoline_onion=trampoline_onion,
1652
                            fw_payment_key=fw_payment_key,
1653
                        )
1654
                    # invoice_status is triggered in self.set_invoice_status when it actually changes.
1655
                    # It is also triggered here to update progress for a lightning payment in the GUI
1656
                    # (e.g. attempt counter)
1657
                    util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT)
5✔
1658
                # 3. await a queue
1659
                htlc_log = await paysession.wait_for_one_htlc_to_resolve()  # TODO maybe wait a bit, more failures might come
5✔
1660
                log.append(htlc_log)
5✔
1661
                if htlc_log.success:
5✔
1662
                    if self.network.path_finder:
5✔
1663
                        # TODO: report every route to liquidity hints for mpp
1664
                        # in the case of success, we report channels of the
1665
                        # route as being able to send the same amount in the future,
1666
                        # as we assume to not know the capacity
1667
                        self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat)
5✔
1668
                        # remove inflight htlcs from liquidity hints
1669
                        self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False)
5✔
1670
                    return
5✔
1671
                # htlc failed
1672
                # if we get a tmp channel failure, it might work to split the amount and try more routes
1673
                # if we get a channel update, we might retry the same route and amount
1674
                route = htlc_log.route
5✔
1675
                sender_idx = htlc_log.sender_idx
5✔
1676
                failure_msg = htlc_log.failure_msg
5✔
1677
                if sender_idx is None:
5✔
1678
                    raise PaymentFailure(failure_msg.code_name())
4✔
1679
                erring_node_id = route[sender_idx].node_id
5✔
1680
                code, data = failure_msg.code, failure_msg.data
5✔
1681
                self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
5✔
1682
                                 f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
1683
                self.logger.info(f"error reported by {erring_node_id.hex()}")
5✔
1684
                if code == OnionFailureCode.MPP_TIMEOUT:
5✔
1685
                    raise PaymentFailure(failure_msg.code_name())
5✔
1686
                # errors returned by the next trampoline.
1687
                if fwd_trampoline_onion and code in [
5✔
1688
                        OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
1689
                        OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON]:
1690
                    raise failure_msg
×
1691
                # trampoline
1692
                if self.uses_trampoline():
5✔
1693
                    paysession.handle_failed_trampoline_htlc(
5✔
1694
                        htlc_log=htlc_log, failure_msg=failure_msg)
1695
                else:
1696
                    self.handle_error_code_from_failed_htlc(
5✔
1697
                        route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat)
1698
                # max attempts or timeout
1699
                if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT):
5✔
1700
                    raise PaymentFailure('Giving up after %d attempts'%len(log))
5✔
1701
        finally:
1702
            paysession.is_active = False
5✔
1703
            if paysession.can_be_deleted():
5✔
1704
                self._paysessions.pop(payment_key)
5✔
1705
            paysession.logger.info(f"pay_to_node ending session for RHASH={payment_hash.hex()}")
5✔
1706

1707
    async def pay_to_route(
5✔
1708
            self, *,
1709
            paysession: PaySession,
1710
            sent_htlc_info: SentHtlcInfo,
1711
            min_final_cltv_delta: int,
1712
            trampoline_onion: Optional[OnionPacket] = None,
1713
            fw_payment_key: str = None,
1714
    ) -> None:
1715
        """Sends a single HTLC."""
1716
        shi = sent_htlc_info
5✔
1717
        del sent_htlc_info  # just renamed
5✔
1718
        short_channel_id = shi.route[0].short_channel_id
5✔
1719
        chan = self.get_channel_by_short_id(short_channel_id)
5✔
1720
        assert chan, ShortChannelID(short_channel_id)
5✔
1721
        peer = self._peers.get(shi.route[0].node_id)
5✔
1722
        if not peer:
5✔
1723
            raise PaymentFailure('Dropped peer')
×
1724
        await peer.initialized
5✔
1725
        htlc = peer.pay(
5✔
1726
            route=shi.route,
1727
            chan=chan,
1728
            amount_msat=shi.amount_msat,
1729
            total_msat=shi.bucket_msat,
1730
            payment_hash=paysession.payment_hash,
1731
            min_final_cltv_delta=min_final_cltv_delta,
1732
            payment_secret=shi.payment_secret_bucket,
1733
            trampoline_onion=trampoline_onion)
1734

1735
        key = (paysession.payment_hash, short_channel_id, htlc.htlc_id)
5✔
1736
        self.sent_htlcs_info[key] = shi
5✔
1737
        paysession.add_new_htlc(shi)
5✔
1738
        if fw_payment_key:
5✔
1739
            htlc_key = serialize_htlc_key(short_channel_id, htlc.htlc_id)
5✔
1740
            self.logger.info(f'adding active forwarding {fw_payment_key}')
5✔
1741
            self.active_forwardings[fw_payment_key].append(htlc_key)
5✔
1742
        if self.network.path_finder:
5✔
1743
            # add inflight htlcs to liquidity hints
1744
            self.network.path_finder.update_inflight_htlcs(shi.route, add_htlcs=True)
5✔
1745
        util.trigger_callback('htlc_added', chan, htlc, SENT)
5✔
1746

1747
    def handle_error_code_from_failed_htlc(
5✔
1748
            self,
1749
            *,
1750
            route: LNPaymentRoute,
1751
            sender_idx: int,
1752
            failure_msg: OnionRoutingFailure,
1753
            amount: int) -> None:
1754

1755
        assert self.channel_db  # cannot be in trampoline mode
5✔
1756
        assert self.network.path_finder
5✔
1757

1758
        # remove inflight htlcs from liquidity hints
1759
        self.network.path_finder.update_inflight_htlcs(route, add_htlcs=False)
5✔
1760

1761
        code, data = failure_msg.code, failure_msg.data
5✔
1762
        # TODO can we use lnmsg.OnionWireSerializer here?
1763
        # TODO update onion_wire.csv
1764
        # handle some specific error codes
1765
        failure_codes = {
5✔
1766
            OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0,
1767
            OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8,
1768
            OnionFailureCode.FEE_INSUFFICIENT: 8,
1769
            OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4,
1770
            OnionFailureCode.EXPIRY_TOO_SOON: 0,
1771
            OnionFailureCode.CHANNEL_DISABLED: 2,
1772
        }
1773
        try:
5✔
1774
            failing_channel = route[sender_idx + 1].short_channel_id
5✔
1775
        except IndexError:
5✔
1776
            raise PaymentFailure(f'payment destination reported error: {failure_msg.code_name()}') from None
5✔
1777

1778
        # TODO: handle unknown next peer?
1779
        # handle failure codes that include a channel update
1780
        if code in failure_codes:
5✔
1781
            offset = failure_codes[code]
5✔
1782
            channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big")
5✔
1783
            channel_update_as_received = data[offset+2: offset+2+channel_update_len]
5✔
1784
            payload = self._decode_channel_update_msg(channel_update_as_received)
5✔
1785
            if payload is None:
5✔
1786
                self.logger.info(f'could not decode channel_update for failed htlc: '
×
1787
                                 f'{channel_update_as_received.hex()}')
1788
                blacklist = True
×
1789
            elif payload.get('short_channel_id') != failing_channel:
5✔
1790
                self.logger.info(f'short_channel_id in channel_update does not match our route')
×
1791
                blacklist = True
×
1792
            else:
1793
                # apply the channel update or get blacklisted
1794
                blacklist, update = self._handle_chanupd_from_failed_htlc(
5✔
1795
                    payload, route=route, sender_idx=sender_idx, failure_msg=failure_msg)
1796
                # we interpret a temporary channel failure as a liquidity issue
1797
                # in the channel and update our liquidity hints accordingly
1798
                if code == OnionFailureCode.TEMPORARY_CHANNEL_FAILURE:
5✔
1799
                    self.network.path_finder.update_liquidity_hints(
5✔
1800
                        route,
1801
                        amount,
1802
                        failing_channel=ShortChannelID(failing_channel))
1803
                # if we can't decide on some action, we are stuck
1804
                if not (blacklist or update):
5✔
1805
                    raise PaymentFailure(failure_msg.code_name())
×
1806
        # for errors that do not include a channel update
1807
        else:
1808
            blacklist = True
5✔
1809
        if blacklist:
5✔
1810
            self.network.path_finder.add_edge_to_blacklist(short_channel_id=failing_channel)
5✔
1811

1812
    def _handle_chanupd_from_failed_htlc(
5✔
1813
        self, payload, *,
1814
        route: LNPaymentRoute,
1815
        sender_idx: int,
1816
        failure_msg: OnionRoutingFailure,
1817
    ) -> Tuple[bool, bool]:
1818
        blacklist = False
5✔
1819
        update = False
5✔
1820
        try:
5✔
1821
            r = self.channel_db.add_channel_update(payload, verify=True)
5✔
1822
        except InvalidGossipMsg:
×
1823
            return True, False  # blacklist
×
1824
        short_channel_id = ShortChannelID(payload['short_channel_id'])
5✔
1825
        if r == UpdateStatus.GOOD:
5✔
1826
            self.logger.info(f"applied channel update to {short_channel_id}")
×
1827
            # TODO: add test for this
1828
            # FIXME: this does not work for our own unannounced channels.
1829
            for chan in self.channels.values():
×
1830
                if chan.short_channel_id == short_channel_id:
×
1831
                    chan.set_remote_update(payload)
×
1832
            update = True
×
1833
        elif r == UpdateStatus.ORPHANED:
5✔
1834
            # maybe it is a private channel (and data in invoice was outdated)
1835
            self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
5✔
1836
            start_node_id = route[sender_idx].node_id
5✔
1837
            cache_ttl = None
5✔
1838
            if failure_msg.code == OnionFailureCode.CHANNEL_DISABLED:
5✔
1839
                # eclair sends CHANNEL_DISABLED if its peer is offline. E.g. we might be trying to pay
1840
                # a mobile phone with the app closed. So we cache this with a short TTL.
1841
                cache_ttl = self.channel_db.PRIVATE_CHAN_UPD_CACHE_TTL_SHORT
×
1842
            update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id, cache_ttl=cache_ttl)
5✔
1843
            blacklist = not update
5✔
1844
        elif r == UpdateStatus.EXPIRED:
×
1845
            blacklist = True
×
1846
        elif r == UpdateStatus.DEPRECATED:
×
1847
            self.logger.info(f'channel update is not more recent.')
×
1848
            blacklist = True
×
1849
        elif r == UpdateStatus.UNCHANGED:
×
1850
            blacklist = True
×
1851
        return blacklist, update
5✔
1852

1853
    @classmethod
5✔
1854
    def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]:
5✔
1855
        channel_update_as_received = chan_upd_msg
5✔
1856
        channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received
5✔
1857
        # note: some nodes put channel updates in error msgs with the leading msg_type already there.
1858
        #       we try decoding both ways here.
1859
        try:
5✔
1860
            message_type, payload = decode_msg(channel_update_typed)
5✔
1861
            if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
5✔
1862
            payload['raw'] = channel_update_typed
5✔
1863
            return payload
5✔
1864
        except Exception:  # FIXME: too broad
5✔
1865
            try:
5✔
1866
                message_type, payload = decode_msg(channel_update_as_received)
5✔
1867
                if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
5✔
1868
                payload['raw'] = channel_update_as_received
5✔
1869
                return payload
5✔
1870
            except Exception:
5✔
1871
                return None
5✔
1872

1873
    def _check_bolt11_invoice(self, bolt11_invoice: str, *, amount_msat: int = None) -> LnAddr:
5✔
1874
        """Parses and validates a bolt11 invoice str into a LnAddr.
1875
        Includes pre-payment checks external to the parser.
1876
        """
1877
        addr = lndecode(bolt11_invoice)
5✔
1878
        if addr.is_expired():
5✔
1879
            raise InvoiceError(_("This invoice has expired"))
×
1880
        # check amount
1881
        if amount_msat:  # replace amt in invoice. main usecase is paying zero amt invoices
5✔
1882
            existing_amt_msat = addr.get_amount_msat()
×
1883
            if existing_amt_msat and amount_msat < existing_amt_msat:
×
1884
                raise Exception("cannot pay lower amt than what is originally in LN invoice")
×
1885
            addr.amount = Decimal(amount_msat) / COIN / 1000
×
1886
        if addr.amount is None:
5✔
1887
            raise InvoiceError(_("Missing amount"))
×
1888
        # check cltv
1889
        if addr.get_min_final_cltv_delta() > NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE:
5✔
1890
            raise InvoiceError("{}\n{}".format(
5✔
1891
                _("Invoice wants us to risk locking funds for unreasonably long."),
1892
                f"min_final_cltv_delta: {addr.get_min_final_cltv_delta()}"))
1893
        # check features
1894
        addr.validate_and_compare_features(self.features)
5✔
1895
        return addr
5✔
1896

1897
    def is_trampoline_peer(self, node_id: bytes) -> bool:
5✔
1898
        # until trampoline is advertised in lnfeatures, check against hardcoded list
1899
        if is_hardcoded_trampoline(node_id):
5✔
1900
            return True
5✔
1901
        peer = self._peers.get(node_id)
×
1902
        if not peer:
×
1903
            return False
×
1904
        return (peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ECLAIR)\
×
1905
                or peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM))
1906

1907
    def suggest_peer(self) -> Optional[bytes]:
5✔
1908
        if not self.uses_trampoline():
×
1909
            return self.lnrater.suggest_peer()
×
1910
        else:
1911
            return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey
×
1912

1913
    def suggest_splits(
5✔
1914
        self,
1915
        *,
1916
        amount_msat: int,
1917
        final_total_msat: int,
1918
        my_active_channels: Sequence[Channel],
1919
        invoice_features: LnFeatures,
1920
        r_tags,
1921
    ) -> List['SplitConfigRating']:
1922
        channels_with_funds = {
5✔
1923
            (chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL))
1924
            for chan in my_active_channels
1925
        }
1926
        self.logger.info(f"channels_with_funds: {channels_with_funds}")
5✔
1927
        exclude_single_part_payments = False
5✔
1928
        if self.uses_trampoline():
5✔
1929
            # in the case of a legacy payment, we don't allow splitting via different
1930
            # trampoline nodes, because of https://github.com/ACINQ/eclair/issues/2127
1931
            is_legacy, _ = is_legacy_relay(invoice_features, r_tags)
5✔
1932
            exclude_multinode_payments = is_legacy
5✔
1933
            # we don't split within a channel when sending to a trampoline node,
1934
            # the trampoline node will split for us
1935
            exclude_single_channel_splits = True
5✔
1936
        else:
1937
            exclude_multinode_payments = False
5✔
1938
            exclude_single_channel_splits = False
5✔
1939
            if invoice_features.supports(LnFeatures.BASIC_MPP_OPT) and not self.config.TEST_FORCE_DISABLE_MPP:
5✔
1940
                # if amt is still large compared to total_msat, split it:
1941
                if (amount_msat / final_total_msat > self.MPP_SPLIT_PART_FRACTION
5✔
1942
                        and amount_msat > self.MPP_SPLIT_PART_MINAMT_MSAT):
1943
                    exclude_single_part_payments = True
×
1944

1945
        def get_splits():
5✔
1946
            return suggest_splits(
5✔
1947
                amount_msat,
1948
                channels_with_funds,
1949
                exclude_single_part_payments=exclude_single_part_payments,
1950
                exclude_multinode_payments=exclude_multinode_payments,
1951
                exclude_single_channel_splits=exclude_single_channel_splits
1952
            )
1953

1954
        split_configurations = get_splits()
5✔
1955
        if not split_configurations and exclude_single_part_payments:
5✔
1956
            exclude_single_part_payments = False
×
1957
            split_configurations = get_splits()
×
1958
        self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
5✔
1959
        return split_configurations
5✔
1960

1961
    async def create_routes_for_payment(
5✔
1962
            self, *,
1963
            paysession: PaySession,
1964
            amount_msat: int,        # part of payment amount we want routes for now
1965
            fwd_trampoline_onion: OnionPacket = None,
1966
            full_path: LNPaymentPath = None,
1967
            channels: Optional[Sequence[Channel]] = None,
1968
            budget: PaymentFeeBudget,
1969
    ) -> AsyncGenerator[Tuple[SentHtlcInfo, int, Optional[OnionPacket]], None]:
1970

1971
        """Creates multiple routes for splitting a payment over the available
1972
        private channels.
1973

1974
        We first try to conduct the payment over a single channel. If that fails
1975
        and mpp is supported by the receiver, we will split the payment."""
1976
        trampoline_features = LnFeatures.VAR_ONION_OPT
5✔
1977
        local_height = self.wallet.adb.get_local_height()
5✔
1978
        fee_related_error = None  # type: Optional[FeeBudgetExceeded]
5✔
1979
        if channels:
5✔
1980
            my_active_channels = channels
×
1981
        else:
1982
            my_active_channels = [
5✔
1983
                chan for chan in self.channels.values() if
1984
                chan.is_active() and not chan.is_frozen_for_sending()]
1985
        # try random order
1986
        random.shuffle(my_active_channels)
5✔
1987
        split_configurations = self.suggest_splits(
5✔
1988
            amount_msat=amount_msat,
1989
            final_total_msat=paysession.amount_to_pay,
1990
            my_active_channels=my_active_channels,
1991
            invoice_features=paysession.invoice_features,
1992
            r_tags=paysession.r_tags,
1993
        )
1994
        for sc in split_configurations:
5✔
1995
            is_multichan_mpp = len(sc.config.items()) > 1
5✔
1996
            is_mpp = sc.config.number_parts() > 1
5✔
1997
            if is_mpp and not paysession.invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
5✔
1998
                continue
5✔
1999
            if not is_mpp and self.config.TEST_FORCE_MPP:
5✔
2000
                continue
5✔
2001
            if is_mpp and self.config.TEST_FORCE_DISABLE_MPP:
5✔
2002
                continue
×
2003
            self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}")
5✔
2004
            routes = []
5✔
2005
            try:
5✔
2006
                if self.uses_trampoline():
5✔
2007
                    per_trampoline_channel_amounts = defaultdict(list)
5✔
2008
                    # categorize by trampoline nodes for trampoline mpp construction
2009
                    for (chan_id, _), part_amounts_msat in sc.config.items():
5✔
2010
                        chan = self.channels[chan_id]
5✔
2011
                        for part_amount_msat in part_amounts_msat:
5✔
2012
                            per_trampoline_channel_amounts[chan.node_id].append((chan_id, part_amount_msat))
5✔
2013
                    # for each trampoline forwarder, construct mpp trampoline
2014
                    for trampoline_node_id, trampoline_parts in per_trampoline_channel_amounts.items():
5✔
2015
                        per_trampoline_amount = sum([x[1] for x in trampoline_parts])
5✔
2016
                        trampoline_route, trampoline_onion, per_trampoline_amount_with_fees, per_trampoline_cltv_delta = create_trampoline_route_and_onion(
5✔
2017
                            amount_msat=per_trampoline_amount,
2018
                            total_msat=paysession.amount_to_pay,
2019
                            min_final_cltv_delta=paysession.min_final_cltv_delta,
2020
                            my_pubkey=self.node_keypair.pubkey,
2021
                            invoice_pubkey=paysession.invoice_pubkey,
2022
                            invoice_features=paysession.invoice_features,
2023
                            node_id=trampoline_node_id,
2024
                            r_tags=paysession.r_tags,
2025
                            payment_hash=paysession.payment_hash,
2026
                            payment_secret=paysession.payment_secret,
2027
                            local_height=local_height,
2028
                            trampoline_fee_level=paysession.trampoline_fee_level,
2029
                            use_two_trampolines=paysession.use_two_trampolines,
2030
                            failed_routes=paysession.failed_trampoline_routes,
2031
                            budget=budget._replace(fee_msat=budget.fee_msat // len(per_trampoline_channel_amounts)),
2032
                        )
2033
                        # node_features is only used to determine is_tlv
2034
                        per_trampoline_secret = os.urandom(32)
5✔
2035
                        per_trampoline_fees = per_trampoline_amount_with_fees - per_trampoline_amount
5✔
2036
                        self.logger.info(f'created route with trampoline fee level={paysession.trampoline_fee_level}')
5✔
2037
                        self.logger.info(f'trampoline hops: {[hop.end_node.hex() for hop in trampoline_route]}')
5✔
2038
                        self.logger.info(f'per trampoline fees: {per_trampoline_fees}')
5✔
2039
                        for chan_id, part_amount_msat in trampoline_parts:
5✔
2040
                            chan = self.channels[chan_id]
5✔
2041
                            margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat
5✔
2042
                            delta_fee = min(per_trampoline_fees, margin)
5✔
2043
                            # TODO: distribute trampoline fee over several channels?
2044
                            part_amount_msat_with_fees = part_amount_msat + delta_fee
5✔
2045
                            per_trampoline_fees -= delta_fee
5✔
2046
                            route = [
5✔
2047
                                RouteEdge(
2048
                                    start_node=self.node_keypair.pubkey,
2049
                                    end_node=trampoline_node_id,
2050
                                    short_channel_id=chan.short_channel_id,
2051
                                    fee_base_msat=0,
2052
                                    fee_proportional_millionths=0,
2053
                                    cltv_delta=0,
2054
                                    node_features=trampoline_features)
2055
                            ]
2056
                            self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
5✔
2057
                            shi = SentHtlcInfo(
5✔
2058
                                route=route,
2059
                                payment_secret_orig=paysession.payment_secret,
2060
                                payment_secret_bucket=per_trampoline_secret,
2061
                                amount_msat=part_amount_msat_with_fees,
2062
                                bucket_msat=per_trampoline_amount_with_fees,
2063
                                amount_receiver_msat=part_amount_msat,
2064
                                trampoline_fee_level=paysession.trampoline_fee_level,
2065
                                trampoline_route=trampoline_route,
2066
                            )
2067
                            routes.append((shi, per_trampoline_cltv_delta, trampoline_onion))
5✔
2068
                        if per_trampoline_fees != 0:
5✔
2069
                            e = 'not enough margin to pay trampoline fee'
×
2070
                            self.logger.info(e)
×
2071
                            raise FeeBudgetExceeded(e)
×
2072
                else:
2073
                    # We atomically loop through a split configuration. If there was
2074
                    # a failure to find a path for a single part, we try the next configuration
2075
                    for (chan_id, _), part_amounts_msat in sc.config.items():
5✔
2076
                        for part_amount_msat in part_amounts_msat:
5✔
2077
                            channel = self.channels[chan_id]
5✔
2078
                            route = await run_in_thread(
5✔
2079
                                partial(
2080
                                    self.create_route_for_single_htlc,
2081
                                    amount_msat=part_amount_msat,
2082
                                    invoice_pubkey=paysession.invoice_pubkey,
2083
                                    min_final_cltv_delta=paysession.min_final_cltv_delta,
2084
                                    r_tags=paysession.r_tags,
2085
                                    invoice_features=paysession.invoice_features,
2086
                                    my_sending_channels=[channel] if is_multichan_mpp else my_active_channels,
2087
                                    full_path=full_path,
2088
                                    budget=budget._replace(fee_msat=budget.fee_msat // sc.config.number_parts()),
2089
                                )
2090
                            )
2091
                            shi = SentHtlcInfo(
5✔
2092
                                route=route,
2093
                                payment_secret_orig=paysession.payment_secret,
2094
                                payment_secret_bucket=paysession.payment_secret,
2095
                                amount_msat=part_amount_msat,
2096
                                bucket_msat=paysession.amount_to_pay,
2097
                                amount_receiver_msat=part_amount_msat,
2098
                                trampoline_fee_level=None,
2099
                                trampoline_route=None,
2100
                            )
2101
                            routes.append((shi, paysession.min_final_cltv_delta, fwd_trampoline_onion))
5✔
2102
            except NoPathFound:
5✔
2103
                continue
5✔
2104
            except FeeBudgetExceeded as e:
5✔
2105
                fee_related_error = e
×
2106
                continue
×
2107
            for route in routes:
5✔
2108
                yield route
5✔
2109
            return
5✔
2110
        if fee_related_error is not None:
5✔
2111
            raise fee_related_error
×
2112
        raise NoPathFound()
5✔
2113

2114
    @profiler
5✔
2115
    def create_route_for_single_htlc(
5✔
2116
            self, *,
2117
            amount_msat: int,  # that final receiver gets
2118
            invoice_pubkey: bytes,
2119
            min_final_cltv_delta: int,
2120
            r_tags,
2121
            invoice_features: int,
2122
            my_sending_channels: List[Channel],
2123
            full_path: Optional[LNPaymentPath],
2124
            budget: PaymentFeeBudget,
2125
    ) -> LNPaymentRoute:
2126

2127
        my_sending_aliases = set(chan.get_local_scid_alias() for chan in my_sending_channels)
5✔
2128
        my_sending_channels = {chan.short_channel_id: chan for chan in my_sending_channels
5✔
2129
            if chan.short_channel_id is not None}
2130
        # Collect all private edges from route hints.
2131
        # Note: if some route hints are multiple edges long, and these paths cross each other,
2132
        #       we allow our path finding to cross the paths; i.e. the route hints are not isolated.
2133
        private_route_edges = {}  # type: Dict[ShortChannelID, RouteEdge]
5✔
2134
        for private_path in r_tags:
5✔
2135
            # we need to shift the node pubkey by one towards the destination:
2136
            private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey]
5✔
2137
            private_path_rest = [edge[1:] for edge in private_path]
5✔
2138
            start_node = private_path[0][0]
5✔
2139
            # remove aliases from direct routes
2140
            if len(private_path) == 1 and private_path[0][1] in my_sending_aliases:
5✔
2141
                self.logger.info(f'create_route: skipping alias {ShortChannelID(private_path[0][1])}')
×
2142
                continue
×
2143
            for end_node, edge_rest in zip(private_path_nodes, private_path_rest):
5✔
2144
                short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_delta = edge_rest
5✔
2145
                short_channel_id = ShortChannelID(short_channel_id)
5✔
2146
                if (our_chan := self.get_channel_by_short_id(short_channel_id)) is not None:
5✔
2147
                    # check if the channel is one of our channels and frozen for sending
2148
                    if our_chan.is_frozen_for_sending():
×
2149
                        continue
×
2150
                # if we have a routing policy for this edge in the db, that takes precedence,
2151
                # as it is likely from a previous failure
2152
                channel_policy = self.channel_db.get_policy_for_node(
5✔
2153
                    short_channel_id=short_channel_id,
2154
                    node_id=start_node,
2155
                    my_channels=my_sending_channels)
2156
                if channel_policy:
5✔
2157
                    fee_base_msat = channel_policy.fee_base_msat
5✔
2158
                    fee_proportional_millionths = channel_policy.fee_proportional_millionths
5✔
2159
                    cltv_delta = channel_policy.cltv_delta
5✔
2160
                node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node)
5✔
2161
                route_edge = RouteEdge(
5✔
2162
                        start_node=start_node,
2163
                        end_node=end_node,
2164
                        short_channel_id=short_channel_id,
2165
                        fee_base_msat=fee_base_msat,
2166
                        fee_proportional_millionths=fee_proportional_millionths,
2167
                        cltv_delta=cltv_delta,
2168
                        node_features=node_info.features if node_info else 0)
2169
                private_route_edges[route_edge.short_channel_id] = route_edge
5✔
2170
                start_node = end_node
5✔
2171
        # now find a route, end to end: between us and the recipient
2172
        try:
5✔
2173
            route = self.network.path_finder.find_route(
5✔
2174
                nodeA=self.node_keypair.pubkey,
2175
                nodeB=invoice_pubkey,
2176
                invoice_amount_msat=amount_msat,
2177
                path=full_path,
2178
                my_sending_channels=my_sending_channels,
2179
                private_route_edges=private_route_edges)
2180
        except NoChannelPolicy as e:
5✔
2181
            raise NoPathFound() from e
×
2182
        if not route:
5✔
2183
            raise NoPathFound()
5✔
2184
        if not is_route_within_budget(
5✔
2185
            route, budget=budget, amount_msat_for_dest=amount_msat, cltv_delta_for_dest=min_final_cltv_delta,
2186
        ):
2187
            self.logger.info(f"rejecting route (exceeds budget): {route=}. {budget=}")
×
2188
            raise FeeBudgetExceeded()
×
2189
        assert len(route) > 0
5✔
2190
        if route[-1].end_node != invoice_pubkey:
5✔
2191
            raise LNPathInconsistent("last node_id != invoice pubkey")
5✔
2192
        # add features from invoice
2193
        route[-1].node_features |= invoice_features
5✔
2194
        return route
5✔
2195

2196
    def clear_invoices_cache(self):
5✔
2197
        self._bolt11_cache.clear()
×
2198

2199
    def get_bolt11_invoice(
5✔
2200
            self, *,
2201
            payment_hash: bytes,
2202
            amount_msat: Optional[int],
2203
            message: str,
2204
            expiry: int,  # expiration of invoice (in seconds, relative)
2205
            fallback_address: Optional[str],
2206
            channels: Optional[Sequence[Channel]] = None,
2207
            min_final_cltv_expiry_delta: Optional[int] = None,
2208
    ) -> Tuple[LnAddr, str]:
2209
        assert isinstance(payment_hash, bytes), f"expected bytes, but got {type(payment_hash)}"
×
2210

2211
        pair = self._bolt11_cache.get(payment_hash)
×
2212
        if pair:
×
2213
            lnaddr, invoice = pair
×
2214
            assert lnaddr.get_amount_msat() == amount_msat
×
2215
            return pair
×
2216

2217
        assert amount_msat is None or amount_msat > 0
×
2218
        timestamp = int(time.time())
×
2219
        needs_jit: bool = self.receive_requires_jit_channel(amount_msat)
×
2220
        routing_hints = self.calc_routing_hints_for_invoice(amount_msat, channels=channels, needs_jit=needs_jit)
×
2221
        self.logger.info(f"creating bolt11 invoice with routing_hints: {routing_hints}, jit: {needs_jit}, sat: {amount_msat or 0 // 1000}")
×
2222
        invoice_features = self.features.for_invoice()
×
2223
        if not self.uses_trampoline():
×
2224
            invoice_features &= ~ LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM
×
2225
        if needs_jit:
×
2226
            # jit only works with single htlcs, mpp will cause LSP to open channels for each htlc
2227
            invoice_features &= ~ LnFeatures.BASIC_MPP_OPT & ~ LnFeatures.BASIC_MPP_REQ
×
2228
        payment_secret = self.get_payment_secret(payment_hash)
×
2229
        amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None
×
2230
        if expiry == 0:
×
2231
            expiry = LN_EXPIRY_NEVER
×
2232
        if min_final_cltv_expiry_delta is None:
×
2233
            min_final_cltv_expiry_delta = MIN_FINAL_CLTV_DELTA_FOR_INVOICE
×
2234
        lnaddr = LnAddr(
×
2235
            paymenthash=payment_hash,
2236
            amount=amount_btc,
2237
            tags=[
2238
                ('d', message),
2239
                ('c', min_final_cltv_expiry_delta),
2240
                ('x', expiry),
2241
                ('9', invoice_features),
2242
                ('f', fallback_address),
2243
            ] + routing_hints,
2244
            date=timestamp,
2245
            payment_secret=payment_secret)
2246
        invoice = lnencode(lnaddr, self.node_keypair.privkey)
×
2247
        pair = lnaddr, invoice
×
2248
        self._bolt11_cache[payment_hash] = pair
×
2249
        return pair
×
2250

2251
    def get_payment_secret(self, payment_hash):
5✔
2252
        return sha256(sha256(self.payment_secret_key) + payment_hash)
5✔
2253

2254
    def _get_payment_key(self, payment_hash: bytes) -> bytes:
5✔
2255
        """Return payment bucket key.
2256
        We bucket htlcs based on payment_hash+payment_secret. payment_secret is included
2257
        as it changes over a trampoline path (in the outer onion), and these paths can overlap.
2258
        """
2259
        payment_secret = self.get_payment_secret(payment_hash)
5✔
2260
        return payment_hash + payment_secret
5✔
2261

2262
    def create_payment_info(self, *, amount_msat: Optional[int], write_to_disk=True) -> bytes:
5✔
2263
        payment_preimage = os.urandom(32)
5✔
2264
        payment_hash = sha256(payment_preimage)
5✔
2265
        info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID)
5✔
2266
        self.save_preimage(payment_hash, payment_preimage, write_to_disk=False)
5✔
2267
        self.save_payment_info(info, write_to_disk=False)
5✔
2268
        if write_to_disk:
5✔
2269
            self.wallet.save_db()
×
2270
        return payment_hash
5✔
2271

2272
    def bundle_payments(self, hash_list):
5✔
2273
        payment_keys = [self._get_payment_key(x) for x in hash_list]
5✔
2274
        self.payment_bundles.append(payment_keys)
5✔
2275

2276
    def get_payment_bundle(self, payment_key: bytes) -> Sequence[bytes]:
5✔
2277
        for key_list in self.payment_bundles:
5✔
2278
            if payment_key in key_list:
5✔
2279
                return key_list
5✔
2280

2281
    def save_preimage(self, payment_hash: bytes, preimage: bytes, *, write_to_disk: bool = True):
5✔
2282
        if sha256(preimage) != payment_hash:
5✔
2283
            raise Exception("tried to save incorrect preimage for payment_hash")
×
2284
        self.preimages[payment_hash.hex()] = preimage.hex()
5✔
2285
        if write_to_disk:
5✔
2286
            self.wallet.save_db()
5✔
2287

2288
    def get_preimage(self, payment_hash: bytes) -> Optional[bytes]:
5✔
2289
        assert isinstance(payment_hash, bytes), f"expected bytes, but got {type(payment_hash)}"
5✔
2290
        preimage_hex = self.preimages.get(payment_hash.hex())
5✔
2291
        if preimage_hex is None:
5✔
2292
            return None
5✔
2293
        preimage_bytes = bytes.fromhex(preimage_hex)
5✔
2294
        if sha256(preimage_bytes) != payment_hash:
5✔
2295
            raise Exception("found incorrect preimage for payment_hash")
×
2296
        return preimage_bytes
5✔
2297

2298
    def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]:
5✔
2299
        """returns None if payment_hash is a payment we are forwarding"""
2300
        key = payment_hash.hex()
5✔
2301
        with self.lock:
5✔
2302
            if key in self.payment_info:
5✔
2303
                amount_msat, direction, status = self.payment_info[key]
5✔
2304
                return PaymentInfo(payment_hash, amount_msat, direction, status)
5✔
2305

2306
    def add_payment_info_for_hold_invoice(self, payment_hash: bytes, lightning_amount_sat: int):
5✔
2307
        info = PaymentInfo(payment_hash, lightning_amount_sat * 1000, RECEIVED, PR_UNPAID)
×
2308
        self.save_payment_info(info, write_to_disk=False)
×
2309

2310
    def register_hold_invoice(self, payment_hash: bytes, cb: Callable[[bytes], Awaitable[None]]):
5✔
2311
        self.hold_invoice_callbacks[payment_hash] = cb
5✔
2312

2313
    def unregister_hold_invoice(self, payment_hash: bytes):
5✔
2314
        self.hold_invoice_callbacks.pop(payment_hash)
×
2315

2316
    def save_payment_info(self, info: PaymentInfo, *, write_to_disk: bool = True) -> None:
5✔
2317
        key = info.payment_hash.hex()
5✔
2318
        assert info.status in SAVED_PR_STATUS
5✔
2319
        with self.lock:
5✔
2320
            self.payment_info[key] = info.amount_msat, info.direction, info.status
5✔
2321
        if write_to_disk:
5✔
2322
            self.wallet.save_db()
5✔
2323

2324
    def check_mpp_status(
5✔
2325
            self, *,
2326
            payment_secret: bytes,
2327
            short_channel_id: ShortChannelID,
2328
            htlc: UpdateAddHtlc,
2329
            expected_msat: int,
2330
    ) -> RecvMPPResolution:
2331
        """Returns the status of the incoming htlc set the given *htlc* belongs to.
2332

2333
        ACCEPTED simply means the mpp set is complete, and we can proceed with further
2334
        checks before fulfilling (or failing) the htlcs.
2335
        In particular, note that hold-invoice-htlcs typically remain in the ACCEPTED state
2336
        for quite some time -- not in the "WAITING" state (which would refer to the mpp set
2337
        not yet being complete!).
2338
        """
2339
        payment_hash = htlc.payment_hash
5✔
2340
        payment_key = payment_hash + payment_secret
5✔
2341
        self.update_mpp_with_received_htlc(
5✔
2342
            payment_key=payment_key, scid=short_channel_id, htlc=htlc, expected_msat=expected_msat)
2343
        mpp_resolution = self.received_mpp_htlcs[payment_key.hex()].resolution
5✔
2344
        # if still waiting, calc resolution now:
2345
        if mpp_resolution == RecvMPPResolution.WAITING:
5✔
2346
            bundle = self.get_payment_bundle(payment_key)
5✔
2347
            if bundle:
5✔
2348
                payment_keys = bundle
5✔
2349
            else:
2350
                payment_keys = [payment_key]
5✔
2351
            first_timestamp = min([self.get_first_timestamp_of_mpp(pkey) for pkey in payment_keys])
5✔
2352
            if self.get_payment_status(payment_hash) == PR_PAID:
5✔
2353
                mpp_resolution = RecvMPPResolution.ACCEPTED
×
2354
            elif self.stopping_soon:
5✔
2355
                # try to time out pending HTLCs before shutting down
2356
                mpp_resolution = RecvMPPResolution.EXPIRED
5✔
2357
            elif all([self.is_mpp_amount_reached(pkey) for pkey in payment_keys]):
5✔
2358
                mpp_resolution = RecvMPPResolution.ACCEPTED
5✔
2359
            elif time.time() - first_timestamp > self.MPP_EXPIRY:
5✔
2360
                mpp_resolution = RecvMPPResolution.EXPIRED
5✔
2361
            # save resolution, if any.
2362
            if mpp_resolution != RecvMPPResolution.WAITING:
5✔
2363
                for pkey in payment_keys:
5✔
2364
                    if pkey.hex() in self.received_mpp_htlcs:
5✔
2365
                        self.set_mpp_resolution(payment_key=pkey, resolution=mpp_resolution)
5✔
2366

2367
        return mpp_resolution
5✔
2368

2369
    def update_mpp_with_received_htlc(
5✔
2370
        self,
2371
        *,
2372
        payment_key: bytes,
2373
        scid: ShortChannelID,
2374
        htlc: UpdateAddHtlc,
2375
        expected_msat: int,
2376
    ):
2377
        # add new htlc to set
2378
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
5✔
2379
        if mpp_status is None:
5✔
2380
            mpp_status = ReceivedMPPStatus(
5✔
2381
                resolution=RecvMPPResolution.WAITING,
2382
                expected_msat=expected_msat,
2383
                htlc_set=set(),
2384
            )
2385
        if expected_msat != mpp_status.expected_msat:
5✔
2386
            self.logger.info(
5✔
2387
                f"marking received mpp as failed. inconsistent total_msats in bucket. {payment_key.hex()=}")
2388
            mpp_status = mpp_status._replace(resolution=RecvMPPResolution.FAILED)
5✔
2389
        key = (scid, htlc)
5✔
2390
        if key not in mpp_status.htlc_set:
5✔
2391
            mpp_status.htlc_set.add(key)  # side-effecting htlc_set
5✔
2392
        self.received_mpp_htlcs[payment_key.hex()] = mpp_status
5✔
2393

2394
    def set_mpp_resolution(self, *, payment_key: bytes, resolution: RecvMPPResolution):
5✔
2395
        mpp_status = self.received_mpp_htlcs[payment_key.hex()]
5✔
2396
        self.logger.info(f'set_mpp_resolution {resolution.name} {len(mpp_status.htlc_set)} {payment_key.hex()}')
5✔
2397
        self.received_mpp_htlcs[payment_key.hex()] = mpp_status._replace(resolution=resolution)
5✔
2398

2399
    def is_mpp_amount_reached(self, payment_key: bytes) -> bool:
5✔
2400
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
5✔
2401
        if not mpp_status:
5✔
2402
            return False
5✔
2403
        total = sum([_htlc.amount_msat for scid, _htlc in mpp_status.htlc_set])
5✔
2404
        return total >= mpp_status.expected_msat
5✔
2405

2406
    def is_accepted_mpp(self, payment_hash: bytes) -> bool:
5✔
2407
        payment_key = self._get_payment_key(payment_hash)
×
2408
        status = self.received_mpp_htlcs.get(payment_key.hex())
×
2409
        return status and status.resolution == RecvMPPResolution.ACCEPTED
×
2410

2411
    def get_first_timestamp_of_mpp(self, payment_key: bytes) -> int:
5✔
2412
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
5✔
2413
        if not mpp_status:
5✔
2414
            return int(time.time())
5✔
2415
        return min([_htlc.timestamp for scid, _htlc in mpp_status.htlc_set])
5✔
2416

2417
    def maybe_cleanup_mpp(
5✔
2418
            self,
2419
            short_channel_id: ShortChannelID,
2420
            htlc: UpdateAddHtlc,
2421
    ) -> None:
2422

2423
        htlc_key = (short_channel_id, htlc)
5✔
2424
        for payment_key_hex, mpp_status in list(self.received_mpp_htlcs.items()):
5✔
2425
            if htlc_key not in mpp_status.htlc_set:
5✔
2426
                continue
5✔
2427
            assert mpp_status.resolution != RecvMPPResolution.WAITING
5✔
2428
            self.logger.info(f'maybe_cleanup_mpp: removing htlc of MPP {payment_key_hex}')
5✔
2429
            mpp_status.htlc_set.remove(htlc_key)  # side-effecting htlc_set
5✔
2430
            if len(mpp_status.htlc_set) == 0:
5✔
2431
                self.logger.info(f'maybe_cleanup_mpp: removing mpp {payment_key_hex}')
5✔
2432
                self.received_mpp_htlcs.pop(payment_key_hex)
5✔
2433
                self.maybe_cleanup_forwarding(payment_key_hex)
5✔
2434

2435
    def maybe_cleanup_forwarding(self, payment_key_hex: str) -> None:
5✔
2436
        self.active_forwardings.pop(payment_key_hex, None)
5✔
2437
        self.forwarding_failures.pop(payment_key_hex, None)
5✔
2438

2439
    def get_payment_status(self, payment_hash: bytes) -> int:
5✔
2440
        info = self.get_payment_info(payment_hash)
5✔
2441
        return info.status if info else PR_UNPAID
5✔
2442

2443
    def get_invoice_status(self, invoice: BaseInvoice) -> int:
5✔
2444
        invoice_id = invoice.rhash
5✔
2445
        status = self.get_payment_status(bfh(invoice_id))
5✔
2446
        if status == PR_UNPAID and invoice_id in self.inflight_payments:
5✔
2447
            return PR_INFLIGHT
×
2448
        # status may be PR_FAILED
2449
        if status == PR_UNPAID and invoice_id in self.logs:
5✔
2450
            status = PR_FAILED
×
2451
        return status
5✔
2452

2453
    def set_invoice_status(self, key: str, status: int) -> None:
5✔
2454
        if status == PR_INFLIGHT:
5✔
2455
            self.inflight_payments.add(key)
5✔
2456
        elif key in self.inflight_payments:
5✔
2457
            self.inflight_payments.remove(key)
5✔
2458
        if status in SAVED_PR_STATUS:
5✔
2459
            self.set_payment_status(bfh(key), status)
5✔
2460
        util.trigger_callback('invoice_status', self.wallet, key, status)
5✔
2461
        self.logger.info(f"set_invoice_status {key}: {status}")
5✔
2462
        # liquidity changed
2463
        self.clear_invoices_cache()
5✔
2464

2465
    def set_request_status(self, payment_hash: bytes, status: int) -> None:
5✔
2466
        if self.get_payment_status(payment_hash) == status:
5✔
2467
            return
5✔
2468
        self.set_payment_status(payment_hash, status)
5✔
2469
        request_id = payment_hash.hex()
5✔
2470
        req = self.wallet.get_request(request_id)
5✔
2471
        if req is None:
5✔
2472
            return
5✔
2473
        util.trigger_callback('request_status', self.wallet, request_id, status)
5✔
2474

2475
    def set_payment_status(self, payment_hash: bytes, status: int) -> None:
5✔
2476
        info = self.get_payment_info(payment_hash)
5✔
2477
        if info is None:
5✔
2478
            # if we are forwarding
2479
            return
5✔
2480
        info = info._replace(status=status)
5✔
2481
        self.save_payment_info(info)
5✔
2482

2483
    def is_forwarded_htlc(self, htlc_key) -> Optional[str]:
5✔
2484
        """Returns whether this was a forwarded HTLC."""
2485
        for payment_key, htlcs in self.active_forwardings.items():
5✔
2486
            if htlc_key in htlcs:
5✔
2487
                return payment_key
5✔
2488

2489
    def notify_upstream_peer(self, htlc_key: str) -> None:
5✔
2490
        """Called when an HTLC we offered on chan gets irrevocably fulfilled or failed.
2491
        If we find this was a forwarded HTLC, the upstream peer is notified.
2492
        """
2493
        upstream_key = self.downstream_to_upstream_htlc.pop(htlc_key, None)
5✔
2494
        if not upstream_key:
5✔
2495
            return
4✔
2496
        upstream_chan_scid, _ = deserialize_htlc_key(upstream_key)
5✔
2497
        upstream_chan = self.get_channel_by_short_id(upstream_chan_scid)
5✔
2498
        upstream_peer = self.peers.get(upstream_chan.node_id) if upstream_chan else None
5✔
2499
        if upstream_peer:
5✔
2500
            upstream_peer.downstream_htlc_resolved_event.set()
5✔
2501
            upstream_peer.downstream_htlc_resolved_event.clear()
5✔
2502

2503
    def htlc_fulfilled(self, chan: Channel, payment_hash: bytes, htlc_id: int):
5✔
2504

2505
        util.trigger_callback('htlc_fulfilled', payment_hash, chan, htlc_id)
5✔
2506
        htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc_id)
5✔
2507
        fw_key = self.is_forwarded_htlc(htlc_key)
5✔
2508
        if fw_key:
5✔
2509
            fw_htlcs = self.active_forwardings[fw_key]
5✔
2510
            fw_htlcs.remove(htlc_key)
5✔
2511

2512
        shi = self.sent_htlcs_info.get((payment_hash, chan.short_channel_id, htlc_id))
5✔
2513
        if shi and htlc_id in chan.onion_keys:
5✔
2514
            chan.pop_onion_key(htlc_id)
5✔
2515
            payment_key = payment_hash + shi.payment_secret_orig
5✔
2516
            paysession = self._paysessions[payment_key]
5✔
2517
            q = paysession.sent_htlcs_q
5✔
2518
            htlc_log = HtlcLog(
5✔
2519
                success=True,
2520
                route=shi.route,
2521
                amount_msat=shi.amount_receiver_msat,
2522
                trampoline_fee_level=shi.trampoline_fee_level)
2523
            q.put_nowait(htlc_log)
5✔
2524
            if paysession.can_be_deleted():
5✔
2525
                self._paysessions.pop(payment_key)
5✔
2526
                paysession_active = False
5✔
2527
            else:
2528
                paysession_active = True
5✔
2529
        else:
2530
            if fw_key:
5✔
2531
                paysession_active = False
5✔
2532
            else:
2533
                key = payment_hash.hex()
5✔
2534
                self.set_invoice_status(key, PR_PAID)
5✔
2535
                util.trigger_callback('payment_succeeded', self.wallet, key)
5✔
2536

2537
        if fw_key:
5✔
2538
            fw_htlcs = self.active_forwardings[fw_key]
5✔
2539
            if len(fw_htlcs) == 0 and not paysession_active:
5✔
2540
                self.notify_upstream_peer(htlc_key)
5✔
2541

2542
    def htlc_failed(
5✔
2543
            self,
2544
            chan: Channel,
2545
            payment_hash: bytes,
2546
            htlc_id: int,
2547
            error_bytes: Optional[bytes],
2548
            failure_message: Optional['OnionRoutingFailure']):
2549
        # note: this may be called several times for the same htlc
2550

2551
        util.trigger_callback('htlc_failed', payment_hash, chan, htlc_id)
5✔
2552
        htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc_id)
5✔
2553
        fw_key = self.is_forwarded_htlc(htlc_key)
5✔
2554
        if fw_key:
5✔
2555
            fw_htlcs = self.active_forwardings[fw_key]
5✔
2556
            fw_htlcs.remove(htlc_key)
5✔
2557

2558
        shi = self.sent_htlcs_info.get((payment_hash, chan.short_channel_id, htlc_id))
5✔
2559
        if shi and htlc_id in chan.onion_keys:
5✔
2560
            onion_key = chan.pop_onion_key(htlc_id)
5✔
2561
            payment_okey = payment_hash + shi.payment_secret_orig
5✔
2562
            paysession = self._paysessions[payment_okey]
5✔
2563
            q = paysession.sent_htlcs_q
5✔
2564
            # detect if it is part of a bucket
2565
            # if yes, wait until the bucket completely failed
2566
            route = shi.route
5✔
2567
            if error_bytes:
5✔
2568
                # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone?
2569
                try:
5✔
2570
                    failure_message, sender_idx = decode_onion_error(
5✔
2571
                        error_bytes,
2572
                        [x.node_id for x in route],
2573
                        onion_key)
2574
                except Exception as e:
4✔
2575
                    sender_idx = None
4✔
2576
                    failure_message = OnionRoutingFailure(OnionFailureCode.INVALID_ONION_PAYLOAD, str(e).encode())
4✔
2577
            else:
2578
                # probably got "update_fail_malformed_htlc". well... who to penalise now?
2579
                assert failure_message is not None
×
2580
                sender_idx = None
×
2581
            self.logger.info(f"htlc_failed {failure_message}")
5✔
2582
            amount_receiver_msat = paysession.on_htlc_fail_get_fail_amt_to_propagate(shi)
5✔
2583
            if amount_receiver_msat is None:
5✔
2584
                return
5✔
2585
            if shi.trampoline_route:
5✔
2586
                route = shi.trampoline_route
5✔
2587
            htlc_log = HtlcLog(
5✔
2588
                success=False,
2589
                route=route,
2590
                amount_msat=amount_receiver_msat,
2591
                error_bytes=error_bytes,
2592
                failure_msg=failure_message,
2593
                sender_idx=sender_idx,
2594
                trampoline_fee_level=shi.trampoline_fee_level)
2595
            q.put_nowait(htlc_log)
5✔
2596
            if paysession.can_be_deleted():
5✔
2597
                self._paysessions.pop(payment_okey)
5✔
2598
                paysession_active = False
5✔
2599
            else:
2600
                paysession_active = True
5✔
2601
        else:
2602
            if fw_key:
5✔
2603
                paysession_active = False
5✔
2604
            else:
2605
                self.logger.info(f"received unknown htlc_failed, probably from previous session (phash={payment_hash.hex()})")
5✔
2606
                key = payment_hash.hex()
5✔
2607
                self.set_invoice_status(key, PR_UNPAID)
5✔
2608
                util.trigger_callback('payment_failed', self.wallet, key, '')
5✔
2609

2610
        if fw_key:
5✔
2611
            fw_htlcs = self.active_forwardings[fw_key]
5✔
2612
            can_forward_failure = (len(fw_htlcs) == 0) and not paysession_active
5✔
2613
            if can_forward_failure:
5✔
2614
                self.logger.info(f'htlc_failed: save_forwarding_failure (phash={payment_hash.hex()})')
5✔
2615
                self.save_forwarding_failure(fw_key, error_bytes=error_bytes, failure_message=failure_message)
5✔
2616
                self.notify_upstream_peer(htlc_key)
5✔
2617
            else:
2618
                self.logger.info(f'htlc_failed: waiting for other htlcs to fail (phash={payment_hash.hex()})')
5✔
2619

2620
    def calc_routing_hints_for_invoice(self, amount_msat: Optional[int], channels=None, needs_jit=False):
5✔
2621
        """calculate routing hints (BOLT-11 'r' field)"""
2622
        routing_hints = []
5✔
2623
        if needs_jit:
5✔
UNCOV
2624
            node_id, rest = extract_nodeid(self.config.ZEROCONF_TRUSTED_NODE)
×
2625
            alias_or_scid = self.get_static_jit_scid_alias()
×
2626
            routing_hints.append(('r', [(node_id, alias_or_scid, 0, 0, 144)]))
×
2627
            # no need for more because we cannot receive enough through the others and mpp is disabled for jit
UNCOV
2628
            channels = []
×
2629
        else:
2630
            if channels is None:
5✔
2631
                channels = list(self.get_channels_for_receiving(amount_msat=amount_msat, include_disconnected=True))
5✔
2632
                random.shuffle(channels)  # let's not leak channel order
5✔
2633
            scid_to_my_channels = {
5✔
2634
                chan.short_channel_id: chan for chan in channels
2635
                if chan.short_channel_id is not None
2636
            }
2637
        for chan in channels:
5✔
2638
            alias_or_scid = chan.get_remote_scid_alias() or chan.short_channel_id
5✔
2639
            assert isinstance(alias_or_scid, bytes), alias_or_scid
5✔
2640
            channel_info = get_mychannel_info(chan.short_channel_id, scid_to_my_channels)
5✔
2641
            # note: as a fallback, if we don't have a channel update for the
2642
            # incoming direction of our private channel, we fill the invoice with garbage.
2643
            # the sender should still be able to pay us, but will incur an extra round trip
2644
            # (they will get the channel update from the onion error)
2645
            # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066
2646
            fee_base_msat = fee_proportional_millionths = 0
5✔
2647
            cltv_delta = 1  # lnd won't even try with zero
5✔
2648
            missing_info = True
5✔
2649
            if channel_info:
5✔
2650
                policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels)
5✔
2651
                if policy:
5✔
2652
                    fee_base_msat = policy.fee_base_msat
5✔
2653
                    fee_proportional_millionths = policy.fee_proportional_millionths
5✔
2654
                    cltv_delta = policy.cltv_delta
5✔
2655
                    missing_info = False
5✔
2656
            if missing_info:
5✔
UNCOV
2657
                self.logger.info(
×
2658
                    f"Warning. Missing channel update for our channel {chan.short_channel_id}; "
2659
                    f"filling invoice with incorrect data.")
2660
            routing_hints.append(('r', [(
5✔
2661
                chan.node_id,
2662
                alias_or_scid,
2663
                fee_base_msat,
2664
                fee_proportional_millionths,
2665
                cltv_delta)]))
2666
        return routing_hints
5✔
2667

2668
    def delete_payment_info(self, payment_hash_hex: str):
5✔
2669
        # This method is called when an invoice or request is deleted by the user.
2670
        # The GUI only lets the user delete invoices or requests that have not been paid.
2671
        # Once an invoice/request has been paid, it is part of the history,
2672
        # and get_lightning_history assumes that payment_info is there.
UNCOV
2673
        assert self.get_payment_status(bytes.fromhex(payment_hash_hex)) != PR_PAID
×
2674
        with self.lock:
×
2675
            self.payment_info.pop(payment_hash_hex, None)
×
2676

2677
    def get_balance(self, frozen=False):
5✔
UNCOV
2678
        with self.lock:
×
2679
            return Decimal(sum(
×
2680
                chan.balance(LOCAL) if not chan.is_closed() and (chan.is_frozen_for_sending() if frozen else True) else 0
2681
                for chan in self.channels.values())) / 1000
2682

2683
    def get_channels_for_sending(self):
5✔
UNCOV
2684
        for c in self.channels.values():
×
2685
            if c.is_active() and not c.is_frozen_for_sending():
×
2686
                if self.channel_db or self.is_trampoline_peer(c.node_id):
×
2687
                    yield c
×
2688

2689
    def fee_estimate(self, amount_sat):
5✔
2690
        # Here we have to guess a fee, because some callers (submarine swaps)
2691
        # use this method to initiate a payment, which would otherwise fail.
UNCOV
2692
        fee_base_msat = 5000               # FIXME ehh.. there ought to be a better way...
×
2693
        fee_proportional_millionths = 500  # FIXME
×
2694
        # inverse of fee_for_edge_msat
UNCOV
2695
        amount_msat = amount_sat * 1000
×
2696
        amount_minus_fees = (amount_msat - fee_base_msat) * 1_000_000 // ( 1_000_000 + fee_proportional_millionths)
×
2697
        return Decimal(amount_msat - amount_minus_fees) / 1000
×
2698

2699
    def num_sats_can_send(self, deltas=None) -> Decimal:
5✔
2700
        """
2701
        without trampoline, sum of all channel capacity
2702
        with trampoline, MPP must use a single trampoline
2703
        """
UNCOV
2704
        if deltas is None:
×
2705
            deltas = {}
×
2706

UNCOV
2707
        def send_capacity(chan):
×
2708
            if chan in deltas:
×
2709
                delta_msat = deltas[chan] * 1000
×
2710
                if delta_msat > chan.available_to_spend(REMOTE):
×
2711
                    delta_msat = 0
×
2712
            else:
UNCOV
2713
                delta_msat = 0
×
2714
            return chan.available_to_spend(LOCAL) + delta_msat
×
2715
        can_send_dict = defaultdict(int)
×
2716
        with self.lock:
×
2717
            for c in self.get_channels_for_sending():
×
2718
                if not self.uses_trampoline():
×
2719
                    can_send_dict[0] += send_capacity(c)
×
2720
                else:
UNCOV
2721
                    can_send_dict[c.node_id] += send_capacity(c)
×
2722
        can_send = max(can_send_dict.values()) if can_send_dict else 0
×
2723
        can_send_sat = Decimal(can_send)/1000
×
2724
        can_send_sat -= self.fee_estimate(can_send_sat)
×
2725
        return max(can_send_sat, 0)
×
2726

2727
    def get_channels_for_receiving(
5✔
2728
        self, *, amount_msat: Optional[int] = None, include_disconnected: bool = False,
2729
    ) -> Sequence[Channel]:
2730
        if not amount_msat:  # assume we want to recv a large amt, e.g. finding max.
5✔
UNCOV
2731
            amount_msat = float('inf')
×
2732
        with self.lock:
5✔
2733
            channels = list(self.channels.values())
5✔
2734
            channels = [chan for chan in channels
5✔
2735
                        if chan.is_open() and not chan.is_frozen_for_receiving()]
2736

2737
            if not include_disconnected:
5✔
UNCOV
2738
                channels = [chan for chan in channels if chan.is_active()]
×
2739

2740
            # Filter out nodes that have low receive capacity compared to invoice amt.
2741
            # Even with MPP, below a certain threshold, including these channels probably
2742
            # hurts more than help, as they lead to many failed attempts for the sender.
2743
            channels = sorted(channels, key=lambda chan: -chan.available_to_spend(REMOTE))
5✔
2744
            selected_channels = []
5✔
2745
            running_sum = 0
5✔
2746
            cutoff_factor = 0.2  # heuristic
5✔
2747
            for chan in channels:
5✔
2748
                recv_capacity = chan.available_to_spend(REMOTE)
5✔
2749
                chan_can_handle_payment_as_single_part = recv_capacity >= amount_msat
5✔
2750
                chan_small_compared_to_running_sum = recv_capacity < cutoff_factor * running_sum
5✔
2751
                if not chan_can_handle_payment_as_single_part and chan_small_compared_to_running_sum:
5✔
2752
                    break
5✔
2753
                running_sum += recv_capacity
5✔
2754
                selected_channels.append(chan)
5✔
2755
            channels = selected_channels
5✔
2756
            del selected_channels
5✔
2757
            # cap max channels to include to keep QR code reasonably scannable
2758
            channels = channels[:10]
5✔
2759
            return channels
5✔
2760

2761
    def num_sats_can_receive(self, deltas=None) -> Decimal:
5✔
2762
        """
2763
        We no longer assume the sender to send MPP on different channels,
2764
        because channel liquidities are hard to guess
2765
        """
UNCOV
2766
        if deltas is None:
×
2767
            deltas = {}
×
2768

UNCOV
2769
        def recv_capacity(chan):
×
2770
            if chan in deltas:
×
2771
                delta_msat = deltas[chan] * 1000
×
2772
                if delta_msat > chan.available_to_spend(LOCAL):
×
2773
                    delta_msat = 0
×
2774
            else:
UNCOV
2775
                delta_msat = 0
×
2776
            return chan.available_to_spend(REMOTE) + delta_msat
×
2777
        with self.lock:
×
2778
            recv_channels = self.get_channels_for_receiving()
×
2779
            recv_chan_msats = [recv_capacity(chan) for chan in recv_channels]
×
2780
        if not recv_chan_msats:
×
2781
            return Decimal(0)
×
2782
        can_receive_msat = max(recv_chan_msats)
×
2783
        return Decimal(can_receive_msat) / 1000
×
2784

2785
    def receive_requires_jit_channel(self, amount_msat: Optional[int]) -> bool:
5✔
2786
        """Returns true if we cannot receive the amount and have set up a trusted LSP node.
2787
        Cannot work reliably with 0 amount invoices as we don't know if we are able to receive it.
2788
        """
2789
        # zeroconf provider is configured and connected
UNCOV
2790
        if (self.can_get_zeroconf_channel()
×
2791
                # we cannot receive the amount specified
2792
                and ((amount_msat and self.num_sats_can_receive() < (amount_msat // 1000))
2793
                    # or we cannot receive anything, and it's a 0 amount invoice
2794
                    or (not amount_msat and self.num_sats_can_receive() < 1))):
UNCOV
2795
            return True
×
2796
        return False
×
2797

2798
    def can_get_zeroconf_channel(self) -> bool:
5✔
UNCOV
2799
        if not self.config.ACCEPT_ZEROCONF_CHANNELS and self.config.ZEROCONF_TRUSTED_NODE:
×
2800
            # check if zeroconf is accepted and client has trusted zeroconf node configured
UNCOV
2801
            return False
×
2802
        try:
×
2803
            node_id = extract_nodeid(self.wallet.config.ZEROCONF_TRUSTED_NODE)[0]
×
2804
        except ConnStringFormatError:
×
2805
            # invalid connection string
UNCOV
2806
            return False
×
2807
        # only return True if we are connected to the zeroconf provider
UNCOV
2808
        return node_id in self.peers
×
2809

2810
    def _suggest_channels_for_rebalance(self, direction, amount_sat) -> Sequence[Tuple[Channel, int]]:
5✔
2811
        """
2812
        Suggest a channel and amount to send/receive with that channel, so that we will be able to receive/send amount_sat
2813
        This is used when suggesting a swap or rebalance in order to receive a payment
2814
        """
UNCOV
2815
        with self.lock:
×
2816
            func = self.num_sats_can_send if direction == SENT else self.num_sats_can_receive
×
2817
            suggestions = []
×
2818
            channels = self.get_channels_for_sending() if direction == SENT else self.get_channels_for_receiving()
×
2819
            for chan in channels:
×
2820
                available_sat = chan.available_to_spend(LOCAL if direction == SENT else REMOTE) // 1000
×
2821
                delta = amount_sat - available_sat
×
2822
                delta += self.fee_estimate(amount_sat)
×
2823
                # add safety margin
UNCOV
2824
                delta += delta // 100 + 1
×
2825
                if func(deltas={chan:delta}) >= amount_sat:
×
2826
                    suggestions.append((chan, delta))
×
2827
                elif direction==RECEIVED and func(deltas={chan:2*delta}) >= amount_sat:
×
2828
                    # MPP heuristics has a 0.5 slope
UNCOV
2829
                    suggestions.append((chan, 2*delta))
×
2830
        if not suggestions:
×
2831
            raise NotEnoughFunds
×
2832
        return suggestions
×
2833

2834
    def _suggest_rebalance(self, direction, amount_sat):
5✔
2835
        """
2836
        Suggest a rebalance in order to be able to send or receive amount_sat.
2837
        Returns (from_channel, to_channel, amount to shuffle)
2838
        """
UNCOV
2839
        try:
×
2840
            suggestions = self._suggest_channels_for_rebalance(direction, amount_sat)
×
2841
        except NotEnoughFunds:
×
2842
            return False
×
2843
        for chan2, delta in suggestions:
×
2844
            # margin for fee caused by rebalancing
UNCOV
2845
            delta += self.fee_estimate(amount_sat)
×
2846
            # find other channel or trampoline that can send delta
UNCOV
2847
            for chan1 in self.channels.values():
×
2848
                if chan1.is_frozen_for_sending() or not chan1.is_active():
×
2849
                    continue
×
2850
                if chan1 == chan2:
×
2851
                    continue
×
2852
                if self.uses_trampoline() and chan1.node_id == chan2.node_id:
×
2853
                    continue
×
2854
                if direction == SENT:
×
2855
                    if chan1.can_pay(delta*1000):
×
2856
                        return (chan1, chan2, delta)
×
2857
                else:
UNCOV
2858
                    if chan1.can_receive(delta*1000):
×
2859
                        return (chan2, chan1, delta)
×
2860
            else:
UNCOV
2861
                continue
×
2862
        else:
UNCOV
2863
            return False
×
2864

2865
    def num_sats_can_rebalance(self, chan1, chan2):
5✔
2866
        # TODO: we should be able to spend 'max', with variable fee
UNCOV
2867
        n1 = chan1.available_to_spend(LOCAL)
×
2868
        n1 -= self.fee_estimate(n1)
×
2869
        n2 = chan2.available_to_spend(REMOTE)
×
2870
        amount_sat = min(n1, n2) // 1000
×
2871
        return amount_sat
×
2872

2873
    def suggest_rebalance_to_send(self, amount_sat):
5✔
UNCOV
2874
        return self._suggest_rebalance(SENT, amount_sat)
×
2875

2876
    def suggest_rebalance_to_receive(self, amount_sat):
5✔
UNCOV
2877
        return self._suggest_rebalance(RECEIVED, amount_sat)
×
2878

2879
    def suggest_swap_to_send(self, amount_sat, coins):
5✔
2880
        # fixme: if swap_amount_sat is lower than the minimum swap amount, we need to propose a higher value
UNCOV
2881
        assert amount_sat > self.num_sats_can_send()
×
2882
        try:
×
2883
            suggestions = self._suggest_channels_for_rebalance(SENT, amount_sat)
×
2884
        except NotEnoughFunds:
×
2885
            return
×
2886
        for chan, swap_recv_amount in suggestions:
×
2887
            # check that we can send onchain
UNCOV
2888
            swap_server_mining_fee = 10000 # guessing, because we have not called get_pairs yet
×
2889
            swap_funding_sat = swap_recv_amount + swap_server_mining_fee
×
2890
            swap_output = PartialTxOutput.from_address_and_value(DummyAddress.SWAP, int(swap_funding_sat))
×
2891
            if not self.wallet.can_pay_onchain([swap_output], coins=coins):
×
2892
                continue
×
2893
            return chan, swap_recv_amount
×
2894

2895
    def suggest_swap_to_receive(self, amount_sat):
5✔
UNCOV
2896
        assert amount_sat > self.num_sats_can_receive()
×
2897
        try:
×
2898
            suggestions = self._suggest_channels_for_rebalance(RECEIVED, amount_sat)
×
2899
        except NotEnoughFunds:
×
2900
            return
×
2901
        for chan, swap_recv_amount in suggestions:
×
2902
            return chan, swap_recv_amount
×
2903

2904
    async def rebalance_channels(self, chan1: Channel, chan2: Channel, *, amount_msat: int):
5✔
UNCOV
2905
        if chan1 == chan2:
×
2906
            raise Exception('Rebalance requires two different channels')
×
2907
        if self.uses_trampoline() and chan1.node_id == chan2.node_id:
×
2908
            raise Exception('Rebalance requires channels from different trampolines')
×
2909
        payment_hash = self.create_payment_info(amount_msat=amount_msat)
×
2910
        lnaddr, invoice = self.get_bolt11_invoice(
×
2911
            payment_hash=payment_hash,
2912
            amount_msat=amount_msat,
2913
            message='rebalance',
2914
            expiry=3600,
2915
            fallback_address=None,
2916
            channels=[chan2],
2917
        )
UNCOV
2918
        invoice_obj = Invoice.from_bech32(invoice)
×
2919
        return await self.pay_invoice(invoice_obj, channels=[chan1])
×
2920

2921
    def can_receive_invoice(self, invoice: BaseInvoice) -> bool:
5✔
UNCOV
2922
        assert invoice.is_lightning()
×
2923
        return (invoice.get_amount_sat() or 0) <= self.num_sats_can_receive()
×
2924

2925
    async def close_channel(self, chan_id):
5✔
UNCOV
2926
        chan = self._channels[chan_id]
×
2927
        peer = self._peers[chan.node_id]
×
2928
        return await peer.close_channel(chan_id)
×
2929

2930
    def _force_close_channel(self, chan_id: bytes) -> Transaction:
5✔
2931
        chan = self._channels[chan_id]
5✔
2932
        tx = chan.force_close_tx()
5✔
2933
        # We set the channel state to make sure we won't sign new commitment txs.
2934
        # We expect the caller to try to broadcast this tx, after which it is
2935
        # not safe to keep using the channel even if the broadcast errors (server could be lying).
2936
        # Until the tx is seen in the mempool, there will be automatic rebroadcasts.
2937
        chan.set_state(ChannelState.FORCE_CLOSING)
5✔
2938
        # Add local tx to wallet to also allow manual rebroadcasts.
2939
        try:
5✔
2940
            self.wallet.adb.add_transaction(tx)
5✔
UNCOV
2941
        except UnrelatedTransactionException:
×
2942
            pass  # this can happen if (~all the balance goes to REMOTE)
×
2943
        return tx
5✔
2944

2945
    async def force_close_channel(self, chan_id: bytes) -> str:
5✔
2946
        """Force-close the channel. Network-related exceptions are propagated to the caller.
2947
        (automatic rebroadcasts will be scheduled)
2948
        """
2949
        # note: as we are async, it can take a few event loop iterations between the caller
2950
        #       "calling us" and us getting to run, and we only set the channel state now:
2951
        tx = self._force_close_channel(chan_id)
5✔
2952
        await self.network.broadcast_transaction(tx)
5✔
2953
        return tx.txid()
5✔
2954

2955
    def schedule_force_closing(self, chan_id: bytes) -> 'asyncio.Task[bool]':
5✔
2956
        """Schedules a task to force-close the channel and returns it.
2957
        Network-related exceptions are suppressed.
2958
        (automatic rebroadcasts will be scheduled)
2959
        Note: this method is intentionally not async so that callers have a guarantee
2960
              that the channel state is set immediately.
2961
        """
2962
        tx = self._force_close_channel(chan_id)
5✔
2963
        return asyncio.create_task(self.network.try_broadcasting(tx, 'force-close'))
5✔
2964

2965
    def remove_channel(self, chan_id):
5✔
UNCOV
2966
        chan = self.channels[chan_id]
×
2967
        assert chan.can_be_deleted()
×
2968
        with self.lock:
×
2969
            self._channels.pop(chan_id)
×
2970
            self.db.get('channels').pop(chan_id.hex())
×
2971
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=False)
×
2972

UNCOV
2973
        util.trigger_callback('channels_updated', self.wallet)
×
2974
        util.trigger_callback('wallet_updated', self.wallet)
×
2975

2976
    @ignore_exceptions
5✔
2977
    @log_exceptions
5✔
2978
    async def reestablish_peer_for_given_channel(self, chan: Channel) -> None:
5✔
UNCOV
2979
        now = time.time()
×
2980
        peer_addresses = []
×
2981
        if self.uses_trampoline():
×
2982
            addr = trampolines_by_id().get(chan.node_id)
×
2983
            if addr:
×
2984
                peer_addresses.append(addr)
×
2985
        else:
2986
            # will try last good address first, from gossip
UNCOV
2987
            last_good_addr = self.channel_db.get_last_good_address(chan.node_id)
×
2988
            if last_good_addr:
×
2989
                peer_addresses.append(last_good_addr)
×
2990
            # will try addresses for node_id from gossip
UNCOV
2991
            addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or []
×
2992
            for host, port, ts in addrs_from_gossip:
×
2993
                peer_addresses.append(LNPeerAddr(host, port, chan.node_id))
×
2994
        # will try addresses stored in channel storage
UNCOV
2995
        peer_addresses += list(chan.get_peer_addresses())
×
2996
        # Done gathering addresses.
2997
        # Now select first one that has not failed recently.
UNCOV
2998
        for peer in peer_addresses:
×
2999
            if self._can_retry_addr(peer, urgent=True, now=now):
×
3000
                await self._add_peer(peer.host, peer.port, peer.pubkey)
×
3001
                return
×
3002

3003
    async def reestablish_peers_and_channels(self):
5✔
UNCOV
3004
        while True:
×
3005
            await asyncio.sleep(1)
×
3006
            if self.stopping_soon:
×
3007
                return
×
3008
            if self.config.ZEROCONF_TRUSTED_NODE:
×
3009
                peer = LNPeerAddr.from_str(self.config.ZEROCONF_TRUSTED_NODE)
×
3010
                if self._can_retry_addr(peer, urgent=True):
×
3011
                    await self._add_peer(peer.host, peer.port, peer.pubkey)
×
3012
            for chan in self.channels.values():
×
3013
                # reestablish
3014
                # note: we delegate filtering out uninteresting chans to this:
UNCOV
3015
                if not chan.should_try_to_reestablish_peer():
×
3016
                    continue
×
3017
                peer = self._peers.get(chan.node_id, None)
×
3018
                if peer:
×
3019
                    await peer.taskgroup.spawn(peer.reestablish_channel(chan))
×
3020
                else:
UNCOV
3021
                    await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
×
3022

3023
    def current_target_feerate_per_kw(self) -> Optional[int]:
5✔
3024
        if self.network.fee_estimates.has_data():
5✔
3025
            feerate_per_kvbyte = self.network.fee_estimates.eta_target_to_fee(FEE_LN_ETA_TARGET)
5✔
3026
        else:
UNCOV
3027
            if constants.net is not constants.BitcoinRegtest:
×
3028
                return None
×
3029
            feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE
×
3030
        return max(FEERATE_PER_KW_MIN_RELAY_LIGHTNING, feerate_per_kvbyte // 4)
5✔
3031

3032
    def current_low_feerate_per_kw(self) -> Optional[int]:
5✔
3033
        if constants.net is constants.BitcoinRegtest:
5✔
UNCOV
3034
            feerate_per_kvbyte = 0
×
3035
        else:
3036
            if not self.network.fee_estimates.has_data():
5✔
UNCOV
3037
                return None
×
3038
            feerate_per_kvbyte = self.network.fee_estimates.eta_target_to_fee(FEE_LN_LOW_ETA_TARGET) or 0
5✔
3039
        low_feerate_per_kw = max(FEERATE_PER_KW_MIN_RELAY_LIGHTNING, feerate_per_kvbyte // 4)
5✔
3040
        # make sure this is never higher than the target feerate:
3041
        current_target_feerate = self.current_target_feerate_per_kw()
5✔
3042
        if not current_target_feerate:
5✔
UNCOV
3043
            return None
×
3044
        low_feerate_per_kw = min(low_feerate_per_kw, current_target_feerate)
5✔
3045
        return low_feerate_per_kw
5✔
3046

3047
    def create_channel_backup(self, channel_id: bytes):
5✔
UNCOV
3048
        chan = self._channels[channel_id]
×
3049
        # do not backup old-style channels
UNCOV
3050
        assert chan.is_static_remotekey_enabled()
×
3051
        peer_addresses = list(chan.get_peer_addresses())
×
3052
        peer_addr = peer_addresses[0]
×
3053
        return ImportedChannelBackupStorage(
×
3054
            node_id = chan.node_id,
3055
            privkey = self.node_keypair.privkey,
3056
            funding_txid = chan.funding_outpoint.txid,
3057
            funding_index = chan.funding_outpoint.output_index,
3058
            funding_address = chan.get_funding_address(),
3059
            host = peer_addr.host,
3060
            port = peer_addr.port,
3061
            is_initiator = chan.constraints.is_initiator,
3062
            channel_seed = chan.config[LOCAL].channel_seed,
3063
            local_delay = chan.config[LOCAL].to_self_delay,
3064
            remote_delay = chan.config[REMOTE].to_self_delay,
3065
            remote_revocation_pubkey = chan.config[REMOTE].revocation_basepoint.pubkey,
3066
            remote_payment_pubkey = chan.config[REMOTE].payment_basepoint.pubkey,
3067
            local_payment_pubkey=chan.config[LOCAL].payment_basepoint.pubkey,
3068
            multisig_funding_privkey=chan.config[LOCAL].multisig_key.privkey,
3069
        )
3070

3071
    def export_channel_backup(self, channel_id):
5✔
UNCOV
3072
        xpub = self.wallet.get_fingerprint()
×
3073
        backup_bytes = self.create_channel_backup(channel_id).to_bytes()
×
3074
        assert backup_bytes == ImportedChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed"
×
3075
        encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub)
×
3076
        assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed"
×
3077
        return 'channel_backup:' + encrypted
×
3078

3079
    async def request_force_close(self, channel_id: bytes, *, connect_str=None) -> None:
5✔
UNCOV
3080
        if channel_id in self.channels:
×
3081
            chan = self.channels[channel_id]
×
3082
            peer = self._peers.get(chan.node_id)
×
3083
            chan.should_request_force_close = True
×
3084
            if peer:
×
3085
                peer.close_and_cleanup()  # to force a reconnect
×
3086
        elif connect_str:
×
3087
            peer = await self.add_peer(connect_str)
×
3088
            await peer.request_force_close(channel_id)
×
3089
        elif channel_id in self.channel_backups:
×
3090
            await self._request_force_close_from_backup(channel_id)
×
3091
        else:
UNCOV
3092
            raise Exception(f'Unknown channel {channel_id.hex()}')
×
3093

3094
    def import_channel_backup(self, data):
5✔
UNCOV
3095
        xpub = self.wallet.get_fingerprint()
×
3096
        cb_storage = ImportedChannelBackupStorage.from_encrypted_str(data, password=xpub)
×
3097
        channel_id = cb_storage.channel_id()
×
3098
        if channel_id.hex() in self.db.get_dict("channels"):
×
3099
            raise Exception('Channel already in wallet')
×
3100
        self.logger.info(f'importing channel backup: {channel_id.hex()}')
×
3101
        d = self.db.get_dict("imported_channel_backups")
×
3102
        d[channel_id.hex()] = cb_storage
×
3103
        with self.lock:
×
3104
            cb = ChannelBackup(cb_storage, lnworker=self)
×
3105
            self._channel_backups[channel_id] = cb
×
3106
        self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
3107
        self.wallet.save_db()
×
3108
        util.trigger_callback('channels_updated', self.wallet)
×
3109
        self.lnwatcher.add_channel(cb)
×
3110

3111
    def has_conflicting_backup_with(self, remote_node_id: bytes):
5✔
3112
        """ Returns whether we have an active channel with this node on another device, using same local node id. """
UNCOV
3113
        channel_backup_peers = [
×
3114
            cb.node_id for cb in self.channel_backups.values()
3115
            if (not cb.is_closed() and cb.get_local_pubkey() == self.node_keypair.pubkey)]
UNCOV
3116
        return any(remote_node_id.startswith(cb_peer_nodeid) for cb_peer_nodeid in channel_backup_peers)
×
3117

3118
    def remove_channel_backup(self, channel_id):
5✔
UNCOV
3119
        chan = self.channel_backups[channel_id]
×
3120
        assert chan.can_be_deleted()
×
3121
        found = False
×
3122
        onchain_backups = self.db.get_dict("onchain_channel_backups")
×
3123
        imported_backups = self.db.get_dict("imported_channel_backups")
×
3124
        if channel_id.hex() in onchain_backups:
×
3125
            onchain_backups.pop(channel_id.hex())
×
3126
            found = True
×
3127
        if channel_id.hex() in imported_backups:
×
3128
            imported_backups.pop(channel_id.hex())
×
3129
            found = True
×
3130
        if not found:
×
3131
            raise Exception('Channel not found')
×
3132
        with self.lock:
×
3133
            self._channel_backups.pop(channel_id)
×
3134
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=False)
×
3135
        self.wallet.save_db()
×
3136
        util.trigger_callback('channels_updated', self.wallet)
×
3137

3138
    @log_exceptions
5✔
3139
    async def _request_force_close_from_backup(self, channel_id: bytes):
5✔
UNCOV
3140
        cb = self.channel_backups.get(channel_id)
×
3141
        if not cb:
×
3142
            raise Exception(f'channel backup not found {self.channel_backups}')
×
3143
        cb = cb.cb # storage
×
3144
        self.logger.info(f'requesting channel force close: {channel_id.hex()}')
×
3145
        if isinstance(cb, ImportedChannelBackupStorage):
×
3146
            node_id = cb.node_id
×
3147
            privkey = cb.privkey
×
3148
            addresses = [(cb.host, cb.port, 0)]
×
3149
        else:
UNCOV
3150
            assert isinstance(cb, OnchainChannelBackupStorage)
×
3151
            privkey = self.node_keypair.privkey
×
3152
            for pubkey, peer_addr in trampolines_by_id().items():
×
3153
                if pubkey.startswith(cb.node_id_prefix):
×
3154
                    node_id = pubkey
×
3155
                    addresses = [(peer_addr.host, peer_addr.port, 0)]
×
3156
                    break
×
3157
            else:
3158
                # we will try with gossip (see below)
UNCOV
3159
                addresses = []
×
3160

UNCOV
3161
        async def _request_fclose(addresses):
×
3162
            for host, port, timestamp in addresses:
×
3163
                peer_addr = LNPeerAddr(host, port, node_id)
×
3164
                transport = LNTransport(privkey, peer_addr, e_proxy=ESocksProxy.from_network_settings(self.network))
×
3165
                peer = Peer(self, node_id, transport, is_channel_backup=True)
×
3166
                try:
×
3167
                    async with OldTaskGroup(wait=any) as group:
×
3168
                        await group.spawn(peer._message_loop())
×
3169
                        await group.spawn(peer.request_force_close(channel_id))
×
3170
                    return True
×
3171
                except Exception as e:
×
3172
                    self.logger.info(f'failed to connect {host} {e}')
×
3173
                    continue
×
3174
            else:
UNCOV
3175
                return False
×
3176
        # try first without gossip db
UNCOV
3177
        success = await _request_fclose(addresses)
×
3178
        if success:
×
3179
            return
×
3180
        # try with gossip db
UNCOV
3181
        if self.uses_trampoline():
×
3182
            raise Exception(_('Please enable gossip'))
×
3183
        node_id = self.network.channel_db.get_node_by_prefix(cb.node_id_prefix)
×
3184
        addresses_from_gossip = self.network.channel_db.get_node_addresses(node_id)
×
3185
        if not addresses_from_gossip:
×
3186
            raise Exception('Peer not found in gossip database')
×
3187
        success = await _request_fclose(addresses_from_gossip)
×
3188
        if not success:
×
3189
            raise Exception('failed to connect')
×
3190

3191
    def maybe_add_backup_from_tx(self, tx):
5✔
3192
        funding_address = None
5✔
3193
        node_id_prefix = None
5✔
3194
        for i, o in enumerate(tx.outputs()):
5✔
3195
            script_type = get_script_type_from_output_script(o.scriptpubkey)
5✔
3196
            if script_type == 'p2wsh':
5✔
UNCOV
3197
                funding_index = i
×
3198
                funding_address = o.address
×
3199
                for o2 in tx.outputs():
×
3200
                    if o2.scriptpubkey.startswith(bytes([opcodes.OP_RETURN])):
×
3201
                        encrypted_data = o2.scriptpubkey[2:]
×
3202
                        data = self.decrypt_cb_data(encrypted_data, funding_address)
×
3203
                        if data.startswith(CB_MAGIC_BYTES):
×
3204
                            node_id_prefix = data[len(CB_MAGIC_BYTES):]
×
3205
        if node_id_prefix is None:
5✔
3206
            return
5✔
UNCOV
3207
        funding_txid = tx.txid()
×
3208
        cb_storage = OnchainChannelBackupStorage(
×
3209
            node_id_prefix = node_id_prefix,
3210
            funding_txid = funding_txid,
3211
            funding_index = funding_index,
3212
            funding_address = funding_address,
3213
            is_initiator = True)
UNCOV
3214
        channel_id = cb_storage.channel_id().hex()
×
3215
        if channel_id in self.db.get_dict("channels"):
×
3216
            return
×
3217
        self.logger.info(f"adding backup from tx")
×
3218
        d = self.db.get_dict("onchain_channel_backups")
×
3219
        d[channel_id] = cb_storage
×
3220
        cb = ChannelBackup(cb_storage, lnworker=self)
×
3221
        self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
3222
        self.wallet.save_db()
×
3223
        with self.lock:
×
3224
            self._channel_backups[bfh(channel_id)] = cb
×
3225
        util.trigger_callback('channels_updated', self.wallet)
×
3226
        self.lnwatcher.add_channel(cb)
×
3227

3228
    def save_forwarding_failure(
5✔
3229
            self, payment_key:str, *,
3230
            error_bytes: Optional[bytes] = None,
3231
            failure_message: Optional['OnionRoutingFailure'] = None):
3232
        error_hex = error_bytes.hex() if error_bytes else None
5✔
3233
        failure_hex = failure_message.to_bytes().hex() if failure_message else None
5✔
3234
        self.forwarding_failures[payment_key] = (error_hex, failure_hex)
5✔
3235

3236
    def get_forwarding_failure(self, payment_key: str) -> Tuple[Optional[bytes], Optional['OnionRoutingFailure']]:
5✔
3237
        error_hex, failure_hex = self.forwarding_failures.get(payment_key, (None, None))
5✔
3238
        error_bytes = bytes.fromhex(error_hex) if error_hex else None
5✔
3239
        failure_message = OnionRoutingFailure.from_bytes(bytes.fromhex(failure_hex)) if failure_hex else None
5✔
3240
        return error_bytes, failure_message
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc