• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5175397198856192

13 Aug 2025 01:48PM UTC coverage: 60.887% (-0.2%) from 61.069%
5175397198856192

Pull #10123

CirrusCI

accumulator
hww: fix crash when disabling keystore for hww
Pull Request #10123: wizard: add script and derivation to keystorewizard flow. fixes #10063

5 of 9 new or added lines in 1 file covered. (55.56%)

57 existing lines in 3 files now uncovered.

22538 of 37016 relevant lines covered (60.89%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.89
/electrum/lnworker.py
1
# Copyright (C) 2018 The Electrum developers
2
# Distributed under the MIT software license, see the accompanying
3
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
4

5
import asyncio
1✔
6
import os
1✔
7
from decimal import Decimal
1✔
8
import random
1✔
9
import time
1✔
10
from enum import IntEnum
1✔
11
from typing import (
1✔
12
    Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, NamedTuple, Mapping, Any, Iterable, AsyncGenerator,
13
    Callable, Awaitable
14
)
15
import threading
1✔
16
import socket
1✔
17
from functools import partial
1✔
18
from collections import defaultdict
1✔
19
import concurrent
1✔
20
from concurrent import futures
1✔
21
import urllib.parse
1✔
22
import itertools
1✔
23

24
import aiohttp
1✔
25
import dns.asyncresolver
1✔
26
import dns.exception
1✔
27
from aiorpcx import run_in_thread, NetAddress, ignore_after
1✔
28

29
from .logging import Logger
1✔
30
from .i18n import _
1✔
31
from .json_db import stored_in
1✔
32
from .channel_db import UpdateStatus, ChannelDBNotLoaded, get_mychannel_info, get_mychannel_policy
1✔
33

34
from . import constants, util
1✔
35
from .util import (
1✔
36
    profiler, OldTaskGroup, ESocksProxy, NetworkRetryManager, JsonRPCClient, NotEnoughFunds, EventListener,
37
    event_listener, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions, ignore_exceptions,
38
    make_aiohttp_session, random_shuffled_copy, is_private_netaddress,
39
    UnrelatedTransactionException, LightningHistoryItem
40
)
41
from .fee_policy import (
1✔
42
    FeePolicy, FEERATE_FALLBACK_STATIC_FEE, FEE_LN_ETA_TARGET, FEE_LN_LOW_ETA_TARGET,
43
    FEERATE_PER_KW_MIN_RELAY_LIGHTNING, FEE_LN_MINIMUM_ETA_TARGET
44
)
45
from .invoices import Invoice, PR_UNPAID, PR_PAID, PR_INFLIGHT, PR_FAILED, LN_EXPIRY_NEVER, BaseInvoice
1✔
46
from .bitcoin import COIN, opcodes, make_op_return, address_to_scripthash, DummyAddress
1✔
47
from .bip32 import BIP32Node
1✔
48
from .address_synchronizer import TX_HEIGHT_LOCAL
1✔
49
from .transaction import (
1✔
50
    Transaction, get_script_type_from_output_script, PartialTxOutput, PartialTransaction, PartialTxInput
51
)
52
from .crypto import (
1✔
53
    sha256, chacha20_encrypt, chacha20_decrypt, pw_encode_with_version_and_mac, pw_decode_with_version_and_mac
54
)
55

56
from .onion_message import OnionMessageManager
1✔
57
from .lntransport import (
1✔
58
    LNTransport, LNResponderTransport, LNTransportBase, LNPeerAddr, split_host_port, extract_nodeid,
59
    ConnStringFormatError
60
)
61
from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT
1✔
62
from .lnaddr import lnencode, LnAddr, lndecode
1✔
63
from .lnchannel import Channel, AbstractChannel, ChannelState, PeerState, HTLCWithStatus, ChannelBackup
1✔
64
from .lnrater import LNRater
1✔
65
from .lnutil import (
1✔
66
    get_compressed_pubkey_from_bech32, serialize_htlc_key, deserialize_htlc_key, PaymentFailure, generate_keypair,
67
    LnKeyFamily, LOCAL, REMOTE, MIN_FINAL_CLTV_DELTA_FOR_INVOICE, SENT, RECEIVED, HTLCOwner, UpdateAddHtlc, LnFeatures,
68
    ShortChannelID, HtlcLog, NoPathFound, InvalidGossipMsg, FeeBudgetExceeded, ImportedChannelBackupStorage,
69
    OnchainChannelBackupStorage, ln_compare_features, IncompatibleLightningFeatures, PaymentFeeBudget,
70
    NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE, GossipForwardingMessage, MIN_FUNDING_SAT
71
)
72
from .lnonion import decode_onion_error, OnionFailureCode, OnionRoutingFailure, OnionPacket
1✔
73
from .lnmsg import decode_msg
1✔
74
from .lnrouter import (
1✔
75
    RouteEdge, LNPaymentRoute, LNPaymentPath, is_route_within_budget, NoChannelPolicy, LNPathInconsistent
76
)
77
from .lnwatcher import LNWatcher
1✔
78
from .submarine_swaps import SwapManager
1✔
79
from .mpp_split import suggest_splits, SplitConfigRating
1✔
80
from .trampoline import (
1✔
81
    create_trampoline_route_and_onion, is_legacy_relay, trampolines_by_id, hardcoded_trampoline_nodes,
82
    is_hardcoded_trampoline
83
)
84

85
if TYPE_CHECKING:
1✔
86
    from .network import Network
×
87
    from .wallet import Abstract_Wallet
×
88
    from .channel_db import ChannelDB
×
89
    from .simple_config import SimpleConfig
×
90

91

92
SAVED_PR_STATUS = [PR_PAID, PR_UNPAID]  # status that are persisted
1✔
93

94
NUM_PEERS_TARGET = 4
1✔
95

96
# onchain channel backup data
97
CB_VERSION = 0
1✔
98
CB_MAGIC_BYTES = bytes([0, 0, 0, CB_VERSION])
1✔
99
NODE_ID_PREFIX_LEN = 16
1✔
100

101

102
class PaymentDirection(IntEnum):
1✔
103
    SENT = 0
1✔
104
    RECEIVED = 1
1✔
105
    SELF_PAYMENT = 2
1✔
106
    FORWARDING = 3
1✔
107

108

109
class PaymentInfo(NamedTuple):
1✔
110
    payment_hash: bytes
1✔
111
    amount_msat: Optional[int]
1✔
112
    direction: int
1✔
113
    status: int
1✔
114

115

116
# Note: these states are persisted in the wallet file.
117
# Do not modify them without performing a wallet db upgrade
118
class RecvMPPResolution(IntEnum):
1✔
119
    WAITING = 0
1✔
120
    EXPIRED = 1
1✔
121
    ACCEPTED = 2
1✔
122
    FAILED = 3
1✔
123

124

125
class ReceivedMPPStatus(NamedTuple):
1✔
126
    resolution: RecvMPPResolution
1✔
127
    expected_msat: int
1✔
128
    htlc_set: Set[Tuple[ShortChannelID, UpdateAddHtlc]]
1✔
129

130
    @stored_in('received_mpp_htlcs', tuple)
1✔
131
    def from_tuple(resolution, expected_msat, htlc_list) -> 'ReceivedMPPStatus':
1✔
132
        htlc_set = set([(ShortChannelID(bytes.fromhex(scid)), UpdateAddHtlc.from_tuple(*x)) for (scid, x) in htlc_list])
×
133
        return ReceivedMPPStatus(
×
134
            resolution=RecvMPPResolution(resolution),
135
            expected_msat=expected_msat,
136
            htlc_set=htlc_set)
137

138

139
SentHtlcKey = Tuple[bytes, ShortChannelID, int]  # RHASH, scid, htlc_id
1✔
140

141

142
class SentHtlcInfo(NamedTuple):
1✔
143
    route: LNPaymentRoute
1✔
144
    payment_secret_orig: bytes
1✔
145
    payment_secret_bucket: bytes
1✔
146
    amount_msat: int
1✔
147
    bucket_msat: int
1✔
148
    amount_receiver_msat: int
1✔
149
    trampoline_fee_level: Optional[int]
1✔
150
    trampoline_route: Optional[LNPaymentRoute]
1✔
151

152

153
class ErrorAddingPeer(Exception): pass
1✔
154

155

156
# set some feature flags as baseline for both LNWallet and LNGossip
157
# note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
158
BASE_FEATURES = (
1✔
159
    LnFeatures(0)
160
    | LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT
161
    | LnFeatures.OPTION_STATIC_REMOTEKEY_OPT
162
    | LnFeatures.VAR_ONION_OPT
163
    | LnFeatures.PAYMENT_SECRET_OPT
164
    | LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT
165
)
166

167
# we do not want to receive unrequested gossip (see lnpeer.maybe_save_remote_update)
168
LNWALLET_FEATURES = (
1✔
169
    BASE_FEATURES
170
    | LnFeatures.OPTION_DATA_LOSS_PROTECT_REQ
171
    | LnFeatures.OPTION_STATIC_REMOTEKEY_REQ
172
    | LnFeatures.VAR_ONION_REQ
173
    | LnFeatures.PAYMENT_SECRET_REQ
174
    | LnFeatures.BASIC_MPP_OPT
175
    | LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM
176
    | LnFeatures.OPTION_SHUTDOWN_ANYSEGWIT_OPT
177
    | LnFeatures.OPTION_CHANNEL_TYPE_OPT
178
    | LnFeatures.OPTION_SCID_ALIAS_OPT
179
    | LnFeatures.OPTION_SUPPORT_LARGE_CHANNEL_OPT
180
)
181

182
LNGOSSIP_FEATURES = (
1✔
183
    BASE_FEATURES
184
    # LNGossip doesn't serve gossip but weirdly have to signal so
185
    # that peers satisfy our queries
186
    | LnFeatures.GOSSIP_QUERIES_REQ
187
    | LnFeatures.GOSSIP_QUERIES_OPT
188
)
189

190

191
class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
1✔
192

193
    def __init__(self, node_keypair, features: LnFeatures, *, config: 'SimpleConfig'):
1✔
194
        Logger.__init__(self)
1✔
195
        NetworkRetryManager.__init__(
1✔
196
            self,
197
            max_retry_delay_normal=3600,
198
            init_retry_delay_normal=600,
199
            max_retry_delay_urgent=300,
200
            init_retry_delay_urgent=4,
201
        )
202
        self.lock = threading.RLock()
1✔
203
        self.node_keypair = node_keypair
1✔
204
        self._peers = {}  # type: Dict[bytes, Peer]  # pubkey -> Peer  # needs self.lock
1✔
205
        self.taskgroup = OldTaskGroup()
1✔
206
        self.listen_server = None  # type: Optional[asyncio.AbstractServer]
1✔
207
        self.features = features
1✔
208
        self.network = None  # type: Optional[Network]
1✔
209
        self.config = config
1✔
210
        self.stopping_soon = False  # whether we are being shut down
1✔
211
        self.register_callbacks()
1✔
212

213
    @property
1✔
214
    def channel_db(self) -> 'ChannelDB':
1✔
215
        return self.network.channel_db if self.network else None
1✔
216

217
    def uses_trampoline(self) -> bool:
1✔
218
        return not bool(self.channel_db)
1✔
219

220
    @property
1✔
221
    def peers(self) -> Mapping[bytes, Peer]:
1✔
222
        """Returns a read-only copy of peers."""
223
        with self.lock:
×
224
            return self._peers.copy()
×
225

226
    def channels_for_peer(self, node_id: bytes) -> Dict[bytes, Channel]:
1✔
227
        return {}
×
228

229
    def get_node_alias(self, node_id: bytes) -> Optional[str]:
1✔
230
        """Returns the alias of the node, or None if unknown."""
231
        node_alias = None
×
232
        if not self.uses_trampoline():
×
233
            node_info = self.channel_db.get_node_info_for_node_id(node_id)
×
234
            if node_info:
×
235
                node_alias = node_info.alias
×
236
        else:
237
            for k, v in hardcoded_trampoline_nodes().items():
×
238
                if v.pubkey.startswith(node_id):
×
239
                    node_alias = k
×
240
                    break
×
241
        return node_alias
×
242

243
    async def maybe_listen(self):
1✔
244
        # FIXME: only one LNWorker can listen at a time (single port)
245
        listen_addr = self.config.LIGHTNING_LISTEN
×
246
        if listen_addr:
×
247
            self.logger.info(f'lightning_listen enabled. will try to bind: {listen_addr!r}')
×
248
            try:
×
249
                netaddr = NetAddress.from_string(listen_addr)
×
250
            except Exception as e:
×
251
                self.logger.error(f"failed to parse config key '{self.config.cv.LIGHTNING_LISTEN.key()}'. got: {e!r}")
×
252
                return
×
253
            addr = str(netaddr.host)
×
254

255
            async def cb(reader, writer):
×
256
                transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
×
257
                try:
×
258
                    node_id = await transport.handshake()
×
259
                except Exception as e:
×
260
                    self.logger.info(f'handshake failure from incoming connection: {e!r}')
×
261
                    return
×
262
                await self._add_peer_from_transport(node_id=node_id, transport=transport)
×
263
            try:
×
264
                self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
×
265
            except OSError as e:
×
266
                self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
×
267

268
    async def main_loop(self):
1✔
269
        self.logger.info("starting taskgroup.")
×
270
        try:
×
271
            async with self.taskgroup as group:
×
272
                await group.spawn(asyncio.Event().wait)  # run forever (until cancel)
×
273
        except Exception as e:
×
274
            self.logger.exception("taskgroup died.")
×
275
        finally:
276
            self.logger.info("taskgroup stopped.")
×
277

278
    async def _maintain_connectivity(self):
1✔
279
        while True:
×
280
            await asyncio.sleep(1)
×
281
            if self.stopping_soon:
×
282
                return
×
283
            now = time.time()
×
284
            if len(self._peers) >= NUM_PEERS_TARGET:
×
285
                continue
×
286
            peers = await self._get_next_peers_to_try()
×
287
            for peer in peers:
×
288
                if self._can_retry_addr(peer, now=now):
×
289
                    try:
×
290
                        await self._add_peer(peer.host, peer.port, peer.pubkey)
×
291
                    except ErrorAddingPeer as e:
×
292
                        self.logger.info(f"failed to add peer: {peer}. exc: {e!r}")
×
293

294
    async def _add_peer(self, host: str, port: int, node_id: bytes) -> Peer:
1✔
295
        if node_id in self._peers:
×
296
            return self._peers[node_id]
×
297
        port = int(port)
×
298
        peer_addr = LNPeerAddr(host, port, node_id)
×
299
        self._trying_addr_now(peer_addr)
×
300
        self.logger.info(f"adding peer {peer_addr}")
×
301
        if node_id == self.node_keypair.pubkey or self.is_our_lnwallet(node_id):
×
302
            raise ErrorAddingPeer("cannot connect to self")
×
303
        transport = LNTransport(self.node_keypair.privkey, peer_addr,
×
304
                                e_proxy=ESocksProxy.from_network_settings(self.network))
305
        peer = await self._add_peer_from_transport(node_id=node_id, transport=transport)
×
306
        assert peer
×
307
        return peer
×
308

309
    async def _add_peer_from_transport(self, *, node_id: bytes, transport: LNTransportBase) -> Optional[Peer]:
1✔
310
        with self.lock:
×
311
            existing_peer = self._peers.get(node_id)
×
312
            if existing_peer:
×
313
                # Two instances of the same wallet are attempting to connect simultaneously.
314
                # If we let the new connection replace the existing one, the two instances might
315
                # both keep trying to reconnect, resulting in neither being usable.
316
                if existing_peer.is_initialized():
×
317
                    # give priority to the existing connection
318
                    return
×
319
                else:
320
                    # Use the new connection. (e.g. old peer might be an outgoing connection
321
                    # for an outdated host/port that will never connect)
322
                    existing_peer.close_and_cleanup()
×
323
            peer = Peer(self, node_id, transport)
×
324
            assert node_id not in self._peers
×
325
            self._peers[node_id] = peer
×
326
        await self.taskgroup.spawn(peer.main_loop())
×
327
        return peer
×
328

329
    def peer_closed(self, peer: Peer) -> None:
1✔
330
        with self.lock:
×
331
            peer2 = self._peers.get(peer.pubkey)
×
332
            if peer2 is peer:
×
333
                self._peers.pop(peer.pubkey)
×
334

335
    def num_peers(self) -> int:
1✔
336
        return sum([p.is_initialized() for p in self.peers.values()])
×
337

338
    def is_our_lnwallet(self, node_id: bytes) -> bool:
1✔
339
        """Check if node_id is one of our own wallets"""
340
        wallets = self.network.daemon.get_wallets()
×
341
        for wallet in wallets.values():
×
342
            if wallet.lnworker and wallet.lnworker.node_keypair.pubkey == node_id:
×
343
                return True
×
344
        return False
×
345

346
    def start_network(self, network: 'Network'):
1✔
347
        assert network
×
348
        assert self.network is None, "already started"
×
349
        self.network = network
×
350
        self._add_peers_from_config()
×
351
        asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
×
352

353
    async def stop(self):
1✔
354
        if self.listen_server:
1✔
355
            self.listen_server.close()
×
356
        self.unregister_callbacks()
1✔
357
        await self.taskgroup.cancel_remaining()
1✔
358

359
    def _add_peers_from_config(self):
1✔
360
        peer_list = self.config.LIGHTNING_PEERS or []
×
361
        for host, port, pubkey in peer_list:
×
362
            asyncio.run_coroutine_threadsafe(
×
363
                self._add_peer(host, int(port), bfh(pubkey)),
364
                self.network.asyncio_loop)
365

366
    def is_good_peer(self, peer: LNPeerAddr) -> bool:
1✔
367
        # the purpose of this method is to filter peers that advertise the desired feature bits
368
        # it is disabled for now, because feature bits published in node announcements seem to be unreliable
369
        return True
×
370
        node_id = peer.pubkey
371
        node = self.channel_db._nodes.get(node_id)
372
        if not node:
373
            return False
374
        try:
375
            ln_compare_features(self.features, node.features)
376
        except IncompatibleLightningFeatures:
377
            return False
378
        #self.logger.info(f'is_good {peer.host}')
379
        return True
380

381
    def on_peer_successfully_established(self, peer: Peer) -> None:
1✔
382
        if isinstance(peer.transport, LNTransport):
1✔
383
            peer_addr = peer.transport.peer_addr
×
384
            # reset connection attempt count
385
            self._on_connection_successfully_established(peer_addr)
×
386
            if not self.uses_trampoline():
×
387
                # add into channel db
388
                self.channel_db.add_recent_peer(peer_addr)
×
389
            # save network address into channels we might have with peer
390
            for chan in peer.channels.values():
×
391
                chan.add_or_update_peer_addr(peer_addr)
×
392

393
    async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
1✔
394
        now = time.time()
×
395
        await self.channel_db.data_loaded.wait()
×
396
        # first try from recent peers
397
        recent_peers = self.channel_db.get_recent_peers()
×
398
        for peer in recent_peers:
×
399
            if not peer:
×
400
                continue
×
401
            if peer.pubkey in self._peers:
×
402
                continue
×
403
            if not self._can_retry_addr(peer, now=now):
×
404
                continue
×
405
            if not self.is_good_peer(peer):
×
406
                continue
×
407
            return [peer]
×
408
        # try random peer from graph
409
        unconnected_nodes = self.channel_db.get_200_randomly_sorted_nodes_not_in(self.peers.keys())
×
410
        if unconnected_nodes:
×
411
            for node_id in unconnected_nodes:
×
412
                addrs = self.channel_db.get_node_addresses(node_id)
×
413
                if not addrs:
×
414
                    continue
×
415
                host, port, timestamp = self.choose_preferred_address(list(addrs))
×
416
                try:
×
417
                    peer = LNPeerAddr(host, port, node_id)
×
418
                except ValueError:
×
419
                    continue
×
420
                if not self._can_retry_addr(peer, now=now):
×
421
                    continue
×
422
                if not self.is_good_peer(peer):
×
423
                    continue
×
424
                #self.logger.info('taking random ln peer from our channel db')
425
                return [peer]
×
426

427
        # getting desperate... let's try hardcoded fallback list of peers
428
        fallback_list = constants.net.FALLBACK_LN_NODES
×
429
        fallback_list = [peer for peer in fallback_list if self._can_retry_addr(peer, now=now)]
×
430
        if fallback_list:
×
431
            return [random.choice(fallback_list)]
×
432

433
        # last resort: try dns seeds (BOLT-10)
434
        return await self._get_peers_from_dns_seeds()
×
435

436
    async def _get_peers_from_dns_seeds(self) -> Sequence[LNPeerAddr]:
1✔
437
        # Return several peers to reduce the number of dns queries.
438
        if not constants.net.LN_DNS_SEEDS:
×
439
            return []
×
440
        dns_seed = random.choice(constants.net.LN_DNS_SEEDS)
×
441
        self.logger.info('asking dns seed "{}" for ln peers'.format(dns_seed))
×
442
        try:
×
443
            # note: this might block for several seconds
444
            # this will include bech32-encoded-pubkeys and ports
445
            srv_answers = await resolve_dns_srv('r{}.{}'.format(
×
446
                constants.net.LN_REALM_BYTE, dns_seed))
447
        except dns.exception.DNSException as e:
×
448
            self.logger.info(f'failed querying (1) dns seed "{dns_seed}" for ln peers: {repr(e)}')
×
449
            return []
×
450
        random.shuffle(srv_answers)
×
451
        num_peers = 2 * NUM_PEERS_TARGET
×
452
        srv_answers = srv_answers[:num_peers]
×
453
        # we now have pubkeys and ports but host is still needed
454
        peers = []
×
455
        for srv_ans in srv_answers:
×
456
            try:
×
457
                # note: this might take several seconds
458
                answers = await dns.asyncresolver.resolve(srv_ans['host'])
×
459
            except dns.exception.DNSException as e:
×
460
                self.logger.info(f'failed querying (2) dns seed "{dns_seed}" for ln peers: {repr(e)}')
×
461
                continue
×
462
            try:
×
463
                ln_host = str(answers[0])
×
464
                port = int(srv_ans['port'])
×
465
                bech32_pubkey = srv_ans['host'].split('.')[0]
×
466
                pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey)
×
467
                peers.append(LNPeerAddr(ln_host, port, pubkey))
×
468
            except Exception as e:
×
469
                self.logger.info(f'error with parsing peer from dns seed: {repr(e)}')
×
470
                continue
×
471
        self.logger.info(f'got {len(peers)} ln peers from dns seed')
×
472
        return peers
×
473

474
    @staticmethod
1✔
475
    def choose_preferred_address(addr_list: Sequence[Tuple[str, int, int]]) -> Tuple[str, int, int]:
1✔
476
        assert len(addr_list) >= 1
×
477
        # choose the most recent one that is an IP
478
        for host, port, timestamp in sorted(addr_list, key=lambda a: -a[2]):
×
479
            if is_ip_address(host):
×
480
                return host, port, timestamp
×
481
        # otherwise choose one at random
482
        # TODO maybe filter out onion if not on tor?
483
        choice = random.choice(addr_list)
×
484
        return choice
×
485

486
    @event_listener
1✔
487
    def on_event_proxy_set(self, *args):
1✔
488
        for peer in self.peers.values():
×
489
            peer.close_and_cleanup()
×
490
        self._clear_addr_retry_times()
×
491

492
    @log_exceptions
1✔
493
    async def add_peer(self, connect_str: str) -> Peer:
1✔
494
        node_id, rest = extract_nodeid(connect_str)
×
495
        peer = self._peers.get(node_id)
×
496
        if not peer:
×
497
            if rest is not None:
×
498
                host, port = split_host_port(rest)
×
499
            else:
500
                if self.uses_trampoline():
×
501
                    addr = trampolines_by_id().get(node_id)
×
502
                    if not addr:
×
503
                        raise ConnStringFormatError(_('Address unknown for node:') + ' ' + node_id.hex())
×
504
                    host, port = addr.host, addr.port
×
505
                else:
506
                    addrs = self.channel_db.get_node_addresses(node_id)
×
507
                    if not addrs:
×
508
                        raise ConnStringFormatError(_('Don\'t know any addresses for node:') + ' ' + node_id.hex())
×
509
                    host, port, timestamp = self.choose_preferred_address(list(addrs))
×
510
            port = int(port)
×
511

512
            if not self.network.proxy:
×
513
                # Try DNS-resolving the host (if needed). This is simply so that
514
                # the caller gets a nice exception if it cannot be resolved.
515
                # (we don't do the DNS lookup if a proxy is set, to avoid a DNS-leak)
516
                if host.endswith('.onion'):
×
517
                    raise ConnStringFormatError(_('.onion address, but no proxy configured'))
×
518
                try:
×
519
                    await asyncio.get_running_loop().getaddrinfo(host, port)
×
520
                except socket.gaierror:
×
521
                    raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
×
522

523
            # add peer
524
            peer = await self._add_peer(host, port, node_id)
×
525
        return peer
×
526

527

528
class LNGossip(LNWorker):
1✔
529
    """The LNGossip class is a separate, unannounced Lightning node with random id that is just querying
530
    gossip from other nodes. The LNGossip node does not satisfy gossip queries, this is done by the
531
    LNWallet class(es). LNWallets are the advertised nodes used for actual payments and only satisfy
532
    peer queries without fetching gossip themselves. This separation is done so that gossip can be queried
533
    independently of the active LNWallets. LNGossip keeps a curated batch of gossip in _forwarding_gossip
534
    that is fetched by the LNWallets for regular forwarding."""
535
    max_age = 14*24*3600
1✔
536

537
    def __init__(self, config: 'SimpleConfig'):
1✔
538
        seed = os.urandom(32)
×
539
        node = BIP32Node.from_rootseed(seed, xtype='standard')
×
540
        xprv = node.to_xprv()
×
541
        node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
×
542
        LNWorker.__init__(self, node_keypair, LNGOSSIP_FEATURES, config=config)
×
543
        self.unknown_ids = set()
×
544
        self._forwarding_gossip = []  # type: List[GossipForwardingMessage]
×
545
        self._last_gossip_batch_ts = 0  # type: int
×
546
        self._forwarding_gossip_lock = asyncio.Lock()
×
547
        self.gossip_request_semaphore = asyncio.Semaphore(5)
×
548
        # statistics
549
        self._num_chan_ann = 0
×
550
        self._num_node_ann = 0
×
551
        self._num_chan_upd = 0
×
552
        self._num_chan_upd_good = 0
×
553

554
    def start_network(self, network: 'Network'):
1✔
555
        super().start_network(network)
×
556
        for coro in [
×
557
                self._maintain_connectivity(),
558
                self.maintain_db(),
559
                self._maintain_forwarding_gossip()
560
        ]:
561
            tg_coro = self.taskgroup.spawn(coro)
×
562
            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
×
563

564
    async def maintain_db(self):
1✔
565
        await self.channel_db.data_loaded.wait()
×
566
        while True:
×
567
            if len(self.unknown_ids) == 0:
×
568
                self.channel_db.prune_old_policies(self.max_age)
×
569
                self.channel_db.prune_orphaned_channels()
×
570
            await asyncio.sleep(120)
×
571

572
    async def _maintain_forwarding_gossip(self):
1✔
573
        await self.channel_db.data_loaded.wait()
×
574
        await self.wait_for_sync()
×
575
        while True:
×
576
            async with self._forwarding_gossip_lock:
×
577
                self._forwarding_gossip = self.channel_db.get_forwarding_gossip_batch()
×
578
                self._last_gossip_batch_ts = int(time.time())
×
579
            self.logger.debug(f"{len(self._forwarding_gossip)} gossip messages available to forward")
×
580
            await asyncio.sleep(60)
×
581

582
    async def get_forwarding_gossip(self) -> tuple[List[GossipForwardingMessage], int]:
1✔
583
        async with self._forwarding_gossip_lock:
×
584
            return self._forwarding_gossip, self._last_gossip_batch_ts
×
585

586
    async def add_new_ids(self, ids: Iterable[bytes]):
1✔
587
        known = self.channel_db.get_channel_ids()
×
588
        new = set(ids) - set(known)
×
589
        self.unknown_ids.update(new)
×
590
        util.trigger_callback('unknown_channels', len(self.unknown_ids))
×
591
        util.trigger_callback('gossip_peers', self.num_peers())
×
592
        util.trigger_callback('ln_gossip_sync_progress')
×
593

594
    def get_ids_to_query(self) -> Sequence[bytes]:
1✔
595
        N = 500
×
596
        l = list(self.unknown_ids)
×
597
        self.unknown_ids = set(l[N:])
×
598
        util.trigger_callback('unknown_channels', len(self.unknown_ids))
×
599
        util.trigger_callback('ln_gossip_sync_progress')
×
600
        return l[0:N]
×
601

602
    def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int], Optional[int]]:
1✔
603
        """Estimates the gossip synchronization process and returns the number
604
        of synchronized channels, the total channels in the network and a
605
        rescaled percentage of the synchronization process."""
606
        if self.num_peers() == 0:
×
607
            return None, None, None
×
608
        nchans_with_0p, nchans_with_1p, nchans_with_2p = self.channel_db.get_num_channels_partitioned_by_policy_count()
×
609
        num_db_channels = nchans_with_0p + nchans_with_1p + nchans_with_2p
×
610
        num_nodes = self.channel_db.num_nodes
×
611
        num_nodes_associated_to_chans = max(len(self.channel_db._channels_for_node.keys()), 1)
×
612
        # some channels will never have two policies (only one is in gossip?...)
613
        # so if we have at least 1 policy for a channel, we consider that channel "complete" here
614
        current_est = num_db_channels - nchans_with_0p
×
615
        total_est = len(self.unknown_ids) + num_db_channels
×
616

617
        progress_chans = current_est / total_est if total_est and current_est else 0
×
618
        # consider that we got at least 10% of the node anns of node ids we know about
619
        progress_nodes = min((num_nodes / num_nodes_associated_to_chans) * 10, 1)
×
620
        progress = (progress_chans * 3 + progress_nodes) / 4  # weigh the channel progress higher
×
621
        # self.logger.debug(f"Sync process chans: {progress_chans} | Progress nodes: {progress_nodes} | "
622
        #                   f"Total progress: {progress} | NUM_NODES: {num_nodes} / {num_nodes_associated_to_chans}")
623
        progress_percent = (1.0 / 0.95 * progress) * 100
×
624
        progress_percent = min(progress_percent, 100)
×
625
        progress_percent = round(progress_percent)
×
626
        # take a minimal number of synchronized channels to get a more accurate
627
        # percentage estimate
628
        if current_est < 200:
×
629
            progress_percent = 0
×
630
        return current_est, total_est, progress_percent
×
631

632
    async def process_gossip(self, chan_anns, node_anns, chan_upds):
1✔
633
        # note: we run in the originating peer's TaskGroup, so we can safely raise here
634
        #       and disconnect only from that peer
635
        await self.channel_db.data_loaded.wait()
×
636

637
        # channel announcements
638
        def process_chan_anns():
×
639
            for payload in chan_anns:
×
640
                self.channel_db.verify_channel_announcement(payload)
×
641
            self.channel_db.add_channel_announcements(chan_anns)
×
642
        await run_in_thread(process_chan_anns)
×
643

644
        # node announcements
645
        def process_node_anns():
×
646
            for payload in node_anns:
×
647
                self.channel_db.verify_node_announcement(payload)
×
648
            self.channel_db.add_node_announcements(node_anns)
×
649
        await run_in_thread(process_node_anns)
×
650
        # channel updates
651
        categorized_chan_upds = await run_in_thread(partial(
×
652
            self.channel_db.add_channel_updates,
653
            chan_upds,
654
            max_age=self.max_age))
655
        orphaned = categorized_chan_upds.orphaned
×
656
        if orphaned:
×
657
            self.logger.info(f'adding {len(orphaned)} unknown channel ids')
×
658
            orphaned_ids = [c['short_channel_id'] for c in orphaned]
×
659
            await self.add_new_ids(orphaned_ids)
×
660

661
        self._num_chan_ann += len(chan_anns)
×
662
        self._num_node_ann += len(node_anns)
×
663
        self._num_chan_upd += len(chan_upds)
×
664
        self._num_chan_upd_good += len(categorized_chan_upds.good)
×
665

666
    def is_synced(self) -> bool:
1✔
667
        _, _, percentage_synced = self.get_sync_progress_estimate()
×
668
        if percentage_synced is not None and percentage_synced >= 100:
×
669
            return True
×
670
        return False
×
671

672
    async def wait_for_sync(self, times_to_check: int = 3):
1✔
673
        """Check if we have 100% sync progress `times_to_check` times in a row (because the
674
        estimate often jumps back after some seconds when doing initial sync)."""
675
        while True:
×
676
            if self.is_synced():
×
677
                times_to_check -= 1
×
678
                if times_to_check <= 0:
×
679
                    return
×
680
            await asyncio.sleep(10)
×
681
            # flush the gossip queue so we don't forward old gossip after sync is complete
682
            self.channel_db.get_forwarding_gossip_batch()
×
683

684

685
class PaySession(Logger):
1✔
686
    def __init__(
1✔
687
            self,
688
            *,
689
            payment_hash: bytes,
690
            payment_secret: bytes,
691
            initial_trampoline_fee_level: int,
692
            invoice_features: int,
693
            r_tags,
694
            min_final_cltv_delta: int,  # delta for last node (typically from invoice)
695
            amount_to_pay: int,  # total payment amount final receiver will get
696
            invoice_pubkey: bytes,
697
            uses_trampoline: bool,  # whether sender uses trampoline or gossip
698
            use_two_trampolines: bool,  # whether legacy payments will try to use two trampolines
699
    ):
700
        assert payment_hash
1✔
701
        assert payment_secret
1✔
702
        self.payment_hash = payment_hash
1✔
703
        self.payment_secret = payment_secret
1✔
704
        self.payment_key = payment_hash + payment_secret
1✔
705
        Logger.__init__(self)
1✔
706

707
        self.invoice_features = LnFeatures(invoice_features)
1✔
708
        self.r_tags = r_tags
1✔
709
        self.min_final_cltv_delta = min_final_cltv_delta
1✔
710
        self.amount_to_pay = amount_to_pay
1✔
711
        self.invoice_pubkey = invoice_pubkey
1✔
712

713
        self.sent_htlcs_q = asyncio.Queue()  # type: asyncio.Queue[HtlcLog]
1✔
714
        self.start_time = time.time()
1✔
715

716
        self.uses_trampoline = uses_trampoline
1✔
717
        self.trampoline_fee_level = initial_trampoline_fee_level
1✔
718
        self.failed_trampoline_routes = []
1✔
719
        self.use_two_trampolines = use_two_trampolines
1✔
720
        self._sent_buckets = dict()  # psecret_bucket -> (amount_sent, amount_failed)
1✔
721

722
        self._amount_inflight = 0  # what we sent in htlcs (that receiver gets, without fees)
1✔
723
        self._nhtlcs_inflight = 0
1✔
724
        self.is_active = True  # is still trying to send new htlcs?
1✔
725

726
    def diagnostic_name(self):
1✔
727
        pkey = sha256(self.payment_key)
1✔
728
        return f"{self.payment_hash[:4].hex()}-{pkey[:2].hex()}"
1✔
729

730
    def maybe_raise_trampoline_fee(self, htlc_log: HtlcLog):
1✔
731
        if htlc_log.trampoline_fee_level == self.trampoline_fee_level:
1✔
732
            self.trampoline_fee_level += 1
1✔
733
            self.failed_trampoline_routes = []
1✔
734
            self.logger.info(f'raising trampoline fee level {self.trampoline_fee_level}')
1✔
735
        else:
736
            self.logger.info(f'NOT raising trampoline fee level, already at {self.trampoline_fee_level}')
1✔
737

738
    def handle_failed_trampoline_htlc(self, *, htlc_log: HtlcLog, failure_msg: OnionRoutingFailure):
1✔
739
        # FIXME The trampoline nodes in the path are chosen randomly.
740
        #       Some of the errors might depend on how we have chosen them.
741
        #       Having more attempts is currently useful in part because of the randomness,
742
        #       instead we should give feedback to create_routes_for_payment.
743
        # Sometimes the trampoline node fails to send a payment and returns
744
        # TEMPORARY_CHANNEL_FAILURE, while it succeeds with a higher trampoline fee.
745
        if failure_msg.code in (
1✔
746
                OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
747
                OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON,
748
                OnionFailureCode.TEMPORARY_CHANNEL_FAILURE):
749
            # TODO: parse the node policy here (not returned by eclair yet)
750
            # TODO: erring node is always the first trampoline even if second
751
            #  trampoline demands more fees, we can't influence this
752
            self.maybe_raise_trampoline_fee(htlc_log)
1✔
753
        elif self.use_two_trampolines:
1✔
754
            self.use_two_trampolines = False
×
755
        elif failure_msg.code in (
1✔
756
                OnionFailureCode.UNKNOWN_NEXT_PEER,
757
                OnionFailureCode.TEMPORARY_NODE_FAILURE):
758
            trampoline_route = htlc_log.route
1✔
759
            r = [hop.end_node.hex() for hop in trampoline_route]
1✔
760
            self.logger.info(f'failed trampoline route: {r}')
1✔
761
            if r not in self.failed_trampoline_routes:
1✔
762
                self.failed_trampoline_routes.append(r)
1✔
763
            else:
764
                pass  # maybe the route was reused between different MPP parts
×
765
        else:
766
            raise PaymentFailure(failure_msg.code_name())
1✔
767

768
    async def wait_for_one_htlc_to_resolve(self) -> HtlcLog:
1✔
769
        self.logger.info(f"waiting... amount_inflight={self._amount_inflight}. nhtlcs_inflight={self._nhtlcs_inflight}")
1✔
770
        htlc_log = await self.sent_htlcs_q.get()
1✔
771
        self._amount_inflight -= htlc_log.amount_msat
1✔
772
        self._nhtlcs_inflight -= 1
1✔
773
        if self._amount_inflight < 0 or self._nhtlcs_inflight < 0:
1✔
774
            raise Exception(f"amount_inflight={self._amount_inflight}, nhtlcs_inflight={self._nhtlcs_inflight}. both should be >= 0 !")
×
775
        return htlc_log
1✔
776

777
    def add_new_htlc(self, sent_htlc_info: SentHtlcInfo):
1✔
778
        self._nhtlcs_inflight += 1
1✔
779
        self._amount_inflight += sent_htlc_info.amount_receiver_msat
1✔
780
        if self._amount_inflight > self.amount_to_pay:  # safety belts
1✔
781
            raise Exception(f"amount_inflight={self._amount_inflight} > amount_to_pay={self.amount_to_pay}")
×
782
        shi = sent_htlc_info
1✔
783
        bkey = shi.payment_secret_bucket
1✔
784
        # if we sent MPP to a trampoline, add item to sent_buckets
785
        if self.uses_trampoline and shi.amount_msat != shi.bucket_msat:
1✔
786
            if bkey not in self._sent_buckets:
1✔
787
                self._sent_buckets[bkey] = (0, 0)
1✔
788
            amount_sent, amount_failed = self._sent_buckets[bkey]
1✔
789
            amount_sent += shi.amount_receiver_msat
1✔
790
            self._sent_buckets[bkey] = amount_sent, amount_failed
1✔
791

792
    def on_htlc_fail_get_fail_amt_to_propagate(self, sent_htlc_info: SentHtlcInfo) -> Optional[int]:
1✔
793
        shi = sent_htlc_info
1✔
794
        # check sent_buckets if we use trampoline
795
        bkey = shi.payment_secret_bucket
1✔
796
        if self.uses_trampoline and bkey in self._sent_buckets:
1✔
797
            amount_sent, amount_failed = self._sent_buckets[bkey]
1✔
798
            amount_failed += shi.amount_receiver_msat
1✔
799
            self._sent_buckets[bkey] = amount_sent, amount_failed
1✔
800
            if amount_sent != amount_failed:
1✔
801
                self.logger.info('bucket still active...')
1✔
802
                return None
1✔
803
            self.logger.info('bucket failed')
1✔
804
            return amount_sent
1✔
805
        # not using trampoline buckets
806
        return shi.amount_receiver_msat
1✔
807

808
    def get_outstanding_amount_to_send(self) -> int:
1✔
809
        return self.amount_to_pay - self._amount_inflight
1✔
810

811
    def can_be_deleted(self) -> bool:
1✔
812
        """Returns True iff finished sending htlcs AND all pending htlcs have resolved."""
813
        if self.is_active:
1✔
814
            return False
1✔
815
        # note: no one is consuming from sent_htlcs_q anymore
816
        nhtlcs_resolved = self.sent_htlcs_q.qsize()
1✔
817
        assert nhtlcs_resolved <= self._nhtlcs_inflight
1✔
818
        return nhtlcs_resolved == self._nhtlcs_inflight
1✔
819

820

821
class LNWallet(LNWorker):
1✔
822

823
    lnwatcher: Optional['LNWatcher']
1✔
824
    MPP_EXPIRY = 120
1✔
825
    TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS = 3  # seconds
1✔
826
    PAYMENT_TIMEOUT = 120
1✔
827
    MPP_SPLIT_PART_FRACTION = 0.2
1✔
828
    MPP_SPLIT_PART_MINAMT_MSAT = 5_000_000
1✔
829

830
    def __init__(self, wallet: 'Abstract_Wallet', xprv):
1✔
831
        self.wallet = wallet
1✔
832
        self.config = wallet.config
1✔
833
        self.db = wallet.db
1✔
834
        self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
1✔
835
        self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey
1✔
836
        self.static_payment_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.PAYMENT_BASE)
1✔
837
        self.payment_secret_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.PAYMENT_SECRET_KEY).privkey
1✔
838
        self.funding_root_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.FUNDING_ROOT_KEY)
1✔
839
        Logger.__init__(self)
1✔
840
        features = LNWALLET_FEATURES
1✔
841
        if self.config.ENABLE_ANCHOR_CHANNELS:
1✔
842
            features |= LnFeatures.OPTION_ANCHORS_ZERO_FEE_HTLC_OPT
1✔
843
        if self.config.ACCEPT_ZEROCONF_CHANNELS:
1✔
844
            features |= LnFeatures.OPTION_ZEROCONF_OPT
×
845
        if self.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS and self.config.LIGHTNING_USE_GOSSIP:
1✔
846
            features |= LnFeatures.GOSSIP_QUERIES_OPT  # signal we have gossip to fetch
×
847
        LNWorker.__init__(self, self.node_keypair, features, config=self.config)
1✔
848
        self.lnwatcher = LNWatcher(self)
1✔
849
        self.lnrater: LNRater = None
1✔
850
        self.payment_info = self.db.get_dict('lightning_payments')  # RHASH -> amount, direction, is_paid
1✔
851
        self._preimages = self.db.get_dict('lightning_preimages')   # RHASH -> preimage
1✔
852
        self._bolt11_cache = {}
1✔
853
        # note: this sweep_address is only used as fallback; as it might result in address-reuse
854
        self.logs = defaultdict(list)  # type: Dict[str, List[HtlcLog]]  # key is RHASH  # (not persisted)
1✔
855
        # used in tests
856
        self.enable_htlc_settle = True
1✔
857
        self.enable_htlc_forwarding = True
1✔
858

859
        # note: accessing channels (besides simple lookup) needs self.lock!
860
        self._channels = {}  # type: Dict[bytes, Channel]
1✔
861
        channels = self.db.get_dict("channels")
1✔
862
        for channel_id, c in random_shuffled_copy(channels.items()):
1✔
863
            self._channels[bfh(channel_id)] = chan = Channel(c, lnworker=self)
1✔
864
            self.wallet.set_reserved_addresses_for_chan(chan, reserved=True)
1✔
865

866
        self._channel_backups = {}  # type: Dict[bytes, ChannelBackup]
1✔
867
        # order is important: imported should overwrite onchain
868
        for name in ["onchain_channel_backups", "imported_channel_backups"]:
1✔
869
            channel_backups = self.db.get_dict(name)
1✔
870
            for channel_id, storage in channel_backups.items():
1✔
871
                self._channel_backups[bfh(channel_id)] = cb = ChannelBackup(storage, lnworker=self)
×
872
                self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
873

874
        self._paysessions = dict()                      # type: Dict[bytes, PaySession]
1✔
875
        self.sent_htlcs_info = dict()                   # type: Dict[SentHtlcKey, SentHtlcInfo]
1✔
876
        self.received_mpp_htlcs = self.db.get_dict('received_mpp_htlcs')   # type: Dict[str, ReceivedMPPStatus]  # payment_key -> ReceivedMPPStatus
1✔
877

878
        # detect inflight payments
879
        self.inflight_payments = set()        # (not persisted) keys of invoices that are in PR_INFLIGHT state
1✔
880
        for payment_hash in self.get_payments(status='inflight').keys():
1✔
881
            self.set_invoice_status(payment_hash.hex(), PR_INFLIGHT)
×
882

883
        # payment forwarding
884
        self.active_forwardings = self.db.get_dict('active_forwardings')    # type: Dict[str, List[str]]        # Dict: payment_key -> list of htlc_keys
1✔
885
        self.forwarding_failures = self.db.get_dict('forwarding_failures')  # type: Dict[str, Tuple[str, str]]  # Dict: payment_key -> (error_bytes, error_message)
1✔
886
        self.downstream_to_upstream_htlc = {}                               # type: Dict[str, str]              # Dict: htlc_key -> htlc_key (not persisted)
1✔
887
        self.dont_settle_htlcs = self.db.get_dict('dont_settle_htlcs')      # type: Dict[str, None]             # payment_hashes of htlcs that we should not settle back yet even if we have the preimage
1✔
888

889
        # payment_hash -> callback:
890
        self.hold_invoice_callbacks = {}                # type: Dict[bytes, Callable[[bytes], Awaitable[None]]]
1✔
891
        self.payment_bundles = []                       # lists of hashes. todo:persist
1✔
892

893
        self.nostr_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NOSTR_KEY)
1✔
894
        self.swap_manager = SwapManager(wallet=self.wallet, lnworker=self)
1✔
895
        self.onion_message_manager = OnionMessageManager(self)
1✔
896
        self.subscribe_to_channels()
1✔
897

898
    def subscribe_to_channels(self):
1✔
899
        for chan in self.channels.values():
1✔
900
            self.lnwatcher.add_channel(chan)
1✔
901
        for cb in self.channel_backups.values():
1✔
902
            self.lnwatcher.add_channel(cb)
×
903

904
    def has_deterministic_node_id(self) -> bool:
1✔
905
        return bool(self.db.get('lightning_xprv'))
×
906

907
    def can_have_recoverable_channels(self) -> bool:
1✔
908
        return (self.has_deterministic_node_id()
×
909
                and not self.config.LIGHTNING_LISTEN)
910

911
    def has_recoverable_channels(self) -> bool:
1✔
912
        """Whether *future* channels opened by this wallet would be recoverable
913
        from seed (via putting OP_RETURN outputs into funding txs).
914
        """
915
        return (self.can_have_recoverable_channels()
×
916
                and self.config.LIGHTNING_USE_RECOVERABLE_CHANNELS)
917

918
    def has_anchor_channels(self) -> bool:
1✔
919
        """Returns True if any active channel is an anchor channel."""
920
        return any(chan.has_anchors() and not chan.is_redeemed()
1✔
921
                   for chan in self.channels.values())
922

923
    @property
1✔
924
    def channels(self) -> Mapping[bytes, Channel]:
1✔
925
        """Returns a read-only copy of channels."""
926
        with self.lock:
1✔
927
            return self._channels.copy()
1✔
928

929
    @property
1✔
930
    def channel_backups(self) -> Mapping[bytes, ChannelBackup]:
1✔
931
        """Returns a read-only copy of channels."""
932
        with self.lock:
1✔
933
            return self._channel_backups.copy()
1✔
934

935
    def get_channel_objects(self) -> Mapping[bytes, AbstractChannel]:
1✔
936
        r = self.channel_backups
×
937
        r.update(self.channels)
×
938
        return r
×
939

940
    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
1✔
941
        return self._channels.get(channel_id, None)
1✔
942

943
    def diagnostic_name(self):
1✔
944
        return self.wallet.diagnostic_name()
1✔
945

946
    @ignore_exceptions
1✔
947
    @log_exceptions
1✔
948
    async def sync_with_remote_watchtower(self):
1✔
949
        self.watchtower_ctns = {}
×
950
        while True:
×
951
            # periodically poll if the user updated 'watchtower_url'
952
            await asyncio.sleep(5)
×
953
            watchtower_url = self.config.WATCHTOWER_CLIENT_URL
×
954
            if not watchtower_url:
×
955
                continue
×
956
            parsed_url = urllib.parse.urlparse(watchtower_url)
×
957
            if not (parsed_url.scheme == 'https' or is_private_netaddress(parsed_url.hostname)):
×
958
                self.logger.warning(f"got watchtower URL for remote tower but we won't use it! "
×
959
                                    f"can only use HTTPS (except if private IP): not using {watchtower_url!r}")
960
                continue
×
961
            # try to sync with the remote watchtower
962
            try:
×
963
                async with make_aiohttp_session(proxy=self.network.proxy) as session:
×
964
                    watchtower = JsonRPCClient(session, watchtower_url)
×
965
                    watchtower.add_method('get_ctn')
×
966
                    watchtower.add_method('add_sweep_tx')
×
967
                    for chan in self.channels.values():
×
968
                        await self.sync_channel_with_watchtower(chan, watchtower)
×
969
            except aiohttp.client_exceptions.ClientConnectorError:
×
970
                self.logger.info(f'could not contact remote watchtower {watchtower_url}')
×
971

972
    def get_watchtower_ctn(self, channel_point):
1✔
973
        return self.watchtower_ctns.get(channel_point)
×
974

975
    async def sync_channel_with_watchtower(self, chan: Channel, watchtower):
1✔
976
        outpoint = chan.funding_outpoint.to_str()
×
977
        addr = chan.get_funding_address()
×
978
        current_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
×
979
        watchtower_ctn = await watchtower.get_ctn(outpoint, addr)
×
980
        for ctn in range(watchtower_ctn + 1, current_ctn):
×
981
            sweeptxs = chan.create_sweeptxs_for_watchtower(ctn)
×
982
            for tx in sweeptxs:
×
983
                await watchtower.add_sweep_tx(outpoint, ctn, tx.inputs()[0].prevout.to_str(), tx.serialize())
×
984
            self.watchtower_ctns[outpoint] = ctn
×
985

986
    def start_network(self, network: 'Network'):
1✔
987
        super().start_network(network)
×
988
        self.lnwatcher.start_network(network)
×
989
        self.swap_manager.start_network(network)
×
990
        self.lnrater = LNRater(self, network)
×
991
        self.onion_message_manager.start_network(network=network)
×
992

993
        for coro in [
×
994
                self.maybe_listen(),
995
                self.lnwatcher.trigger_callbacks(),  # shortcut (don't block) if funding tx locked and verified
996
                self.reestablish_peers_and_channels(),
997
                self.sync_with_remote_watchtower(),
998
        ]:
999
            tg_coro = self.taskgroup.spawn(coro)
×
1000
            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
×
1001

1002
    async def stop(self):
1✔
1003
        self.stopping_soon = True
1✔
1004
        if self.listen_server:  # stop accepting new peers
1✔
1005
            self.listen_server.close()
×
1006
        async with ignore_after(self.TIMEOUT_SHUTDOWN_FAIL_PENDING_HTLCS):
1✔
1007
            await self.wait_for_received_pending_htlcs_to_get_removed()
1✔
1008
        await LNWorker.stop(self)
1✔
1009
        if self.lnwatcher:
1✔
1010
            self.lnwatcher.stop()
×
1011
            self.lnwatcher = None
×
1012
        if self.swap_manager and self.swap_manager.network:  # may not be present in tests
1✔
1013
            await self.swap_manager.stop()
×
1014
        if self.onion_message_manager:
1✔
1015
            await self.onion_message_manager.stop()
×
1016

1017
    async def wait_for_received_pending_htlcs_to_get_removed(self):
1✔
1018
        assert self.stopping_soon is True
1✔
1019
        # We try to fail pending MPP HTLCs, and wait a bit for them to get removed.
1020
        # Note: even without MPP, if we just failed/fulfilled an HTLC, it is good
1021
        #       to wait a bit for it to become irrevocably removed.
1022
        # Note: we don't wait for *all htlcs* to get removed, only for those
1023
        #       that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
1024
        async with OldTaskGroup() as group:
1✔
1025
            for peer in self.peers.values():
1✔
1026
                await group.spawn(peer.wait_one_htlc_switch_iteration())
1✔
1027
        while True:
1✔
1028
            if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
1✔
1029
                break
1✔
1030
            async with OldTaskGroup(wait=any) as group:
1✔
1031
                for peer in self.peers.values():
1✔
1032
                    await group.spawn(peer.received_htlc_removed_event.wait())
1✔
1033

1034
    def peer_closed(self, peer):
1✔
1035
        for chan in self.channels_for_peer(peer.pubkey).values():
×
1036
            chan.peer_state = PeerState.DISCONNECTED
×
1037
            util.trigger_callback('channel', self.wallet, chan)
×
1038
        super().peer_closed(peer)
×
1039

1040
    def get_payments(self, *, status=None) -> Mapping[bytes, List[HTLCWithStatus]]:
1✔
1041
        out = defaultdict(list)
1✔
1042
        for chan in self.channels.values():
1✔
1043
            d = chan.get_payments(status=status)
1✔
1044
            for payment_hash, plist in d.items():
1✔
1045
                out[payment_hash] += plist
1✔
1046
        return out
1✔
1047

1048
    def get_payment_value(
1✔
1049
            self, info: Optional['PaymentInfo'],
1050
            plist: List[HTLCWithStatus]
1051
    ) -> Tuple[PaymentDirection, int, Optional[int], int]:
1052
        """ fee_msat is included in amount_msat"""
1053
        assert plist
×
1054
        amount_msat = sum(int(x.direction) * x.htlc.amount_msat for x in plist)
×
1055
        if all(x.direction == SENT for x in plist):
×
1056
            direction = PaymentDirection.SENT
×
1057
            fee_msat = (- info.amount_msat - amount_msat) if info else None
×
1058
        elif all(x.direction == RECEIVED for x in plist):
×
1059
            direction = PaymentDirection.RECEIVED
×
1060
            fee_msat = None
×
1061
        elif amount_msat < 0:
×
1062
            direction = PaymentDirection.SELF_PAYMENT
×
1063
            fee_msat = - amount_msat
×
1064
        else:
1065
            direction = PaymentDirection.FORWARDING
×
1066
            fee_msat = - amount_msat
×
1067
        timestamp = min([htlc_with_status.htlc.timestamp for htlc_with_status in plist])
×
1068
        return direction, amount_msat, fee_msat, timestamp
×
1069

1070
    def get_lightning_history(self) -> Dict[str, LightningHistoryItem]:
1✔
1071
        """
1072
        side effect: sets defaults labels
1073
        note that the result is not ordered
1074
        """
1075
        out = {}
×
1076
        for payment_hash, plist in self.get_payments(status='settled').items():
×
1077
            if len(plist) == 0:
×
1078
                continue
×
1079
            key = payment_hash.hex()
×
1080
            info = self.get_payment_info(payment_hash)
×
1081
            # note: just after successfully paying an invoice using MPP, amount and fee values might be shifted
1082
            #       temporarily: the amount only considers 'settled' htlcs (see plist above), but we might also
1083
            #       have some inflight htlcs still. Until all relevant htlcs settle, the amount will be lower than
1084
            #       expected and the fee higher (the inflight htlcs will be effectively counted as fees).
1085
            direction, amount_msat, fee_msat, timestamp = self.get_payment_value(info, plist)
×
1086
            label = self.wallet.get_label_for_rhash(key)
×
1087
            if not label and direction == PaymentDirection.FORWARDING:
×
1088
                label = _('Forwarding')
×
1089
            preimage = self.get_preimage(payment_hash).hex()
×
1090
            group_id = self.swap_manager.get_group_id_for_payment_hash(payment_hash)
×
1091
            item = LightningHistoryItem(
×
1092
                type='payment',
1093
                payment_hash=payment_hash.hex(),
1094
                preimage=preimage,
1095
                amount_msat=amount_msat,
1096
                fee_msat=fee_msat,
1097
                group_id=group_id,
1098
                timestamp=timestamp or 0,
1099
                label=label,
1100
                direction=direction,
1101
            )
1102
            out[payment_hash.hex()] = item
×
1103
        now = int(time.time())
×
1104
        for chan in itertools.chain(self.channels.values(), self.channel_backups.values()):  # type: AbstractChannel
×
1105
            item = chan.get_funding_height()
×
1106
            if item is None:
×
1107
                continue
×
1108
            funding_txid, funding_height, funding_timestamp = item
×
1109
            label = _('Open channel') + ' ' + chan.get_id_for_log()
×
1110
            self.wallet.set_default_label(funding_txid, label)
×
1111
            self.wallet.set_group_label(funding_txid, label)
×
1112
            item = LightningHistoryItem(
×
1113
                type='channel_opening',
1114
                label=label,
1115
                group_id=funding_txid,
1116
                timestamp=funding_timestamp or now,
1117
                amount_msat=chan.balance(LOCAL, ctn=0),
1118
                fee_msat=None,
1119
                payment_hash=None,
1120
                preimage=None,
1121
                direction=None,
1122
            )
1123
            out[funding_txid] = item
×
1124
            item = chan.get_closing_height()
×
1125
            if item is None:
×
1126
                continue
×
1127
            closing_txid, closing_height, closing_timestamp = item
×
1128
            label = _('Close channel') + ' ' + chan.get_id_for_log()
×
1129
            self.wallet.set_default_label(closing_txid, label)
×
1130
            self.wallet.set_group_label(closing_txid, label)
×
1131
            item = LightningHistoryItem(
×
1132
                type='channel_closing',
1133
                label=label,
1134
                group_id=closing_txid,
1135
                timestamp=closing_timestamp or now,
1136
                amount_msat=-chan.balance(LOCAL),
1137
                fee_msat=None,
1138
                payment_hash=None,
1139
                preimage=None,
1140
                direction=None,
1141
            )
1142
            out[closing_txid] = item
×
1143

1144
        # sanity check
1145
        balance_msat = sum([x.amount_msat for x in out.values()])
×
1146
        lb = sum(chan.balance(LOCAL) if not chan.is_closed_or_closing() else 0
×
1147
                 for chan in self.channels.values())
1148
        if balance_msat != lb:
×
1149
            # this typically happens when a channel is recently force closed
1150
            self.logger.info(f'get_lightning_history: balance mismatch {balance_msat - lb}')
×
1151
        return out
×
1152

1153
    def get_groups_for_onchain_history(self) -> Dict[str, str]:
1✔
1154
        """
1155
        returns dict: txid -> group_id
1156
        side effect: sets default labels
1157
        """
1158
        groups = {}
×
1159
        # add funding events
1160
        for chan in itertools.chain(self.channels.values(), self.channel_backups.values()):  # type: AbstractChannel
×
1161
            item = chan.get_funding_height()
×
1162
            if item is None:
×
1163
                continue
×
1164
            funding_txid, funding_height, funding_timestamp = item
×
1165
            groups[funding_txid] = funding_txid
×
1166
            item = chan.get_closing_height()
×
1167
            if item is None:
×
1168
                continue
×
1169
            closing_txid, closing_height, closing_timestamp = item
×
1170
            groups[closing_txid] = closing_txid
×
1171

1172
        d = self.swap_manager.get_groups_for_onchain_history()
×
1173
        for txid, v in d.items():
×
1174
            group_id = v['group_id']
×
1175
            label = v.get('label')
×
1176
            group_label = v.get('group_label') or label
×
1177
            groups[txid] = group_id
×
1178
            if label:
×
1179
                self.wallet.set_default_label(txid, label)
×
1180
            if group_label:
×
1181
                self.wallet.set_group_label(group_id, group_label)
×
1182

1183
        return groups
×
1184

1185
    def channel_peers(self) -> List[bytes]:
1✔
1186
        node_ids = [chan.node_id for chan in self.channels.values() if not chan.is_closed()]
×
1187
        return node_ids
×
1188

1189
    def channels_for_peer(self, node_id):
1✔
1190
        assert type(node_id) is bytes
1✔
1191
        return {chan_id: chan for (chan_id, chan) in self.channels.items()
1✔
1192
                if chan.node_id == node_id}
1193

1194
    def channel_state_changed(self, chan: Channel):
1✔
1195
        if type(chan) is Channel:
×
1196
            self.save_channel(chan)
×
1197
        self.clear_invoices_cache()
×
1198
        util.trigger_callback('channel', self.wallet, chan)
×
1199

1200
    def save_channel(self, chan: Channel):
1✔
1201
        assert type(chan) is Channel
×
1202
        if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point:
×
1203
            raise Exception("Tried to save channel with next_point == current_point, this should not happen")
×
1204
        self.wallet.save_db()
×
1205
        util.trigger_callback('channel', self.wallet, chan)
×
1206

1207
    def channel_by_txo(self, txo: str) -> Optional[AbstractChannel]:
1✔
1208
        for chan in self.channels.values():
×
1209
            if chan.funding_outpoint.to_str() == txo:
×
1210
                return chan
×
1211
        for chan in self.channel_backups.values():
×
1212
            if chan.funding_outpoint.to_str() == txo:
×
1213
                return chan
×
1214
        return None
×
1215

1216
    async def handle_onchain_state(self, chan: Channel):
1✔
1217
        if self.network is None:
×
1218
            # network not started yet
1219
            return
×
1220

1221
        if type(chan) is ChannelBackup:
×
1222
            util.trigger_callback('channel', self.wallet, chan)
×
1223
            return
×
1224

1225
        if (chan.get_state() in (ChannelState.OPEN, ChannelState.SHUTDOWN)
×
1226
                and chan.should_be_closed_due_to_expiring_htlcs(self.wallet.adb.get_local_height())):
1227
            self.logger.info(f"force-closing due to expiring htlcs")
×
1228
            await self.schedule_force_closing(chan.channel_id)
×
1229

1230
        elif chan.get_state() == ChannelState.FUNDED:
×
1231
            peer = self._peers.get(chan.node_id)
×
1232
            if peer and peer.is_initialized() and chan.peer_state == PeerState.GOOD:
×
1233
                peer.send_channel_ready(chan)
×
1234

1235
        elif chan.get_state() == ChannelState.OPEN:
×
1236
            peer = self._peers.get(chan.node_id)
×
1237
            if peer and peer.is_initialized() and chan.peer_state == PeerState.GOOD:
×
1238
                peer.maybe_update_fee(chan)
×
1239
                peer.maybe_send_announcement_signatures(chan)
×
1240

1241
        elif chan.get_state() == ChannelState.FORCE_CLOSING:
×
1242
            force_close_tx = chan.force_close_tx()
×
1243
            txid = force_close_tx.txid()
×
1244
            height = self.lnwatcher.adb.get_tx_height(txid).height
×
1245
            if height == TX_HEIGHT_LOCAL:
×
1246
                self.logger.info('REBROADCASTING CLOSING TX')
×
1247
                await self.network.try_broadcasting(force_close_tx, 'force-close')
×
1248

1249
    def get_peer_by_static_jit_scid_alias(self, scid_alias: bytes) -> Optional[Peer]:
1✔
1250
        for nodeid, peer in self.peers.items():
×
1251
            if scid_alias == self._scid_alias_of_node(nodeid):
×
1252
                return peer
×
1253

1254
    def _scid_alias_of_node(self, nodeid: bytes) -> bytes:
1✔
1255
        # scid alias for just-in-time channels
1256
        return sha256(b'Electrum' + nodeid)[0:8]
×
1257

1258
    def get_static_jit_scid_alias(self) -> bytes:
1✔
1259
        return self._scid_alias_of_node(self.node_keypair.pubkey)
×
1260

1261
    @log_exceptions
1✔
1262
    async def open_channel_just_in_time(
1✔
1263
        self,
1264
        *,
1265
        next_peer: Peer,
1266
        next_amount_msat_htlc: int,
1267
        next_cltv_abs: int,
1268
        payment_hash: bytes,
1269
        next_onion: OnionPacket,
1270
    ) -> str:
1271
        # if an exception is raised during negotiation, we raise an OnionRoutingFailure.
1272
        # this will cancel the incoming HTLC
1273

1274
        # prevent settling the htlc until the channel opening was successful so we can fail it if needed
1275
        self.dont_settle_htlcs[payment_hash.hex()] = None
×
1276
        try:
×
1277
            funding_sat = 2 * (next_amount_msat_htlc // 1000) # try to fully spend htlcs
×
1278
            password = self.wallet.get_unlocked_password() if self.wallet.has_password() else None
×
1279
            channel_opening_fee = next_amount_msat_htlc // 100
×
1280
            if channel_opening_fee // 1000 < self.config.ZEROCONF_MIN_OPENING_FEE:
×
1281
                self.logger.info(f'rejecting JIT channel: payment too low')
×
1282
                raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, data=b'payment too low')
×
1283
            self.logger.info(f'channel opening fee (sats): {channel_opening_fee//1000}')
×
1284
            next_chan, funding_tx = await self.open_channel_with_peer(
×
1285
                next_peer, funding_sat,
1286
                push_sat=0,
1287
                zeroconf=True,
1288
                public=False,
1289
                opening_fee=channel_opening_fee,
1290
                password=password,
1291
            )
1292
            async def wait_for_channel():
×
1293
                while not next_chan.is_open():
×
1294
                    await asyncio.sleep(1)
×
1295
            await util.wait_for2(wait_for_channel(), LN_P2P_NETWORK_TIMEOUT)
×
1296
            next_chan.save_remote_scid_alias(self._scid_alias_of_node(next_peer.pubkey))
×
1297
            self.logger.info(f'JIT channel is open')
×
1298
            next_amount_msat_htlc -= channel_opening_fee
×
1299
            # fixme: some checks are missing
1300
            htlc = next_peer.send_htlc(
×
1301
                chan=next_chan,
1302
                payment_hash=payment_hash,
1303
                amount_msat=next_amount_msat_htlc,
1304
                cltv_abs=next_cltv_abs,
1305
                onion=next_onion)
1306
            async def wait_for_preimage():
×
1307
                while self.get_preimage(payment_hash) is None:
×
1308
                    await asyncio.sleep(1)
×
1309
            await util.wait_for2(wait_for_preimage(), LN_P2P_NETWORK_TIMEOUT)
×
1310

1311
            # We have been paid and can broadcast
1312
            # todo: if broadcasting raise an exception, we should try to rebroadcast
1313
            await self.network.broadcast_transaction(funding_tx)
×
1314
        except OnionRoutingFailure:
×
1315
            raise
×
1316
        except Exception:
×
1317
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
1318
        finally:
1319
            del self.dont_settle_htlcs[payment_hash.hex()]
×
1320

1321
        htlc_key = serialize_htlc_key(next_chan.get_scid_or_local_alias(), htlc.htlc_id)
×
1322
        return htlc_key
×
1323

1324
    @log_exceptions
1✔
1325
    async def open_channel_with_peer(
1✔
1326
            self, peer, funding_sat, *,
1327
            push_sat: int = 0,
1328
            public: bool = False,
1329
            zeroconf: bool = False,
1330
            opening_fee: int = None,
1331
            password=None):
1332
        if self.config.ENABLE_ANCHOR_CHANNELS:
×
1333
            self.wallet.unlock(password)
×
1334
        coins = self.wallet.get_spendable_coins(None)
×
1335
        node_id = peer.pubkey
×
1336
        fee_policy = FeePolicy(self.config.FEE_POLICY)
×
1337
        funding_tx = self.mktx_for_open_channel(
×
1338
            coins=coins,
1339
            funding_sat=funding_sat,
1340
            node_id=node_id,
1341
            fee_policy=fee_policy)
1342
        chan, funding_tx = await self._open_channel_coroutine(
×
1343
            peer=peer,
1344
            funding_tx=funding_tx,
1345
            funding_sat=funding_sat,
1346
            push_sat=push_sat,
1347
            public=public,
1348
            zeroconf=zeroconf,
1349
            opening_fee=opening_fee,
1350
            password=password)
1351
        return chan, funding_tx
×
1352

1353
    @log_exceptions
1✔
1354
    async def _open_channel_coroutine(
1✔
1355
            self, *,
1356
            peer: Peer,
1357
            funding_tx: PartialTransaction,
1358
            funding_sat: int,
1359
            push_sat: int,
1360
            public: bool,
1361
            zeroconf=False,
1362
            opening_fee=None,
1363
            password: Optional[str],
1364
    ) -> Tuple[Channel, PartialTransaction]:
1365

1366
        if funding_sat > self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1367
            raise Exception(
×
1368
                _("Requested channel capacity is over maximum.")
1369
                + f"\n{funding_sat} sat > {self.config.LIGHTNING_MAX_FUNDING_SAT} sat"
1370
            )
1371
        coro = peer.channel_establishment_flow(
×
1372
            funding_tx=funding_tx,
1373
            funding_sat=funding_sat,
1374
            push_msat=push_sat * 1000,
1375
            public=public,
1376
            zeroconf=zeroconf,
1377
            opening_fee=opening_fee,
1378
            temp_channel_id=os.urandom(32))
1379
        chan, funding_tx = await util.wait_for2(coro, LN_P2P_NETWORK_TIMEOUT)
×
1380
        util.trigger_callback('channels_updated', self.wallet)
×
1381
        self.wallet.adb.add_transaction(funding_tx)  # save tx as local into the wallet
×
1382
        self.wallet.sign_transaction(funding_tx, password)
×
1383
        if funding_tx.is_complete() and not zeroconf:
×
1384
            await self.network.try_broadcasting(funding_tx, 'open_channel')
×
1385
        return chan, funding_tx
×
1386

1387
    def add_channel(self, chan: Channel):
1✔
1388
        with self.lock:
×
1389
            self._channels[chan.channel_id] = chan
×
1390
        self.lnwatcher.add_channel(chan)
×
1391

1392
    def add_new_channel(self, chan: Channel):
1✔
1393
        self.add_channel(chan)
×
1394
        channels_db = self.db.get_dict('channels')
×
1395
        channels_db[chan.channel_id.hex()] = chan.storage
×
1396
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=True)
×
1397
        try:
×
1398
            self.save_channel(chan)
×
1399
        except Exception:
×
1400
            chan.set_state(ChannelState.REDEEMED)
×
1401
            self.remove_channel(chan.channel_id)
×
1402
            raise
×
1403

1404
    def cb_data(self, node_id: bytes) -> bytes:
1✔
1405
        return CB_MAGIC_BYTES + node_id[0:NODE_ID_PREFIX_LEN]
×
1406

1407
    def decrypt_cb_data(self, encrypted_data, funding_address):
1✔
1408
        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
×
1409
        nonce = funding_scripthash[0:12]
×
1410
        return chacha20_decrypt(key=self.backup_key, data=encrypted_data, nonce=nonce)
×
1411

1412
    def encrypt_cb_data(self, data, funding_address):
1✔
1413
        funding_scripthash = bytes.fromhex(address_to_scripthash(funding_address))
×
1414
        nonce = funding_scripthash[0:12]
×
1415
        # note: we are only using chacha20 instead of chacha20+poly1305 to save onchain space
1416
        #       (not have the 16 byte MAC). Otherwise, the latter would be preferable.
1417
        return chacha20_encrypt(key=self.backup_key, data=data, nonce=nonce)
×
1418

1419
    def mktx_for_open_channel(
1✔
1420
            self, *,
1421
            coins: Sequence[PartialTxInput],
1422
            funding_sat: int,
1423
            node_id: bytes,
1424
            fee_policy: FeePolicy,
1425
    ) -> PartialTransaction:
1426
        from .wallet import get_locktime_for_new_transaction
×
1427

1428
        outputs = [PartialTxOutput.from_address_and_value(DummyAddress.CHANNEL, funding_sat)]
×
1429
        if self.has_recoverable_channels():
×
1430
            dummy_scriptpubkey = make_op_return(self.cb_data(node_id))
×
1431
            outputs.append(PartialTxOutput(scriptpubkey=dummy_scriptpubkey, value=0))
×
1432
        tx = self.wallet.make_unsigned_transaction(
×
1433
            coins=coins,
1434
            outputs=outputs,
1435
            fee_policy=fee_policy,
1436
            # we do not know yet if peer accepts anchors, just assume they do
1437
            is_anchor_channel_opening=self.config.ENABLE_ANCHOR_CHANNELS,
1438
        )
1439
        tx.set_rbf(False)
×
1440
        # rm randomness from locktime, as we use the locktime as entropy for deriving the funding_privkey
1441
        # (and it would be confusing to get a collision as a consequence of the randomness)
1442
        tx.locktime = get_locktime_for_new_transaction(self.network, include_random_component=False)
×
1443
        return tx
×
1444

1445
    def suggest_funding_amount(self, amount_to_pay, coins) -> Tuple[int, int] | None:
1✔
1446
        """ whether we can pay amount_sat after opening a new channel"""
1447
        num_sats_can_send = int(self.num_sats_can_send())
×
1448
        lightning_needed = amount_to_pay - num_sats_can_send
×
1449
        assert lightning_needed > 0
×
1450
        min_funding_sat = lightning_needed + (lightning_needed // 20) + 1000  # safety margin
×
1451
        min_funding_sat = max(min_funding_sat, MIN_FUNDING_SAT)  # at least MIN_FUNDING_SAT
×
1452
        if min_funding_sat > self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1453
            return
×
1454
        fee_policy = FeePolicy(f'feerate:{FEERATE_FALLBACK_STATIC_FEE}')
×
1455
        try:
×
1456
            self.mktx_for_open_channel(
×
1457
                coins=coins, funding_sat=min_funding_sat, node_id=bytes(32), fee_policy=fee_policy)
1458
            funding_sat = min_funding_sat
×
1459
        except NotEnoughFunds:
×
1460
            return
×
1461
        # if available, suggest twice that amount:
1462
        if 2 * min_funding_sat <= self.config.LIGHTNING_MAX_FUNDING_SAT:
×
1463
            try:
×
1464
                self.mktx_for_open_channel(
×
1465
                    coins=coins, funding_sat=2*min_funding_sat, node_id=bytes(32), fee_policy=fee_policy)
1466
                funding_sat = 2 * min_funding_sat
×
1467
            except NotEnoughFunds:
×
1468
                pass
×
1469
        return funding_sat, min_funding_sat
×
1470

1471
    def open_channel(
1✔
1472
            self, *,
1473
            connect_str: str,
1474
            funding_tx: PartialTransaction,
1475
            funding_sat: int,
1476
            push_amt_sat: int,
1477
            public: bool = False,
1478
            password: str = None,
1479
    ) -> Tuple[Channel, PartialTransaction]:
1480

1481
        fut = asyncio.run_coroutine_threadsafe(self.add_peer(connect_str), self.network.asyncio_loop)
×
1482
        try:
×
1483
            peer = fut.result()
×
1484
        except concurrent.futures.TimeoutError:
×
1485
            raise Exception(_("add peer timed out"))
×
1486
        coro = self._open_channel_coroutine(
×
1487
            peer=peer,
1488
            funding_tx=funding_tx,
1489
            funding_sat=funding_sat,
1490
            push_sat=push_amt_sat,
1491
            public=public,
1492
            password=password)
1493
        fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
×
1494
        try:
×
1495
            chan, funding_tx = fut.result()
×
1496
        except concurrent.futures.TimeoutError:
×
1497
            raise Exception(_("open_channel timed out"))
×
1498
        return chan, funding_tx
×
1499

1500
    def get_channel_by_short_id(self, short_channel_id: bytes) -> Optional[Channel]:
1✔
1501
        # First check against *real* SCIDs.
1502
        # This e.g. protects against maliciously chosen SCID aliases, and accidental collisions.
1503
        for chan in self.channels.values():
×
1504
            if chan.short_channel_id == short_channel_id:
×
1505
                return chan
×
1506
        # Now we also consider aliases.
1507
        # TODO we should split this as this search currently ignores the "direction"
1508
        #      of the aliases. We should only look at either the remote OR the local alias,
1509
        #      depending on context.
1510
        for chan in self.channels.values():
×
1511
            if chan.get_remote_scid_alias() == short_channel_id:
×
1512
                return chan
×
1513
            if chan.get_local_scid_alias() == short_channel_id:
×
1514
                return chan
×
1515
        return None
×
1516

1517
    def can_pay_invoice(self, invoice: Invoice) -> bool:
1✔
1518
        assert invoice.is_lightning()
×
1519
        return (invoice.get_amount_sat() or 0) <= self.num_sats_can_send()
×
1520

1521
    @log_exceptions
1✔
1522
    async def pay_invoice(
1✔
1523
            self, invoice: Invoice, *,
1524
            amount_msat: int = None,
1525
            attempts: int = None,  # used only in unit tests
1526
            full_path: LNPaymentPath = None,
1527
            channels: Optional[Sequence[Channel]] = None,
1528
            budget: Optional[PaymentFeeBudget] = None,
1529
    ) -> Tuple[bool, List[HtlcLog]]:
1530
        bolt11 = invoice.lightning_invoice
1✔
1531
        lnaddr = self._check_bolt11_invoice(bolt11, amount_msat=amount_msat)
1✔
1532
        min_final_cltv_delta = lnaddr.get_min_final_cltv_delta()
1✔
1533
        payment_hash = lnaddr.paymenthash
1✔
1534
        key = payment_hash.hex()
1✔
1535
        payment_secret = lnaddr.payment_secret
1✔
1536
        invoice_pubkey = lnaddr.pubkey.serialize()
1✔
1537
        invoice_features = lnaddr.get_features()
1✔
1538
        r_tags = lnaddr.get_routing_info('r')
1✔
1539
        amount_to_pay = lnaddr.get_amount_msat()
1✔
1540
        status = self.get_payment_status(payment_hash)
1✔
1541
        if status == PR_PAID:
1✔
1542
            raise PaymentFailure(_("This invoice has been paid already"))
×
1543
        if status == PR_INFLIGHT:
1✔
1544
            raise PaymentFailure(_("A payment was already initiated for this invoice"))
×
1545
        if payment_hash in self.get_payments(status='inflight'):
1✔
1546
            raise PaymentFailure(_("A previous attempt to pay this invoice did not clear"))
×
1547
        info = PaymentInfo(payment_hash, amount_to_pay, SENT, PR_UNPAID)
1✔
1548
        self.save_payment_info(info)
1✔
1549
        self.wallet.set_label(key, lnaddr.get_description())
1✔
1550
        self.set_invoice_status(key, PR_INFLIGHT)
1✔
1551
        if budget is None:
1✔
1552
            budget = PaymentFeeBudget.from_invoice_amount(invoice_amount_msat=amount_to_pay, config=self.config)
1✔
1553
        if attempts is None and self.uses_trampoline():
1✔
1554
            # we don't expect lots of failed htlcs with trampoline, so we can fail sooner
1555
            attempts = 30
1✔
1556
        success = False
1✔
1557
        try:
1✔
1558
            await self.pay_to_node(
1✔
1559
                node_pubkey=invoice_pubkey,
1560
                payment_hash=payment_hash,
1561
                payment_secret=payment_secret,
1562
                amount_to_pay=amount_to_pay,
1563
                min_final_cltv_delta=min_final_cltv_delta,
1564
                r_tags=r_tags,
1565
                invoice_features=invoice_features,
1566
                attempts=attempts,
1567
                full_path=full_path,
1568
                channels=channels,
1569
                budget=budget,
1570
            )
1571
            success = True
1✔
1572
        except PaymentFailure as e:
1✔
1573
            self.logger.info(f'payment failure: {e!r}')
1✔
1574
            reason = str(e)
1✔
1575
        except ChannelDBNotLoaded as e:
1✔
1576
            self.logger.info(f'payment failure: {e!r}')
×
1577
            reason = str(e)
×
1578
        finally:
1579
            self.logger.info(f"pay_invoice ending session for RHASH={payment_hash.hex()}. {success=}")
1✔
1580
        if success:
1✔
1581
            self.set_invoice_status(key, PR_PAID)
1✔
1582
            util.trigger_callback('payment_succeeded', self.wallet, key)
1✔
1583
        else:
1584
            self.set_invoice_status(key, PR_UNPAID)
1✔
1585
            util.trigger_callback('payment_failed', self.wallet, key, reason)
1✔
1586
        log = self.logs[key]
1✔
1587
        return success, log
1✔
1588

1589
    async def pay_to_node(
1✔
1590
            self, *,
1591
            node_pubkey: bytes,
1592
            payment_hash: bytes,
1593
            payment_secret: bytes,
1594
            amount_to_pay: int,  # in msat
1595
            min_final_cltv_delta: int,
1596
            r_tags,
1597
            invoice_features: int,
1598
            attempts: int = None,
1599
            full_path: LNPaymentPath = None,
1600
            fwd_trampoline_onion: OnionPacket = None,
1601
            budget: PaymentFeeBudget,
1602
            channels: Optional[Sequence[Channel]] = None,
1603
            fw_payment_key: str = None,  # for forwarding
1604
    ) -> None:
1605

1606
        assert budget
1✔
1607
        assert budget.fee_msat >= 0, budget
1✔
1608
        assert budget.cltv >= 0, budget
1✔
1609

1610
        payment_key = payment_hash + payment_secret
1✔
1611
        assert payment_key not in self._paysessions
1✔
1612
        self._paysessions[payment_key] = paysession = PaySession(
1✔
1613
            payment_hash=payment_hash,
1614
            payment_secret=payment_secret,
1615
            initial_trampoline_fee_level=self.config.INITIAL_TRAMPOLINE_FEE_LEVEL,
1616
            invoice_features=invoice_features,
1617
            r_tags=r_tags,
1618
            min_final_cltv_delta=min_final_cltv_delta,
1619
            amount_to_pay=amount_to_pay,
1620
            invoice_pubkey=node_pubkey,
1621
            uses_trampoline=self.uses_trampoline(),
1622
            # the config option to use two trampoline hops for legacy payments has been removed as
1623
            # the trampoline onion is too small (400 bytes) to accommodate two trampoline hops and
1624
            # routing hints, making the functionality unusable for payments that require routing hints.
1625
            # TODO: if you read this, the year is 2027 and there is no use for the second trampoline
1626
            # hop code anymore remove the code completely.
1627
            use_two_trampolines=False,
1628
        )
1629
        self.logs[payment_hash.hex()] = log = []  # TODO incl payment_secret in key (re trampoline forwarding)
1✔
1630

1631
        paysession.logger.info(
1✔
1632
            f"pay_to_node starting session for RHASH={payment_hash.hex()}. "
1633
            f"using_trampoline={self.uses_trampoline()}. "
1634
            f"invoice_features={paysession.invoice_features.get_names()}. "
1635
            f"{amount_to_pay=} msat. {budget=}")
1636
        if not self.uses_trampoline():
1✔
1637
            self.logger.info(
1✔
1638
                f"gossip_db status. sync progress: {self.network.lngossip.get_sync_progress_estimate()}. "
1639
                f"num_nodes={self.channel_db.num_nodes}, "
1640
                f"num_channels={self.channel_db.num_channels}, "
1641
                f"num_policies={self.channel_db.num_policies}.")
1642

1643
        # when encountering trampoline forwarding difficulties in the legacy case, we
1644
        # sometimes need to fall back to a single trampoline forwarder, at the expense
1645
        # of privacy
1646
        try:
1✔
1647
            while True:
1✔
1648
                if (amount_to_send := paysession.get_outstanding_amount_to_send()) > 0:
1✔
1649
                    # 1. create a set of routes for remaining amount.
1650
                    # note: path-finding runs in a separate thread so that we don't block the asyncio loop
1651
                    # graph updates might occur during the computation
1652
                    remaining_fee_budget_msat = (budget.fee_msat * amount_to_send) // amount_to_pay
1✔
1653
                    routes = self.create_routes_for_payment(
1✔
1654
                        paysession=paysession,
1655
                        amount_msat=amount_to_send,
1656
                        full_path=full_path,
1657
                        fwd_trampoline_onion=fwd_trampoline_onion,
1658
                        channels=channels,
1659
                        budget=budget._replace(fee_msat=remaining_fee_budget_msat),
1660
                    )
1661
                    # 2. send htlcs
1662
                    async for sent_htlc_info, cltv_delta, trampoline_onion in routes:
1✔
1663
                        await self.pay_to_route(
1✔
1664
                            paysession=paysession,
1665
                            sent_htlc_info=sent_htlc_info,
1666
                            min_final_cltv_delta=cltv_delta,
1667
                            trampoline_onion=trampoline_onion,
1668
                            fw_payment_key=fw_payment_key,
1669
                        )
1670
                    # invoice_status is triggered in self.set_invoice_status when it actually changes.
1671
                    # It is also triggered here to update progress for a lightning payment in the GUI
1672
                    # (e.g. attempt counter)
1673
                    util.trigger_callback('invoice_status', self.wallet, payment_hash.hex(), PR_INFLIGHT)
1✔
1674
                # 3. await a queue
1675
                htlc_log = await paysession.wait_for_one_htlc_to_resolve()  # TODO maybe wait a bit, more failures might come
1✔
1676
                log.append(htlc_log)
1✔
1677
                if htlc_log.success:
1✔
1678
                    if self.network.path_finder:
1✔
1679
                        # TODO: report every route to liquidity hints for mpp
1680
                        # in the case of success, we report channels of the
1681
                        # route as being able to send the same amount in the future,
1682
                        # as we assume to not know the capacity
1683
                        self.network.path_finder.update_liquidity_hints(htlc_log.route, htlc_log.amount_msat)
1✔
1684
                        # remove inflight htlcs from liquidity hints
1685
                        self.network.path_finder.update_inflight_htlcs(htlc_log.route, add_htlcs=False)
1✔
1686
                    return
1✔
1687
                # htlc failed
1688
                # if we get a tmp channel failure, it might work to split the amount and try more routes
1689
                # if we get a channel update, we might retry the same route and amount
1690
                route = htlc_log.route
1✔
1691
                sender_idx = htlc_log.sender_idx
1✔
1692
                failure_msg = htlc_log.failure_msg
1✔
1693
                if sender_idx is None:
1✔
UNCOV
1694
                    raise PaymentFailure(failure_msg.code_name())
×
1695
                erring_node_id = route[sender_idx].node_id
1✔
1696
                code, data = failure_msg.code, failure_msg.data
1✔
1697
                self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
1✔
1698
                                 f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
1699
                self.logger.info(f"error reported by {erring_node_id.hex()}")
1✔
1700
                if code == OnionFailureCode.MPP_TIMEOUT:
1✔
1701
                    raise PaymentFailure(failure_msg.code_name())
1✔
1702
                # errors returned by the next trampoline.
1703
                if fwd_trampoline_onion and code in [
1✔
1704
                        OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
1705
                        OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON]:
1706
                    raise failure_msg
×
1707
                # trampoline
1708
                if self.uses_trampoline():
1✔
1709
                    paysession.handle_failed_trampoline_htlc(
1✔
1710
                        htlc_log=htlc_log, failure_msg=failure_msg)
1711
                else:
1712
                    self.handle_error_code_from_failed_htlc(
1✔
1713
                        route=route, sender_idx=sender_idx, failure_msg=failure_msg, amount=htlc_log.amount_msat)
1714
                # max attempts or timeout
1715
                if (attempts is not None and len(log) >= attempts) or (attempts is None and time.time() - paysession.start_time > self.PAYMENT_TIMEOUT):
1✔
1716
                    raise PaymentFailure('Giving up after %d attempts'%len(log))
1✔
1717
        finally:
1718
            paysession.is_active = False
1✔
1719
            if paysession.can_be_deleted():
1✔
1720
                self._paysessions.pop(payment_key)
1✔
1721
            paysession.logger.info(f"pay_to_node ending session for RHASH={payment_hash.hex()}")
1✔
1722

1723
    async def pay_to_route(
1✔
1724
            self, *,
1725
            paysession: PaySession,
1726
            sent_htlc_info: SentHtlcInfo,
1727
            min_final_cltv_delta: int,
1728
            trampoline_onion: Optional[OnionPacket] = None,
1729
            fw_payment_key: str = None,
1730
    ) -> None:
1731
        """Sends a single HTLC."""
1732
        shi = sent_htlc_info
1✔
1733
        del sent_htlc_info  # just renamed
1✔
1734
        short_channel_id = shi.route[0].short_channel_id
1✔
1735
        chan = self.get_channel_by_short_id(short_channel_id)
1✔
1736
        assert chan, ShortChannelID(short_channel_id)
1✔
1737
        peer = self._peers.get(shi.route[0].node_id)
1✔
1738
        if not peer:
1✔
1739
            raise PaymentFailure('Dropped peer')
×
1740
        await peer.initialized
1✔
1741
        htlc = peer.pay(
1✔
1742
            route=shi.route,
1743
            chan=chan,
1744
            amount_msat=shi.amount_msat,
1745
            total_msat=shi.bucket_msat,
1746
            payment_hash=paysession.payment_hash,
1747
            min_final_cltv_delta=min_final_cltv_delta,
1748
            payment_secret=shi.payment_secret_bucket,
1749
            trampoline_onion=trampoline_onion)
1750

1751
        key = (paysession.payment_hash, short_channel_id, htlc.htlc_id)
1✔
1752
        self.sent_htlcs_info[key] = shi
1✔
1753
        paysession.add_new_htlc(shi)
1✔
1754
        if fw_payment_key:
1✔
1755
            htlc_key = serialize_htlc_key(short_channel_id, htlc.htlc_id)
1✔
1756
            self.logger.info(f'adding active forwarding {fw_payment_key}')
1✔
1757
            self.active_forwardings[fw_payment_key].append(htlc_key)
1✔
1758
        if self.network.path_finder:
1✔
1759
            # add inflight htlcs to liquidity hints
1760
            self.network.path_finder.update_inflight_htlcs(shi.route, add_htlcs=True)
1✔
1761
        util.trigger_callback('htlc_added', chan, htlc, SENT)
1✔
1762

1763
    def handle_error_code_from_failed_htlc(
1✔
1764
            self,
1765
            *,
1766
            route: LNPaymentRoute,
1767
            sender_idx: int,
1768
            failure_msg: OnionRoutingFailure,
1769
            amount: int) -> None:
1770

1771
        assert self.channel_db  # cannot be in trampoline mode
1✔
1772
        assert self.network.path_finder
1✔
1773

1774
        # remove inflight htlcs from liquidity hints
1775
        self.network.path_finder.update_inflight_htlcs(route, add_htlcs=False)
1✔
1776

1777
        code, data = failure_msg.code, failure_msg.data
1✔
1778
        # TODO can we use lnmsg.OnionWireSerializer here?
1779
        # TODO update onion_wire.csv
1780
        # handle some specific error codes
1781
        failure_codes = {
1✔
1782
            OnionFailureCode.TEMPORARY_CHANNEL_FAILURE: 0,
1783
            OnionFailureCode.AMOUNT_BELOW_MINIMUM: 8,
1784
            OnionFailureCode.FEE_INSUFFICIENT: 8,
1785
            OnionFailureCode.INCORRECT_CLTV_EXPIRY: 4,
1786
            OnionFailureCode.EXPIRY_TOO_SOON: 0,
1787
            OnionFailureCode.CHANNEL_DISABLED: 2,
1788
        }
1789
        try:
1✔
1790
            failing_channel = route[sender_idx + 1].short_channel_id
1✔
1791
        except IndexError:
1✔
1792
            raise PaymentFailure(f'payment destination reported error: {failure_msg.code_name()}') from None
1✔
1793

1794
        # TODO: handle unknown next peer?
1795
        # handle failure codes that include a channel update
1796
        if code in failure_codes:
1✔
1797
            offset = failure_codes[code]
1✔
1798
            channel_update_len = int.from_bytes(data[offset:offset+2], byteorder="big")
1✔
1799
            channel_update_as_received = data[offset+2: offset+2+channel_update_len]
1✔
1800
            payload = self._decode_channel_update_msg(channel_update_as_received)
1✔
1801
            if payload is None:
1✔
1802
                self.logger.info(f'could not decode channel_update for failed htlc: '
×
1803
                                 f'{channel_update_as_received.hex()}')
1804
                blacklist = True
×
1805
            elif payload.get('short_channel_id') != failing_channel:
1✔
1806
                self.logger.info(f'short_channel_id in channel_update does not match our route')
×
1807
                blacklist = True
×
1808
            else:
1809
                # apply the channel update or get blacklisted
1810
                blacklist, update = self._handle_chanupd_from_failed_htlc(
1✔
1811
                    payload, route=route, sender_idx=sender_idx, failure_msg=failure_msg)
1812
                # we interpret a temporary channel failure as a liquidity issue
1813
                # in the channel and update our liquidity hints accordingly
1814
                if code == OnionFailureCode.TEMPORARY_CHANNEL_FAILURE:
1✔
1815
                    self.network.path_finder.update_liquidity_hints(
1✔
1816
                        route,
1817
                        amount,
1818
                        failing_channel=ShortChannelID(failing_channel))
1819
                # if we can't decide on some action, we are stuck
1820
                if not (blacklist or update):
1✔
1821
                    raise PaymentFailure(failure_msg.code_name())
×
1822
        # for errors that do not include a channel update
1823
        else:
1824
            blacklist = True
1✔
1825
        if blacklist:
1✔
1826
            self.network.path_finder.add_edge_to_blacklist(short_channel_id=failing_channel)
1✔
1827

1828
    def _handle_chanupd_from_failed_htlc(
1✔
1829
        self, payload, *,
1830
        route: LNPaymentRoute,
1831
        sender_idx: int,
1832
        failure_msg: OnionRoutingFailure,
1833
    ) -> Tuple[bool, bool]:
1834
        blacklist = False
1✔
1835
        update = False
1✔
1836
        try:
1✔
1837
            r = self.channel_db.add_channel_update(payload, verify=True)
1✔
1838
        except InvalidGossipMsg:
×
1839
            return True, False  # blacklist
×
1840
        short_channel_id = ShortChannelID(payload['short_channel_id'])
1✔
1841
        if r == UpdateStatus.GOOD:
1✔
1842
            self.logger.info(f"applied channel update to {short_channel_id}")
×
1843
            # TODO: add test for this
1844
            # FIXME: this does not work for our own unannounced channels.
1845
            for chan in self.channels.values():
×
1846
                if chan.short_channel_id == short_channel_id:
×
1847
                    chan.set_remote_update(payload)
×
1848
            update = True
×
1849
        elif r == UpdateStatus.ORPHANED:
1✔
1850
            # maybe it is a private channel (and data in invoice was outdated)
1851
            self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
1✔
1852
            start_node_id = route[sender_idx].node_id
1✔
1853
            cache_ttl = None
1✔
1854
            if failure_msg.code == OnionFailureCode.CHANNEL_DISABLED:
1✔
1855
                # eclair sends CHANNEL_DISABLED if its peer is offline. E.g. we might be trying to pay
1856
                # a mobile phone with the app closed. So we cache this with a short TTL.
1857
                cache_ttl = self.channel_db.PRIVATE_CHAN_UPD_CACHE_TTL_SHORT
×
1858
            update = self.channel_db.add_channel_update_for_private_channel(payload, start_node_id, cache_ttl=cache_ttl)
1✔
1859
            blacklist = not update
1✔
1860
        elif r == UpdateStatus.EXPIRED:
×
1861
            blacklist = True
×
1862
        elif r == UpdateStatus.DEPRECATED:
×
1863
            self.logger.info(f'channel update is not more recent.')
×
1864
            blacklist = True
×
1865
        elif r == UpdateStatus.UNCHANGED:
×
1866
            blacklist = True
×
1867
        return blacklist, update
1✔
1868

1869
    @classmethod
1✔
1870
    def _decode_channel_update_msg(cls, chan_upd_msg: bytes) -> Optional[Dict[str, Any]]:
1✔
1871
        channel_update_as_received = chan_upd_msg
1✔
1872
        channel_update_typed = (258).to_bytes(length=2, byteorder="big") + channel_update_as_received
1✔
1873
        # note: some nodes put channel updates in error msgs with the leading msg_type already there.
1874
        #       we try decoding both ways here.
1875
        try:
1✔
1876
            message_type, payload = decode_msg(channel_update_typed)
1✔
1877
            if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
1✔
1878
            payload['raw'] = channel_update_typed
1✔
1879
            return payload
1✔
1880
        except Exception:  # FIXME: too broad
1✔
1881
            try:
1✔
1882
                message_type, payload = decode_msg(channel_update_as_received)
1✔
1883
                if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception()
1✔
1884
                payload['raw'] = channel_update_as_received
1✔
1885
                return payload
1✔
1886
            except Exception:
1✔
1887
                return None
1✔
1888

1889
    def _check_bolt11_invoice(self, bolt11_invoice: str, *, amount_msat: int = None) -> LnAddr:
1✔
1890
        """Parses and validates a bolt11 invoice str into a LnAddr.
1891
        Includes pre-payment checks external to the parser.
1892
        """
1893
        addr = lndecode(bolt11_invoice)
1✔
1894
        if addr.is_expired():
1✔
1895
            raise InvoiceError(_("This invoice has expired"))
×
1896
        # check amount
1897
        if amount_msat:  # replace amt in invoice. main usecase is paying zero amt invoices
1✔
1898
            existing_amt_msat = addr.get_amount_msat()
×
1899
            if existing_amt_msat and amount_msat < existing_amt_msat:
×
1900
                raise Exception("cannot pay lower amt than what is originally in LN invoice")
×
1901
            addr.amount = Decimal(amount_msat) / COIN / 1000
×
1902
        if addr.amount is None:
1✔
1903
            raise InvoiceError(_("Missing amount"))
×
1904
        # check cltv
1905
        if addr.get_min_final_cltv_delta() > NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE:
1✔
1906
            raise InvoiceError("{}\n{}".format(
1✔
1907
                _("Invoice wants us to risk locking funds for unreasonably long."),
1908
                f"min_final_cltv_delta: {addr.get_min_final_cltv_delta()}"))
1909
        # check features
1910
        addr.validate_and_compare_features(self.features)
1✔
1911
        return addr
1✔
1912

1913
    def is_trampoline_peer(self, node_id: bytes) -> bool:
1✔
1914
        # until trampoline is advertised in lnfeatures, check against hardcoded list
1915
        if is_hardcoded_trampoline(node_id):
1✔
1916
            return True
1✔
1917
        peer = self._peers.get(node_id)
×
1918
        if not peer:
×
1919
            return False
×
1920
        return (peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ECLAIR)
×
1921
                or peer.their_features.supports(LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM))
1922

1923
    def suggest_peer(self) -> Optional[bytes]:
1✔
1924
        if not self.uses_trampoline():
×
1925
            return self.lnrater.suggest_peer()
×
1926
        else:
1927
            return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey
×
1928

1929
    def suggest_payment_splits(
1✔
1930
        self,
1931
        *,
1932
        amount_msat: int,
1933
        final_total_msat: int,
1934
        my_active_channels: Sequence[Channel],
1935
        invoice_features: LnFeatures,
1936
        r_tags: Sequence[Sequence[Sequence[Any]]],
1937
        receiver_pubkey: bytes,
1938
    ) -> List['SplitConfigRating']:
1939
        channels_with_funds = {
1✔
1940
            (chan.channel_id, chan.node_id): ( int(chan.available_to_spend(HTLCOwner.LOCAL)), chan.htlc_slots_left(HTLCOwner.LOCAL))
1941
            for chan in my_active_channels
1942
        }
1943
        # if we have a direct channel it's preferable to send a single part directly through this
1944
        # channel, so this bool will disable excluding single part payments
1945
        have_direct_channel = any(chan.node_id == receiver_pubkey for chan in my_active_channels)
1✔
1946
        self.logger.info(f"channels_with_funds: {channels_with_funds}, {have_direct_channel=}")
1✔
1947
        exclude_single_part_payments = False
1✔
1948
        if self.uses_trampoline():
1✔
1949
            # in the case of a legacy payment, we don't allow splitting via different
1950
            # trampoline nodes, because of https://github.com/ACINQ/eclair/issues/2127
1951
            is_legacy, _ = is_legacy_relay(invoice_features, r_tags)
1✔
1952
            exclude_multinode_payments = is_legacy
1✔
1953
            # we don't split within a channel when sending to a trampoline node,
1954
            # the trampoline node will split for us
1955
            exclude_single_channel_splits = not self.config.TEST_FORCE_MPP
1✔
1956
        else:
1957
            exclude_multinode_payments = False
1✔
1958
            exclude_single_channel_splits = False
1✔
1959
            if invoice_features.supports(LnFeatures.BASIC_MPP_OPT) and not self.config.TEST_FORCE_DISABLE_MPP:
1✔
1960
                # if amt is still large compared to total_msat, split it:
1961
                if (amount_msat / final_total_msat > self.MPP_SPLIT_PART_FRACTION
1✔
1962
                        and amount_msat > self.MPP_SPLIT_PART_MINAMT_MSAT
1963
                        and not have_direct_channel):
1964
                    exclude_single_part_payments = True
×
1965

1966
        split_configurations = suggest_splits(
1✔
1967
            amount_msat,
1968
            channels_with_funds,
1969
            exclude_single_part_payments=exclude_single_part_payments,
1970
            exclude_multinode_payments=exclude_multinode_payments,
1971
            exclude_single_channel_splits=exclude_single_channel_splits
1972
        )
1973

1974
        self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
1✔
1975
        return split_configurations
1✔
1976

1977
    async def create_routes_for_payment(
1✔
1978
            self, *,
1979
            paysession: PaySession,
1980
            amount_msat: int,        # part of payment amount we want routes for now
1981
            fwd_trampoline_onion: OnionPacket = None,
1982
            full_path: LNPaymentPath = None,
1983
            channels: Optional[Sequence[Channel]] = None,
1984
            budget: PaymentFeeBudget,
1985
    ) -> AsyncGenerator[Tuple[SentHtlcInfo, int, Optional[OnionPacket]], None]:
1986

1987
        """Creates multiple routes for splitting a payment over the available
1988
        private channels.
1989

1990
        We first try to conduct the payment over a single channel. If that fails
1991
        and mpp is supported by the receiver, we will split the payment."""
1992
        trampoline_features = LnFeatures.VAR_ONION_OPT
1✔
1993
        local_height = self.wallet.adb.get_local_height()
1✔
1994
        fee_related_error = None  # type: Optional[FeeBudgetExceeded]
1✔
1995
        if channels:
1✔
1996
            my_active_channels = channels
×
1997
        else:
1998
            my_active_channels = [
1✔
1999
                chan for chan in self.channels.values() if
2000
                chan.is_active() and not chan.is_frozen_for_sending()]
2001
        # try random order
2002
        random.shuffle(my_active_channels)
1✔
2003
        split_configurations = self.suggest_payment_splits(
1✔
2004
            amount_msat=amount_msat,
2005
            final_total_msat=paysession.amount_to_pay,
2006
            my_active_channels=my_active_channels,
2007
            invoice_features=paysession.invoice_features,
2008
            r_tags=paysession.r_tags,
2009
            receiver_pubkey=paysession.invoice_pubkey,
2010
        )
2011
        for sc in split_configurations:
1✔
2012
            is_multichan_mpp = len(sc.config.items()) > 1
1✔
2013
            is_mpp = sc.config.number_parts() > 1
1✔
2014
            if is_mpp and not paysession.invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
1✔
2015
                continue
1✔
2016
            if not is_mpp and self.config.TEST_FORCE_MPP:
1✔
2017
                continue
1✔
2018
            if is_mpp and self.config.TEST_FORCE_DISABLE_MPP:
1✔
2019
                continue
×
2020
            self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}")
1✔
2021
            routes = []
1✔
2022
            try:
1✔
2023
                if self.uses_trampoline():
1✔
2024
                    per_trampoline_channel_amounts = defaultdict(list)
1✔
2025
                    # categorize by trampoline nodes for trampoline mpp construction
2026
                    for (chan_id, _), part_amounts_msat in sc.config.items():
1✔
2027
                        chan = self.channels[chan_id]
1✔
2028
                        for part_amount_msat in part_amounts_msat:
1✔
2029
                            per_trampoline_channel_amounts[chan.node_id].append((chan_id, part_amount_msat))
1✔
2030
                    # for each trampoline forwarder, construct mpp trampoline
2031
                    for trampoline_node_id, trampoline_parts in per_trampoline_channel_amounts.items():
1✔
2032
                        per_trampoline_amount = sum([x[1] for x in trampoline_parts])
1✔
2033
                        trampoline_route, trampoline_onion, per_trampoline_amount_with_fees, per_trampoline_cltv_delta = create_trampoline_route_and_onion(
1✔
2034
                            amount_msat=per_trampoline_amount,
2035
                            total_msat=paysession.amount_to_pay,
2036
                            min_final_cltv_delta=paysession.min_final_cltv_delta,
2037
                            my_pubkey=self.node_keypair.pubkey,
2038
                            invoice_pubkey=paysession.invoice_pubkey,
2039
                            invoice_features=paysession.invoice_features,
2040
                            node_id=trampoline_node_id,
2041
                            r_tags=paysession.r_tags,
2042
                            payment_hash=paysession.payment_hash,
2043
                            payment_secret=paysession.payment_secret,
2044
                            local_height=local_height,
2045
                            trampoline_fee_level=paysession.trampoline_fee_level,
2046
                            use_two_trampolines=paysession.use_two_trampolines,
2047
                            failed_routes=paysession.failed_trampoline_routes,
2048
                            budget=budget._replace(fee_msat=budget.fee_msat // len(per_trampoline_channel_amounts)),
2049
                        )
2050
                        # node_features is only used to determine is_tlv
2051
                        per_trampoline_secret = os.urandom(32)
1✔
2052
                        per_trampoline_fees = per_trampoline_amount_with_fees - per_trampoline_amount
1✔
2053
                        self.logger.info(f'created route with trampoline fee level={paysession.trampoline_fee_level}')
1✔
2054
                        self.logger.info(f'trampoline hops: {[hop.end_node.hex() for hop in trampoline_route]}')
1✔
2055
                        self.logger.info(f'per trampoline fees: {per_trampoline_fees}')
1✔
2056
                        for chan_id, part_amount_msat in trampoline_parts:
1✔
2057
                            chan = self.channels[chan_id]
1✔
2058
                            margin = chan.available_to_spend(LOCAL) - part_amount_msat
1✔
2059
                            delta_fee = min(per_trampoline_fees, margin)
1✔
2060
                            # TODO: distribute trampoline fee over several channels?
2061
                            part_amount_msat_with_fees = part_amount_msat + delta_fee
1✔
2062
                            per_trampoline_fees -= delta_fee
1✔
2063
                            route = [
1✔
2064
                                RouteEdge(
2065
                                    start_node=self.node_keypair.pubkey,
2066
                                    end_node=trampoline_node_id,
2067
                                    short_channel_id=chan.short_channel_id,
2068
                                    fee_base_msat=0,
2069
                                    fee_proportional_millionths=0,
2070
                                    cltv_delta=0,
2071
                                    node_features=trampoline_features)
2072
                            ]
2073
                            self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
1✔
2074
                            shi = SentHtlcInfo(
1✔
2075
                                route=route,
2076
                                payment_secret_orig=paysession.payment_secret,
2077
                                payment_secret_bucket=per_trampoline_secret,
2078
                                amount_msat=part_amount_msat_with_fees,
2079
                                bucket_msat=per_trampoline_amount_with_fees,
2080
                                amount_receiver_msat=part_amount_msat,
2081
                                trampoline_fee_level=paysession.trampoline_fee_level,
2082
                                trampoline_route=trampoline_route,
2083
                            )
2084
                            routes.append((shi, per_trampoline_cltv_delta, trampoline_onion))
1✔
2085
                        if per_trampoline_fees != 0:
1✔
2086
                            e = 'not enough margin to pay trampoline fee'
×
2087
                            self.logger.info(e)
×
2088
                            raise FeeBudgetExceeded(e)
×
2089
                else:
2090
                    # We atomically loop through a split configuration. If there was
2091
                    # a failure to find a path for a single part, we try the next configuration
2092
                    for (chan_id, _), part_amounts_msat in sc.config.items():
1✔
2093
                        for part_amount_msat in part_amounts_msat:
1✔
2094
                            channel = self.channels[chan_id]
1✔
2095
                            route = await run_in_thread(
1✔
2096
                                partial(
2097
                                    self.create_route_for_single_htlc,
2098
                                    amount_msat=part_amount_msat,
2099
                                    invoice_pubkey=paysession.invoice_pubkey,
2100
                                    min_final_cltv_delta=paysession.min_final_cltv_delta,
2101
                                    r_tags=paysession.r_tags,
2102
                                    invoice_features=paysession.invoice_features,
2103
                                    my_sending_channels=[channel] if is_multichan_mpp else my_active_channels,
2104
                                    full_path=full_path,
2105
                                    budget=budget._replace(fee_msat=budget.fee_msat // sc.config.number_parts()),
2106
                                )
2107
                            )
2108
                            shi = SentHtlcInfo(
1✔
2109
                                route=route,
2110
                                payment_secret_orig=paysession.payment_secret,
2111
                                payment_secret_bucket=paysession.payment_secret,
2112
                                amount_msat=part_amount_msat,
2113
                                bucket_msat=paysession.amount_to_pay,
2114
                                amount_receiver_msat=part_amount_msat,
2115
                                trampoline_fee_level=None,
2116
                                trampoline_route=None,
2117
                            )
2118
                            routes.append((shi, paysession.min_final_cltv_delta, fwd_trampoline_onion))
1✔
2119
            except NoPathFound:
1✔
2120
                continue
1✔
2121
            except FeeBudgetExceeded as e:
1✔
2122
                fee_related_error = e
×
2123
                continue
×
2124
            for route in routes:
1✔
2125
                yield route
1✔
2126
            return
1✔
2127
        if fee_related_error is not None:
1✔
2128
            raise fee_related_error
×
2129
        raise NoPathFound()
1✔
2130

2131
    @profiler
1✔
2132
    def create_route_for_single_htlc(
1✔
2133
            self, *,
2134
            amount_msat: int,  # that final receiver gets
2135
            invoice_pubkey: bytes,
2136
            min_final_cltv_delta: int,
2137
            r_tags,
2138
            invoice_features: int,
2139
            my_sending_channels: List[Channel],
2140
            full_path: Optional[LNPaymentPath],
2141
            budget: PaymentFeeBudget,
2142
    ) -> LNPaymentRoute:
2143

2144
        my_sending_aliases = set(chan.get_local_scid_alias() for chan in my_sending_channels)
1✔
2145
        my_sending_channels = {chan.short_channel_id: chan for chan in my_sending_channels
1✔
2146
            if chan.short_channel_id is not None}
2147
        # Collect all private edges from route hints.
2148
        # Note: if some route hints are multiple edges long, and these paths cross each other,
2149
        #       we allow our path finding to cross the paths; i.e. the route hints are not isolated.
2150
        private_route_edges = {}  # type: Dict[ShortChannelID, RouteEdge]
1✔
2151
        for private_path in r_tags:
1✔
2152
            # we need to shift the node pubkey by one towards the destination:
2153
            private_path_nodes = [edge[0] for edge in private_path][1:] + [invoice_pubkey]
1✔
2154
            private_path_rest = [edge[1:] for edge in private_path]
1✔
2155
            start_node = private_path[0][0]
1✔
2156
            # remove aliases from direct routes
2157
            if len(private_path) == 1 and private_path[0][1] in my_sending_aliases:
1✔
2158
                self.logger.info(f'create_route: skipping alias {ShortChannelID(private_path[0][1])}')
×
2159
                continue
×
2160
            for end_node, edge_rest in zip(private_path_nodes, private_path_rest):
1✔
2161
                short_channel_id, fee_base_msat, fee_proportional_millionths, cltv_delta = edge_rest
1✔
2162
                short_channel_id = ShortChannelID(short_channel_id)
1✔
2163
                if (our_chan := self.get_channel_by_short_id(short_channel_id)) is not None:
1✔
2164
                    # check if the channel is one of our channels and frozen for sending
2165
                    if our_chan.is_frozen_for_sending():
1✔
2166
                        continue
×
2167
                # if we have a routing policy for this edge in the db, that takes precedence,
2168
                # as it is likely from a previous failure
2169
                channel_policy = self.channel_db.get_policy_for_node(
1✔
2170
                    short_channel_id=short_channel_id,
2171
                    node_id=start_node,
2172
                    my_channels=my_sending_channels)
2173
                if channel_policy:
1✔
2174
                    fee_base_msat = channel_policy.fee_base_msat
1✔
2175
                    fee_proportional_millionths = channel_policy.fee_proportional_millionths
1✔
2176
                    cltv_delta = channel_policy.cltv_delta
1✔
2177
                node_info = self.channel_db.get_node_info_for_node_id(node_id=end_node)
1✔
2178
                route_edge = RouteEdge(
1✔
2179
                        start_node=start_node,
2180
                        end_node=end_node,
2181
                        short_channel_id=short_channel_id,
2182
                        fee_base_msat=fee_base_msat,
2183
                        fee_proportional_millionths=fee_proportional_millionths,
2184
                        cltv_delta=cltv_delta,
2185
                        node_features=node_info.features if node_info else 0)
2186
                private_route_edges[route_edge.short_channel_id] = route_edge
1✔
2187
                start_node = end_node
1✔
2188
        # now find a route, end to end: between us and the recipient
2189
        try:
1✔
2190
            route = self.network.path_finder.find_route(
1✔
2191
                nodeA=self.node_keypair.pubkey,
2192
                nodeB=invoice_pubkey,
2193
                invoice_amount_msat=amount_msat,
2194
                path=full_path,
2195
                my_sending_channels=my_sending_channels,
2196
                private_route_edges=private_route_edges)
2197
        except NoChannelPolicy as e:
1✔
2198
            raise NoPathFound() from e
×
2199
        if not route:
1✔
2200
            raise NoPathFound()
1✔
2201
        if not is_route_within_budget(
1✔
2202
            route, budget=budget, amount_msat_for_dest=amount_msat, cltv_delta_for_dest=min_final_cltv_delta,
2203
        ):
2204
            self.logger.info(f"rejecting route (exceeds budget): {route=}. {budget=}")
×
2205
            raise FeeBudgetExceeded()
×
2206
        assert len(route) > 0
1✔
2207
        if route[-1].end_node != invoice_pubkey:
1✔
2208
            raise LNPathInconsistent("last node_id != invoice pubkey")
1✔
2209
        # add features from invoice
2210
        route[-1].node_features |= invoice_features
1✔
2211
        return route
1✔
2212

2213
    def clear_invoices_cache(self):
1✔
2214
        self._bolt11_cache.clear()
×
2215

2216
    def get_bolt11_invoice(
1✔
2217
            self, *,
2218
            payment_hash: bytes,
2219
            amount_msat: Optional[int],
2220
            message: str,
2221
            expiry: int,  # expiration of invoice (in seconds, relative)
2222
            fallback_address: Optional[str],
2223
            channels: Optional[Sequence[Channel]] = None,
2224
            min_final_cltv_expiry_delta: Optional[int] = None,
2225
    ) -> Tuple[LnAddr, str]:
2226
        assert isinstance(payment_hash, bytes), f"expected bytes, but got {type(payment_hash)}"
1✔
2227

2228
        pair = self._bolt11_cache.get(payment_hash)
1✔
2229
        if pair:
1✔
2230
            lnaddr, invoice = pair
×
2231
            assert lnaddr.get_amount_msat() == amount_msat
×
2232
            return pair
×
2233

2234
        assert amount_msat is None or amount_msat > 0
1✔
2235
        timestamp = int(time.time())
1✔
2236
        needs_jit: bool = self.receive_requires_jit_channel(amount_msat)
1✔
2237
        routing_hints = self.calc_routing_hints_for_invoice(amount_msat, channels=channels, needs_jit=needs_jit)
1✔
2238
        self.logger.info(f"creating bolt11 invoice with routing_hints: {routing_hints}, jit: {needs_jit}, sat: {amount_msat or 0 // 1000}")
1✔
2239
        invoice_features = self.features.for_invoice()
1✔
2240
        if not self.uses_trampoline():
1✔
2241
            invoice_features &= ~ LnFeatures.OPTION_TRAMPOLINE_ROUTING_OPT_ELECTRUM
×
2242
        if needs_jit:
1✔
2243
            # jit only works with single htlcs, mpp will cause LSP to open channels for each htlc
2244
            invoice_features &= ~ LnFeatures.BASIC_MPP_OPT & ~ LnFeatures.BASIC_MPP_REQ
×
2245
        payment_secret = self.get_payment_secret(payment_hash)
1✔
2246
        amount_btc = amount_msat/Decimal(COIN*1000) if amount_msat else None
1✔
2247
        if expiry == 0:
1✔
2248
            expiry = LN_EXPIRY_NEVER
×
2249
        if min_final_cltv_expiry_delta is None:
1✔
2250
            min_final_cltv_expiry_delta = MIN_FINAL_CLTV_DELTA_FOR_INVOICE
×
2251
        lnaddr = LnAddr(
1✔
2252
            paymenthash=payment_hash,
2253
            amount=amount_btc,
2254
            tags=[
2255
                ('d', message),
2256
                ('c', min_final_cltv_expiry_delta),
2257
                ('x', expiry),
2258
                ('9', invoice_features),
2259
                ('f', fallback_address),
2260
            ] + routing_hints,
2261
            date=timestamp,
2262
            payment_secret=payment_secret)
2263
        invoice = lnencode(lnaddr, self.node_keypair.privkey)
1✔
2264
        pair = lnaddr, invoice
1✔
2265
        self._bolt11_cache[payment_hash] = pair
1✔
2266
        return pair
1✔
2267

2268
    def get_payment_secret(self, payment_hash):
1✔
2269
        return sha256(sha256(self.payment_secret_key) + payment_hash)
1✔
2270

2271
    def _get_payment_key(self, payment_hash: bytes) -> bytes:
1✔
2272
        """Return payment bucket key.
2273
        We bucket htlcs based on payment_hash+payment_secret. payment_secret is included
2274
        as it changes over a trampoline path (in the outer onion), and these paths can overlap.
2275
        """
2276
        payment_secret = self.get_payment_secret(payment_hash)
1✔
2277
        return payment_hash + payment_secret
1✔
2278

2279
    def create_payment_info(self, *, amount_msat: Optional[int], write_to_disk=True) -> bytes:
1✔
2280
        payment_preimage = os.urandom(32)
1✔
2281
        payment_hash = sha256(payment_preimage)
1✔
2282
        info = PaymentInfo(payment_hash, amount_msat, RECEIVED, PR_UNPAID)
1✔
2283
        self.save_preimage(payment_hash, payment_preimage, write_to_disk=False)
1✔
2284
        self.save_payment_info(info, write_to_disk=False)
1✔
2285
        if write_to_disk:
1✔
2286
            self.wallet.save_db()
×
2287
        return payment_hash
1✔
2288

2289
    def bundle_payments(self, hash_list):
1✔
2290
        payment_keys = [self._get_payment_key(x) for x in hash_list]
1✔
2291
        self.payment_bundles.append(payment_keys)
1✔
2292

2293
    def get_payment_bundle(self, payment_key: bytes) -> Sequence[bytes]:
1✔
2294
        for key_list in self.payment_bundles:
1✔
2295
            if payment_key in key_list:
1✔
2296
                return key_list
1✔
2297

2298
    def save_preimage(self, payment_hash: bytes, preimage: bytes, *, write_to_disk: bool = True):
1✔
2299
        if sha256(preimage) != payment_hash:
1✔
2300
            raise Exception("tried to save incorrect preimage for payment_hash")
×
2301
        if self._preimages.get(payment_hash.hex()) is not None:
1✔
2302
            return  # we already have this preimage
1✔
2303
        self.logger.debug(f"saving preimage for {payment_hash.hex()}")
1✔
2304
        self._preimages[payment_hash.hex()] = preimage.hex()
1✔
2305
        if write_to_disk:
1✔
2306
            self.wallet.save_db()
1✔
2307

2308
    def get_preimage(self, payment_hash: bytes) -> Optional[bytes]:
1✔
2309
        assert isinstance(payment_hash, bytes), f"expected bytes, but got {type(payment_hash)}"
1✔
2310
        preimage_hex = self._preimages.get(payment_hash.hex())
1✔
2311
        if preimage_hex is None:
1✔
2312
            return None
1✔
2313
        preimage_bytes = bytes.fromhex(preimage_hex)
1✔
2314
        if sha256(preimage_bytes) != payment_hash:
1✔
2315
            raise Exception("found incorrect preimage for payment_hash")
×
2316
        return preimage_bytes
1✔
2317

2318
    def get_preimage_hex(self, payment_hash: str) -> Optional[str]:
1✔
2319
        preimage_bytes = self.get_preimage(bytes.fromhex(payment_hash)) or b""
1✔
2320
        return preimage_bytes.hex() or None
1✔
2321

2322
    def get_payment_info(self, payment_hash: bytes) -> Optional[PaymentInfo]:
1✔
2323
        """returns None if payment_hash is a payment we are forwarding"""
2324
        key = payment_hash.hex()
1✔
2325
        with self.lock:
1✔
2326
            if key in self.payment_info:
1✔
2327
                amount_msat, direction, status = self.payment_info[key]
1✔
2328
                return PaymentInfo(payment_hash, amount_msat, direction, status)
1✔
2329
            return None
1✔
2330

2331
    def add_payment_info_for_hold_invoice(self, payment_hash: bytes, lightning_amount_sat: Optional[int]):
1✔
2332
        amount = lightning_amount_sat * 1000 if lightning_amount_sat else None
1✔
2333
        info = PaymentInfo(payment_hash, amount, RECEIVED, PR_UNPAID)
1✔
2334
        self.save_payment_info(info, write_to_disk=False)
1✔
2335

2336
    def register_hold_invoice(self, payment_hash: bytes, cb: Callable[[bytes], Awaitable[None]]):
1✔
2337
        self.hold_invoice_callbacks[payment_hash] = cb
1✔
2338

2339
    def unregister_hold_invoice(self, payment_hash: bytes):
1✔
2340
        self.hold_invoice_callbacks.pop(payment_hash)
×
2341

2342
    def save_payment_info(self, info: PaymentInfo, *, write_to_disk: bool = True) -> None:
1✔
2343
        key = info.payment_hash.hex()
1✔
2344
        assert info.status in SAVED_PR_STATUS
1✔
2345
        with self.lock:
1✔
2346
            self.payment_info[key] = info.amount_msat, info.direction, info.status
1✔
2347
        if write_to_disk:
1✔
2348
            self.wallet.save_db()
1✔
2349

2350
    def check_mpp_status(
1✔
2351
            self, *,
2352
            payment_secret: bytes,
2353
            short_channel_id: ShortChannelID,
2354
            htlc: UpdateAddHtlc,
2355
            expected_msat: int,
2356
    ) -> RecvMPPResolution:
2357
        """Returns the status of the incoming htlc set the given *htlc* belongs to.
2358

2359
        ACCEPTED simply means the mpp set is complete, and we can proceed with further
2360
        checks before fulfilling (or failing) the htlcs.
2361
        In particular, note that hold-invoice-htlcs typically remain in the ACCEPTED state
2362
        for quite some time -- not in the "WAITING" state (which would refer to the mpp set
2363
        not yet being complete!).
2364
        """
2365
        payment_hash = htlc.payment_hash
1✔
2366
        payment_key = payment_hash + payment_secret
1✔
2367
        self.update_mpp_with_received_htlc(
1✔
2368
            payment_key=payment_key, scid=short_channel_id, htlc=htlc, expected_msat=expected_msat)
2369
        mpp_resolution = self.received_mpp_htlcs[payment_key.hex()].resolution
1✔
2370
        # if still waiting, calc resolution now:
2371
        if mpp_resolution == RecvMPPResolution.WAITING:
1✔
2372
            bundle = self.get_payment_bundle(payment_key)
1✔
2373
            if bundle:
1✔
2374
                payment_keys = bundle
1✔
2375
            else:
2376
                payment_keys = [payment_key]
1✔
2377
            first_timestamp = min([self.get_first_timestamp_of_mpp(pkey) for pkey in payment_keys])
1✔
2378
            if self.get_payment_status(payment_hash) == PR_PAID:
1✔
2379
                mpp_resolution = RecvMPPResolution.ACCEPTED
×
2380
            elif self.stopping_soon:
1✔
2381
                # try to time out pending HTLCs before shutting down
2382
                mpp_resolution = RecvMPPResolution.EXPIRED
1✔
2383
            elif all([self.is_mpp_amount_reached(pkey) for pkey in payment_keys]):
1✔
2384
                mpp_resolution = RecvMPPResolution.ACCEPTED
1✔
2385
            elif time.time() - first_timestamp > self.MPP_EXPIRY:
1✔
2386
                mpp_resolution = RecvMPPResolution.EXPIRED
1✔
2387
            # save resolution, if any.
2388
            if mpp_resolution != RecvMPPResolution.WAITING:
1✔
2389
                for pkey in payment_keys:
1✔
2390
                    if pkey.hex() in self.received_mpp_htlcs:
1✔
2391
                        self.set_mpp_resolution(payment_key=pkey, resolution=mpp_resolution)
1✔
2392

2393
        return mpp_resolution
1✔
2394

2395
    def update_mpp_with_received_htlc(
1✔
2396
        self,
2397
        *,
2398
        payment_key: bytes,
2399
        scid: ShortChannelID,
2400
        htlc: UpdateAddHtlc,
2401
        expected_msat: int,
2402
    ):
2403
        # add new htlc to set
2404
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
1✔
2405
        if mpp_status is None:
1✔
2406
            mpp_status = ReceivedMPPStatus(
1✔
2407
                resolution=RecvMPPResolution.WAITING,
2408
                expected_msat=expected_msat,
2409
                htlc_set=set(),
2410
            )
2411
        if expected_msat != mpp_status.expected_msat:
1✔
2412
            self.logger.info(
1✔
2413
                f"marking received mpp as failed. inconsistent total_msats in bucket. {payment_key.hex()=}")
2414
            mpp_status = mpp_status._replace(resolution=RecvMPPResolution.FAILED)
1✔
2415
        key = (scid, htlc)
1✔
2416
        if key not in mpp_status.htlc_set:
1✔
2417
            mpp_status.htlc_set.add(key)  # side-effecting htlc_set
1✔
2418
        self.received_mpp_htlcs[payment_key.hex()] = mpp_status
1✔
2419

2420
    def set_mpp_resolution(self, *, payment_key: bytes, resolution: RecvMPPResolution):
1✔
2421
        mpp_status = self.received_mpp_htlcs[payment_key.hex()]
1✔
2422
        self.logger.info(f'set_mpp_resolution {resolution.name} {len(mpp_status.htlc_set)} {payment_key.hex()}')
1✔
2423
        self.received_mpp_htlcs[payment_key.hex()] = mpp_status._replace(resolution=resolution)
1✔
2424

2425
    def is_mpp_amount_reached(self, payment_key: bytes) -> bool:
1✔
2426
        amounts = self.get_mpp_amounts(payment_key)
1✔
2427
        if amounts is None:
1✔
2428
            return False
1✔
2429
        total, expected = amounts
1✔
2430
        return total >= expected
1✔
2431

2432
    def is_accepted_mpp(self, payment_hash: bytes) -> bool:
1✔
2433
        payment_key = self._get_payment_key(payment_hash)
1✔
2434
        status = self.received_mpp_htlcs.get(payment_key.hex())
1✔
2435
        return status and status.resolution == RecvMPPResolution.ACCEPTED
1✔
2436

2437
    def get_payment_mpp_amount_msat(self, payment_hash: bytes) -> Optional[int]:
1✔
2438
        """Returns the received mpp amount for given payment hash."""
2439
        payment_key = self._get_payment_key(payment_hash)
1✔
2440
        amounts = self.get_mpp_amounts(payment_key)
1✔
2441
        if not amounts:
1✔
2442
            return None
1✔
2443
        total_msat, _ = amounts
1✔
2444
        return total_msat
1✔
2445

2446
    def get_mpp_amounts(self, payment_key: bytes) -> Optional[Tuple[int, int]]:
1✔
2447
        """Returns (total received amount, expected amount) or None."""
2448
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
1✔
2449
        if not mpp_status:
1✔
2450
            return None
1✔
2451
        total = sum([_htlc.amount_msat for scid, _htlc in mpp_status.htlc_set])
1✔
2452
        return total, mpp_status.expected_msat
1✔
2453

2454
    def get_first_timestamp_of_mpp(self, payment_key: bytes) -> int:
1✔
2455
        mpp_status = self.received_mpp_htlcs.get(payment_key.hex())
1✔
2456
        if not mpp_status:
1✔
2457
            return int(time.time())
1✔
2458
        return min([_htlc.timestamp for scid, _htlc in mpp_status.htlc_set])
1✔
2459

2460
    def maybe_cleanup_mpp(
1✔
2461
            self,
2462
            short_channel_id: ShortChannelID,
2463
            htlc: UpdateAddHtlc,
2464
    ) -> None:
2465

2466
        htlc_key = (short_channel_id, htlc)
1✔
2467
        for payment_key_hex, mpp_status in list(self.received_mpp_htlcs.items()):
1✔
2468
            if htlc_key not in mpp_status.htlc_set:
1✔
2469
                continue
1✔
2470
            assert mpp_status.resolution != RecvMPPResolution.WAITING
1✔
2471
            self.logger.info(f'maybe_cleanup_mpp: removing htlc of MPP {payment_key_hex}')
1✔
2472
            mpp_status.htlc_set.remove(htlc_key)  # side-effecting htlc_set
1✔
2473
            if len(mpp_status.htlc_set) == 0:
1✔
2474
                self.logger.info(f'maybe_cleanup_mpp: removing mpp {payment_key_hex}')
1✔
2475
                self.received_mpp_htlcs.pop(payment_key_hex)
1✔
2476
                self.maybe_cleanup_forwarding(payment_key_hex)
1✔
2477

2478
    def maybe_cleanup_forwarding(self, payment_key_hex: str) -> None:
1✔
2479
        self.active_forwardings.pop(payment_key_hex, None)
1✔
2480
        self.forwarding_failures.pop(payment_key_hex, None)
1✔
2481

2482
    def get_payment_status(self, payment_hash: bytes) -> int:
1✔
2483
        info = self.get_payment_info(payment_hash)
1✔
2484
        return info.status if info else PR_UNPAID
1✔
2485

2486
    def get_invoice_status(self, invoice: BaseInvoice) -> int:
1✔
2487
        invoice_id = invoice.rhash
1✔
2488
        status = self.get_payment_status(bfh(invoice_id))
1✔
2489
        if status == PR_UNPAID and invoice_id in self.inflight_payments:
1✔
2490
            return PR_INFLIGHT
×
2491
        # status may be PR_FAILED
2492
        if status == PR_UNPAID and invoice_id in self.logs:
1✔
2493
            status = PR_FAILED
×
2494
        return status
1✔
2495

2496
    def set_invoice_status(self, key: str, status: int) -> None:
1✔
2497
        if status == PR_INFLIGHT:
1✔
2498
            self.inflight_payments.add(key)
1✔
2499
        elif key in self.inflight_payments:
1✔
2500
            self.inflight_payments.remove(key)
1✔
2501
        if status in SAVED_PR_STATUS:
1✔
2502
            self.set_payment_status(bfh(key), status)
1✔
2503
        util.trigger_callback('invoice_status', self.wallet, key, status)
1✔
2504
        self.logger.info(f"set_invoice_status {key}: {status}")
1✔
2505
        # liquidity changed
2506
        self.clear_invoices_cache()
1✔
2507

2508
    def set_request_status(self, payment_hash: bytes, status: int) -> None:
1✔
2509
        if self.get_payment_status(payment_hash) == status:
1✔
2510
            return
1✔
2511
        self.set_payment_status(payment_hash, status)
1✔
2512
        request_id = payment_hash.hex()
1✔
2513
        req = self.wallet.get_request(request_id)
1✔
2514
        if req is None:
1✔
2515
            return
1✔
2516
        util.trigger_callback('request_status', self.wallet, request_id, status)
1✔
2517

2518
    def set_payment_status(self, payment_hash: bytes, status: int) -> None:
1✔
2519
        info = self.get_payment_info(payment_hash)
1✔
2520
        if info is None:
1✔
2521
            # if we are forwarding
2522
            return
1✔
2523
        info = info._replace(status=status)
1✔
2524
        self.save_payment_info(info)
1✔
2525

2526
    def is_forwarded_htlc(self, htlc_key) -> Optional[str]:
1✔
2527
        """Returns whether this was a forwarded HTLC."""
2528
        for payment_key, htlcs in self.active_forwardings.items():
1✔
2529
            if htlc_key in htlcs:
1✔
2530
                return payment_key
1✔
2531

2532
    def notify_upstream_peer(self, htlc_key: str) -> None:
1✔
2533
        """Called when an HTLC we offered on chan gets irrevocably fulfilled or failed.
2534
        If we find this was a forwarded HTLC, the upstream peer is notified.
2535
        """
2536
        upstream_key = self.downstream_to_upstream_htlc.pop(htlc_key, None)
1✔
2537
        if not upstream_key:
1✔
UNCOV
2538
            return
×
2539
        upstream_chan_scid, _ = deserialize_htlc_key(upstream_key)
1✔
2540
        upstream_chan = self.get_channel_by_short_id(upstream_chan_scid)
1✔
2541
        upstream_peer = self.peers.get(upstream_chan.node_id) if upstream_chan else None
1✔
2542
        if upstream_peer:
1✔
2543
            upstream_peer.downstream_htlc_resolved_event.set()
1✔
2544
            upstream_peer.downstream_htlc_resolved_event.clear()
1✔
2545

2546
    def htlc_fulfilled(self, chan: Channel, payment_hash: bytes, htlc_id: int):
1✔
2547

2548
        util.trigger_callback('htlc_fulfilled', payment_hash, chan, htlc_id)
1✔
2549
        htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc_id)
1✔
2550
        fw_key = self.is_forwarded_htlc(htlc_key)
1✔
2551
        if fw_key:
1✔
2552
            fw_htlcs = self.active_forwardings[fw_key]
1✔
2553
            fw_htlcs.remove(htlc_key)
1✔
2554

2555
        shi = self.sent_htlcs_info.get((payment_hash, chan.short_channel_id, htlc_id))
1✔
2556
        if shi and htlc_id in chan.onion_keys:
1✔
2557
            chan.pop_onion_key(htlc_id)
1✔
2558
            payment_key = payment_hash + shi.payment_secret_orig
1✔
2559
            paysession = self._paysessions[payment_key]
1✔
2560
            q = paysession.sent_htlcs_q
1✔
2561
            htlc_log = HtlcLog(
1✔
2562
                success=True,
2563
                route=shi.route,
2564
                amount_msat=shi.amount_receiver_msat,
2565
                trampoline_fee_level=shi.trampoline_fee_level)
2566
            q.put_nowait(htlc_log)
1✔
2567
            if paysession.can_be_deleted():
1✔
2568
                self._paysessions.pop(payment_key)
1✔
2569
                paysession_active = False
1✔
2570
            else:
2571
                paysession_active = True
1✔
2572
        else:
2573
            if fw_key:
1✔
2574
                paysession_active = False
1✔
2575
            else:
2576
                key = payment_hash.hex()
1✔
2577
                self.set_invoice_status(key, PR_PAID)
1✔
2578
                util.trigger_callback('payment_succeeded', self.wallet, key)
1✔
2579

2580
        if fw_key:
1✔
2581
            fw_htlcs = self.active_forwardings[fw_key]
1✔
2582
            if len(fw_htlcs) == 0 and not paysession_active:
1✔
2583
                self.notify_upstream_peer(htlc_key)
1✔
2584

2585
    def htlc_failed(
1✔
2586
            self,
2587
            chan: Channel,
2588
            payment_hash: bytes,
2589
            htlc_id: int,
2590
            error_bytes: Optional[bytes],
2591
            failure_message: Optional['OnionRoutingFailure']):
2592
        # note: this may be called several times for the same htlc
2593

2594
        util.trigger_callback('htlc_failed', payment_hash, chan, htlc_id)
1✔
2595
        htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc_id)
1✔
2596
        fw_key = self.is_forwarded_htlc(htlc_key)
1✔
2597
        if fw_key:
1✔
2598
            fw_htlcs = self.active_forwardings[fw_key]
1✔
2599
            fw_htlcs.remove(htlc_key)
1✔
2600

2601
        shi = self.sent_htlcs_info.get((payment_hash, chan.short_channel_id, htlc_id))
1✔
2602
        if shi and htlc_id in chan.onion_keys:
1✔
2603
            onion_key = chan.pop_onion_key(htlc_id)
1✔
2604
            payment_okey = payment_hash + shi.payment_secret_orig
1✔
2605
            paysession = self._paysessions[payment_okey]
1✔
2606
            q = paysession.sent_htlcs_q
1✔
2607
            # detect if it is part of a bucket
2608
            # if yes, wait until the bucket completely failed
2609
            route = shi.route
1✔
2610
            if error_bytes:
1✔
2611
                # TODO "decode_onion_error" might raise, catch and maybe blacklist/penalise someone?
2612
                try:
1✔
2613
                    failure_message, sender_idx = decode_onion_error(
1✔
2614
                        error_bytes,
2615
                        [x.node_id for x in route],
2616
                        onion_key)
UNCOV
2617
                except Exception as e:
×
UNCOV
2618
                    sender_idx = None
×
UNCOV
2619
                    failure_message = OnionRoutingFailure(OnionFailureCode.INVALID_ONION_PAYLOAD, str(e).encode())
×
2620
            else:
2621
                # probably got "update_fail_malformed_htlc". well... who to penalise now?
2622
                assert failure_message is not None
×
2623
                sender_idx = None
×
2624
            self.logger.info(f"htlc_failed {failure_message}")
1✔
2625
            amount_receiver_msat = paysession.on_htlc_fail_get_fail_amt_to_propagate(shi)
1✔
2626
            if amount_receiver_msat is None:
1✔
2627
                return
1✔
2628
            if shi.trampoline_route:
1✔
2629
                route = shi.trampoline_route
1✔
2630
            htlc_log = HtlcLog(
1✔
2631
                success=False,
2632
                route=route,
2633
                amount_msat=amount_receiver_msat,
2634
                error_bytes=error_bytes,
2635
                failure_msg=failure_message,
2636
                sender_idx=sender_idx,
2637
                trampoline_fee_level=shi.trampoline_fee_level)
2638
            q.put_nowait(htlc_log)
1✔
2639
            if paysession.can_be_deleted():
1✔
2640
                self._paysessions.pop(payment_okey)
1✔
2641
                paysession_active = False
1✔
2642
            else:
2643
                paysession_active = True
1✔
2644
        else:
2645
            if fw_key:
1✔
2646
                paysession_active = False
1✔
2647
            else:
2648
                self.logger.info(f"received unknown htlc_failed, probably from previous session (phash={payment_hash.hex()})")
1✔
2649
                key = payment_hash.hex()
1✔
2650
                invoice = self.wallet.get_invoice(key)
1✔
2651
                if invoice and self.get_invoice_status(invoice) != PR_UNPAID:
1✔
2652
                    self.set_invoice_status(key, PR_UNPAID)
×
2653
                    util.trigger_callback('payment_failed', self.wallet, key, '')
×
2654

2655
        if fw_key:
1✔
2656
            fw_htlcs = self.active_forwardings[fw_key]
1✔
2657
            can_forward_failure = (len(fw_htlcs) == 0) and not paysession_active
1✔
2658
            if can_forward_failure:
1✔
2659
                self.logger.info(f'htlc_failed: save_forwarding_failure (phash={payment_hash.hex()})')
1✔
2660
                self.save_forwarding_failure(fw_key, error_bytes=error_bytes, failure_message=failure_message)
1✔
2661
                self.notify_upstream_peer(htlc_key)
1✔
2662
            else:
2663
                self.logger.info(f'htlc_failed: waiting for other htlcs to fail (phash={payment_hash.hex()})')
1✔
2664

2665
    def calc_routing_hints_for_invoice(self, amount_msat: Optional[int], channels=None, needs_jit=False):
1✔
2666
        """calculate routing hints (BOLT-11 'r' field)"""
2667
        routing_hints = []
1✔
2668
        if needs_jit:
1✔
2669
            node_id, rest = extract_nodeid(self.config.ZEROCONF_TRUSTED_NODE)
×
2670
            alias_or_scid = self.get_static_jit_scid_alias()
×
2671
            routing_hints.append(('r', [(node_id, alias_or_scid, 0, 0, 144)]))
×
2672
            # no need for more because we cannot receive enough through the others and mpp is disabled for jit
2673
            channels = []
×
2674
        else:
2675
            if channels is None:
1✔
2676
                channels = list(self.get_channels_for_receiving(amount_msat=amount_msat, include_disconnected=True))
1✔
2677
                random.shuffle(channels)  # let's not leak channel order
1✔
2678
            scid_to_my_channels = {
1✔
2679
                chan.short_channel_id: chan for chan in channels
2680
                if chan.short_channel_id is not None
2681
            }
2682
        for chan in channels:
1✔
2683
            alias_or_scid = chan.get_remote_scid_alias() or chan.short_channel_id
1✔
2684
            assert isinstance(alias_or_scid, bytes), alias_or_scid
1✔
2685
            channel_info = get_mychannel_info(chan.short_channel_id, scid_to_my_channels)
1✔
2686
            # note: as a fallback, if we don't have a channel update for the
2687
            # incoming direction of our private channel, we fill the invoice with garbage.
2688
            # the sender should still be able to pay us, but will incur an extra round trip
2689
            # (they will get the channel update from the onion error)
2690
            # at least, that's the theory. https://github.com/lightningnetwork/lnd/issues/2066
2691
            fee_base_msat = fee_proportional_millionths = 0
1✔
2692
            cltv_delta = 1  # lnd won't even try with zero
1✔
2693
            missing_info = True
1✔
2694
            if channel_info:
1✔
2695
                policy = get_mychannel_policy(channel_info.short_channel_id, chan.node_id, scid_to_my_channels)
1✔
2696
                if policy:
1✔
2697
                    fee_base_msat = policy.fee_base_msat
1✔
2698
                    fee_proportional_millionths = policy.fee_proportional_millionths
1✔
2699
                    cltv_delta = policy.cltv_delta
1✔
2700
                    missing_info = False
1✔
2701
            if missing_info:
1✔
2702
                self.logger.info(
×
2703
                    f"Warning. Missing channel update for our channel {chan.short_channel_id}; "
2704
                    f"filling invoice with incorrect data.")
2705
            routing_hints.append(('r', [(
1✔
2706
                chan.node_id,
2707
                alias_or_scid,
2708
                fee_base_msat,
2709
                fee_proportional_millionths,
2710
                cltv_delta)]))
2711
        return routing_hints
1✔
2712

2713
    def delete_payment_info(self, payment_hash_hex: str):
1✔
2714
        # This method is called when an invoice or request is deleted by the user.
2715
        # The GUI only lets the user delete invoices or requests that have not been paid.
2716
        # Once an invoice/request has been paid, it is part of the history,
2717
        # and get_lightning_history assumes that payment_info is there.
2718
        assert self.get_payment_status(bytes.fromhex(payment_hash_hex)) != PR_PAID
1✔
2719
        with self.lock:
1✔
2720
            self.payment_info.pop(payment_hash_hex, None)
1✔
2721

2722
    def get_balance(self, *, frozen=False) -> Decimal:
1✔
2723
        with self.lock:
×
2724
            return Decimal(sum(
×
2725
                chan.balance(LOCAL) if not chan.is_closed() and (chan.is_frozen_for_sending() if frozen else True) else 0
2726
                for chan in self.channels.values())) / 1000
2727

2728
    def get_channels_for_sending(self):
1✔
2729
        for c in self.channels.values():
×
2730
            if c.is_active() and not c.is_frozen_for_sending():
×
2731
                if self.channel_db or self.is_trampoline_peer(c.node_id):
×
2732
                    yield c
×
2733

2734
    def fee_estimate(self, amount_sat):
1✔
2735
        # Here we have to guess a fee, because some callers (submarine swaps)
2736
        # use this method to initiate a payment, which would otherwise fail.
2737
        fee_base_msat = 5000               # FIXME ehh.. there ought to be a better way...
×
2738
        fee_proportional_millionths = 500  # FIXME
×
2739
        # inverse of fee_for_edge_msat
2740
        amount_msat = amount_sat * 1000
×
2741
        amount_minus_fees = (amount_msat - fee_base_msat) * 1_000_000 // ( 1_000_000 + fee_proportional_millionths)
×
2742
        return Decimal(amount_msat - amount_minus_fees) / 1000
×
2743

2744
    def num_sats_can_send(self, deltas=None) -> Decimal:
1✔
2745
        """
2746
        without trampoline, sum of all channel capacity
2747
        with trampoline, MPP must use a single trampoline
2748
        """
2749
        if deltas is None:
×
2750
            deltas = {}
×
2751

2752
        def send_capacity(chan):
×
2753
            if chan in deltas:
×
2754
                delta_msat = deltas[chan] * 1000
×
2755
                if delta_msat > chan.available_to_spend(REMOTE):
×
2756
                    delta_msat = 0
×
2757
            else:
2758
                delta_msat = 0
×
2759
            return chan.available_to_spend(LOCAL) + delta_msat
×
2760
        can_send_dict = defaultdict(int)
×
2761
        with self.lock:
×
2762
            for c in self.get_channels_for_sending():
×
2763
                if not self.uses_trampoline():
×
2764
                    can_send_dict[0] += send_capacity(c)
×
2765
                else:
2766
                    can_send_dict[c.node_id] += send_capacity(c)
×
2767
        can_send = max(can_send_dict.values()) if can_send_dict else 0
×
2768
        can_send_sat = Decimal(can_send)/1000
×
2769
        can_send_sat -= self.fee_estimate(can_send_sat)
×
2770
        return max(can_send_sat, 0)
×
2771

2772
    def get_channels_for_receiving(
1✔
2773
        self, *, amount_msat: Optional[int] = None, include_disconnected: bool = False,
2774
    ) -> Sequence[Channel]:
2775
        if not amount_msat:  # assume we want to recv a large amt, e.g. finding max.
1✔
2776
            amount_msat = float('inf')
×
2777
        with self.lock:
1✔
2778
            channels = list(self.channels.values())
1✔
2779
            channels = [chan for chan in channels
1✔
2780
                        if chan.is_open() and not chan.is_frozen_for_receiving()]
2781

2782
            if not include_disconnected:
1✔
2783
                channels = [chan for chan in channels if chan.is_active()]
×
2784

2785
            # Filter out nodes that have low receive capacity compared to invoice amt.
2786
            # Even with MPP, below a certain threshold, including these channels probably
2787
            # hurts more than help, as they lead to many failed attempts for the sender.
2788
            channels = sorted(channels, key=lambda chan: -chan.available_to_spend(REMOTE))
1✔
2789
            selected_channels = []
1✔
2790
            running_sum = 0
1✔
2791
            cutoff_factor = 0.2  # heuristic
1✔
2792
            for chan in channels:
1✔
2793
                recv_capacity = chan.available_to_spend(REMOTE)
1✔
2794
                chan_can_handle_payment_as_single_part = recv_capacity >= amount_msat
1✔
2795
                chan_small_compared_to_running_sum = recv_capacity < cutoff_factor * running_sum
1✔
2796
                if not chan_can_handle_payment_as_single_part and chan_small_compared_to_running_sum:
1✔
2797
                    break
1✔
2798
                running_sum += recv_capacity
1✔
2799
                selected_channels.append(chan)
1✔
2800
            channels = selected_channels
1✔
2801
            del selected_channels
1✔
2802
            # cap max channels to include to keep QR code reasonably scannable
2803
            channels = channels[:10]
1✔
2804
            return channels
1✔
2805

2806
    def num_sats_can_receive(self, deltas=None) -> Decimal:
1✔
2807
        """
2808
        We no longer assume the sender to send MPP on different channels,
2809
        because channel liquidities are hard to guess
2810
        """
2811
        if deltas is None:
×
2812
            deltas = {}
×
2813

2814
        def recv_capacity(chan):
×
2815
            if chan in deltas:
×
2816
                delta_msat = deltas[chan] * 1000
×
2817
                if delta_msat > chan.available_to_spend(LOCAL):
×
2818
                    delta_msat = 0
×
2819
            else:
2820
                delta_msat = 0
×
2821
            return chan.available_to_spend(REMOTE) + delta_msat
×
2822
        with self.lock:
×
2823
            recv_channels = self.get_channels_for_receiving()
×
2824
            recv_chan_msats = [recv_capacity(chan) for chan in recv_channels]
×
2825
        if not recv_chan_msats:
×
2826
            return Decimal(0)
×
2827
        can_receive_msat = max(recv_chan_msats)
×
2828
        return Decimal(can_receive_msat) / 1000
×
2829

2830
    def receive_requires_jit_channel(self, amount_msat: Optional[int]) -> bool:
1✔
2831
        """Returns true if we cannot receive the amount and have set up a trusted LSP node.
2832
        Cannot work reliably with 0 amount invoices as we don't know if we are able to receive it.
2833
        """
2834
        # zeroconf provider is configured and connected
2835
        if (self.can_get_zeroconf_channel()
1✔
2836
                # we cannot receive the amount specified
2837
                and ((amount_msat and self.num_sats_can_receive() < (amount_msat // 1000))
2838
                        # or we cannot receive anything, and it's a 0 amount invoice
2839
                        or (not amount_msat and self.num_sats_can_receive() < 1))):
2840
            return True
×
2841
        return False
1✔
2842

2843
    def can_get_zeroconf_channel(self) -> bool:
1✔
2844
        if not self.config.ACCEPT_ZEROCONF_CHANNELS and self.config.ZEROCONF_TRUSTED_NODE:
1✔
2845
            # check if zeroconf is accepted and client has trusted zeroconf node configured
2846
            return False
×
2847
        try:
1✔
2848
            node_id = extract_nodeid(self.wallet.config.ZEROCONF_TRUSTED_NODE)[0]
1✔
2849
        except ConnStringFormatError:
1✔
2850
            # invalid connection string
2851
            return False
1✔
2852
        # only return True if we are connected to the zeroconf provider
2853
        return node_id in self.peers
×
2854

2855
    def _suggest_channels_for_rebalance(self, direction, amount_sat) -> Sequence[Tuple[Channel, int]]:
1✔
2856
        """
2857
        Suggest a channel and amount to send/receive with that channel, so that we will be able to receive/send amount_sat
2858
        This is used when suggesting a swap or rebalance in order to receive a payment
2859
        """
2860
        with self.lock:
×
2861
            func = self.num_sats_can_send if direction == SENT else self.num_sats_can_receive
×
2862
            suggestions = []
×
2863
            channels = self.get_channels_for_sending() if direction == SENT else self.get_channels_for_receiving()
×
2864
            for chan in channels:
×
2865
                available_sat = chan.available_to_spend(LOCAL if direction == SENT else REMOTE) // 1000
×
2866
                delta = amount_sat - available_sat
×
2867
                delta += self.fee_estimate(amount_sat)
×
2868
                # add safety margin
2869
                delta += delta // 100 + 1
×
2870
                if func(deltas={chan:delta}) >= amount_sat:
×
2871
                    suggestions.append((chan, delta))
×
2872
                elif direction == RECEIVED and func(deltas={chan:2*delta}) >= amount_sat:
×
2873
                    # MPP heuristics has a 0.5 slope
2874
                    suggestions.append((chan, 2*delta))
×
2875
        if not suggestions:
×
2876
            raise NotEnoughFunds
×
2877
        return suggestions
×
2878

2879
    def _suggest_rebalance(self, direction, amount_sat):
1✔
2880
        """
2881
        Suggest a rebalance in order to be able to send or receive amount_sat.
2882
        Returns (from_channel, to_channel, amount to shuffle)
2883
        """
2884
        try:
×
2885
            suggestions = self._suggest_channels_for_rebalance(direction, amount_sat)
×
2886
        except NotEnoughFunds:
×
2887
            return False
×
2888
        for chan2, delta in suggestions:
×
2889
            # margin for fee caused by rebalancing
2890
            delta += self.fee_estimate(amount_sat)
×
2891
            # find other channel or trampoline that can send delta
2892
            for chan1 in self.channels.values():
×
2893
                if chan1.is_frozen_for_sending() or not chan1.is_active():
×
2894
                    continue
×
2895
                if chan1 == chan2:
×
2896
                    continue
×
2897
                if self.uses_trampoline() and chan1.node_id == chan2.node_id:
×
2898
                    continue
×
2899
                if direction == SENT:
×
2900
                    if chan1.can_pay(delta*1000):
×
2901
                        return chan1, chan2, delta
×
2902
                else:
2903
                    if chan1.can_receive(delta*1000):
×
2904
                        return chan2, chan1, delta
×
2905
            else:
2906
                continue
×
2907
        else:
2908
            return False
×
2909

2910
    def num_sats_can_rebalance(self, chan1, chan2):
1✔
2911
        # TODO: we should be able to spend 'max', with variable fee
2912
        n1 = chan1.available_to_spend(LOCAL)
×
2913
        n1 -= self.fee_estimate(n1)
×
2914
        n2 = chan2.available_to_spend(REMOTE)
×
2915
        amount_sat = min(n1, n2) // 1000
×
2916
        return amount_sat
×
2917

2918
    def suggest_rebalance_to_send(self, amount_sat):
1✔
2919
        return self._suggest_rebalance(SENT, amount_sat)
×
2920

2921
    def suggest_rebalance_to_receive(self, amount_sat):
1✔
2922
        return self._suggest_rebalance(RECEIVED, amount_sat)
×
2923

2924
    def suggest_swap_to_send(self, amount_sat, coins):
1✔
2925
        # fixme: if swap_amount_sat is lower than the minimum swap amount, we need to propose a higher value
2926
        assert amount_sat > self.num_sats_can_send()
×
2927
        try:
×
2928
            suggestions = self._suggest_channels_for_rebalance(SENT, amount_sat)
×
2929
        except NotEnoughFunds:
×
2930
            return None
×
2931
        for chan, swap_recv_amount in suggestions:
×
2932
            # check that we can send onchain
2933
            swap_server_mining_fee = 10000 # guessing, because we have not called get_pairs yet
×
2934
            swap_funding_sat = swap_recv_amount + swap_server_mining_fee
×
2935
            swap_output = PartialTxOutput.from_address_and_value(DummyAddress.SWAP, int(swap_funding_sat))
×
2936
            try:
×
2937
                # check if we have enough onchain funds
2938
                self.wallet.make_unsigned_transaction(
×
2939
                    coins=coins,
2940
                    outputs=[swap_output],
2941
                    fee_policy=FeePolicy(self.config.FEE_POLICY_SWAPS),
2942
                )
2943
            except NotEnoughFunds:
×
2944
                continue
×
2945
            return chan, swap_recv_amount
×
2946
        return None
×
2947

2948
    def suggest_swap_to_receive(self, amount_sat):
1✔
2949
        assert amount_sat > self.num_sats_can_receive()
×
2950
        try:
×
2951
            suggestions = self._suggest_channels_for_rebalance(RECEIVED, amount_sat)
×
2952
        except NotEnoughFunds:
×
2953
            return
×
2954
        for chan, swap_recv_amount in suggestions:
×
2955
            return chan, swap_recv_amount
×
2956

2957
    async def rebalance_channels(self, chan1: Channel, chan2: Channel, *, amount_msat: int):
1✔
2958
        if chan1 == chan2:
×
2959
            raise Exception('Rebalance requires two different channels')
×
2960
        if self.uses_trampoline() and chan1.node_id == chan2.node_id:
×
2961
            raise Exception('Rebalance requires channels from different trampolines')
×
2962
        payment_hash = self.create_payment_info(amount_msat=amount_msat)
×
2963
        lnaddr, invoice = self.get_bolt11_invoice(
×
2964
            payment_hash=payment_hash,
2965
            amount_msat=amount_msat,
2966
            message='rebalance',
2967
            expiry=3600,
2968
            fallback_address=None,
2969
            channels=[chan2],
2970
        )
2971
        invoice_obj = Invoice.from_bech32(invoice)
×
2972
        return await self.pay_invoice(invoice_obj, channels=[chan1])
×
2973

2974
    def can_receive_invoice(self, invoice: BaseInvoice) -> bool:
1✔
2975
        assert invoice.is_lightning()
×
2976
        return (invoice.get_amount_sat() or 0) <= self.num_sats_can_receive()
×
2977

2978
    async def close_channel(self, chan_id):
1✔
2979
        chan = self._channels[chan_id]
×
2980
        peer = self._peers[chan.node_id]
×
2981
        return await peer.close_channel(chan_id)
×
2982

2983
    def _force_close_channel(self, chan_id: bytes) -> Transaction:
1✔
2984
        chan = self._channels[chan_id]
1✔
2985
        tx = chan.force_close_tx()
1✔
2986
        # We set the channel state to make sure we won't sign new commitment txs.
2987
        # We expect the caller to try to broadcast this tx, after which it is
2988
        # not safe to keep using the channel even if the broadcast errors (server could be lying).
2989
        # Until the tx is seen in the mempool, there will be automatic rebroadcasts.
2990
        chan.set_state(ChannelState.FORCE_CLOSING)
1✔
2991
        # Add local tx to wallet to also allow manual rebroadcasts.
2992
        try:
1✔
2993
            self.wallet.adb.add_transaction(tx)
1✔
2994
        except UnrelatedTransactionException:
×
2995
            pass  # this can happen if (~all the balance goes to REMOTE)
×
2996
        return tx
1✔
2997

2998
    async def force_close_channel(self, chan_id: bytes) -> str:
1✔
2999
        """Force-close the channel. Network-related exceptions are propagated to the caller.
3000
        (automatic rebroadcasts will be scheduled)
3001
        """
3002
        # note: as we are async, it can take a few event loop iterations between the caller
3003
        #       "calling us" and us getting to run, and we only set the channel state now:
3004
        tx = self._force_close_channel(chan_id)
1✔
3005
        await self.network.broadcast_transaction(tx)
1✔
3006
        return tx.txid()
1✔
3007

3008
    def schedule_force_closing(self, chan_id: bytes) -> 'asyncio.Task[bool]':
1✔
3009
        """Schedules a task to force-close the channel and returns it.
3010
        Network-related exceptions are suppressed.
3011
        (automatic rebroadcasts will be scheduled)
3012
        Note: this method is intentionally not async so that callers have a guarantee
3013
              that the channel state is set immediately.
3014
        """
3015
        tx = self._force_close_channel(chan_id)
1✔
3016
        return asyncio.create_task(self.network.try_broadcasting(tx, 'force-close'))
1✔
3017

3018
    def remove_channel(self, chan_id):
1✔
3019
        chan = self.channels[chan_id]
×
3020
        assert chan.can_be_deleted()
×
3021
        with self.lock:
×
3022
            self._channels.pop(chan_id)
×
3023
            self.db.get('channels').pop(chan_id.hex())
×
3024
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=False)
×
3025

3026
        util.trigger_callback('channels_updated', self.wallet)
×
3027
        util.trigger_callback('wallet_updated', self.wallet)
×
3028

3029
    @ignore_exceptions
1✔
3030
    @log_exceptions
1✔
3031
    async def reestablish_peer_for_given_channel(self, chan: Channel) -> None:
1✔
3032
        now = time.time()
×
3033
        peer_addresses = []
×
3034
        if self.uses_trampoline():
×
3035
            addr = trampolines_by_id().get(chan.node_id)
×
3036
            if addr:
×
3037
                peer_addresses.append(addr)
×
3038
        else:
3039
            # will try last good address first, from gossip
3040
            last_good_addr = self.channel_db.get_last_good_address(chan.node_id)
×
3041
            if last_good_addr:
×
3042
                peer_addresses.append(last_good_addr)
×
3043
            # will try addresses for node_id from gossip
3044
            addrs_from_gossip = self.channel_db.get_node_addresses(chan.node_id) or []
×
3045
            for host, port, ts in addrs_from_gossip:
×
3046
                peer_addresses.append(LNPeerAddr(host, port, chan.node_id))
×
3047
        # will try addresses stored in channel storage
3048
        peer_addresses += list(chan.get_peer_addresses())
×
3049
        # Done gathering addresses.
3050
        # Now select first one that has not failed recently.
3051
        for peer in peer_addresses:
×
3052
            if self._can_retry_addr(peer, urgent=True, now=now):
×
3053
                await self._add_peer(peer.host, peer.port, peer.pubkey)
×
3054
                return
×
3055

3056
    async def reestablish_peers_and_channels(self):
1✔
3057
        while True:
×
3058
            await asyncio.sleep(1)
×
3059
            if self.stopping_soon:
×
3060
                return
×
3061
            if self.config.ZEROCONF_TRUSTED_NODE:
×
3062
                peer = LNPeerAddr.from_str(self.config.ZEROCONF_TRUSTED_NODE)
×
3063
                if self._can_retry_addr(peer, urgent=True):
×
3064
                    await self._add_peer(peer.host, peer.port, peer.pubkey)
×
3065
            for chan in self.channels.values():
×
3066
                # reestablish
3067
                # note: we delegate filtering out uninteresting chans to this:
3068
                if not chan.should_try_to_reestablish_peer():
×
3069
                    continue
×
3070
                peer = self._peers.get(chan.node_id, None)
×
3071
                if peer:
×
3072
                    await peer.taskgroup.spawn(peer.reestablish_channel(chan))
×
3073
                else:
3074
                    await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
×
3075

3076
    def current_target_feerate_per_kw(self, *, has_anchors: bool) -> Optional[int]:
1✔
3077
        target: int = FEE_LN_MINIMUM_ETA_TARGET if has_anchors else FEE_LN_ETA_TARGET
1✔
3078
        feerate_per_kvbyte = self.network.fee_estimates.eta_target_to_fee(target)
1✔
3079
        if feerate_per_kvbyte is None:
1✔
3080
            return None
×
3081
        if has_anchors:
1✔
3082
            # set a floor of 5 sat/vb to have some safety margin in case the mempool
3083
            # grows quickly
3084
            feerate_per_kvbyte = max(feerate_per_kvbyte, 5000)
×
3085
        return max(FEERATE_PER_KW_MIN_RELAY_LIGHTNING, feerate_per_kvbyte // 4)
1✔
3086

3087
    def current_low_feerate_per_kw_srk_channel(self) -> Optional[int]:
1✔
3088
        """Gets low feerate for static remote key channels."""
3089
        if constants.net is constants.BitcoinRegtest:
1✔
3090
            feerate_per_kvbyte = 0
×
3091
        else:
3092
            feerate_per_kvbyte = self.network.fee_estimates.eta_target_to_fee(FEE_LN_LOW_ETA_TARGET)
1✔
3093
            if feerate_per_kvbyte is None:
1✔
3094
                return None
×
3095
        low_feerate_per_kw = max(FEERATE_PER_KW_MIN_RELAY_LIGHTNING, feerate_per_kvbyte // 4)
1✔
3096
        # make sure this is never higher than the target feerate:
3097
        current_target_feerate = self.current_target_feerate_per_kw(has_anchors=False)
1✔
3098
        if not current_target_feerate:
1✔
3099
            return None
×
3100
        low_feerate_per_kw = min(low_feerate_per_kw, current_target_feerate)
1✔
3101
        return low_feerate_per_kw
1✔
3102

3103
    def create_channel_backup(self, channel_id: bytes):
1✔
3104
        chan = self._channels[channel_id]
×
3105
        # do not backup old-style channels
3106
        assert chan.is_static_remotekey_enabled()
×
3107
        peer_addresses = list(chan.get_peer_addresses())
×
3108
        peer_addr = peer_addresses[0]
×
3109
        return ImportedChannelBackupStorage(
×
3110
            node_id=chan.node_id,
3111
            privkey=self.node_keypair.privkey,
3112
            funding_txid=chan.funding_outpoint.txid,
3113
            funding_index=chan.funding_outpoint.output_index,
3114
            funding_address=chan.get_funding_address(),
3115
            host=peer_addr.host,
3116
            port=peer_addr.port,
3117
            is_initiator=chan.constraints.is_initiator,
3118
            channel_seed=chan.config[LOCAL].channel_seed,
3119
            local_delay=chan.config[LOCAL].to_self_delay,
3120
            remote_delay=chan.config[REMOTE].to_self_delay,
3121
            remote_revocation_pubkey=chan.config[REMOTE].revocation_basepoint.pubkey,
3122
            remote_payment_pubkey=chan.config[REMOTE].payment_basepoint.pubkey,
3123
            local_payment_pubkey=chan.config[LOCAL].payment_basepoint.pubkey,
3124
            multisig_funding_privkey=chan.config[LOCAL].multisig_key.privkey,
3125
        )
3126

3127
    def export_channel_backup(self, channel_id):
1✔
3128
        xpub = self.wallet.get_fingerprint()
×
3129
        backup_bytes = self.create_channel_backup(channel_id).to_bytes()
×
3130
        assert backup_bytes == ImportedChannelBackupStorage.from_bytes(backup_bytes).to_bytes(), "roundtrip failed"
×
3131
        encrypted = pw_encode_with_version_and_mac(backup_bytes, xpub)
×
3132
        assert backup_bytes == pw_decode_with_version_and_mac(encrypted, xpub), "encrypt failed"
×
3133
        return 'channel_backup:' + encrypted
×
3134

3135
    async def request_force_close(self, channel_id: bytes, *, connect_str=None) -> None:
1✔
3136
        if channel_id in self.channels:
×
3137
            chan = self.channels[channel_id]
×
3138
            peer = self._peers.get(chan.node_id)
×
3139
            chan.should_request_force_close = True
×
3140
            if peer:
×
3141
                peer.close_and_cleanup()  # to force a reconnect
×
3142
        elif connect_str:
×
3143
            peer = await self.add_peer(connect_str)
×
3144
            await peer.request_force_close(channel_id)
×
3145
        elif channel_id in self.channel_backups:
×
3146
            await self._request_force_close_from_backup(channel_id)
×
3147
        else:
3148
            raise Exception(f'Unknown channel {channel_id.hex()}')
×
3149

3150
    def import_channel_backup(self, data):
1✔
3151
        xpub = self.wallet.get_fingerprint()
×
3152
        cb_storage = ImportedChannelBackupStorage.from_encrypted_str(data, password=xpub)
×
3153
        channel_id = cb_storage.channel_id()
×
3154
        if channel_id.hex() in self.db.get_dict("channels"):
×
3155
            raise Exception('Channel already in wallet')
×
3156
        self.logger.info(f'importing channel backup: {channel_id.hex()}')
×
3157
        d = self.db.get_dict("imported_channel_backups")
×
3158
        d[channel_id.hex()] = cb_storage
×
3159
        with self.lock:
×
3160
            cb = ChannelBackup(cb_storage, lnworker=self)
×
3161
            self._channel_backups[channel_id] = cb
×
3162
        self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
3163
        self.wallet.save_db()
×
3164
        util.trigger_callback('channels_updated', self.wallet)
×
3165
        self.lnwatcher.add_channel(cb)
×
3166

3167
    def has_conflicting_backup_with(self, remote_node_id: bytes):
1✔
3168
        """ Returns whether we have an active channel with this node on another device, using same local node id. """
3169
        channel_backup_peers = [
×
3170
            cb.node_id for cb in self.channel_backups.values()
3171
            if (not cb.is_closed() and cb.get_local_pubkey() == self.node_keypair.pubkey)]
3172
        return any(remote_node_id.startswith(cb_peer_nodeid) for cb_peer_nodeid in channel_backup_peers)
×
3173

3174
    def remove_channel_backup(self, channel_id):
1✔
3175
        chan = self.channel_backups[channel_id]
×
3176
        assert chan.can_be_deleted()
×
3177
        found = False
×
3178
        onchain_backups = self.db.get_dict("onchain_channel_backups")
×
3179
        imported_backups = self.db.get_dict("imported_channel_backups")
×
3180
        if channel_id.hex() in onchain_backups:
×
3181
            onchain_backups.pop(channel_id.hex())
×
3182
            found = True
×
3183
        if channel_id.hex() in imported_backups:
×
3184
            imported_backups.pop(channel_id.hex())
×
3185
            found = True
×
3186
        if not found:
×
3187
            raise Exception('Channel not found')
×
3188
        with self.lock:
×
3189
            self._channel_backups.pop(channel_id)
×
3190
        self.wallet.set_reserved_addresses_for_chan(chan, reserved=False)
×
3191
        self.wallet.save_db()
×
3192
        util.trigger_callback('channels_updated', self.wallet)
×
3193

3194
    @log_exceptions
1✔
3195
    async def _request_force_close_from_backup(self, channel_id: bytes):
1✔
3196
        cb = self.channel_backups.get(channel_id)
×
3197
        if not cb:
×
3198
            raise Exception(f'channel backup not found {self.channel_backups}')
×
3199
        cb = cb.cb # storage
×
3200
        self.logger.info(f'requesting channel force close: {channel_id.hex()}')
×
3201
        if isinstance(cb, ImportedChannelBackupStorage):
×
3202
            node_id = cb.node_id
×
3203
            privkey = cb.privkey
×
3204
            addresses = [(cb.host, cb.port, 0)]
×
3205
        else:
3206
            assert isinstance(cb, OnchainChannelBackupStorage)
×
3207
            privkey = self.node_keypair.privkey
×
3208
            for pubkey, peer_addr in trampolines_by_id().items():
×
3209
                if pubkey.startswith(cb.node_id_prefix):
×
3210
                    node_id = pubkey
×
3211
                    addresses = [(peer_addr.host, peer_addr.port, 0)]
×
3212
                    break
×
3213
            else:
3214
                # we will try with gossip (see below)
3215
                addresses = []
×
3216

3217
        async def _request_fclose(addresses):
×
3218
            for host, port, timestamp in addresses:
×
3219
                peer_addr = LNPeerAddr(host, port, node_id)
×
3220
                transport = LNTransport(privkey, peer_addr, e_proxy=ESocksProxy.from_network_settings(self.network))
×
3221
                peer = Peer(self, node_id, transport, is_channel_backup=True)
×
3222
                try:
×
3223
                    async with OldTaskGroup(wait=any) as group:
×
3224
                        await group.spawn(peer._message_loop())
×
3225
                        await group.spawn(peer.request_force_close(channel_id))
×
3226
                    return True
×
3227
                except Exception as e:
×
3228
                    self.logger.info(f'failed to connect {host} {e}')
×
3229
                    continue
×
3230
            else:
3231
                return False
×
3232
        # try first without gossip db
3233
        success = await _request_fclose(addresses)
×
3234
        if success:
×
3235
            return
×
3236
        # try with gossip db
3237
        if self.uses_trampoline():
×
3238
            raise Exception(_('Please enable gossip'))
×
3239
        node_id = self.network.channel_db.get_node_by_prefix(cb.node_id_prefix)
×
3240
        addresses_from_gossip = self.network.channel_db.get_node_addresses(node_id)
×
3241
        if not addresses_from_gossip:
×
3242
            raise Exception('Peer not found in gossip database')
×
3243
        success = await _request_fclose(addresses_from_gossip)
×
3244
        if not success:
×
3245
            raise Exception('failed to connect')
×
3246

3247
    def maybe_add_backup_from_tx(self, tx):
1✔
3248
        funding_address = None
1✔
3249
        node_id_prefix = None
1✔
3250
        for i, o in enumerate(tx.outputs()):
1✔
3251
            script_type = get_script_type_from_output_script(o.scriptpubkey)
1✔
3252
            if script_type == 'p2wsh':
1✔
3253
                funding_index = i
×
3254
                funding_address = o.address
×
3255
                for o2 in tx.outputs():
×
3256
                    if o2.scriptpubkey.startswith(bytes([opcodes.OP_RETURN])):
×
3257
                        encrypted_data = o2.scriptpubkey[2:]
×
3258
                        data = self.decrypt_cb_data(encrypted_data, funding_address)
×
3259
                        if data.startswith(CB_MAGIC_BYTES):
×
3260
                            node_id_prefix = data[len(CB_MAGIC_BYTES):]
×
3261
        if node_id_prefix is None:
1✔
3262
            return
1✔
3263
        funding_txid = tx.txid()
×
3264
        cb_storage = OnchainChannelBackupStorage(
×
3265
            node_id_prefix=node_id_prefix,
3266
            funding_txid=funding_txid,
3267
            funding_index=funding_index,
3268
            funding_address=funding_address,
3269
            is_initiator=True)
3270
        channel_id = cb_storage.channel_id().hex()
×
3271
        if channel_id in self.db.get_dict("channels"):
×
3272
            return
×
3273
        self.logger.info(f"adding backup from tx")
×
3274
        d = self.db.get_dict("onchain_channel_backups")
×
3275
        d[channel_id] = cb_storage
×
3276
        cb = ChannelBackup(cb_storage, lnworker=self)
×
3277
        self.wallet.set_reserved_addresses_for_chan(cb, reserved=True)
×
3278
        self.wallet.save_db()
×
3279
        with self.lock:
×
3280
            self._channel_backups[bfh(channel_id)] = cb
×
3281
        util.trigger_callback('channels_updated', self.wallet)
×
3282
        self.lnwatcher.add_channel(cb)
×
3283

3284
    def save_forwarding_failure(
1✔
3285
            self,
3286
            payment_key: str,
3287
            *,
3288
            error_bytes: Optional[bytes] = None,
3289
            failure_message: Optional['OnionRoutingFailure'] = None
3290
    ) -> None:
3291
        error_hex = error_bytes.hex() if error_bytes else None
1✔
3292
        failure_hex = failure_message.to_bytes().hex() if failure_message else None
1✔
3293
        self.forwarding_failures[payment_key] = (error_hex, failure_hex)
1✔
3294

3295
    def get_forwarding_failure(self, payment_key: str) -> Tuple[Optional[bytes], Optional['OnionRoutingFailure']]:
1✔
3296
        error_hex, failure_hex = self.forwarding_failures.get(payment_key, (None, None))
1✔
3297
        error_bytes = bytes.fromhex(error_hex) if error_hex else None
1✔
3298
        failure_message = OnionRoutingFailure.from_bytes(bytes.fromhex(failure_hex)) if failure_hex else None
1✔
3299
        return error_bytes, failure_message
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc