• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6606673856430080

15 Jul 2025 10:35PM UTC coverage: 59.8% (-0.003%) from 59.803%
6606673856430080

push

CirrusCI

SomberNight
verifier: fix off-by-one for max_checkpoint

if a wallet had a tx mined in the max_checkpoint block, in certain cases
we would leave it forever in the "unverified" state and remain stuck in "synchronizing..."

0 of 3 new or added lines in 2 files covered. (0.0%)

10 existing lines in 5 files now uncovered.

21985 of 36764 relevant lines covered (59.8%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.77
/electrum/lnpeer.py
1
#!/usr/bin/env python3
2
#
3
# Copyright (C) 2018 The Electrum developers
4
# Distributed under the MIT software license, see the accompanying
5
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
6

7
import zlib
5✔
8
from collections import OrderedDict, defaultdict
5✔
9
import asyncio
5✔
10
import os
5✔
11
import time
5✔
12
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable, Awaitable, List
5✔
13
from datetime import datetime
5✔
14
import functools
5✔
15

16
import electrum_ecc as ecc
5✔
17
from electrum_ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey
5✔
18

19
import aiorpcx
5✔
20
from aiorpcx import ignore_after
5✔
21

22
from .crypto import sha256, sha256d, privkey_to_pubkey
5✔
23
from . import bitcoin, util
5✔
24
from . import constants
5✔
25
from .util import (bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
5✔
26
                   UnrelatedTransactionException, error_text_bytes_to_safe_str, AsyncHangDetector,
27
                   NoDynamicFeeEstimates, event_listener, EventListener)
28
from . import transaction
5✔
29
from .bitcoin import make_op_return, DummyAddress
5✔
30
from .transaction import PartialTxOutput, match_script_against_template, Sighash
5✔
31
from .logging import Logger
5✔
32
from .lnrouter import RouteEdge
5✔
33
from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment, process_onion_packet,
5✔
34
                      OnionPacket, construct_onion_error, obfuscate_onion_error, OnionRoutingFailure,
35
                      ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
36
                      OnionFailureCodeMetaFlag)
37
from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState, ChanCloseOption, CF_ANNOUNCE_CHANNEL
5✔
38
from . import lnutil
5✔
39
from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, ChannelConfig,
5✔
40
                     RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
41
                     funding_output_script, get_per_commitment_secret_from_seed,
42
                     secret_to_pubkey, PaymentFailure, LnFeatures,
43
                     LOCAL, REMOTE, HTLCOwner,
44
                     ln_compare_features, MIN_FINAL_CLTV_DELTA_ACCEPTED,
45
                     RemoteMisbehaving, ShortChannelID,
46
                     IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
47
                     ChannelType, LNProtocolWarning, validate_features,
48
                     IncompatibleOrInsaneFeatures, FeeBudgetExceeded,
49
                     GossipForwardingMessage, GossipTimestampFilter)
50
from .lnutil import FeeUpdate, channel_id_from_funding_tx, PaymentFeeBudget
5✔
51
from .lnutil import serialize_htlc_key, Keypair
5✔
52
from .lntransport import LNTransport, LNTransportBase, LightningPeerConnectionClosed, HandshakeFailed
5✔
53
from .lnmsg import encode_msg, decode_msg, UnknownOptionalMsgType, FailedToParseMsg
5✔
54
from .interface import GracefulDisconnect
5✔
55
from .lnrouter import fee_for_edge_msat
5✔
56
from .json_db import StoredDict
5✔
57
from .invoices import PR_PAID
5✔
58
from .fee_policy import FEE_LN_ETA_TARGET, FEE_LN_MINIMUM_ETA_TARGET, FEERATE_PER_KW_MIN_RELAY_LIGHTNING
5✔
59
from .trampoline import decode_routing_info
5✔
60

61
if TYPE_CHECKING:
5✔
62
    from .lnworker import LNGossip, LNWallet
×
63
    from .lnrouter import LNPaymentRoute
×
64
    from .transaction import PartialTransaction
×
65

66

67
LN_P2P_NETWORK_TIMEOUT = 20
5✔
68

69

70
class Peer(Logger, EventListener):
5✔
71
    # note: in general this class is NOT thread-safe. Most methods are assumed to be running on asyncio thread.
72

73
    ORDERED_MESSAGES = (
5✔
74
        'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'closing_signed')
75
    SPAMMY_MESSAGES = (
5✔
76
        'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',
77
        'gossip_timestamp_filter', 'reply_channel_range', 'query_channel_range',
78
        'query_short_channel_ids', 'reply_short_channel_ids', 'reply_short_channel_ids_end')
79

80
    DELAY_INC_MSG_PROCESSING_SLEEP = 0.01
5✔
81

82
    def __init__(
5✔
83
            self,
84
            lnworker: Union['LNGossip', 'LNWallet'],
85
            pubkey: bytes,
86
            transport: LNTransportBase,
87
            *, is_channel_backup= False):
88

89
        self.lnworker = lnworker
5✔
90
        self.network = lnworker.network
5✔
91
        self.asyncio_loop = self.network.asyncio_loop
5✔
92
        self.is_channel_backup = is_channel_backup
5✔
93
        self._sent_init = False  # type: bool
5✔
94
        self._received_init = False  # type: bool
5✔
95
        self.initialized = self.asyncio_loop.create_future()
5✔
96
        self.got_disconnected = asyncio.Event()
5✔
97
        self.querying = asyncio.Event()
5✔
98
        self.transport = transport
5✔
99
        self.pubkey = pubkey  # remote pubkey
5✔
100
        self.privkey = self.transport.privkey  # local privkey
5✔
101
        self.features = self.lnworker.features  # type: LnFeatures
5✔
102
        self.their_features = LnFeatures(0)  # type: LnFeatures
5✔
103
        self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
5✔
104
        assert self.node_ids[0] != self.node_ids[1]
5✔
105
        self.last_message_time = 0
5✔
106
        self.pong_event = asyncio.Event()
5✔
107
        self.reply_channel_range = asyncio.Queue()
5✔
108
        # gossip uses a single queue to preserve message order
109
        self.gossip_queue = asyncio.Queue()
5✔
110
        self.gossip_timestamp_filter = None  # type: Optional[GossipTimestampFilter]
5✔
111
        self.outgoing_gossip_reply = False # type: bool
5✔
112
        self.ordered_message_queues = defaultdict(asyncio.Queue)  # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
5✔
113
        self.temp_id_to_id = {}  # type: Dict[bytes, Optional[bytes]]   # to forward error messages
5✔
114
        self.funding_created_sent = set() # for channels in PREOPENING
5✔
115
        self.funding_signed_sent = set()  # for channels in PREOPENING
5✔
116
        self.shutdown_received = {} # chan_id -> asyncio.Future()
5✔
117
        self.channel_reestablish_msg = defaultdict(self.asyncio_loop.create_future)  # type: Dict[bytes, asyncio.Future]
5✔
118
        self._chan_reest_finished = defaultdict(asyncio.Event)  # type: Dict[bytes, asyncio.Event]
5✔
119
        self.orphan_channel_updates = OrderedDict()  # type: OrderedDict[ShortChannelID, dict]
5✔
120
        Logger.__init__(self)
5✔
121
        self.taskgroup = OldTaskGroup()
5✔
122
        # HTLCs offered by REMOTE, that we started removing but are still active:
123
        self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
5✔
124
        self.received_htlc_removed_event = asyncio.Event()
5✔
125
        self._htlc_switch_iterstart_event = asyncio.Event()
5✔
126
        self._htlc_switch_iterdone_event = asyncio.Event()
5✔
127
        self._received_revack_event = asyncio.Event()
5✔
128
        self.received_commitsig_event = asyncio.Event()
5✔
129
        self.downstream_htlc_resolved_event = asyncio.Event()
5✔
130
        self.register_callbacks()
5✔
131
        self._num_gossip_messages_forwarded = 0
5✔
132

133
    def send_message(self, message_name: str, **kwargs):
5✔
134
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
5✔
135
        assert type(message_name) is str
5✔
136
        if message_name not in self.SPAMMY_MESSAGES:
5✔
137
            self.logger.debug(f"Sending {message_name.upper()}")
5✔
138
        if message_name.upper() != "INIT" and not self.is_initialized():
5✔
139
            raise Exception("tried to send message before we are initialized")
×
140
        raw_msg = encode_msg(message_name, **kwargs)
5✔
141
        self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
5✔
142
        self.transport.send_bytes(raw_msg)
5✔
143

144
    def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
5✔
145
        is_commitment_signed = message_name == "commitment_signed"
5✔
146
        if not (message_name.startswith("update_") or is_commitment_signed):
5✔
147
            return
5✔
148
        assert channel_id
5✔
149
        chan = self.get_channel_by_id(channel_id)
5✔
150
        if not chan:
5✔
151
            raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
×
152
        chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
5✔
153
        if is_commitment_signed:
5✔
154
            # saving now, to ensure replaying updates works (in case of channel reestablishment)
155
            self.lnworker.save_channel(chan)
5✔
156

157
    def maybe_set_initialized(self):
5✔
158
        if self.initialized.done():
5✔
159
            return
5✔
160
        if self._sent_init and self._received_init:
5✔
161
            self.initialized.set_result(True)
5✔
162

163
    def is_initialized(self) -> bool:
5✔
164
        return (self.initialized.done()
5✔
165
                and not self.initialized.cancelled()
166
                and self.initialized.exception() is None
167
                and self.initialized.result() is True)
168

169
    async def initialize(self):
5✔
170
        # If outgoing transport, do handshake now. For incoming, it has already been done.
171
        if isinstance(self.transport, LNTransport):
5✔
172
            await self.transport.handshake()
×
173
        self.logger.info(f"handshake done for {self.transport.peer_addr or self.pubkey.hex()}")
5✔
174
        features = self.features.for_init_message()
5✔
175
        flen = features.min_len()
5✔
176
        self.send_message(
5✔
177
            "init", gflen=0, flen=flen,
178
            features=features,
179
            init_tlvs={
180
                'networks':
181
                {'chains': constants.net.rev_genesis_bytes()}
182
            })
183
        self._sent_init = True
5✔
184
        self.maybe_set_initialized()
5✔
185

186
    @property
5✔
187
    def channels(self) -> Dict[bytes, Channel]:
5✔
188
        return self.lnworker.channels_for_peer(self.pubkey)
5✔
189

190
    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
5✔
191
        # note: this is faster than self.channels.get(channel_id)
192
        chan = self.lnworker.get_channel_by_id(channel_id)
5✔
193
        if not chan:
5✔
194
            return None
×
195
        if chan.node_id != self.pubkey:
5✔
196
            return None
×
197
        return chan
5✔
198

199
    def diagnostic_name(self):
5✔
200
        return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
5✔
201

202
    async def ping_if_required(self):
5✔
203
        if time.time() - self.last_message_time > 30:
5✔
204
            self.send_message('ping', num_pong_bytes=4, byteslen=4)
×
205
            self.pong_event.clear()
×
206
            await self.pong_event.wait()
×
207

208
    async def _process_message(self, message: bytes) -> None:
5✔
209
        try:
5✔
210
            message_type, payload = decode_msg(message)
5✔
211
        except UnknownOptionalMsgType as e:
5✔
212
            self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
5✔
213
            return
5✔
214
        except FailedToParseMsg as e:
5✔
215
            self.logger.info(
5✔
216
                f"failed to parse message from peer. disconnecting. "
217
                f"msg_type={e.msg_type_name}({e.msg_type_int}). exc={e!r}")
218
            #self.logger.info(f"failed to parse message: message(SECRET?)={message.hex()}")
219
            raise GracefulDisconnect() from e
5✔
220
        self.last_message_time = time.time()
5✔
221
        if message_type not in self.SPAMMY_MESSAGES:
5✔
222
            self.logger.debug(f"Received {message_type.upper()}")
5✔
223
        # only process INIT if we are a backup
224
        if self.is_channel_backup is True and message_type != 'init':
5✔
225
            return
×
226
        if message_type in self.ORDERED_MESSAGES:
5✔
227
            chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
5✔
228
            self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
5✔
229
        else:
230
            if message_type not in ('error', 'warning') and 'channel_id' in payload:
5✔
231
                chan = self.get_channel_by_id(payload['channel_id'])
5✔
232
                if chan is None:
5✔
233
                    self.logger.info(f"Received {message_type} for unknown channel {payload['channel_id'].hex()}")
×
234
                    return
×
235
                args = (chan, payload)
5✔
236
            else:
237
                args = (payload,)
5✔
238
            try:
5✔
239
                f = getattr(self, 'on_' + message_type)
5✔
240
            except AttributeError:
×
241
                #self.logger.info("Received '%s'" % message_type.upper(), payload)
242
                return
×
243
            # raw message is needed to check signature
244
            if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
5✔
245
                payload['raw'] = message
5✔
246
                payload['sender_node_id'] = self.pubkey
5✔
247
            # note: the message handler might be async or non-async. In either case, by default,
248
            #       we wait for it to complete before we return, i.e. before the next message is processed.
249
            if asyncio.iscoroutinefunction(f):
5✔
250
                async with AsyncHangDetector(
5✔
251
                    message=f"message handler still running for {message_type.upper()}",
252
                    logger=self.logger,
253
                ):
254
                    await f(*args)
5✔
255
            else:
256
                f(*args)
5✔
257

258
    def non_blocking_msg_handler(func):
5✔
259
        """Makes a message handler non-blocking: while processing the message,
260
        the message_loop keeps processing subsequent incoming messages asynchronously.
261
        """
262
        assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
5✔
263
        @functools.wraps(func)
5✔
264
        async def wrapper(self: 'Peer', *args, **kwargs):
5✔
265
            return await self.taskgroup.spawn(func(self, *args, **kwargs))
5✔
266
        return wrapper
5✔
267

268
    def on_warning(self, payload):
5✔
269
        chan_id = payload.get("channel_id")
5✔
270
        err_bytes = payload['data']
5✔
271
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
5✔
272
        self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: "
5✔
273
                         f"{error_text_bytes_to_safe_str(err_bytes, max_len=None)}. chan_id={chan_id.hex()}. "
274
                         f"{is_known_chan_id=}")
275

276
    def on_error(self, payload):
5✔
277
        chan_id = payload.get("channel_id")
5✔
278
        err_bytes = payload['data']
5✔
279
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
5✔
280
        self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: "
5✔
281
                         f"{error_text_bytes_to_safe_str(err_bytes, max_len=None)}. chan_id={chan_id.hex()}. "
282
                         f"{is_known_chan_id=}")
283
        if chan := self.channels.get(chan_id):
5✔
284
            self.schedule_force_closing(chan_id)
5✔
285
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
5✔
286
            chan.save_remote_peer_sent_error(err_bytes)
5✔
287
        elif chan_id in self.temp_id_to_id:
×
288
            chan_id = self.temp_id_to_id[chan_id] or chan_id
×
289
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
×
290
        elif chan_id == bytes(32):
×
291
            # if channel_id is all zero:
292
            # - MUST fail all channels with the sending node.
293
            for cid in self.channels:
×
294
                self.schedule_force_closing(cid)
×
295
                self.ordered_message_queues[cid].put_nowait((None, {'error': err_bytes}))
×
296
        else:
297
            # if no existing channel is referred to by channel_id:
298
            # - MUST ignore the message.
299
            return
×
300
        raise GracefulDisconnect
5✔
301

302
    def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=False):
5✔
303
        """Sends a warning and disconnects if close_connection.
304

305
        Note:
306
        * channel_id is the temporary channel id when the channel id is not yet available
307

308
        A sending node:
309
        MAY set channel_id to all zero if the warning is not related to a specific channel.
310

311
        when failure was caused by an invalid signature check:
312
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
313
          funding_signed, closing_signed, or commitment_signed message.
314
        """
315
        assert isinstance(channel_id, bytes)
5✔
316
        encoded_data = b'' if not message else message.encode('ascii')
5✔
317
        self.send_message('warning', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
5✔
318
        if close_connection:
5✔
319
            raise GracefulDisconnect
5✔
320

321
    def send_error(self, channel_id: bytes, message: str = None, *, force_close_channel=False):
5✔
322
        """Sends an error message and force closes the channel.
323

324
        Note:
325
        * channel_id is the temporary channel id when the channel id is not yet available
326

327
        A sending node:
328
        * SHOULD send error for protocol violations or internal errors that make channels
329
          unusable or that make further communication unusable.
330
        * SHOULD send error with the unknown channel_id in reply to messages of type
331
          32-255 related to unknown channels.
332
        * MUST fail the channel(s) referred to by the error message.
333
        * MAY set channel_id to all zero to indicate all channels.
334

335
        when failure was caused by an invalid signature check:
336
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
337
          funding_signed, closing_signed, or commitment_signed message.
338
        """
339
        assert isinstance(channel_id, bytes)
5✔
340
        encoded_data = b'' if not message else message.encode('ascii')
5✔
341
        self.send_message('error', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
5✔
342
        # MUST fail the channel(s) referred to by the error message:
343
        #  we may violate this with force_close_channel
344
        if force_close_channel:
5✔
345
            if channel_id in self.channels:
5✔
346
                self.schedule_force_closing(channel_id)
5✔
347
            elif channel_id == bytes(32):
×
348
                for cid in self.channels:
×
349
                    self.schedule_force_closing(cid)
×
350
        raise GracefulDisconnect
5✔
351

352
    def on_ping(self, payload):
5✔
353
        l = payload['num_pong_bytes']
5✔
354
        self.send_message('pong', byteslen=l)
5✔
355

356
    def on_pong(self, payload):
5✔
357
        self.pong_event.set()
5✔
358

359
    async def wait_for_message(self, expected_name: str, channel_id: bytes):
5✔
360
        q = self.ordered_message_queues[channel_id]
5✔
361
        name, payload = await util.wait_for2(q.get(), LN_P2P_NETWORK_TIMEOUT)
5✔
362
        # raise exceptions for errors, so that the caller sees them
363
        if (err_bytes := payload.get("error")) is not None:
5✔
364
            err_text = error_text_bytes_to_safe_str(err_bytes)
×
365
            raise GracefulDisconnect(
×
366
                f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {err_text}")
367
        if name != expected_name:
5✔
368
            raise Exception(f"Received unexpected '{name}'")
×
369
        return payload
5✔
370

371
    def on_init(self, payload):
5✔
372
        if self._received_init:
5✔
373
            self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
5✔
374
            return
5✔
375
        _their_features = int.from_bytes(payload['features'], byteorder="big")
5✔
376
        _their_features |= int.from_bytes(payload['globalfeatures'], byteorder="big")
5✔
377
        try:
5✔
378
            self.their_features = validate_features(_their_features)
5✔
379
        except IncompatibleOrInsaneFeatures as e:
×
380
            raise GracefulDisconnect(f"remote sent insane features: {repr(e)}")
×
381
        # check if features are compatible, and set self.features to what we negotiated
382
        try:
5✔
383
            self.features = ln_compare_features(self.features, self.their_features)
5✔
384
        except IncompatibleLightningFeatures as e:
×
385
            self.initialized.set_exception(e)
×
386
            raise GracefulDisconnect(f"{str(e)}")
×
387
        self.logger.info(
5✔
388
            f"received INIT with features={str(self.their_features.get_names())}. "
389
            f"negotiated={str(self.features)}")
390
        # check that they are on the same chain as us, if provided
391
        their_networks = payload["init_tlvs"].get("networks")
5✔
392
        if their_networks:
5✔
393
            their_chains = list(chunks(their_networks["chains"], 32))
5✔
394
            if constants.net.rev_genesis_bytes() not in their_chains:
5✔
395
                raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
×
396
        # all checks passed
397
        self.lnworker.on_peer_successfully_established(self)
5✔
398
        self._received_init = True
5✔
399
        self.maybe_set_initialized()
5✔
400

401
    def on_node_announcement(self, payload):
5✔
402
        if not self.lnworker.uses_trampoline():
×
403
            self.gossip_queue.put_nowait(('node_announcement', payload))
×
404

405
    def on_channel_announcement(self, payload):
5✔
406
        if not self.lnworker.uses_trampoline():
×
407
            self.gossip_queue.put_nowait(('channel_announcement', payload))
×
408

409
    def on_channel_update(self, payload):
5✔
410
        self.maybe_save_remote_update(payload)
5✔
411
        if not self.lnworker.uses_trampoline():
5✔
412
            self.gossip_queue.put_nowait(('channel_update', payload))
5✔
413

414
    def on_query_channel_range(self, payload):
5✔
415
        if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
×
416
            return
×
417
        if not self._is_valid_channel_range_query(payload):
×
418
            return self.send_warning(bytes(32), "received invalid query_channel_range")
×
419
        if self.outgoing_gossip_reply:
×
420
            return self.send_warning(bytes(32), "received multiple queries at the same time")
×
421
        self.outgoing_gossip_reply = True
×
422
        self.gossip_queue.put_nowait(('query_channel_range', payload))
×
423

424
    def on_query_short_channel_ids(self, payload):
5✔
425
        if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
×
426
            return
×
427
        if self.outgoing_gossip_reply:
×
428
            return self.send_warning(bytes(32), "received multiple queries at the same time")
×
429
        if not self._is_valid_short_channel_id_query(payload):
×
430
            return self.send_warning(bytes(32), "invalid query_short_channel_ids")
×
431
        self.outgoing_gossip_reply = True
×
432
        self.gossip_queue.put_nowait(('query_short_channel_ids', payload))
×
433

434
    def on_gossip_timestamp_filter(self, payload):
5✔
435
        if self._should_forward_gossip():
×
436
            self.set_gossip_timestamp_filter(payload)
×
437

438
    def set_gossip_timestamp_filter(self, payload: dict) -> None:
5✔
439
        """Set the gossip_timestamp_filter for this peer. If the peer requested historical gossip,
440
        the request is put on the queue, otherwise only the forwarding loop will check the filter"""
441
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
442
            return
×
443
        filter = GossipTimestampFilter.from_payload(payload)
×
444
        self.gossip_timestamp_filter = filter
×
445
        self.logger.debug(f"got gossip_ts_filter from peer {self.pubkey.hex()}: "
×
446
                          f"{str(self.gossip_timestamp_filter)}")
447
        if filter and not filter.only_forwarding:
×
448
            self.gossip_queue.put_nowait(('gossip_timestamp_filter', None))
×
449

450
    def maybe_save_remote_update(self, payload):
5✔
451
        if not self.channels:
5✔
452
            return
×
453
        for chan in self.channels.values():
5✔
454
            if payload['short_channel_id'] in [chan.short_channel_id, chan.get_local_scid_alias()]:
5✔
455
                chan.set_remote_update(payload)
5✔
456
                self.logger.info(f"saved remote channel_update gossip msg for chan {chan.get_id_for_log()}")
5✔
457
                break
5✔
458
        else:
459
            # Save (some bounded number of) orphan channel updates for later
460
            # as it might be for our own direct channel with this peer
461
            # (and we might not yet know the short channel id for that)
462
            # Background: this code is here to deal with a bug in LND,
463
            # see https://github.com/lightningnetwork/lnd/issues/3651 (closed 2022-08-13, lnd-v0.15.1)
464
            # and https://github.com/lightningnetwork/lightning-rfc/pull/657
465
            # This code assumes gossip_queries is set. BOLT7: "if the
466
            # gossip_queries feature is negotiated, [a node] MUST NOT
467
            # send gossip it did not generate itself"
468
            # NOTE: The definition of gossip_queries changed
469
            # https://github.com/lightning/bolts/commit/fce8bab931674a81a9ea895c9e9162e559e48a65
470
            short_channel_id = ShortChannelID(payload['short_channel_id'])
×
471
            self.logger.debug(f'received orphan channel update {short_channel_id}')
×
472
            self.orphan_channel_updates[short_channel_id] = payload
×
473
            while len(self.orphan_channel_updates) > 25:
×
474
                self.orphan_channel_updates.popitem(last=False)
×
475

476
    def on_announcement_signatures(self, chan: Channel, payload):
5✔
477
        h = chan.get_channel_announcement_hash()
×
478
        node_signature = payload["node_signature"]
×
479
        bitcoin_signature = payload["bitcoin_signature"]
×
480
        if not ECPubkey(chan.config[REMOTE].multisig_key.pubkey).ecdsa_verify(bitcoin_signature, h):
×
481
            raise Exception("bitcoin_sig invalid in announcement_signatures")
×
482
        if not ECPubkey(self.pubkey).ecdsa_verify(node_signature, h):
×
483
            raise Exception("node_sig invalid in announcement_signatures")
×
484
        chan.config[REMOTE].announcement_node_sig = node_signature
×
485
        chan.config[REMOTE].announcement_bitcoin_sig = bitcoin_signature
×
486
        self.lnworker.save_channel(chan)
×
487
        self.maybe_send_announcement_signatures(chan, is_reply=True)
×
488

489
    def handle_disconnect(func):
5✔
490
        @functools.wraps(func)
5✔
491
        async def wrapper_func(self, *args, **kwargs):
5✔
492
            try:
×
493
                return await func(self, *args, **kwargs)
×
494
            except GracefulDisconnect as e:
×
495
                self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
×
496
            except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
×
497
                    aiorpcx.socks.SOCKSError) as e:
498
                self.logger.info(f"Disconnecting: {repr(e)}")
×
499
            finally:
500
                self.close_and_cleanup()
×
501
        return wrapper_func
5✔
502

503
    @ignore_exceptions  # do not kill outer taskgroup
5✔
504
    @log_exceptions
5✔
505
    @handle_disconnect
5✔
506
    async def main_loop(self):
5✔
507
        async with self.taskgroup as group:
×
508
            await group.spawn(self.htlc_switch())
×
509
            await group.spawn(self._message_loop())
×
510
            await group.spawn(self._query_gossip())
×
511
            await group.spawn(self._process_gossip())
×
512
            await group.spawn(self._send_own_gossip())
×
513
            await group.spawn(self._forward_gossip())
×
514

515
    async def _process_gossip(self):
5✔
516
        while True:
×
517
            await asyncio.sleep(5)
×
518
            if not self.network.lngossip:
×
519
                continue
×
520
            chan_anns = []
×
521
            chan_upds = []
×
522
            node_anns = []
×
523
            while True:
×
524
                name, payload = await self.gossip_queue.get()
×
525
                if name == 'channel_announcement':
×
526
                    chan_anns.append(payload)
×
527
                elif name == 'channel_update':
×
528
                    chan_upds.append(payload)
×
529
                elif name == 'node_announcement':
×
530
                    node_anns.append(payload)
×
531
                elif name == 'query_channel_range':
×
532
                    await self.taskgroup.spawn(self._send_reply_channel_range(payload))
×
533
                elif name == 'query_short_channel_ids':
×
534
                    await self.taskgroup.spawn(self._send_reply_short_channel_ids(payload))
×
535
                elif name == 'gossip_timestamp_filter':
×
536
                    await self.taskgroup.spawn(self._handle_historical_gossip_request())
×
537
                else:
538
                    raise Exception('unknown message')
×
539
                if self.gossip_queue.empty():
×
540
                    break
×
541
            if self.network.lngossip:
×
542
                await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
×
543

544
    async def _send_own_gossip(self):
5✔
545
        if self.lnworker == self.lnworker.network.lngossip:
×
546
            return
×
547
        await asyncio.sleep(10)
×
548
        while True:
×
549
            public_channels = [chan for chan in self.lnworker.channels.values() if chan.is_public()]
×
550
            if public_channels:
×
551
                alias = self.lnworker.config.LIGHTNING_NODE_ALIAS
×
552
                color = self.lnworker.config.LIGHTNING_NODE_COLOR_RGB
×
553
                self.send_node_announcement(alias, color)
×
554
                for chan in public_channels:
×
555
                    if chan.is_open() and chan.peer_state == PeerState.GOOD:
×
556
                        self.maybe_send_channel_announcement(chan)
×
557
            await asyncio.sleep(600)
×
558

559
    def _should_forward_gossip(self) -> bool:
5✔
560
        if (self.network.lngossip != self.lnworker
×
561
                and not self.lnworker.uses_trampoline()
562
                and self.features.supports(LnFeatures.GOSSIP_QUERIES_REQ)):
563
            return True
×
564
        return False
×
565

566
    async def _forward_gossip(self):
5✔
567
        if not self._should_forward_gossip():
×
568
            return
×
569

570
        async def send_new_gossip_with_semaphore(gossip: List[GossipForwardingMessage]):
×
571
            async with self.network.lngossip.gossip_request_semaphore:
×
572
                sent = await self._send_gossip_messages(gossip)
×
573
            if sent > 0:
×
574
                self.logger.debug(f"forwarded {sent} gossip messages to {self.pubkey.hex()}")
×
575

576
        lngossip = self.network.lngossip
×
577
        last_gossip_batch_ts = 0
×
578
        while True:
×
579
            await asyncio.sleep(10)
×
580
            if not self.gossip_timestamp_filter:
×
581
                continue  # peer didn't request gossip
×
582

583
            new_gossip, last_lngossip_refresh_ts = await lngossip.get_forwarding_gossip()
×
584
            if not last_lngossip_refresh_ts > last_gossip_batch_ts:
×
585
                continue  # no new batch available
×
586
            last_gossip_batch_ts = last_lngossip_refresh_ts
×
587

588
            await self.taskgroup.spawn(send_new_gossip_with_semaphore(new_gossip))
×
589

590
    async def _handle_historical_gossip_request(self):
5✔
591
        """Called when a peer requests historical gossip with a gossip_timestamp_filter query."""
592
        filter = self.gossip_timestamp_filter
×
593
        if not self._should_forward_gossip() or not filter or filter.only_forwarding:
×
594
            return
×
595
        async with self.network.lngossip.gossip_request_semaphore:
×
596
            requested_gossip = self.lnworker.channel_db.get_gossip_in_timespan(filter)
×
597
            filter.only_forwarding = True
×
598
            sent = await self._send_gossip_messages(requested_gossip)
×
599
            if sent > 0:
×
600
                self._num_gossip_messages_forwarded += sent
×
601
                #self.logger.debug(f"forwarded {sent} historical gossip messages to {self.pubkey.hex()}")
602

603
    async def _send_gossip_messages(self, messages: List[GossipForwardingMessage]) -> int:
5✔
604
        amount_sent = 0
×
605
        for msg in messages:
×
606
            if self.gossip_timestamp_filter.in_range(msg.timestamp) \
×
607
                and self.pubkey != msg.sender_node_id:
608
                await self.transport.send_bytes_and_drain(msg.msg)
×
609
                amount_sent += 1
×
610
                if amount_sent % 250 == 0:
×
611
                    # this can be a lot of messages, completely blocking the event loop
612
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
613
        return amount_sent
×
614

615
    async def _query_gossip(self):
5✔
616
        try:
×
617
            await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
618
        except Exception as e:
×
619
            raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
×
620
        if self.lnworker == self.lnworker.network.lngossip:
×
621
            if not self.their_features.supports(LnFeatures.GOSSIP_QUERIES_OPT):
×
622
                raise GracefulDisconnect("remote does not support gossip_queries, which we need")
×
623
            try:
×
624
                ids, complete = await util.wait_for2(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
×
625
            except asyncio.TimeoutError as e:
×
626
                raise GracefulDisconnect("query_channel_range timed out") from e
×
627
            self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
×
628
            await self.lnworker.add_new_ids(ids)
×
629
            self.request_gossip(int(time.time()))
×
630
            while True:
×
631
                todo = self.lnworker.get_ids_to_query()
×
632
                if not todo:
×
633
                    await asyncio.sleep(1)
×
634
                    continue
×
635
                await self.get_short_channel_ids(todo)
×
636

637
    @staticmethod
5✔
638
    def _is_valid_channel_range_query(payload: dict) -> bool:
5✔
639
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
640
            return False
×
641
        if payload.get('first_blocknum', -1) < constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS:
×
642
            return False
×
643
        if payload.get('number_of_blocks', 0) < 1:
×
644
            return False
×
645
        return True
×
646

647
    def _is_valid_short_channel_id_query(self, payload: dict) -> bool:
5✔
648
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
649
            return False
×
650
        enc_short_ids = payload['encoded_short_ids']
×
651
        if enc_short_ids[0] != 0:
×
652
            self.logger.debug(f"got query_short_channel_ids with invalid encoding: {repr(enc_short_ids[0])}")
×
653
            return False
×
654
        if (len(enc_short_ids) - 1) % 8 != 0:
×
655
            self.logger.debug(f"got query_short_channel_ids with invalid length")
×
656
            return False
×
657
        return True
×
658

659
    async def _send_reply_channel_range(self, payload: dict):
5✔
660
        """https://github.com/lightning/bolts/blob/acd383145dd8c3fecd69ce94e4a789767b984ac0/07-routing-gossip.md#requirements-5"""
661
        first_blockheight: int = payload['first_blocknum']
×
662

663
        async with self.network.lngossip.gossip_request_semaphore:
×
664
            sorted_scids: List[ShortChannelID] = self.lnworker.channel_db.get_channels_in_range(
×
665
                first_blockheight,
666
                payload['number_of_blocks']
667
            )
668
            self.logger.debug(f"reply_channel_range to request "
×
669
                              f"first_height={first_blockheight}, "
670
                              f"num_blocks={payload['number_of_blocks']}, "
671
                              f"sending {len(sorted_scids)} scids")
672

673
            complete: bool = False
×
674
            while not complete:
×
675
                # create a 64800 byte chunk of skids, split the remaining scids
676
                encoded_scids, sorted_scids = b''.join(sorted_scids[:8100]), sorted_scids[8100:]
×
677
                complete = len(sorted_scids) == 0  # if there are no scids remaining we are done
×
678
                # number of blocks covered by the scids in this chunk
679
                if complete:
×
680
                    # LAST MESSAGE MUST have first_blocknum plus number_of_blocks equal or greater than
681
                    # the query_channel_range first_blocknum plus number_of_blocks.
682
                    number_of_blocks = ((payload['first_blocknum'] + payload['number_of_blocks'])
×
683
                                        - first_blockheight)
684
                else:
685
                    # we cover the range until the height of the first scid in the next chunk
686
                    number_of_blocks = sorted_scids[0].block_height - first_blockheight
×
687
                self.send_message('reply_channel_range',
×
688
                    chain_hash=constants.net.rev_genesis_bytes(),
689
                    first_blocknum=first_blockheight,
690
                    number_of_blocks=number_of_blocks,
691
                    sync_complete=complete,
692
                    len=1+len(encoded_scids),
693
                    encoded_short_ids=b'\x00' + encoded_scids)
694
                if not complete:
×
695
                    first_blockheight = sorted_scids[0].block_height
×
696
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
697
            self.outgoing_gossip_reply = False
×
698

699
    async def get_channel_range(self):
5✔
700
        first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
×
701
        num_blocks = self.lnworker.network.get_local_height() - first_block
×
702
        self.query_channel_range(first_block, num_blocks)
×
703
        intervals = []
×
704
        ids = set()
×
705
        # note: implementations behave differently...
706
        # "sane implementation that follows BOLT-07" example:
707
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
708
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
709
        #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
710
        #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
711
        #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
712
        # lnd example:
713
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
714
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
715
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
716
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
717
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
718
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
719
        # ADDENDUM (01/2025): now it's 'MUST set sync_complete to false if this is not the final reply_channel_range.'
720
        while True:
×
721
            index, num, complete, _ids = await self.reply_channel_range.get()
×
722
            ids.update(_ids)
×
723
            intervals.append((index, index+num))
×
724
            intervals.sort()
×
725
            while len(intervals) > 1:
×
726
                a,b = intervals[0]
×
727
                c,d = intervals[1]
×
728
                if not (a <= c and a <= b and c <= d):
×
729
                    raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
×
730
                if b >= c:
×
731
                    intervals = [(a,d)] + intervals[2:]
×
732
                else:
733
                    break
×
734
            if len(intervals) == 1 and complete:
×
735
                a, b = intervals[0]
×
736
                if a <= first_block and b >= first_block + num_blocks:
×
737
                    break
×
738
        return ids, complete
×
739

740
    def request_gossip(self, timestamp=0):
5✔
741
        if timestamp == 0:
×
742
            self.logger.info('requesting whole channel graph')
×
743
        else:
744
            self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).isoformat()}')
×
745
        self.send_message(
×
746
            'gossip_timestamp_filter',
747
            chain_hash=constants.net.rev_genesis_bytes(),
748
            first_timestamp=timestamp,
749
            timestamp_range=b'\xff'*4)
750

751
    def query_channel_range(self, first_block, num_blocks):
5✔
752
        self.logger.info(f'query channel range {first_block} {num_blocks}')
×
753
        self.send_message(
×
754
            'query_channel_range',
755
            chain_hash=constants.net.rev_genesis_bytes(),
756
            first_blocknum=first_block,
757
            number_of_blocks=num_blocks)
758

759
    def decode_short_ids(self, encoded):
5✔
760
        if encoded[0] == 0:
×
761
            decoded = encoded[1:]
×
762
        elif encoded[0] == 1:
×
763
            decoded = zlib.decompress(encoded[1:])
×
764
        else:
765
            raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
×
766
        ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
×
767
        return ids
×
768

769
    def on_reply_channel_range(self, payload):
5✔
770
        first = payload['first_blocknum']
×
771
        num = payload['number_of_blocks']
×
772
        complete = bool(int.from_bytes(payload['sync_complete'], 'big'))
×
773
        encoded = payload['encoded_short_ids']
×
774
        ids = self.decode_short_ids(encoded)
×
775
        # self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, "
776
        #                  f"num_ids {len(ids)}, complete {complete}")
777
        self.reply_channel_range.put_nowait((first, num, complete, ids))
×
778

779
    async def _send_reply_short_channel_ids(self, payload: dict):
5✔
780
        async with self.network.lngossip.gossip_request_semaphore:
×
781
            requested_scids = payload['encoded_short_ids']
×
782
            decoded_scids = [ShortChannelID.normalize(scid)
×
783
                             for scid in self.decode_short_ids(requested_scids)]
784
            self.logger.debug(f"serving query_short_channel_ids request: "
×
785
                              f"requested {len(decoded_scids)} scids")
786
            chan_db = self.lnworker.channel_db
×
787
            response: Set[bytes] = set()
×
788
            for scid in decoded_scids:
×
789
                requested_msgs = chan_db.get_gossip_for_scid_request(scid)
×
790
                response.update(requested_msgs)
×
791
            self.logger.debug(f"found {len(response)} gossip messages to serve scid request")
×
792
            for index, msg in enumerate(response):
×
793
                await self.transport.send_bytes_and_drain(msg)
×
794
                if index % 250 == 0:
×
795
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
796
            self.send_message(
×
797
                'reply_short_channel_ids_end',
798
                chain_hash=constants.net.rev_genesis_bytes(),
799
                full_information=self.network.lngossip.is_synced()
800
            )
801
            self.outgoing_gossip_reply = False
×
802

803
    async def get_short_channel_ids(self, ids):
5✔
804
        #self.logger.info(f'Querying {len(ids)} short_channel_ids')
805
        assert not self.querying.is_set()
×
806
        self.query_short_channel_ids(ids)
×
807
        await self.querying.wait()
×
808
        self.querying.clear()
×
809

810
    def query_short_channel_ids(self, ids):
5✔
811
        # compression MUST NOT be used according to updated bolt
812
        # (https://github.com/lightning/bolts/pull/981)
813
        ids = sorted(ids)
×
814
        s = b''.join(ids)
×
815
        prefix = b'\x00'  # uncompressed
×
816
        self.send_message(
×
817
            'query_short_channel_ids',
818
            chain_hash=constants.net.rev_genesis_bytes(),
819
            len=1+len(s),
820
            encoded_short_ids=prefix+s)
821

822
    async def _message_loop(self):
5✔
823
        try:
5✔
824
            await util.wait_for2(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
5✔
825
        except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
×
826
            raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
×
827
        async for msg in self.transport.read_messages():
5✔
828
            await self._process_message(msg)
5✔
829
            if self.DELAY_INC_MSG_PROCESSING_SLEEP:
5✔
830
                # rate-limit message-processing a bit, to make it harder
831
                # for a single peer to bog down the event loop / cpu:
832
                await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
5✔
833

834
    def on_reply_short_channel_ids_end(self, payload):
5✔
835
        self.querying.set()
×
836

837
    def close_and_cleanup(self):
5✔
838
        # note: This method might get called multiple times!
839
        #       E.g. if you call close_and_cleanup() to cause a disconnection from the peer,
840
        #       it will get called a second time in handle_disconnect().
841
        self.unregister_callbacks()
×
842
        try:
×
843
            if self.transport:
×
844
                self.transport.close()
×
845
        except Exception:
×
846
            pass
×
847
        self.lnworker.peer_closed(self)
×
848
        self.got_disconnected.set()
×
849

850
    def is_shutdown_anysegwit(self):
5✔
851
        return self.features.supports(LnFeatures.OPTION_SHUTDOWN_ANYSEGWIT_OPT)
5✔
852

853
    def is_channel_type(self):
5✔
854
        return self.features.supports(LnFeatures.OPTION_CHANNEL_TYPE_OPT)
×
855

856
    def accepts_zeroconf(self):
5✔
857
        return self.features.supports(LnFeatures.OPTION_ZEROCONF_OPT)
5✔
858

859
    def is_upfront_shutdown_script(self):
5✔
860
        return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
5✔
861

862
    def use_anchors(self) -> bool:
5✔
863
        return self.features.supports(LnFeatures.OPTION_ANCHORS_ZERO_FEE_HTLC_OPT)
×
864

865
    def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
5✔
866
        if msg_identifier not in ['accept', 'open']:
×
867
            raise ValueError("msg_identifier must be either 'accept' or 'open'")
×
868

869
        uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
×
870
            'upfront_shutdown_script')
871

872
        if uss_tlv and self.is_upfront_shutdown_script():
×
873
            upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
×
874
        else:
875
            upfront_shutdown_script = b''
×
876
        self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
×
877
        return upfront_shutdown_script
×
878

879
    def make_local_config(
5✔
880
        self,
881
        *,
882
        funding_sat: int,
883
        push_msat: int,
884
        initiator: HTLCOwner,
885
        channel_type: ChannelType,
886
        multisig_funding_keypair: Optional[Keypair],  # if None, will get derived from channel_seed
887
    ) -> LocalConfig:
888
        channel_seed = os.urandom(32)
×
889
        initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
×
890

891
        # sending empty bytes as the upfront_shutdown_script will give us the
892
        # flexibility to decide an address at closing time
893
        upfront_shutdown_script = b''
×
894

895
        if self.use_anchors():
×
896
            static_payment_key = self.lnworker.static_payment_key
×
897
            static_remotekey = None
×
898
        else:
899
            assert channel_type & channel_type.OPTION_STATIC_REMOTEKEY
×
900
            wallet = self.lnworker.wallet
×
901
            assert wallet.txin_type == 'p2wpkh'
×
902
            addr = wallet.get_new_sweep_address_for_channel()
×
903
            static_payment_key = None
×
904
            static_remotekey = bytes.fromhex(wallet.get_public_key(addr))
×
905

906
        if multisig_funding_keypair:
×
907
            for chan in self.lnworker.channels.values():  # check against all chans of lnworker, for sanity
×
908
                if multisig_funding_keypair.pubkey == chan.config[LOCAL].multisig_key.pubkey:
×
909
                    raise Exception(
×
910
                        "Refusing to reuse multisig_funding_keypair for new channel. "
911
                        "Wait one block before opening another channel with this peer."
912
                    )
913

914
        dust_limit_sat = bitcoin.DUST_LIMIT_P2PKH
×
915
        reserve_sat = max(funding_sat // 100, dust_limit_sat)
×
916
        # for comparison of defaults, see
917
        # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
918
        # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
919
        # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
920
        max_htlc_value_in_flight_msat = self.network.config.LIGHTNING_MAX_HTLC_VALUE_IN_FLIGHT_MSAT or funding_sat * 1000
×
921
        local_config = LocalConfig.from_seed(
×
922
            channel_seed=channel_seed,
923
            static_remotekey=static_remotekey,
924
            static_payment_key=static_payment_key,
925
            multisig_key=multisig_funding_keypair,
926
            upfront_shutdown_script=upfront_shutdown_script,
927
            to_self_delay=self.network.config.LIGHTNING_TO_SELF_DELAY_CSV,
928
            dust_limit_sat=dust_limit_sat,
929
            max_htlc_value_in_flight_msat=max_htlc_value_in_flight_msat,
930
            max_accepted_htlcs=30,
931
            initial_msat=initial_msat,
932
            reserve_sat=reserve_sat,
933
            funding_locked_received=False,
934
            current_commitment_signature=None,
935
            current_htlc_signatures=b'',
936
            htlc_minimum_msat=1,
937
            announcement_node_sig=b'',
938
            announcement_bitcoin_sig=b'',
939
        )
940
        local_config.validate_params(funding_sat=funding_sat, config=self.network.config, peer_features=self.features)
×
941
        return local_config
×
942

943
    def temporarily_reserve_funding_tx_change_address(func):
5✔
944
        # During the channel open flow, if we initiated, we might have used a change address
945
        # of ours in the funding tx. The funding tx is not part of the wallet history
946
        # at that point yet, but we should already consider this change address as 'used'.
947
        @functools.wraps(func)
5✔
948
        async def wrapper(self: 'Peer', *args, **kwargs):
5✔
949
            funding_tx = kwargs['funding_tx']  # type: PartialTransaction
×
950
            wallet = self.lnworker.wallet
×
951
            change_addresses = [txout.address for txout in funding_tx.outputs()
×
952
                                if wallet.is_change(txout.address)]
953
            for addr in change_addresses:
×
954
                wallet.set_reserved_state_of_address(addr, reserved=True)
×
955
            try:
×
956
                return await func(self, *args, **kwargs)
×
957
            finally:
958
                for addr in change_addresses:
×
959
                    self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
×
960
        return wrapper
5✔
961

962
    @temporarily_reserve_funding_tx_change_address
5✔
963
    async def channel_establishment_flow(
5✔
964
            self, *,
965
            funding_tx: 'PartialTransaction',
966
            funding_sat: int,
967
            push_msat: int,
968
            public: bool,
969
            zeroconf: bool = False,
970
            temp_channel_id: bytes,
971
            opening_fee: int = None,
972
    ) -> Tuple[Channel, 'PartialTransaction']:
973
        """Implements the channel opening flow.
974

975
        -> open_channel message
976
        <- accept_channel message
977
        -> funding_created message
978
        <- funding_signed message
979

980
        Channel configurations are initialized in this method.
981
        """
982

983
        if public and not self.lnworker.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS:
×
984
            raise Exception('Cannot create public channels')
×
985

986
        if not self.lnworker.wallet.can_have_lightning():
×
987
            # old wallet that cannot have lightning anymore
988
            raise Exception('This wallet cannot create new channels')
×
989

990
        # will raise if init fails
991
        await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
992
        # trampoline is not yet in features
993
        if self.lnworker.uses_trampoline() and not self.lnworker.is_trampoline_peer(self.pubkey):
×
994
            raise Exception('Not a trampoline node: ' + str(self.their_features))
×
995

996
        channel_flags = CF_ANNOUNCE_CHANNEL if public else 0
×
997
        feerate: Optional[int] = self.lnworker.current_target_feerate_per_kw(
×
998
            has_anchors=self.use_anchors()
999
        )
1000
        if feerate is None:
×
1001
            raise NoDynamicFeeEstimates()
×
1002
        # we set a channel type for internal bookkeeping
1003
        open_channel_tlvs = {}
×
1004
        assert self.their_features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
×
1005
        our_channel_type = ChannelType(ChannelType.OPTION_STATIC_REMOTEKEY)
×
1006
        if self.use_anchors():
×
1007
            our_channel_type |= ChannelType(ChannelType.OPTION_ANCHORS_ZERO_FEE_HTLC_TX)
×
1008
        if zeroconf:
×
1009
            our_channel_type |= ChannelType(ChannelType.OPTION_ZEROCONF)
×
1010
        # We do not set the option_scid_alias bit in channel_type because LND rejects it.
1011
        # Eclair accepts channel_type with that bit, but does not require it.
1012

1013
        # if option_channel_type is negotiated: MUST set channel_type
1014
        if self.is_channel_type():
×
1015
            # if it includes channel_type: MUST set it to a defined type representing the type it wants.
1016
            open_channel_tlvs['channel_type'] = {
×
1017
                'type': our_channel_type.to_bytes_minimal()
1018
            }
1019

1020
        if self.use_anchors():
×
1021
            multisig_funding_keypair = lnutil.derive_multisig_funding_key_if_we_opened(
×
1022
                funding_root_secret=self.lnworker.funding_root_keypair.privkey,
1023
                remote_node_id_or_prefix=self.pubkey,
1024
                nlocktime=funding_tx.locktime,
1025
            )
1026
        else:
1027
            multisig_funding_keypair = None
×
1028
        local_config = self.make_local_config(
×
1029
            funding_sat=funding_sat,
1030
            push_msat=push_msat,
1031
            initiator=LOCAL,
1032
            channel_type=our_channel_type,
1033
            multisig_funding_keypair=multisig_funding_keypair,
1034
        )
1035
        # if it includes open_channel_tlvs: MUST include upfront_shutdown_script.
1036
        open_channel_tlvs['upfront_shutdown_script'] = {
×
1037
            'shutdown_scriptpubkey': local_config.upfront_shutdown_script
1038
        }
1039
        if opening_fee:
×
1040
            # todo: maybe add payment hash
1041
            open_channel_tlvs['channel_opening_fee'] = {
×
1042
                'channel_opening_fee': opening_fee
1043
            }
1044
        # for the first commitment transaction
1045
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
1046
            local_config.per_commitment_secret_seed,
1047
            RevocationStore.START_INDEX
1048
        )
1049
        per_commitment_point_first = secret_to_pubkey(
×
1050
            int.from_bytes(per_commitment_secret_first, 'big'))
1051

1052
        # store the temp id now, so that it is recognized for e.g. 'error' messages
1053
        # TODO: this is never cleaned up; the dict grows unbounded until disconnect
1054
        self.temp_id_to_id[temp_channel_id] = None
×
1055
        self.send_message(
×
1056
            "open_channel",
1057
            temporary_channel_id=temp_channel_id,
1058
            chain_hash=constants.net.rev_genesis_bytes(),
1059
            funding_satoshis=funding_sat,
1060
            push_msat=push_msat,
1061
            dust_limit_satoshis=local_config.dust_limit_sat,
1062
            feerate_per_kw=feerate,
1063
            max_accepted_htlcs=local_config.max_accepted_htlcs,
1064
            funding_pubkey=local_config.multisig_key.pubkey,
1065
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
1066
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
1067
            payment_basepoint=local_config.payment_basepoint.pubkey,
1068
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
1069
            first_per_commitment_point=per_commitment_point_first,
1070
            to_self_delay=local_config.to_self_delay,
1071
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
1072
            channel_flags=channel_flags,
1073
            channel_reserve_satoshis=local_config.reserve_sat,
1074
            htlc_minimum_msat=local_config.htlc_minimum_msat,
1075
            open_channel_tlvs=open_channel_tlvs,
1076
        )
1077

1078
        # <- accept_channel
1079
        payload = await self.wait_for_message('accept_channel', temp_channel_id)
×
1080
        self.logger.debug(f"received accept_channel for temp_channel_id={temp_channel_id.hex()}. {payload=}")
×
1081
        remote_per_commitment_point = payload['first_per_commitment_point']
×
1082
        funding_txn_minimum_depth = payload['minimum_depth']
×
1083
        if not zeroconf and funding_txn_minimum_depth <= 0:
×
1084
            raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
×
1085
        if funding_txn_minimum_depth > 30:
×
1086
            raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
×
1087

1088
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
1089
            payload, 'accept')
1090

1091
        accept_channel_tlvs = payload.get('accept_channel_tlvs')
×
1092
        their_channel_type = accept_channel_tlvs.get('channel_type') if accept_channel_tlvs else None
×
1093
        if their_channel_type:
×
1094
            their_channel_type = ChannelType.from_bytes(their_channel_type['type'], byteorder='big').discard_unknown_and_check()
×
1095
            # if channel_type is set, and channel_type was set in open_channel,
1096
            # and they are not equal types: MUST reject the channel.
1097
            if open_channel_tlvs.get('channel_type') is not None and their_channel_type != our_channel_type:
×
1098
                raise Exception("Channel type is not the one that we sent.")
×
1099

1100
        remote_config = RemoteConfig(
×
1101
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
1102
            multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
1103
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
1104
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
1105
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
1106
            to_self_delay=payload['to_self_delay'],
1107
            dust_limit_sat=payload['dust_limit_satoshis'],
1108
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
1109
            max_accepted_htlcs=payload["max_accepted_htlcs"],
1110
            initial_msat=push_msat,
1111
            reserve_sat=payload["channel_reserve_satoshis"],
1112
            htlc_minimum_msat=payload['htlc_minimum_msat'],
1113
            next_per_commitment_point=remote_per_commitment_point,
1114
            current_per_commitment_point=None,
1115
            upfront_shutdown_script=upfront_shutdown_script,
1116
            announcement_node_sig=b'',
1117
            announcement_bitcoin_sig=b'',
1118
        )
1119
        ChannelConfig.cross_validate_params(
×
1120
            local_config=local_config,
1121
            remote_config=remote_config,
1122
            funding_sat=funding_sat,
1123
            is_local_initiator=True,
1124
            initial_feerate_per_kw=feerate,
1125
            config=self.network.config,
1126
            peer_features=self.features,
1127
            has_anchors=self.use_anchors(),
1128
        )
1129

1130
        # -> funding created
1131
        # replace dummy output in funding tx
1132
        redeem_script = funding_output_script(local_config, remote_config)
×
1133
        funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
×
1134
        funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
×
1135
        funding_tx.replace_output_address(DummyAddress.CHANNEL, funding_address)
×
1136
        # find and encrypt op_return data associated to funding_address
1137
        has_onchain_backup = self.lnworker and self.lnworker.has_recoverable_channels()
×
1138
        if has_onchain_backup:
×
1139
            backup_data = self.lnworker.cb_data(self.pubkey)
×
1140
            dummy_scriptpubkey = make_op_return(backup_data)
×
1141
            for o in funding_tx.outputs():
×
1142
                if o.scriptpubkey == dummy_scriptpubkey:
×
1143
                    encrypted_data = self.lnworker.encrypt_cb_data(backup_data, funding_address)
×
1144
                    assert len(encrypted_data) == len(backup_data)
×
1145
                    o.scriptpubkey = make_op_return(encrypted_data)
×
1146
                    break
×
1147
            else:
1148
                raise Exception('op_return output not found in funding tx')
×
1149
        # must not be malleable
1150
        funding_tx.set_rbf(False)
×
1151
        if not funding_tx.is_segwit():
×
1152
            raise Exception('Funding transaction is not segwit')
×
1153
        funding_txid = funding_tx.txid()
×
1154
        assert funding_txid
×
1155
        funding_index = funding_tx.outputs().index(funding_output)
×
1156
        # build remote commitment transaction
1157
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
×
1158
        outpoint = Outpoint(funding_txid, funding_index)
×
1159
        constraints = ChannelConstraints(
×
1160
            flags=channel_flags,
1161
            capacity=funding_sat,
1162
            is_initiator=True,
1163
            funding_txn_minimum_depth=funding_txn_minimum_depth
1164
        )
1165
        storage = self.create_channel_storage(
×
1166
            channel_id, outpoint, local_config, remote_config, constraints, our_channel_type)
1167
        chan = Channel(
×
1168
            storage,
1169
            lnworker=self.lnworker,
1170
            initial_feerate=feerate
1171
        )
1172
        chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
×
1173
        chan.storage['has_onchain_backup'] = has_onchain_backup
×
1174
        chan.storage['init_timestamp'] = int(time.time())
×
1175
        if isinstance(self.transport, LNTransport):
×
1176
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
1177
        sig_64, _ = chan.sign_next_commitment()
×
1178
        self.temp_id_to_id[temp_channel_id] = channel_id
×
1179

1180
        self.send_message("funding_created",
×
1181
            temporary_channel_id=temp_channel_id,
1182
            funding_txid=funding_txid_bytes,
1183
            funding_output_index=funding_index,
1184
            signature=sig_64)
1185
        self.funding_created_sent.add(channel_id)
×
1186

1187
        # <- funding signed
1188
        payload = await self.wait_for_message('funding_signed', channel_id)
×
1189
        self.logger.info('received funding_signed')
×
1190
        remote_sig = payload['signature']
×
1191
        try:
×
1192
            chan.receive_new_commitment(remote_sig, [])
×
1193
        except LNProtocolWarning as e:
×
1194
            self.send_warning(channel_id, message=str(e), close_connection=True)
×
1195
        chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
×
1196
        chan.set_state(ChannelState.OPENING)
×
1197
        if zeroconf:
×
1198
            chan.set_state(ChannelState.FUNDED)
×
1199
            self.send_channel_ready(chan)
×
1200
        self.lnworker.add_new_channel(chan)
×
1201
        return chan, funding_tx
×
1202

1203
    def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints, channel_type):
5✔
1204
        chan_dict = {
×
1205
            "node_id": self.pubkey.hex(),
1206
            "channel_id": channel_id.hex(),
1207
            "short_channel_id": None,
1208
            "funding_outpoint": outpoint,
1209
            "remote_config": remote_config,
1210
            "local_config": local_config,
1211
            "constraints": constraints,
1212
            "remote_update": None,
1213
            "state": ChannelState.PREOPENING.name,
1214
            'onion_keys': {},
1215
            'data_loss_protect_remote_pcp': {},
1216
            "log": {},
1217
            "unfulfilled_htlcs": {},
1218
            "revocation_store": {},
1219
            "channel_type": channel_type,
1220
        }
1221
        # set db to None, because we do not want to write updates until channel is saved
1222
        return StoredDict(chan_dict, None, [])
×
1223

1224
    @non_blocking_msg_handler
5✔
1225
    async def on_open_channel(self, payload):
5✔
1226
        """Implements the channel acceptance flow.
1227

1228
        <- open_channel message
1229
        -> accept_channel message
1230
        <- funding_created message
1231
        -> funding_signed message
1232

1233
        Channel configurations are initialized in this method.
1234
        """
1235

1236
        # <- open_channel
1237
        if payload['chain_hash'] != constants.net.rev_genesis_bytes():
×
1238
            raise Exception('wrong chain_hash')
×
1239

1240
        open_channel_tlvs = payload.get('open_channel_tlvs')
×
1241
        channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None
×
1242
        # The receiving node MAY fail the channel if:
1243
        # option_channel_type was negotiated but the message doesn't include a channel_type
1244
        if self.is_channel_type() and channel_type is None:
×
1245
            raise Exception("sender has advertised option_channel_type, but hasn't sent the channel type")
×
1246
        # MUST fail the channel if it supports channel_type,
1247
        # channel_type was set, and the type is not suitable.
1248
        elif self.is_channel_type() and channel_type is not None:
×
1249
            channel_type = ChannelType.from_bytes(channel_type['type'], byteorder='big').discard_unknown_and_check()
×
1250
            if not channel_type.complies_with_features(self.features):
×
1251
                raise Exception("sender has sent a channel type we don't support")
×
1252

1253
        if self.is_channel_type():
×
1254
            is_zeroconf = bool(channel_type & ChannelType.OPTION_ZEROCONF)
×
1255
            if is_zeroconf and not self.network.config.ZEROCONF_TRUSTED_NODE.startswith(self.pubkey.hex()):
×
1256
                raise Exception(f"not accepting zeroconf from node {self.pubkey}")
×
1257
        else:
1258
            is_zeroconf = False
×
1259

1260
        if self.lnworker.has_recoverable_channels() and not is_zeroconf:
×
1261
            # FIXME: we might want to keep the connection open
1262
            raise Exception('not accepting channels')
×
1263

1264
        if not self.lnworker.wallet.can_have_lightning():
×
1265
            # old wallet that cannot have lightning anymore
1266
            raise Exception('This wallet does not accept new channels')
×
1267

1268
        funding_sat = payload['funding_satoshis']
×
1269
        push_msat = payload['push_msat']
×
1270
        feerate = payload['feerate_per_kw']  # note: we are not validating this
×
1271
        temp_chan_id = payload['temporary_channel_id']
×
1272
        # store the temp id now, so that it is recognized for e.g. 'error' messages
1273
        # TODO: this is never cleaned up; the dict grows unbounded until disconnect
1274
        self.temp_id_to_id[temp_chan_id] = None
×
1275
        channel_opening_fee = open_channel_tlvs.get('channel_opening_fee') if open_channel_tlvs else None
×
1276
        if channel_opening_fee:
×
1277
            # todo check that the fee is reasonable
1278
            pass
×
1279

1280
        if self.use_anchors():
×
1281
            multisig_funding_keypair = lnutil.derive_multisig_funding_key_if_they_opened(
×
1282
                funding_root_secret=self.lnworker.funding_root_keypair.privkey,
1283
                remote_node_id_or_prefix=self.pubkey,
1284
                remote_funding_pubkey=payload['funding_pubkey'],
1285
            )
1286
        else:
1287
            multisig_funding_keypair = None
×
1288
        local_config = self.make_local_config(
×
1289
            funding_sat=funding_sat,
1290
            push_msat=push_msat,
1291
            initiator=REMOTE,
1292
            channel_type=channel_type,
1293
            multisig_funding_keypair=multisig_funding_keypair,
1294
        )
1295

1296
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
1297
            payload, 'open')
1298

1299
        remote_config = RemoteConfig(
×
1300
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
1301
            multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
1302
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
1303
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
1304
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
1305
            to_self_delay=payload['to_self_delay'],
1306
            dust_limit_sat=payload['dust_limit_satoshis'],
1307
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
1308
            max_accepted_htlcs=payload['max_accepted_htlcs'],
1309
            initial_msat=funding_sat * 1000 - push_msat,
1310
            reserve_sat=payload['channel_reserve_satoshis'],
1311
            htlc_minimum_msat=payload['htlc_minimum_msat'],
1312
            next_per_commitment_point=payload['first_per_commitment_point'],
1313
            current_per_commitment_point=None,
1314
            upfront_shutdown_script=upfront_shutdown_script,
1315
            announcement_node_sig=b'',
1316
            announcement_bitcoin_sig=b'',
1317
        )
1318
        ChannelConfig.cross_validate_params(
×
1319
            local_config=local_config,
1320
            remote_config=remote_config,
1321
            funding_sat=funding_sat,
1322
            is_local_initiator=False,
1323
            initial_feerate_per_kw=feerate,
1324
            config=self.network.config,
1325
            peer_features=self.features,
1326
            has_anchors=self.use_anchors(),
1327
        )
1328

1329
        channel_flags = ord(payload['channel_flags'])
×
1330

1331
        # -> accept channel
1332
        # for the first commitment transaction
1333
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
1334
            local_config.per_commitment_secret_seed,
1335
            RevocationStore.START_INDEX
1336
        )
1337
        per_commitment_point_first = secret_to_pubkey(
×
1338
            int.from_bytes(per_commitment_secret_first, 'big'))
1339

1340
        min_depth = 0 if is_zeroconf else 3
×
1341

1342
        accept_channel_tlvs = {
×
1343
            'upfront_shutdown_script': {
1344
                'shutdown_scriptpubkey': local_config.upfront_shutdown_script
1345
            },
1346
        }
1347
        # The sender: if it sets channel_type: MUST set it to the channel_type from open_channel
1348
        if self.is_channel_type():
×
1349
            accept_channel_tlvs['channel_type'] = {
×
1350
                'type': channel_type.to_bytes_minimal()
1351
            }
1352

1353
        self.send_message(
×
1354
            'accept_channel',
1355
            temporary_channel_id=temp_chan_id,
1356
            dust_limit_satoshis=local_config.dust_limit_sat,
1357
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
1358
            channel_reserve_satoshis=local_config.reserve_sat,
1359
            htlc_minimum_msat=local_config.htlc_minimum_msat,
1360
            minimum_depth=min_depth,
1361
            to_self_delay=local_config.to_self_delay,
1362
            max_accepted_htlcs=local_config.max_accepted_htlcs,
1363
            funding_pubkey=local_config.multisig_key.pubkey,
1364
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
1365
            payment_basepoint=local_config.payment_basepoint.pubkey,
1366
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
1367
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
1368
            first_per_commitment_point=per_commitment_point_first,
1369
            accept_channel_tlvs=accept_channel_tlvs,
1370
        )
1371

1372
        # <- funding created
1373
        funding_created = await self.wait_for_message('funding_created', temp_chan_id)
×
1374

1375
        # -> funding signed
1376
        funding_idx = funding_created['funding_output_index']
×
1377
        funding_txid = funding_created['funding_txid'][::-1].hex()
×
1378
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
×
1379
        constraints = ChannelConstraints(
×
1380
            flags=channel_flags,
1381
            capacity=funding_sat,
1382
            is_initiator=False,
1383
            funding_txn_minimum_depth=min_depth,
1384
        )
1385
        outpoint = Outpoint(funding_txid, funding_idx)
×
1386
        chan_dict = self.create_channel_storage(
×
1387
            channel_id, outpoint, local_config, remote_config, constraints, channel_type)
1388
        chan = Channel(
×
1389
            chan_dict,
1390
            lnworker=self.lnworker,
1391
            initial_feerate=feerate,
1392
            opening_fee = channel_opening_fee,
1393
        )
1394
        chan.storage['init_timestamp'] = int(time.time())
×
1395
        if isinstance(self.transport, LNTransport):
×
1396
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
1397
        remote_sig = funding_created['signature']
×
1398
        try:
×
1399
            chan.receive_new_commitment(remote_sig, [])
×
1400
        except LNProtocolWarning as e:
×
1401
            self.send_warning(channel_id, message=str(e), close_connection=True)
×
1402
        sig_64, _ = chan.sign_next_commitment()
×
1403
        self.send_message('funding_signed',
×
1404
            channel_id=channel_id,
1405
            signature=sig_64,
1406
        )
1407
        self.temp_id_to_id[temp_chan_id] = channel_id
×
1408
        self.funding_signed_sent.add(chan.channel_id)
×
1409
        chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
×
1410
        chan.set_state(ChannelState.OPENING)
×
1411
        if is_zeroconf:
×
1412
            chan.set_state(ChannelState.FUNDED)
×
1413
            self.send_channel_ready(chan)
×
1414
        self.lnworker.add_new_channel(chan)
×
1415

1416
    async def request_force_close(self, channel_id: bytes):
5✔
1417
        """Try to trigger the remote peer to force-close."""
1418
        await self.initialized
5✔
1419
        self.logger.info(f"trying to get remote peer to force-close chan {channel_id.hex()}")
5✔
1420
        # First, we intentionally send a "channel_reestablish" msg with an old state.
1421
        # Many nodes (but not all) automatically force-close when seeing this.
1422
        latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
5✔
1423
        self.send_message(
5✔
1424
            "channel_reestablish",
1425
            channel_id=channel_id,
1426
            next_commitment_number=0,
1427
            next_revocation_number=0,
1428
            your_last_per_commitment_secret=0,
1429
            my_current_per_commitment_point=latest_point)
1430
        # Newish nodes that have lightning/bolts/pull/950 force-close upon receiving an "error" msg,
1431
        # so send that too. E.g. old "channel_reestablish" is not enough for eclair 0.7+,
1432
        # but "error" is. see https://github.com/ACINQ/eclair/pull/2036
1433
        # The receiving node:
1434
        #   - upon receiving `error`:
1435
        #     - MUST fail the channel referred to by `channel_id`, if that channel is with the sending node.
1436
        self.send_message("error", channel_id=channel_id, data=b"", len=0)
5✔
1437

1438
    def schedule_force_closing(self, channel_id: bytes):
5✔
1439
        """ wrapper of lnworker's method, that raises if channel is not with this peer """
1440
        channels_with_peer = list(self.channels.keys())
5✔
1441
        channels_with_peer.extend(self.temp_id_to_id.values())
5✔
1442
        if channel_id not in channels_with_peer:
5✔
1443
            raise ValueError(f"channel {channel_id.hex()} does not belong to this peer")
×
1444
        chan = self.channels.get(channel_id)
5✔
1445
        if not chan:
5✔
1446
            self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet")
×
1447
        if ChanCloseOption.LOCAL_FCLOSE in chan.get_close_options():
5✔
1448
            self.lnworker.schedule_force_closing(channel_id)
5✔
1449
        else:
1450
            self.logger.info(f"tried to force-close channel {chan.get_id_for_log()} "
5✔
1451
                             f"but close option is not allowed. {chan.get_state()=!r}")
1452

1453
    async def on_channel_reestablish(self, chan: Channel, msg):
5✔
1454
        # Note: it is critical for this message handler to block processing of further messages,
1455
        #       until this msg is processed. If we are behind (lost state), and send chan_reest to the remote,
1456
        #       when the remote realizes we are behind, they might send an "error" message - but the spec mandates
1457
        #       they send chan_reest first. If we processed the error first, we might force-close and lose money!
1458
        their_next_local_ctn = msg["next_commitment_number"]
5✔
1459
        their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
5✔
1460
        their_local_pcp = msg.get("my_current_per_commitment_point")
5✔
1461
        their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
5✔
1462
        self.logger.info(
5✔
1463
            f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
1464
            f'(their_next_local_ctn={their_next_local_ctn}, '
1465
            f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
1466
        if chan.get_state() >= ChannelState.CLOSED:
5✔
1467
            self.logger.warning(
×
1468
                f"on_channel_reestablish. dropping message. illegal action. "
1469
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1470
            return
×
1471
        # sanity checks of received values
1472
        if their_next_local_ctn < 0:
5✔
1473
            raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
×
1474
        if their_oldest_unrevoked_remote_ctn < 0:
5✔
1475
            raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
×
1476
        # ctns
1477
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
5✔
1478
        latest_local_ctn = chan.get_latest_ctn(LOCAL)
5✔
1479
        next_local_ctn = chan.get_next_ctn(LOCAL)
5✔
1480
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
5✔
1481
        latest_remote_ctn = chan.get_latest_ctn(REMOTE)
5✔
1482
        next_remote_ctn = chan.get_next_ctn(REMOTE)
5✔
1483
        # compare remote ctns
1484
        we_are_ahead = False
5✔
1485
        they_are_ahead = False
5✔
1486
        we_must_resend_revoke_and_ack = False
5✔
1487
        if next_remote_ctn != their_next_local_ctn:
5✔
1488
            if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
5✔
1489
                # We will replay the local updates (see reestablish_channel), which should contain a commitment_signed
1490
                # (due to is_revack_pending being true), and this should remedy this situation.
1491
                pass
5✔
1492
            else:
1493
                self.logger.warning(
5✔
1494
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1495
                    f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
1496
                if their_next_local_ctn < next_remote_ctn:
5✔
1497
                    we_are_ahead = True
5✔
1498
                else:
1499
                    they_are_ahead = True
5✔
1500
        # compare local ctns
1501
        if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
5✔
1502
            if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
5✔
1503
                # A node:
1504
                #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
1505
                #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
1506
                #        MUST re-send the revoke_and_ack.
1507
                we_must_resend_revoke_and_ack = True
5✔
1508
            else:
1509
                self.logger.warning(
5✔
1510
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1511
                    f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
1512
                if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
5✔
1513
                    we_are_ahead = True
5✔
1514
                else:
1515
                    they_are_ahead = True
5✔
1516
        # option_data_loss_protect
1517
        assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
5✔
1518
        def are_datalossprotect_fields_valid() -> bool:
5✔
1519
            if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
5✔
1520
                return False
×
1521
            if their_oldest_unrevoked_remote_ctn > 0:
5✔
1522
                our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
5✔
1523
            else:
1524
                assert their_oldest_unrevoked_remote_ctn == 0
5✔
1525
                our_pcs = bytes(32)
5✔
1526
            if our_pcs != their_claim_of_our_last_per_commitment_secret:
5✔
1527
                self.logger.error(
×
1528
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1529
                    f"(DLP) local PCS mismatch: {our_pcs.hex()} != {their_claim_of_our_last_per_commitment_secret.hex()}")
1530
                return False
×
1531
            assert chan.is_static_remotekey_enabled()
5✔
1532
            return True
5✔
1533
        if not are_datalossprotect_fields_valid():
5✔
1534
            raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
×
1535
        fut = self.channel_reestablish_msg[chan.channel_id]
5✔
1536
        if they_are_ahead:
5✔
1537
            self.logger.warning(
5✔
1538
                f"channel_reestablish ({chan.get_id_for_log()}): "
1539
                f"remote is ahead of us! They should force-close. Remote PCP: {their_local_pcp.hex()}")
1540
            # data_loss_protect_remote_pcp is used in lnsweep
1541
            chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
5✔
1542
            chan.set_state(ChannelState.WE_ARE_TOXIC)
5✔
1543
            self.lnworker.save_channel(chan)
5✔
1544
            chan.peer_state = PeerState.BAD
5✔
1545
            # raise after we send channel_reestablish, so the remote can realize they are ahead
1546
            # FIXME what if we have multiple chans with peer? timing...
1547
            fut.set_exception(GracefulDisconnect("remote ahead of us"))
5✔
1548
        elif we_are_ahead:
5✔
1549
            self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
5✔
1550
            self.schedule_force_closing(chan.channel_id)
5✔
1551
            # FIXME what if we have multiple chans with peer? timing...
1552
            fut.set_exception(GracefulDisconnect("we are ahead of remote"))
5✔
1553
        else:
1554
            # all good
1555
            fut.set_result((we_must_resend_revoke_and_ack, their_next_local_ctn))
5✔
1556
            # Block processing of further incoming messages until we finished our part of chan-reest.
1557
            # This is needed for the replaying of our local unacked updates to be sane (if the peer
1558
            # also replays some messages we must not react to them until we finished replaying our own).
1559
            # (it would be sufficient to only block messages related to this channel, but this is easier)
1560
            await self._chan_reest_finished[chan.channel_id].wait()
5✔
1561
            # Note: if the above event is never set, we won't detect if the connection was closed by remote...
1562

1563
    def _send_channel_reestablish(self, chan: Channel):
5✔
1564
        assert self.is_initialized()
5✔
1565
        chan_id = chan.channel_id
5✔
1566
        # ctns
1567
        next_local_ctn = chan.get_next_ctn(LOCAL)
5✔
1568
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
5✔
1569
        # send message
1570
        assert chan.is_static_remotekey_enabled()
5✔
1571
        latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
5✔
1572
        if oldest_unrevoked_remote_ctn == 0:
5✔
1573
            last_rev_secret = 0
5✔
1574
        else:
1575
            last_rev_index = oldest_unrevoked_remote_ctn - 1
5✔
1576
            last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
5✔
1577
        self.send_message(
5✔
1578
            "channel_reestablish",
1579
            channel_id=chan_id,
1580
            next_commitment_number=next_local_ctn,
1581
            next_revocation_number=oldest_unrevoked_remote_ctn,
1582
            your_last_per_commitment_secret=last_rev_secret,
1583
            my_current_per_commitment_point=latest_point)
1584
        self.logger.info(
5✔
1585
            f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
1586
            f'(next_local_ctn={next_local_ctn}, '
1587
            f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
1588

1589
    async def reestablish_channel(self, chan: Channel):
5✔
1590
        await self.initialized
5✔
1591
        chan_id = chan.channel_id
5✔
1592
        if chan.should_request_force_close:
5✔
1593
            if chan.get_state() != ChannelState.WE_ARE_TOXIC:
×
1594
                chan.set_state(ChannelState.REQUESTED_FCLOSE)
×
1595
            await self.request_force_close(chan_id)
×
1596
            chan.should_request_force_close = False
×
1597
            return
×
1598
        if chan.get_state() == ChannelState.WE_ARE_TOXIC:
5✔
1599
            # Depending on timing, the remote might not know we are behind.
1600
            # We should let them know, so that they force-close.
1601
            # We do "request force-close" with ctn=0, instead of leaking our actual ctns,
1602
            # to decrease the remote's confidence of actual data loss on our part.
1603
            await self.request_force_close(chan_id)
5✔
1604
            return
5✔
1605
        if chan.get_state() == ChannelState.FORCE_CLOSING:
5✔
1606
            # We likely got here because we found out that we are ahead (i.e. remote lost state).
1607
            # Depending on timing, the remote might not know they are behind.
1608
            # We should let them know:
1609
            self._send_channel_reestablish(chan)
5✔
1610
            return
5✔
1611
        if self.network.blockchain().is_tip_stale() \
5✔
1612
                or not self.lnworker.wallet.is_up_to_date() \
1613
                or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) \
1614
            is None:
1615
            # don't try to reestablish until we can do fee estimation and are up-to-date
1616
            return
×
1617
        # if we get here, we will try to do a proper reestablish
1618
        if not (ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING):
5✔
1619
            raise Exception(f"unexpected {chan.get_state()=} for reestablish")
×
1620
        if chan.peer_state != PeerState.DISCONNECTED:
5✔
1621
            self.logger.info(
×
1622
                f'reestablish_channel was called but channel {chan.get_id_for_log()} '
1623
                f'already in peer_state {chan.peer_state!r}')
1624
            return
×
1625
        chan.peer_state = PeerState.REESTABLISHING
5✔
1626
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1627
        # ctns
1628
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
5✔
1629
        next_local_ctn = chan.get_next_ctn(LOCAL)
5✔
1630
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
5✔
1631
        # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
1632
        chan.hm.discard_unsigned_remote_updates()
5✔
1633
        # send message
1634
        self._send_channel_reestablish(chan)
5✔
1635
        # wait until we receive their channel_reestablish
1636
        fut = self.channel_reestablish_msg[chan_id]
5✔
1637
        await fut
5✔
1638
        we_must_resend_revoke_and_ack, their_next_local_ctn = fut.result()
5✔
1639

1640
        def replay_updates_and_commitsig():
5✔
1641
            # Replay un-acked local updates (including commitment_signed) byte-for-byte.
1642
            # If we have sent them a commitment signature that they "lost" (due to disconnect),
1643
            # we need to make sure we replay the same local updates, as otherwise they could
1644
            # end up with two (or more) signed valid commitment transactions at the same ctn.
1645
            # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
1646
            # e.g. for watchtowers, hence we must ensure these ctxs coincide.
1647
            # We replay the local updates even if they were not yet committed.
1648
            unacked = chan.hm.get_unacked_local_updates()
5✔
1649
            replayed_msgs = []
5✔
1650
            for ctn, messages in unacked.items():
5✔
1651
                if ctn < their_next_local_ctn:
5✔
1652
                    # They claim to have received these messages and the corresponding
1653
                    # commitment_signed, hence we must not replay them.
1654
                    continue
5✔
1655
                for raw_upd_msg in messages:
5✔
1656
                    self.transport.send_bytes(raw_upd_msg)
5✔
1657
                    replayed_msgs.append(raw_upd_msg)
5✔
1658
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {len(replayed_msgs)} unacked messages. '
5✔
1659
                             f'{[decode_msg(raw_upd_msg)[0] for raw_upd_msg in replayed_msgs]}')
1660

1661
        def resend_revoke_and_ack():
5✔
1662
            last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
5✔
1663
            next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
5✔
1664
            self.send_message(
5✔
1665
                "revoke_and_ack",
1666
                channel_id=chan.channel_id,
1667
                per_commitment_secret=last_secret,
1668
                next_per_commitment_point=next_point)
1669

1670
        # We need to preserve relative order of last revack and commitsig.
1671
        # note: it is not possible to recover and reestablish a channel if we are out-of-sync by
1672
        # more than one ctns, i.e. we will only ever retransmit up to one commitment_signed message.
1673
        # Hence, if we need to retransmit a revack, without loss of generality, we can either replay
1674
        # it as the first message or as the last message.
1675
        was_revoke_last = chan.hm.was_revoke_last()
5✔
1676
        if we_must_resend_revoke_and_ack and not was_revoke_last:
5✔
1677
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack first.')
5✔
1678
            resend_revoke_and_ack()
5✔
1679
        replay_updates_and_commitsig()
5✔
1680
        if we_must_resend_revoke_and_ack and was_revoke_last:
5✔
1681
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack last.')
5✔
1682
            resend_revoke_and_ack()
5✔
1683

1684
        chan.peer_state = PeerState.GOOD
5✔
1685
        self._chan_reest_finished[chan.channel_id].set()
5✔
1686
        if chan.is_funded():
5✔
1687
            chan_just_became_ready = (their_next_local_ctn == next_local_ctn == 1)
5✔
1688
            if chan_just_became_ready or self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
5✔
1689
                self.send_channel_ready(chan)
5✔
1690

1691
        self.maybe_send_announcement_signatures(chan)
5✔
1692
        self.maybe_update_fee(chan)  # if needed, update fee ASAP, to avoid force-closures from this
5✔
1693
        # checks done
1694
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1695
        # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
1696
        if chan.get_state() == ChannelState.SHUTDOWN:
5✔
1697
            await self.taskgroup.spawn(self.send_shutdown(chan))
×
1698

1699
    def send_channel_ready(self, chan: Channel):
5✔
1700
        assert chan.is_funded()
5✔
1701
        if chan.sent_channel_ready:
5✔
1702
            return
×
1703
        channel_id = chan.channel_id
5✔
1704
        per_commitment_secret_index = RevocationStore.START_INDEX - 1
5✔
1705
        second_per_commitment_point = secret_to_pubkey(int.from_bytes(
5✔
1706
            get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
1707
        channel_ready_tlvs = {}
5✔
1708
        if self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
5✔
1709
            # LND requires that we send an alias if the option has been negotiated in INIT.
1710
            # otherwise, the channel will not be marked as active.
1711
            # This does not apply if the channel was previously marked active without an alias.
1712
            channel_ready_tlvs['short_channel_id'] = {'alias': chan.get_local_scid_alias(create_new_if_needed=True)}
5✔
1713
        # note: if 'channel_ready' was not yet received, we might send it multiple times
1714
        self.send_message(
5✔
1715
            "channel_ready",
1716
            channel_id=channel_id,
1717
            second_per_commitment_point=second_per_commitment_point,
1718
            channel_ready_tlvs=channel_ready_tlvs)
1719
        chan.sent_channel_ready = True
5✔
1720
        self.maybe_mark_open(chan)
5✔
1721

1722
    def on_channel_ready(self, chan: Channel, payload):
5✔
1723
        self.logger.info(f"on_channel_ready. channel: {chan.channel_id.hex()}")
5✔
1724
        if chan.peer_state != PeerState.GOOD:  # should never happen
5✔
1725
            raise Exception(f"received channel_ready in unexpected {chan.peer_state=!r}")
×
1726
        if chan.is_closed():
5✔
1727
            self.logger.warning(
×
1728
                f"on_channel_ready. dropping message. illegal action. "
1729
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1730
            return
×
1731
        # save remote alias for use in invoices
1732
        scid_alias = payload.get('channel_ready_tlvs', {}).get('short_channel_id', {}).get('alias')
5✔
1733
        if scid_alias:
5✔
1734
            chan.save_remote_scid_alias(scid_alias)
5✔
1735
        if not chan.config[LOCAL].funding_locked_received:
5✔
1736
            their_next_point = payload["second_per_commitment_point"]
×
1737
            chan.config[REMOTE].next_per_commitment_point = their_next_point
×
1738
            chan.config[LOCAL].funding_locked_received = True
×
1739
            self.lnworker.save_channel(chan)
×
1740
        self.maybe_mark_open(chan)
5✔
1741

1742
    def send_node_announcement(self, alias:str, color_hex:str):
5✔
1743
        from .channel_db import NodeInfo
×
1744
        timestamp = int(time.time())
×
1745
        node_id = privkey_to_pubkey(self.privkey)
×
1746
        features = self.features.for_node_announcement()
×
1747
        flen = features.min_len()
×
1748
        rgb_color = bytes.fromhex(color_hex)
×
1749
        alias = bytes(alias, 'utf8')
×
1750
        alias += bytes(32 - len(alias))
×
1751
        if self.lnworker.config.LIGHTNING_LISTEN is not None:
×
1752
            addr = self.lnworker.config.LIGHTNING_LISTEN
×
1753
            try:
×
1754
                hostname, port = addr.split(':')
×
1755
                if port is None:  # use default port if not specified
×
1756
                    port = 9735
×
1757
                addresses = NodeInfo.to_addresses_field(hostname, int(port))
×
1758
            except Exception:
×
1759
                self.logger.exception(f"Invalid lightning_listen address: {addr}")
×
1760
                return
×
1761
        else:
1762
            addresses = b''
×
1763
        raw_msg = encode_msg(
×
1764
            "node_announcement",
1765
            flen=flen,
1766
            features=features,
1767
            timestamp=timestamp,
1768
            rgb_color=rgb_color,
1769
            node_id=node_id,
1770
            alias=alias,
1771
            addrlen=len(addresses),
1772
            addresses=addresses)
1773
        h = sha256d(raw_msg[64+2:])
×
1774
        signature = ecc.ECPrivkey(self.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1775
        message_type, payload = decode_msg(raw_msg)
×
1776
        payload['signature'] = signature
×
1777
        raw_msg = encode_msg(message_type, **payload)
×
1778
        self.transport.send_bytes(raw_msg)
×
1779

1780
    def maybe_send_channel_announcement(self, chan: Channel):
5✔
1781
        node_sigs = [chan.config[REMOTE].announcement_node_sig, chan.config[LOCAL].announcement_node_sig]
×
1782
        bitcoin_sigs = [chan.config[REMOTE].announcement_bitcoin_sig, chan.config[LOCAL].announcement_bitcoin_sig]
×
1783
        if not bitcoin_sigs[0] or not bitcoin_sigs[1]:
×
1784
            return
×
1785
        raw_msg, is_reverse = chan.construct_channel_announcement_without_sigs()
×
1786
        if is_reverse:
×
1787
            node_sigs.reverse()
×
1788
            bitcoin_sigs.reverse()
×
1789
        message_type, payload = decode_msg(raw_msg)
×
1790
        payload['node_signature_1'] = node_sigs[0]
×
1791
        payload['node_signature_2'] = node_sigs[1]
×
1792
        payload['bitcoin_signature_1'] = bitcoin_sigs[0]
×
1793
        payload['bitcoin_signature_2'] = bitcoin_sigs[1]
×
1794
        raw_msg = encode_msg(message_type, **payload)
×
1795
        self.transport.send_bytes(raw_msg)
×
1796

1797
    def maybe_mark_open(self, chan: Channel):
5✔
1798
        if not chan.sent_channel_ready:
5✔
1799
            return
×
1800
        if not chan.config[LOCAL].funding_locked_received:
5✔
1801
            return
×
1802
        self.mark_open(chan)
5✔
1803

1804
    def mark_open(self, chan: Channel):
5✔
1805
        assert chan.is_funded()
5✔
1806
        # only allow state transition from "FUNDED" to "OPEN"
1807
        old_state = chan.get_state()
5✔
1808
        if old_state == ChannelState.OPEN:
5✔
1809
            return
5✔
1810
        if old_state != ChannelState.FUNDED:
5✔
1811
            self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
×
1812
            return
×
1813
        assert chan.config[LOCAL].funding_locked_received
5✔
1814
        chan.set_state(ChannelState.OPEN)
5✔
1815
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1816
        # peer may have sent us a channel update for the incoming direction previously
1817
        pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
5✔
1818
        if pending_channel_update:
5✔
1819
            chan.set_remote_update(pending_channel_update)
×
1820
        self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
5✔
1821
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
1822
        if forwarding_enabled and chan.short_channel_id:
5✔
1823
            # send channel_update of outgoing edge to peer,
1824
            # so that channel can be used to receive payments
1825
            self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
5✔
1826
            chan_upd = chan.get_outgoing_gossip_channel_update()
5✔
1827
            self.transport.send_bytes(chan_upd)
5✔
1828

1829
    def maybe_send_announcement_signatures(self, chan: Channel, is_reply=False):
5✔
1830
        if not chan.is_public():
5✔
1831
            return
5✔
1832
        if chan.sent_announcement_signatures:
×
1833
            return
×
1834
        if not is_reply and chan.config[REMOTE].announcement_node_sig:
×
1835
            return
×
1836
        h = chan.get_channel_announcement_hash()
×
1837
        bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1838
        node_signature = ecc.ECPrivkey(self.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1839
        self.send_message(
×
1840
            "announcement_signatures",
1841
            channel_id=chan.channel_id,
1842
            short_channel_id=chan.short_channel_id,
1843
            node_signature=node_signature,
1844
            bitcoin_signature=bitcoin_signature
1845
        )
1846
        chan.config[LOCAL].announcement_node_sig = node_signature
×
1847
        chan.config[LOCAL].announcement_bitcoin_sig = bitcoin_signature
×
1848
        self.lnworker.save_channel(chan)
×
1849
        chan.sent_announcement_signatures = True
×
1850

1851
    def on_update_fail_htlc(self, chan: Channel, payload):
5✔
1852
        htlc_id = payload["id"]
5✔
1853
        reason = payload["reason"]
5✔
1854
        self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
1855
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
1856
            self.logger.warning(
×
1857
                f"on_update_fail_htlc. dropping message. illegal action. "
1858
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1859
            return
×
1860
        chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
5✔
1861
        self.maybe_send_commitment(chan)
5✔
1862

1863
    def maybe_send_commitment(self, chan: Channel) -> bool:
5✔
1864
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
5✔
1865
        if not chan.can_update_ctx(proposer=LOCAL):
5✔
1866
            return False
×
1867
        # REMOTE should revoke first before we can sign a new ctx
1868
        if chan.hm.is_revack_pending(REMOTE):
5✔
1869
            return False
5✔
1870
        # if there are no changes, we will not (and must not) send a new commitment
1871
        if not chan.has_pending_changes(REMOTE):
5✔
1872
            return False
5✔
1873
        self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
5✔
1874
        sig_64, htlc_sigs = chan.sign_next_commitment()
5✔
1875
        self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
5✔
1876
        return True
5✔
1877

1878
    def create_onion_for_route(
5✔
1879
            self, *,
1880
            route: 'LNPaymentRoute',
1881
            amount_msat: int,
1882
            total_msat: int,
1883
            payment_hash: bytes,
1884
            min_final_cltv_delta: int,
1885
            payment_secret: bytes,
1886
            trampoline_onion: Optional[OnionPacket] = None,
1887
    ):
1888
        # add features learned during "init" for direct neighbour:
1889
        route[0].node_features |= self.features
5✔
1890
        local_height = self.network.get_local_height()
5✔
1891
        final_cltv_abs = local_height + min_final_cltv_delta
5✔
1892
        hops_data, amount_msat, cltv_abs = calc_hops_data_for_payment(
5✔
1893
            route,
1894
            amount_msat,
1895
            final_cltv_abs=final_cltv_abs,
1896
            total_msat=total_msat,
1897
            payment_secret=payment_secret)
1898
        num_hops = len(hops_data)
5✔
1899
        self.logger.info(f"lnpeer.pay len(route)={len(route)}")
5✔
1900
        for i in range(len(route)):
5✔
1901
            self.logger.info(f"  {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
5✔
1902
        assert final_cltv_abs <= cltv_abs, (final_cltv_abs, cltv_abs)
5✔
1903
        session_key = os.urandom(32) # session_key
5✔
1904
        # if we are forwarding a trampoline payment, add trampoline onion
1905
        if trampoline_onion:
5✔
1906
            self.logger.info(f'adding trampoline onion to final payload')
5✔
1907
            trampoline_payload = hops_data[-1].payload
5✔
1908
            trampoline_payload["trampoline_onion_packet"] = {
5✔
1909
                "version": trampoline_onion.version,
1910
                "public_key": trampoline_onion.public_key,
1911
                "hops_data": trampoline_onion.hops_data,
1912
                "hmac": trampoline_onion.hmac
1913
            }
1914
            if t_hops_data := trampoline_onion._debug_hops_data:  # None if trampoline-forwarding
5✔
1915
                t_route = trampoline_onion._debug_route
5✔
1916
                assert t_route is not None
5✔
1917
                self.logger.info(f"lnpeer.pay len(t_route)={len(t_route)}")
5✔
1918
                for i in range(len(t_route)):
5✔
1919
                    self.logger.info(f"  {i}: t_node={t_route[i].end_node.hex()} hop_data={t_hops_data[i]!r}")
5✔
1920
        # create onion packet
1921
        payment_path_pubkeys = [x.node_id for x in route]
5✔
1922
        onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
5✔
1923
        self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
5✔
1924
        # create htlc
1925
        if cltv_abs > local_height + lnutil.NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE:
5✔
1926
            raise PaymentFailure(f"htlc expiry too far into future. (in {cltv_abs-local_height} blocks)")
×
1927
        return onion, amount_msat, cltv_abs, session_key
5✔
1928

1929
    def send_htlc(
5✔
1930
        self,
1931
        *,
1932
        chan: Channel,
1933
        payment_hash: bytes,
1934
        amount_msat: int,
1935
        cltv_abs: int,
1936
        onion: OnionPacket,
1937
        session_key: Optional[bytes] = None,
1938
    ) -> UpdateAddHtlc:
1939
        assert chan.can_send_update_add_htlc(), f"cannot send updates: {chan.short_channel_id}"
5✔
1940
        htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_abs=cltv_abs, timestamp=int(time.time()))
5✔
1941
        htlc = chan.add_htlc(htlc)
5✔
1942
        if session_key:
5✔
1943
            chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
5✔
1944
        self.logger.info(f"starting payment. htlc: {htlc}")
5✔
1945
        self.send_message(
5✔
1946
            "update_add_htlc",
1947
            channel_id=chan.channel_id,
1948
            id=htlc.htlc_id,
1949
            cltv_expiry=htlc.cltv_abs,
1950
            amount_msat=htlc.amount_msat,
1951
            payment_hash=htlc.payment_hash,
1952
            onion_routing_packet=onion.to_bytes())
1953
        self.maybe_send_commitment(chan)
5✔
1954
        return htlc
5✔
1955

1956
    def pay(self, *,
5✔
1957
            route: 'LNPaymentRoute',
1958
            chan: Channel,
1959
            amount_msat: int,
1960
            total_msat: int,
1961
            payment_hash: bytes,
1962
            min_final_cltv_delta: int,
1963
            payment_secret: bytes,
1964
            trampoline_onion: Optional[OnionPacket] = None,
1965
        ) -> UpdateAddHtlc:
1966

1967
        assert amount_msat > 0, "amount_msat is not greater zero"
5✔
1968
        assert len(route) > 0
5✔
1969
        if not chan.can_send_update_add_htlc():
5✔
1970
            raise PaymentFailure("Channel cannot send update_add_htlc")
5✔
1971
        onion, amount_msat, cltv_abs, session_key = self.create_onion_for_route(
5✔
1972
            route=route,
1973
            amount_msat=amount_msat,
1974
            total_msat=total_msat,
1975
            payment_hash=payment_hash,
1976
            min_final_cltv_delta=min_final_cltv_delta,
1977
            payment_secret=payment_secret,
1978
            trampoline_onion=trampoline_onion
1979
        )
1980
        htlc = self.send_htlc(
5✔
1981
            chan=chan,
1982
            payment_hash=payment_hash,
1983
            amount_msat=amount_msat,
1984
            cltv_abs=cltv_abs,
1985
            onion=onion,
1986
            session_key=session_key,
1987
        )
1988
        return htlc
5✔
1989

1990
    def send_revoke_and_ack(self, chan: Channel) -> None:
5✔
1991
        if not chan.can_update_ctx(proposer=LOCAL):
5✔
1992
            return
×
1993
        self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
5✔
1994
        rev = chan.revoke_current_commitment()
5✔
1995
        self.lnworker.save_channel(chan)
5✔
1996
        self.send_message("revoke_and_ack",
5✔
1997
            channel_id=chan.channel_id,
1998
            per_commitment_secret=rev.per_commitment_secret,
1999
            next_per_commitment_point=rev.next_per_commitment_point)
2000
        self.maybe_send_commitment(chan)
5✔
2001

2002
    def on_commitment_signed(self, chan: Channel, payload) -> None:
5✔
2003
        self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
5✔
2004
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
2005
            self.logger.warning(
×
2006
                f"on_commitment_signed. dropping message. illegal action. "
2007
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2008
            return
×
2009
        # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
2010
        if not chan.has_pending_changes(LOCAL):
5✔
2011
            # TODO if feerate changed A->B->A; so there were updates but the value is identical,
2012
            #      then it might be legal to send a commitment_signature
2013
            #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
2014
            raise RemoteMisbehaving('received commitment_signed without pending changes')
×
2015
        # REMOTE should wait until we have revoked
2016
        if chan.hm.is_revack_pending(LOCAL):
5✔
2017
            raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
×
2018
        data = payload["htlc_signature"]
5✔
2019
        htlc_sigs = list(chunks(data, 64))
5✔
2020
        chan.receive_new_commitment(payload["signature"], htlc_sigs)
5✔
2021
        self.send_revoke_and_ack(chan)
5✔
2022
        self.received_commitsig_event.set()
5✔
2023
        self.received_commitsig_event.clear()
5✔
2024

2025
    def on_update_fulfill_htlc(self, chan: Channel, payload):
5✔
2026
        preimage = payload["payment_preimage"]
5✔
2027
        payment_hash = sha256(preimage)
5✔
2028
        htlc_id = payload["id"]
5✔
2029
        self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
2030
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
2031
            self.logger.warning(
×
2032
                f"on_update_fulfill_htlc. dropping message. illegal action. "
2033
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2034
            return
×
2035
        chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
5✔
2036
        self.lnworker.save_preimage(payment_hash, preimage)
5✔
2037
        self.maybe_send_commitment(chan)
5✔
2038

2039
    def on_update_fail_malformed_htlc(self, chan: Channel, payload):
5✔
2040
        htlc_id = payload["id"]
×
2041
        failure_code = payload["failure_code"]
×
2042
        self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
×
2043
                         f"htlc_id {htlc_id}. failure_code={failure_code}")
2044
        if not chan.can_update_ctx(proposer=REMOTE):
×
2045
            self.logger.warning(
×
2046
                f"on_update_fail_malformed_htlc. dropping message. illegal action. "
2047
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2048
            return
×
2049
        if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
×
2050
            self.schedule_force_closing(chan.channel_id)
×
2051
            raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
×
2052
        reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
×
2053
        chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
×
2054
        self.maybe_send_commitment(chan)
×
2055

2056
    def on_update_add_htlc(self, chan: Channel, payload):
5✔
2057
        payment_hash = payload["payment_hash"]
5✔
2058
        htlc_id = payload["id"]
5✔
2059
        cltv_abs = payload["cltv_expiry"]
5✔
2060
        amount_msat_htlc = payload["amount_msat"]
5✔
2061
        onion_packet = payload["onion_routing_packet"]
5✔
2062
        htlc = UpdateAddHtlc(
5✔
2063
            amount_msat=amount_msat_htlc,
2064
            payment_hash=payment_hash,
2065
            cltv_abs=cltv_abs,
2066
            timestamp=int(time.time()),
2067
            htlc_id=htlc_id)
2068
        self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
5✔
2069
        if chan.get_state() != ChannelState.OPEN:
5✔
2070
            raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
×
2071
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
2072
            self.logger.warning(
×
2073
                f"on_update_add_htlc. dropping message. illegal action. "
2074
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2075
            return
×
2076
        if cltv_abs > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
5✔
2077
            self.schedule_force_closing(chan.channel_id)
×
2078
            raise RemoteMisbehaving(f"received update_add_htlc with {cltv_abs=} > BLOCKHEIGHT_MAX")
×
2079
        # add htlc
2080
        chan.receive_htlc(htlc, onion_packet)
5✔
2081
        util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
5✔
2082

2083

2084
    async def maybe_forward_htlc(
5✔
2085
            self, *,
2086
            incoming_chan: Channel,
2087
            htlc: UpdateAddHtlc,
2088
            processed_onion: ProcessedOnionPacket,
2089
    ) -> str:
2090

2091
        # Forward HTLC
2092
        # FIXME: there are critical safety checks MISSING here
2093
        #        - for example; atm we forward first and then persist "forwarding_info",
2094
        #          so if we segfault in-between and restart, we might forward an HTLC twice...
2095
        #          (same for trampoline forwarding)
2096
        #        - we could check for the exposure to dust HTLCs, see:
2097
        #          https://github.com/ACINQ/eclair/pull/1985
2098

2099
        def log_fail_reason(reason: str):
5✔
2100
            self.logger.debug(
5✔
2101
                f"maybe_forward_htlc. will FAIL HTLC: inc_chan={incoming_chan.get_id_for_log()}. "
2102
                f"{reason}. inc_htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
2103

2104
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
2105
        if not forwarding_enabled:
5✔
2106
            log_fail_reason("forwarding is disabled")
×
2107
            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
×
2108
        chain = self.network.blockchain()
5✔
2109
        if chain.is_tip_stale():
5✔
2110
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
2111
        try:
5✔
2112
            _next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]  # type: bytes
5✔
2113
            next_chan_scid = ShortChannelID(_next_chan_scid)
5✔
2114
        except Exception:
×
2115
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2116
        try:
5✔
2117
            next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
5✔
2118
        except Exception:
×
2119
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2120
        try:
5✔
2121
            next_cltv_abs = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
2122
        except Exception:
×
2123
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2124

2125
        next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
5✔
2126

2127
        if self.lnworker.features.supports(LnFeatures.OPTION_ZEROCONF_OPT):
5✔
2128
            next_peer = self.lnworker.get_peer_by_static_jit_scid_alias(next_chan_scid)
×
2129
        else:
2130
            next_peer = None
5✔
2131

2132
        if not next_chan and next_peer and next_peer.accepts_zeroconf():
5✔
2133
            # check if an already existing channel can be used.
2134
            # todo: split the payment
2135
            for next_chan in next_peer.channels.values():
×
2136
                if next_chan.can_pay(next_amount_msat_htlc):
×
2137
                    break
×
2138
            else:
2139
                return await self.lnworker.open_channel_just_in_time(
×
2140
                    next_peer=next_peer,
2141
                    next_amount_msat_htlc=next_amount_msat_htlc,
2142
                    next_cltv_abs=next_cltv_abs,
2143
                    payment_hash=htlc.payment_hash,
2144
                    next_onion=processed_onion.next_packet)
2145

2146
        local_height = chain.height()
5✔
2147
        if next_chan is None:
5✔
2148
            log_fail_reason(f"cannot find next_chan {next_chan_scid}")
×
2149
            raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
×
2150
        outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update(scid=next_chan_scid)[2:]
5✔
2151
        outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
5✔
2152
        outgoing_chan_upd_message = outgoing_chan_upd_len + outgoing_chan_upd
5✔
2153
        if not next_chan.can_send_update_add_htlc():
5✔
2154
            log_fail_reason(
×
2155
                f"next_chan {next_chan.get_id_for_log()} cannot send ctx updates. "
2156
                f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
2157
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
2158
        if not next_chan.can_pay(next_amount_msat_htlc):
5✔
2159
            log_fail_reason(f"transient error (likely due to insufficient funds): not next_chan.can_pay(amt)")
5✔
2160
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
5✔
2161
        if htlc.cltv_abs - next_cltv_abs < next_chan.forwarding_cltv_delta:
5✔
2162
            log_fail_reason(
×
2163
                f"INCORRECT_CLTV_EXPIRY. "
2164
                f"{htlc.cltv_abs=} - {next_cltv_abs=} < {next_chan.forwarding_cltv_delta=}")
2165
            data = htlc.cltv_abs.to_bytes(4, byteorder="big") + outgoing_chan_upd_message
×
2166
            raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
×
2167
        if htlc.cltv_abs - lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED <= local_height \
5✔
2168
                or next_cltv_abs <= local_height:
2169
            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=outgoing_chan_upd_message)
×
2170
        if max(htlc.cltv_abs, next_cltv_abs) > local_height + lnutil.NBLOCK_CLTV_DELTA_TOO_FAR_INTO_FUTURE:
5✔
2171
            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
×
2172
        forwarding_fees = fee_for_edge_msat(
5✔
2173
            forwarded_amount_msat=next_amount_msat_htlc,
2174
            fee_base_msat=next_chan.forwarding_fee_base_msat,
2175
            fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
2176
        if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
5✔
2177
            data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_message
×
2178
            raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
×
2179
        if self._maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(htlc.payment_hash):
5✔
2180
            log_fail_reason(f"RHASH corresponds to payreq we created")
5✔
2181
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
2182
        self.logger.info(
5✔
2183
            f"maybe_forward_htlc. will forward HTLC: inc_chan={incoming_chan.short_channel_id}. inc_htlc={str(htlc)}. "
2184
            f"next_chan={next_chan.get_id_for_log()}.")
2185

2186
        next_peer = self.lnworker.peers.get(next_chan.node_id)
5✔
2187
        if next_peer is None:
5✔
2188
            log_fail_reason(f"next_peer offline ({next_chan.node_id.hex()})")
×
2189
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
2190
        try:
5✔
2191
            next_htlc = next_peer.send_htlc(
5✔
2192
                chan=next_chan,
2193
                payment_hash=htlc.payment_hash,
2194
                amount_msat=next_amount_msat_htlc,
2195
                cltv_abs=next_cltv_abs,
2196
                onion=processed_onion.next_packet,
2197
            )
2198
        except BaseException as e:
×
2199
            log_fail_reason(f"error sending message to next_peer={next_chan.node_id.hex()}")
×
2200
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
2201

2202
        htlc_key = serialize_htlc_key(next_chan.get_scid_or_local_alias(), next_htlc.htlc_id)
5✔
2203
        return htlc_key
5✔
2204

2205
    @log_exceptions
5✔
2206
    async def maybe_forward_trampoline(
5✔
2207
            self, *,
2208
            payment_hash: bytes,
2209
            inc_cltv_abs: int,
2210
            outer_onion: ProcessedOnionPacket,
2211
            trampoline_onion: ProcessedOnionPacket,
2212
            fw_payment_key: str,
2213
    ) -> None:
2214

2215
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
2216
        forwarding_trampoline_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_TRAMPOLINE_PAYMENTS
5✔
2217
        if not (forwarding_enabled and forwarding_trampoline_enabled):
5✔
2218
            self.logger.info(f"trampoline forwarding is disabled. failing htlc.")
×
2219
            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
×
2220
        payload = trampoline_onion.hop_data.payload
5✔
2221
        payment_data = payload.get('payment_data')
5✔
2222
        try:
5✔
2223
            payment_secret = payment_data['payment_secret'] if payment_data else os.urandom(32)
5✔
2224
            outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
5✔
2225
            amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
5✔
2226
            out_cltv_abs = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
2227
            if "invoice_features" in payload:
5✔
2228
                self.logger.info('forward_trampoline: legacy')
5✔
2229
                next_trampoline_onion = None
5✔
2230
                invoice_features = payload["invoice_features"]["invoice_features"]
5✔
2231
                invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
5✔
2232
                r_tags = decode_routing_info(invoice_routing_info)
5✔
2233
                self.logger.info(f'r_tags {r_tags}')
5✔
2234
                # TODO legacy mpp payment, use total_msat from trampoline onion
2235
            else:
2236
                self.logger.info('forward_trampoline: end-to-end')
5✔
2237
                invoice_features = LnFeatures.BASIC_MPP_OPT
5✔
2238
                next_trampoline_onion = trampoline_onion.next_packet
5✔
2239
                r_tags = []
5✔
2240
        except Exception as e:
×
2241
            self.logger.exception('')
×
2242
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2243

2244
        if self._maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(payment_hash):
5✔
2245
            self.logger.debug(
5✔
2246
                f"maybe_forward_trampoline. will FAIL HTLC(s). "
2247
                f"RHASH corresponds to payreq we created. {payment_hash.hex()=}")
2248
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
2249

2250
        # these are the fee/cltv paid by the sender
2251
        # pay_to_node will raise if they are not sufficient
2252
        total_msat = outer_onion.hop_data.payload["payment_data"]["total_msat"]
5✔
2253
        budget = PaymentFeeBudget(
5✔
2254
            fee_msat=total_msat - amt_to_forward,
2255
            cltv=inc_cltv_abs - out_cltv_abs,
2256
        )
2257
        self.logger.info(f'trampoline forwarding. budget={budget}')
5✔
2258
        self.logger.info(f'trampoline forwarding. {inc_cltv_abs=}, {out_cltv_abs=}')
5✔
2259
        # To convert abs vs rel cltvs, we need to guess blockheight used by original sender as "current blockheight".
2260
        # Blocks might have been mined since.
2261
        # - if we skew towards the past, we decrease our own cltv_budget accordingly (which is ok)
2262
        # - if we skew towards the future, we decrease the cltv_budget for the subsequent nodes in the path,
2263
        #   which can result in them failing the payment.
2264
        # So we skew towards the past and guess that there has been 1 new block mined since the payment began:
2265
        local_height_of_onion_creator = self.network.get_local_height() - 1
5✔
2266
        cltv_budget_for_rest_of_route = out_cltv_abs - local_height_of_onion_creator
5✔
2267

2268
        if budget.fee_msat < 1000:
5✔
2269
            raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'')
5✔
2270
        if budget.cltv < 576:
5✔
2271
            raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'')
×
2272

2273
        # do we have a connection to the node?
2274
        next_peer = self.lnworker.peers.get(outgoing_node_id)
5✔
2275
        if next_peer and next_peer.accepts_zeroconf():
5✔
2276
            self.logger.info(f'JIT: found next_peer')
×
2277
            for next_chan in next_peer.channels.values():
×
2278
                if next_chan.can_pay(amt_to_forward):
×
2279
                    # todo: detect if we can do mpp
2280
                    self.logger.info(f'jit: next_chan can pay')
×
2281
                    break
×
2282
            else:
2283
                scid_alias = self.lnworker._scid_alias_of_node(next_peer.pubkey)
×
2284
                route = [RouteEdge(
×
2285
                    start_node=next_peer.pubkey,
2286
                    end_node=outgoing_node_id,
2287
                    short_channel_id=scid_alias,
2288
                    fee_base_msat=0,
2289
                    fee_proportional_millionths=0,
2290
                    cltv_delta=144,
2291
                    node_features=0
2292
                )]
2293
                next_onion, amount_msat, cltv_abs, session_key = self.create_onion_for_route(
×
2294
                    route=route,
2295
                    amount_msat=amt_to_forward,
2296
                    total_msat=amt_to_forward,
2297
                    payment_hash=payment_hash,
2298
                    min_final_cltv_delta=cltv_budget_for_rest_of_route,
2299
                    payment_secret=payment_secret,
2300
                    trampoline_onion=next_trampoline_onion,
2301
                )
2302
                await self.lnworker.open_channel_just_in_time(
×
2303
                    next_peer=next_peer,
2304
                    next_amount_msat_htlc=amt_to_forward,
2305
                    next_cltv_abs=cltv_abs,
2306
                    payment_hash=payment_hash,
2307
                    next_onion=next_onion)
2308
                return
×
2309

2310
        try:
5✔
2311
            await self.lnworker.pay_to_node(
5✔
2312
                node_pubkey=outgoing_node_id,
2313
                payment_hash=payment_hash,
2314
                payment_secret=payment_secret,
2315
                amount_to_pay=amt_to_forward,
2316
                min_final_cltv_delta=cltv_budget_for_rest_of_route,
2317
                r_tags=r_tags,
2318
                invoice_features=invoice_features,
2319
                fwd_trampoline_onion=next_trampoline_onion,
2320
                budget=budget,
2321
                attempts=100,
2322
                fw_payment_key=fw_payment_key,
2323
            )
2324
        except OnionRoutingFailure as e:
5✔
2325
            raise
×
2326
        except FeeBudgetExceeded:
5✔
2327
            raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'')
×
2328
        except PaymentFailure as e:
5✔
2329
            self.logger.debug(
5✔
2330
                f"maybe_forward_trampoline. PaymentFailure for {payment_hash.hex()=}, {payment_secret.hex()=}: {e!r}")
2331
            raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
5✔
2332

2333
    def _maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(self, payment_hash: bytes) -> bool:
5✔
2334
        """Returns True if the HTLC should be failed.
2335
        We must not forward HTLCs with a matching payment_hash to a payment request we created.
2336
        Example attack:
2337
        - Bob creates payment request with HASH1, for 1 BTC; and gives the payreq to Alice
2338
        - Alice sends htlc A->B->C, for 1 sat, with HASH1
2339
        - Bob must not release the preimage of HASH1
2340
        """
2341
        payment_info = self.lnworker.get_payment_info(payment_hash)
5✔
2342
        is_our_payreq = payment_info and payment_info.direction == RECEIVED
5✔
2343
        # note: If we don't have the preimage for a payment request, then it must be a hold invoice.
2344
        #       Hold invoices are created by other parties (e.g. a counterparty initiating a submarine swap),
2345
        #       and it is the other party choosing the payment_hash. If we failed HTLCs with payment_hashes colliding
2346
        #       with hold invoices, then a party that can make us save a hold invoice for an arbitrary hash could
2347
        #       also make us fail arbitrary HTLCs.
2348
        return bool(is_our_payreq and self.lnworker.get_preimage(payment_hash))
5✔
2349

2350
    def check_accepted_htlc(
5✔
2351
            self, *,
2352
            chan: Channel,
2353
            htlc: UpdateAddHtlc,
2354
            processed_onion: ProcessedOnionPacket,
2355
            log_fail_reason: Callable,
2356
    ):
2357
        """
2358
        Perform checks that are invariant (results do not depend on height, network conditions, etc).
2359
        May raise OnionRoutingFailure
2360
        """
2361
        try:
5✔
2362
            amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
5✔
2363
        except Exception:
×
2364
            log_fail_reason(f"'amt_to_forward' missing from onion")
×
2365
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2366

2367
        exc_incorrect_or_unknown_pd = OnionRoutingFailure(
5✔
2368
            code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
2369
            data=amt_to_forward.to_bytes(8, byteorder="big")) # height will be added later
2370
        try:
5✔
2371
            cltv_abs_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
2372
        except Exception:
×
2373
            log_fail_reason(f"'outgoing_cltv_value' missing from onion")
×
2374
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2375

2376
        if cltv_abs_from_onion > htlc.cltv_abs:
5✔
2377
            log_fail_reason(f"cltv_abs_from_onion != htlc.cltv_abs")
×
2378
            raise OnionRoutingFailure(
×
2379
                code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
2380
                data=htlc.cltv_abs.to_bytes(4, byteorder="big"))
2381
        try:
5✔
2382
            total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
5✔
2383
        except Exception:
×
2384
            log_fail_reason(f"'total_msat' missing from onion")
×
2385
            raise exc_incorrect_or_unknown_pd
×
2386

2387
        if chan.opening_fee:
5✔
2388
            channel_opening_fee = chan.opening_fee['channel_opening_fee']
×
2389
            total_msat -= channel_opening_fee
×
2390
            amt_to_forward -= channel_opening_fee
×
2391
        else:
2392
            channel_opening_fee = 0
5✔
2393

2394
        if amt_to_forward > htlc.amount_msat:
5✔
2395
            log_fail_reason(f"amt_to_forward != htlc.amount_msat")
×
2396
            raise OnionRoutingFailure(
×
2397
                code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
2398
                data=htlc.amount_msat.to_bytes(8, byteorder="big"))
2399

2400
        try:
5✔
2401
            payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]  # type: bytes
5✔
2402
        except Exception:
×
2403
            log_fail_reason(f"'payment_secret' missing from onion")
×
2404
            raise exc_incorrect_or_unknown_pd
×
2405

2406
        return payment_secret_from_onion, total_msat, channel_opening_fee, exc_incorrect_or_unknown_pd
5✔
2407

2408
    def check_mpp_is_waiting(self, *, payment_secret, short_channel_id, htlc, expected_msat, exc_incorrect_or_unknown_pd, log_fail_reason) -> bool:
5✔
2409
        from .lnworker import RecvMPPResolution
5✔
2410
        mpp_resolution = self.lnworker.check_mpp_status(
5✔
2411
            payment_secret=payment_secret,
2412
            short_channel_id=short_channel_id,
2413
            htlc=htlc,
2414
            expected_msat=expected_msat,
2415
        )
2416
        if mpp_resolution == RecvMPPResolution.WAITING:
5✔
2417
            return True
5✔
2418
        elif mpp_resolution == RecvMPPResolution.EXPIRED:
5✔
2419
            log_fail_reason(f"MPP_TIMEOUT")
5✔
2420
            raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
5✔
2421
        elif mpp_resolution == RecvMPPResolution.FAILED:
5✔
2422
            log_fail_reason(f"mpp_resolution is FAILED")
5✔
2423
            raise exc_incorrect_or_unknown_pd
5✔
2424
        elif mpp_resolution == RecvMPPResolution.ACCEPTED:
5✔
2425
            return False
5✔
2426
        else:
2427
            raise Exception(f"unexpected {mpp_resolution=}")
×
2428

2429
    def maybe_fulfill_htlc(
5✔
2430
            self, *,
2431
            chan: Channel,
2432
            htlc: UpdateAddHtlc,
2433
            processed_onion: ProcessedOnionPacket,
2434
            onion_packet_bytes: bytes,
2435
            already_forwarded: bool = False,
2436
    ) -> Tuple[Optional[bytes], Optional[Tuple[str, Callable[[], Awaitable[Optional[str]]]]]]:
2437
        """
2438
        Decide what to do with an HTLC: return preimage if it can be fulfilled, forwarding callback if it can be forwarded.
2439
        Return (preimage, (payment_key, callback)) with at most a single element not None.
2440
        """
2441
        if not processed_onion.are_we_final:
5✔
2442
            if not self.lnworker.enable_htlc_forwarding:
5✔
2443
                return None, None
5✔
2444
            # use the htlc key if we are forwarding
2445
            payment_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc.htlc_id)
5✔
2446
            callback = lambda: self.maybe_forward_htlc(
5✔
2447
                incoming_chan=chan,
2448
                htlc=htlc,
2449
                processed_onion=processed_onion)
2450
            return None, (payment_key, callback)
5✔
2451

2452
        def log_fail_reason(reason: str):
5✔
2453
            self.logger.info(
5✔
2454
                f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
2455
                f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
2456

2457
        chain = self.network.blockchain()
5✔
2458
        # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
2459
        # We should not release the preimage for an HTLC that its sender could already time out as
2460
        # then they might try to force-close and it becomes a race.
2461
        if chain.is_tip_stale() and not already_forwarded:
5✔
2462
            log_fail_reason(f"our chain tip is stale")
×
2463
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
2464
        local_height = chain.height()
5✔
2465

2466
        # parse parameters and perform checks that are invariant
2467
        payment_secret_from_onion, total_msat, channel_opening_fee, exc_incorrect_or_unknown_pd = self.check_accepted_htlc(
5✔
2468
            chan=chan,
2469
            htlc=htlc,
2470
            processed_onion=processed_onion,
2471
            log_fail_reason=log_fail_reason)
2472

2473
        # payment key for final onions
2474
        payment_hash = htlc.payment_hash
5✔
2475
        payment_key = (payment_hash + payment_secret_from_onion).hex()
5✔
2476

2477
        if self.check_mpp_is_waiting(
5✔
2478
                payment_secret=payment_secret_from_onion,
2479
                short_channel_id=chan.get_scid_or_local_alias(),
2480
                htlc=htlc,
2481
                expected_msat=total_msat,
2482
                exc_incorrect_or_unknown_pd=exc_incorrect_or_unknown_pd,
2483
                log_fail_reason=log_fail_reason,
2484
        ):
2485
            return None, None
5✔
2486

2487
        # TODO check against actual min_final_cltv_expiry_delta from invoice (and give 2-3 blocks of leeway?)
2488
        if local_height + MIN_FINAL_CLTV_DELTA_ACCEPTED > htlc.cltv_abs and not already_forwarded:
5✔
2489
            log_fail_reason(f"htlc.cltv_abs is unreasonably close")
×
2490
            raise exc_incorrect_or_unknown_pd
×
2491

2492
        # detect callback
2493
        # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
2494
        # order is important: if we receive a trampoline onion for a hold invoice, we need to peel the onion first.
2495

2496
        if processed_onion.trampoline_onion_packet:
5✔
2497
            # TODO: we should check that all trampoline_onions are the same
2498
            trampoline_onion = self.process_onion_packet(
5✔
2499
                processed_onion.trampoline_onion_packet,
2500
                payment_hash=payment_hash,
2501
                onion_packet_bytes=onion_packet_bytes,
2502
                is_trampoline=True)
2503
            if trampoline_onion.are_we_final:
5✔
2504
                # trampoline- we are final recipient of HTLC
2505
                # note: the returned payment_key will contain the inner payment_secret
2506
                return self.maybe_fulfill_htlc(
5✔
2507
                    chan=chan,
2508
                    htlc=htlc,
2509
                    processed_onion=trampoline_onion,
2510
                    onion_packet_bytes=onion_packet_bytes,
2511
                    already_forwarded=already_forwarded,
2512
                )
2513
            else:
2514
                callback = lambda: self.maybe_forward_trampoline(
5✔
2515
                    payment_hash=payment_hash,
2516
                    inc_cltv_abs=htlc.cltv_abs, # TODO: use max or enforce same value across mpp parts
2517
                    outer_onion=processed_onion,
2518
                    trampoline_onion=trampoline_onion,
2519
                    fw_payment_key=payment_key)
2520
                return None, (payment_key, callback)
5✔
2521

2522
        # TODO don't accept payments twice for same invoice
2523
        # TODO check invoice expiry
2524
        info = self.lnworker.get_payment_info(payment_hash)
5✔
2525
        if info is None:
5✔
2526
            log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
×
2527
            raise exc_incorrect_or_unknown_pd
×
2528

2529
        preimage = self.lnworker.get_preimage(payment_hash)
5✔
2530
        expected_payment_secrets = [self.lnworker.get_payment_secret(htlc.payment_hash)]
5✔
2531
        if preimage:
5✔
2532
            expected_payment_secrets.append(derive_payment_secret_from_payment_preimage(preimage)) # legacy secret for old invoices
5✔
2533
        if payment_secret_from_onion not in expected_payment_secrets:
5✔
2534
            log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {expected_payment_secrets[0].hex()}')
×
2535
            raise exc_incorrect_or_unknown_pd
×
2536
        invoice_msat = info.amount_msat
5✔
2537
        if channel_opening_fee:
5✔
2538
            invoice_msat -= channel_opening_fee
×
2539

2540
        if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
5✔
2541
            log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
×
2542
            raise exc_incorrect_or_unknown_pd
×
2543

2544
        hold_invoice_callback = self.lnworker.hold_invoice_callbacks.get(payment_hash)
5✔
2545
        if hold_invoice_callback and not preimage:
5✔
2546
            callback = lambda: hold_invoice_callback(payment_hash)
5✔
2547
            return None, (payment_key, callback)
5✔
2548

2549
        if payment_hash.hex() in self.lnworker.dont_settle_htlcs:
5✔
2550
            return None, None
×
2551

2552
        if not preimage:
5✔
2553
            if not already_forwarded:
5✔
2554
                log_fail_reason(f"missing preimage and no hold invoice callback {payment_hash.hex()}")
5✔
2555
                raise exc_incorrect_or_unknown_pd
5✔
2556
            else:
2557
                return None, None
×
2558

2559
        chan.opening_fee = None
5✔
2560
        self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
5✔
2561
        return preimage, None
5✔
2562

2563
    def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
5✔
2564
        self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
2565
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
5✔
2566
        assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
5✔
2567
        self.received_htlcs_pending_removal.add((chan, htlc_id))
5✔
2568
        chan.settle_htlc(preimage, htlc_id)
5✔
2569
        self.send_message(
5✔
2570
            "update_fulfill_htlc",
2571
            channel_id=chan.channel_id,
2572
            id=htlc_id,
2573
            payment_preimage=preimage)
2574

2575
    def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
5✔
2576
        self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
5✔
2577
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
5✔
2578
        self.received_htlcs_pending_removal.add((chan, htlc_id))
5✔
2579
        chan.fail_htlc(htlc_id)
5✔
2580
        self.send_message(
5✔
2581
            "update_fail_htlc",
2582
            channel_id=chan.channel_id,
2583
            id=htlc_id,
2584
            len=len(error_bytes),
2585
            reason=error_bytes)
2586

2587
    def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
5✔
2588
        self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
×
2589
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
×
2590
        if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
×
2591
            raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
×
2592
        self.received_htlcs_pending_removal.add((chan, htlc_id))
×
2593
        chan.fail_htlc(htlc_id)
×
2594
        self.send_message(
×
2595
            "update_fail_malformed_htlc",
2596
            channel_id=chan.channel_id,
2597
            id=htlc_id,
2598
            sha256_of_onion=reason.data,
2599
            failure_code=reason.code)
2600

2601
    def on_revoke_and_ack(self, chan: Channel, payload) -> None:
5✔
2602
        self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
5✔
2603
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
2604
            self.logger.warning(
×
2605
                f"on_revoke_and_ack. dropping message. illegal action. "
2606
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2607
            return
×
2608
        rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
5✔
2609
        chan.receive_revocation(rev)
5✔
2610
        self.lnworker.save_channel(chan)
5✔
2611
        self.maybe_send_commitment(chan)
5✔
2612
        self._received_revack_event.set()
5✔
2613
        self._received_revack_event.clear()
5✔
2614

2615
    @event_listener
5✔
2616
    async def on_event_fee(self, *args):
5✔
2617
        async def async_wrapper():
×
2618
            for chan in self.channels.values():
×
2619
                self.maybe_update_fee(chan)
×
2620
        await self.taskgroup.spawn(async_wrapper)
×
2621

2622
    def on_update_fee(self, chan: Channel, payload):
5✔
2623
        if not chan.can_update_ctx(proposer=REMOTE):
5✔
2624
            self.logger.warning(
×
2625
                f"on_update_fee. dropping message. illegal action. "
2626
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2627
            return
×
2628
        feerate = payload["feerate_per_kw"]
5✔
2629
        chan.update_fee(feerate, False)
5✔
2630

2631
    def maybe_update_fee(self, chan: Channel):
5✔
2632
        """
2633
        called when our fee estimates change
2634
        """
2635
        if not chan.can_update_ctx(proposer=LOCAL):
5✔
2636
            return
×
2637
        if chan.get_state() != ChannelState.OPEN:
5✔
2638
            return
×
2639
        current_feerate_per_kw: Optional[int] = self.lnworker.current_target_feerate_per_kw(
5✔
2640
            has_anchors=chan.has_anchors()
2641
        )
2642
        if current_feerate_per_kw is None:
5✔
2643
            return
×
2644
        # add some buffer to anchor chan fees as we always act at the lower end and don't
2645
        # want to get kicked out of the mempool immediately if it grows
2646
        fee_buffer = current_feerate_per_kw * 0.5 if chan.has_anchors() else 0
5✔
2647
        update_feerate_per_kw = int(current_feerate_per_kw + fee_buffer)
5✔
2648
        def does_chan_fee_need_update(chan_feerate: Union[float, int]) -> Optional[bool]:
5✔
2649
            if chan.has_anchors():
5✔
2650
                # TODO: once package relay and electrum servers with submitpackage are more common,
2651
                # TODO: we should reconsider this logic and move towards 0 fee ctx
2652
                # update if we used up half of the buffer or the fee decreased a lot again
2653
                fee_increased = current_feerate_per_kw + (fee_buffer / 2) > chan_feerate
×
2654
                changed_significantly = abs((chan_feerate - update_feerate_per_kw) / chan_feerate) > 0.2
×
2655
                return fee_increased or changed_significantly
×
2656
            else:
2657
                # We raise fees more aggressively than we lower them. Overpaying is not too bad,
2658
                # but lowballing can be fatal if we can't even get into the mempool...
2659
                high_fee = 2 * current_feerate_per_kw  # type: # Union[float, int]
5✔
2660
                low_fee = self.lnworker.current_low_feerate_per_kw_srk_channel()  # type: Optional[Union[float, int]]
5✔
2661
                if low_fee is None:
5✔
2662
                    return None
×
2663
                low_fee = max(low_fee, 0.75 * current_feerate_per_kw)
5✔
2664
                # make sure low_feerate and target_feerate are not too close to each other:
2665
                low_fee = min(low_fee, current_feerate_per_kw - FEERATE_PER_KW_MIN_RELAY_LIGHTNING)
5✔
2666
                assert low_fee < high_fee, (low_fee, high_fee)
5✔
2667
                return not (low_fee < chan_feerate < high_fee)
5✔
2668
        if not chan.constraints.is_initiator:
5✔
2669
            if constants.net is not constants.BitcoinRegtest:
5✔
2670
                chan_feerate = chan.get_latest_feerate(LOCAL)
5✔
2671
                ratio = chan_feerate / update_feerate_per_kw
5✔
2672
                if ratio < 0.5:
5✔
2673
                    # Note that we trust the Electrum server about fee rates
2674
                    # Thus, automated force-closing might not be a good idea
2675
                    # Maybe we should display something in the GUI instead
2676
                    self.logger.warning(
×
2677
                        f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
2678
                        f"current recommended feerate is {update_feerate_per_kw} sat/kw, consider force closing!")
2679
            return
5✔
2680
        # it is our responsibility to update the fee
2681
        chan_fee = chan.get_next_feerate(REMOTE)
5✔
2682
        if does_chan_fee_need_update(chan_fee):
5✔
2683
            self.logger.info(f"({chan.get_id_for_log()}) onchain fees have changed considerably. updating fee.")
×
2684
        elif chan.get_latest_ctn(REMOTE) == 0:
5✔
2685
            # workaround eclair issue https://github.com/ACINQ/eclair/issues/1730 (fixed in 2022)
2686
            self.logger.info(f"({chan.get_id_for_log()}) updating fee to bump remote ctn")
5✔
2687
            if current_feerate_per_kw == chan_fee:
5✔
2688
                update_feerate_per_kw += 1
×
2689
        else:
2690
            return
5✔
2691
        self.logger.info(f"({chan.get_id_for_log()}) current pending feerate {chan_fee}. "
5✔
2692
                         f"new feerate {update_feerate_per_kw}")
2693
        assert update_feerate_per_kw >= FEERATE_PER_KW_MIN_RELAY_LIGHTNING, f"fee below minimum: {update_feerate_per_kw}"
5✔
2694
        chan.update_fee(update_feerate_per_kw, True)
5✔
2695
        self.send_message(
5✔
2696
            "update_fee",
2697
            channel_id=chan.channel_id,
2698
            feerate_per_kw=update_feerate_per_kw)
2699
        self.maybe_send_commitment(chan)
5✔
2700

2701
    @log_exceptions
5✔
2702
    async def close_channel(self, chan_id: bytes):
5✔
2703
        chan = self.channels[chan_id]
5✔
2704
        self.shutdown_received[chan_id] = self.asyncio_loop.create_future()
5✔
2705
        await self.send_shutdown(chan)
5✔
2706
        payload = await self.shutdown_received[chan_id]
5✔
2707
        try:
5✔
2708
            txid = await self._shutdown(chan, payload, is_local=True)
5✔
2709
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
5✔
2710
        except asyncio.TimeoutError:
×
2711
            txid = chan.unconfirmed_closing_txid
×
2712
            self.logger.warning(f'({chan.get_id_for_log()}) did not send closing_signed, {txid}')
×
2713
            if txid is None:
×
2714
                raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
×
2715
        return txid
5✔
2716

2717
    @non_blocking_msg_handler
5✔
2718
    async def on_shutdown(self, chan: Channel, payload):
5✔
2719
        if chan.peer_state != PeerState.GOOD:  # should never happen
5✔
2720
            raise Exception(f"received shutdown in unexpected {chan.peer_state=!r}")
×
2721
        if not self.can_send_shutdown(chan, proposer=REMOTE):
5✔
2722
            self.logger.warning(
×
2723
                f"on_shutdown. illegal action. "
2724
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2725
            self.send_error(chan.channel_id, message="cannot process 'shutdown' in current channel state.")
×
2726
        their_scriptpubkey = payload['scriptpubkey']
5✔
2727
        their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
5✔
2728
        # BOLT-02 check if they use the upfront shutdown script they advertised
2729
        if self.is_upfront_shutdown_script() and their_upfront_scriptpubkey:
5✔
2730
            if not (their_scriptpubkey == their_upfront_scriptpubkey):
5✔
2731
                self.send_warning(
5✔
2732
                    chan.channel_id,
2733
                    "remote didn't use upfront shutdown script it committed to in channel opening",
2734
                    close_connection=True)
2735
        else:
2736
            # BOLT-02 restrict the scriptpubkey to some templates:
2737
            if self.is_shutdown_anysegwit() and match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT):
5✔
2738
                pass
×
2739
            elif match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
2740
                pass
5✔
2741
            else:
2742
                self.send_warning(
×
2743
                    chan.channel_id,
2744
                    f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}',
2745
                    close_connection=True)
2746

2747
        chan_id = chan.channel_id
5✔
2748
        if chan_id in self.shutdown_received:
5✔
2749
            self.shutdown_received[chan_id].set_result(payload)
5✔
2750
        else:
2751
            await self.send_shutdown(chan)
5✔
2752
            txid = await self._shutdown(chan, payload, is_local=False)
5✔
2753
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
5✔
2754

2755
    def can_send_shutdown(self, chan: Channel, *, proposer: HTLCOwner) -> bool:
5✔
2756
        if chan.get_state() >= ChannelState.CLOSED:
5✔
2757
            return False
×
2758
        if chan.get_state() >= ChannelState.OPENING:
5✔
2759
            return True
5✔
2760
        if proposer == LOCAL:
×
2761
            if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
×
2762
                return True
×
2763
            if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
×
2764
                return True
×
2765
        else:  # proposer == REMOTE
2766
            # (from BOLT-02)
2767
            #   A receiving node:
2768
            #       - if it hasn't received a funding_signed (if it is a funder) or a funding_created (if it is a fundee):
2769
            #           - SHOULD send an error and fail the channel.
2770
            # ^ that check is equivalent to `chan.get_state() < ChannelState.OPENING`, which is already checked.
2771
            pass
×
2772
        return False
×
2773

2774
    async def send_shutdown(self, chan: Channel):
5✔
2775
        if not self.can_send_shutdown(chan, proposer=LOCAL):
5✔
2776
            raise Exception(f"cannot send shutdown. chan={chan.get_id_for_log()}. {chan.get_state()=!r}")
×
2777
        if chan.config[LOCAL].upfront_shutdown_script:
5✔
2778
            scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
5✔
2779
        else:
2780
            scriptpubkey = bitcoin.address_to_script(chan.get_sweep_address())
5✔
2781
        assert scriptpubkey
5✔
2782
        # wait until no more pending updates (bolt2)
2783
        chan.set_can_send_ctx_updates(False)
5✔
2784
        while chan.has_pending_changes(REMOTE):
5✔
2785
            await asyncio.sleep(0.1)
×
2786
        self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
5✔
2787
        chan.set_state(ChannelState.SHUTDOWN)
5✔
2788
        # can fulfill or fail htlcs. cannot add htlcs, because state != OPEN
2789
        chan.set_can_send_ctx_updates(True)
5✔
2790

2791
    def get_shutdown_fee_range(self, chan, closing_tx, is_local):
5✔
2792
        """ return the closing fee and fee range we initially try to enforce """
2793
        config = self.network.config
5✔
2794
        our_fee = None
5✔
2795
        if config.TEST_SHUTDOWN_FEE:
5✔
2796
            our_fee = config.TEST_SHUTDOWN_FEE
5✔
2797
        else:
2798
            fee_rate_per_kb = self.network.fee_estimates.eta_target_to_fee(FEE_LN_ETA_TARGET)
5✔
2799
            if fee_rate_per_kb is None:  # fallback
5✔
2800
                from .fee_policy import FeePolicy
×
2801
                fee_rate_per_kb = FeePolicy(config.FEE_POLICY).fee_per_kb(self.network)
×
2802
            if fee_rate_per_kb is not None:
5✔
2803
                our_fee = fee_rate_per_kb * closing_tx.estimated_size() // 1000
5✔
2804
            # TODO: anchors: remove this, as commitment fee rate can be below chain head fee rate?
2805
            # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
2806
            max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
5✔
2807
            if our_fee is None:  # fallback
5✔
2808
                self.logger.warning(f"got no fee estimates for co-op close! falling back to chan.get_latest_fee")
×
2809
                our_fee = max_fee
×
2810
            our_fee = min(our_fee, max_fee)
5✔
2811
        # config modern_fee_negotiation can be set in tests
2812
        if config.TEST_SHUTDOWN_LEGACY:
5✔
2813
            our_fee_range = None
5✔
2814
        elif config.TEST_SHUTDOWN_FEE_RANGE:
5✔
2815
            our_fee_range = config.TEST_SHUTDOWN_FEE_RANGE
5✔
2816
        else:
2817
            # we aim at a fee between next block inclusion and some lower value
2818
            our_fee_range = {'min_fee_satoshis': our_fee // 2, 'max_fee_satoshis': our_fee * 2}
5✔
2819
        self.logger.info(f"Our fee range: {our_fee_range} and fee: {our_fee}")
5✔
2820
        return our_fee, our_fee_range
5✔
2821

2822
    @log_exceptions
5✔
2823
    async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
5✔
2824
        # wait until no HTLCs remain in either commitment transaction
2825
        while chan.has_unsettled_htlcs():
5✔
2826
            self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
5✔
2827
            await asyncio.sleep(1)
5✔
2828
        # if no HTLCs remain, we must not send updates
2829
        chan.set_can_send_ctx_updates(False)
5✔
2830
        their_scriptpubkey = payload['scriptpubkey']
5✔
2831
        if chan.config[LOCAL].upfront_shutdown_script:
5✔
2832
            our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
5✔
2833
        else:
2834
            our_scriptpubkey = bitcoin.address_to_script(chan.get_sweep_address())
5✔
2835
        assert our_scriptpubkey
5✔
2836
        # estimate fee of closing tx
2837
        dummy_sig, dummy_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
5✔
2838
        our_sig = None  # type: Optional[bytes]
5✔
2839
        closing_tx = None  # type: Optional[PartialTransaction]
5✔
2840
        is_initiator = chan.constraints.is_initiator
5✔
2841
        our_fee, our_fee_range = self.get_shutdown_fee_range(chan, dummy_tx, is_local)
5✔
2842

2843
        def send_closing_signed(our_fee, our_fee_range, drop_remote):
5✔
2844
            nonlocal our_sig, closing_tx
2845
            if our_fee_range:
5✔
2846
                closing_signed_tlvs = {'fee_range': our_fee_range}
5✔
2847
            else:
2848
                closing_signed_tlvs = {}
5✔
2849
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
5✔
2850
            self.logger.info(f"Sending fee range: {closing_signed_tlvs} and fee: {our_fee}")
5✔
2851
            self.send_message(
5✔
2852
                'closing_signed',
2853
                channel_id=chan.channel_id,
2854
                fee_satoshis=our_fee,
2855
                signature=our_sig,
2856
                closing_signed_tlvs=closing_signed_tlvs,
2857
            )
2858

2859
        def verify_signature(tx: 'PartialTransaction', sig) -> bool:
5✔
2860
            their_pubkey = chan.config[REMOTE].multisig_key.pubkey
5✔
2861
            pre_hash = tx.serialize_preimage(0)
5✔
2862
            msg_hash = sha256d(pre_hash)
5✔
2863
            return ECPubkey(their_pubkey).ecdsa_verify(sig, msg_hash)
5✔
2864

2865
        async def receive_closing_signed():
5✔
2866
            nonlocal our_sig, closing_tx
2867
            try:
5✔
2868
                cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
5✔
2869
            except asyncio.exceptions.TimeoutError:
5✔
2870
                self.schedule_force_closing(chan.channel_id)
×
2871
                raise Exception("closing_signed not received, force closing.")
×
2872
            their_fee = cs_payload['fee_satoshis']
5✔
2873
            their_fee_range = cs_payload['closing_signed_tlvs'].get('fee_range')
5✔
2874
            their_sig = cs_payload['signature']
5✔
2875
            # perform checks
2876
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
5✔
2877
            if verify_signature(closing_tx, their_sig):
5✔
2878
                drop_remote = False
5✔
2879
            else:
2880
                our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
×
2881
                if verify_signature(closing_tx, their_sig):
×
2882
                    drop_remote = True
×
2883
                else:
2884
                    # this can happen if we consider our output too valuable to drop,
2885
                    # but the remote drops it because it violates their dust limit
2886
                    raise Exception('failed to verify their signature')
×
2887
            # at this point we know how the closing tx looks like
2888
            # check that their output is above their scriptpubkey's network dust limit
2889
            to_remote_set = closing_tx.get_output_idxs_from_scriptpubkey(their_scriptpubkey)
5✔
2890
            if not drop_remote and to_remote_set:
5✔
2891
                to_remote_idx = to_remote_set.pop()
5✔
2892
                to_remote_amount = closing_tx.outputs()[to_remote_idx].value
5✔
2893
                transaction.check_scriptpubkey_template_and_dust(their_scriptpubkey, to_remote_amount)
5✔
2894
            return their_fee, their_fee_range, their_sig, drop_remote
5✔
2895

2896
        def choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee):
5✔
2897
            assert our_fee != their_fee
5✔
2898
            fee_range_sent = our_fee_range and (is_initiator or (their_previous_fee is not None))
5✔
2899

2900
            # The sending node, if it is not the funder:
2901
            if our_fee_range and their_fee_range and not is_initiator and not self.network.config.TEST_SHUTDOWN_FEE_RANGE:
5✔
2902
                # SHOULD set max_fee_satoshis to at least the max_fee_satoshis received
2903
                our_fee_range['max_fee_satoshis'] = max(their_fee_range['max_fee_satoshis'], our_fee_range['max_fee_satoshis'])
×
2904
                # SHOULD set min_fee_satoshis to a fairly low value
2905
                our_fee_range['min_fee_satoshis'] = min(their_fee_range['min_fee_satoshis'], our_fee_range['min_fee_satoshis'])
×
2906
                # Note: the BOLT describes what the sending node SHOULD do.
2907
                # However, this assumes that we have decided to send 'funding_signed' in response to their fee_range.
2908
                # In practice, we might prefer to fail the channel in some cases (TODO)
2909

2910
            # the receiving node, if fee_satoshis matches its previously sent fee_range,
2911
            if fee_range_sent and (our_fee_range['min_fee_satoshis'] <= their_fee <= our_fee_range['max_fee_satoshis']):
5✔
2912
                # SHOULD reply with a closing_signed with the same fee_satoshis value if it is different from its previously sent fee_satoshis
2913
                our_fee = their_fee
5✔
2914

2915
            # the receiving node, if the message contains a fee_range
2916
            elif our_fee_range and their_fee_range:
5✔
2917
                overlap_min = max(our_fee_range['min_fee_satoshis'], their_fee_range['min_fee_satoshis'])
5✔
2918
                overlap_max = min(our_fee_range['max_fee_satoshis'], their_fee_range['max_fee_satoshis'])
5✔
2919
                # if there is no overlap between that and its own fee_range
2920
                if overlap_min > overlap_max:
5✔
2921
                    # TODO: the receiving node should first send a warning, and fail the channel
2922
                    # only if it doesn't receive a satisfying fee_range after a reasonable amount of time
2923
                    self.schedule_force_closing(chan.channel_id)
×
2924
                    raise Exception("There is no overlap between between their and our fee range.")
×
2925
                # otherwise, if it is the funder
2926
                if is_initiator:
5✔
2927
                    # if fee_satoshis is not in the overlap between the sent and received fee_range:
2928
                    if not (overlap_min <= their_fee <= overlap_max):
×
2929
                        # MUST fail the channel
2930
                        self.schedule_force_closing(chan.channel_id)
×
2931
                        raise Exception("Their fee is not in the overlap region, we force closed.")
×
2932
                    # otherwise, MUST reply with the same fee_satoshis.
2933
                    our_fee = their_fee
×
2934
                # otherwise (it is not the funder):
2935
                else:
2936
                    # if it has already sent a closing_signed:
2937
                    if fee_range_sent:
5✔
2938
                        # fee_satoshis is not the same as the value we sent, we MUST fail the channel
2939
                        self.schedule_force_closing(chan.channel_id)
×
2940
                        raise Exception("Expected the same fee as ours, we force closed.")
×
2941
                    # otherwise:
2942
                    # MUST propose a fee_satoshis in the overlap between received and (about-to-be) sent fee_range.
2943
                    our_fee = (overlap_min + overlap_max) // 2
5✔
2944
            else:
2945
                # otherwise, if fee_satoshis is not strictly between its last-sent fee_satoshis
2946
                # and its previously-received fee_satoshis, UNLESS it has since reconnected:
2947
                if their_previous_fee and not (min(our_fee, their_previous_fee) < their_fee < max(our_fee, their_previous_fee)):
5✔
2948
                    # SHOULD fail the connection.
2949
                    raise Exception('Their fee is not between our last sent and their last sent fee.')
×
2950
                # accept their fee if they are very close
2951
                if abs(their_fee - our_fee) < 2:
5✔
2952
                    our_fee = their_fee
5✔
2953
                else:
2954
                    # this will be "strictly between" (as in BOLT2) previous values because of the above
2955
                    our_fee = (our_fee + their_fee) // 2
5✔
2956

2957
            return our_fee, our_fee_range
5✔
2958

2959
        # Fee negotiation: both parties exchange 'funding_signed' messages.
2960
        # The funder sends the first message, the non-funder sends the last message.
2961
        # In the 'modern' case, at most 3 messages are exchanged, because choose_new_fee of the funder either returns their_fee or fails
2962
        their_fee = None
5✔
2963
        drop_remote = False  # does the peer drop its to_local output or not?
5✔
2964
        if is_initiator:
5✔
2965
            send_closing_signed(our_fee, our_fee_range, drop_remote)
5✔
2966
        while True:
5✔
2967
            their_previous_fee = their_fee
5✔
2968
            their_fee, their_fee_range, their_sig, drop_remote = await receive_closing_signed()
5✔
2969
            if our_fee == their_fee:
5✔
2970
                break
5✔
2971
            our_fee, our_fee_range = choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee)
5✔
2972
            if not is_initiator and our_fee == their_fee:
5✔
2973
                break
×
2974
            send_closing_signed(our_fee, our_fee_range, drop_remote)
5✔
2975
            if is_initiator and our_fee == their_fee:
5✔
2976
                break
5✔
2977
        if not is_initiator:
5✔
2978
            send_closing_signed(our_fee, our_fee_range, drop_remote)
5✔
2979

2980
        # add signatures
2981
        closing_tx.add_signature_to_txin(
5✔
2982
            txin_idx=0,
2983
            signing_pubkey=chan.config[LOCAL].multisig_key.pubkey,
2984
            sig=ecdsa_der_sig_from_ecdsa_sig64(our_sig) + Sighash.to_sigbytes(Sighash.ALL))
2985
        closing_tx.add_signature_to_txin(
5✔
2986
            txin_idx=0,
2987
            signing_pubkey=chan.config[REMOTE].multisig_key.pubkey,
2988
            sig=ecdsa_der_sig_from_ecdsa_sig64(their_sig) + Sighash.to_sigbytes(Sighash.ALL))
2989
        # save local transaction and set state
2990
        try:
5✔
2991
            self.lnworker.wallet.adb.add_transaction(closing_tx)
5✔
2992
        except UnrelatedTransactionException:
×
2993
            pass  # this can happen if (~all the balance goes to REMOTE)
×
2994
        chan.set_state(ChannelState.CLOSING)
5✔
2995
        # broadcast
2996
        await self.network.try_broadcasting(closing_tx, 'closing')
5✔
2997
        return closing_tx.txid()
5✔
2998

2999
    async def htlc_switch(self):
5✔
3000
        # In this loop, an item of chan.unfulfilled_htlcs may go through 4 stages:
3001
        # - 1. not forwarded yet: (None, onion_packet_hex)
3002
        # - 2. forwarded: (forwarding_key, onion_packet_hex)
3003
        # - 3. processed: (forwarding_key, None), not irrevocably removed yet
3004
        # - 4. done: (forwarding_key, None), irrevocably removed
3005

3006
        await self.initialized
5✔
3007
        while True:
5✔
3008
            await self.ping_if_required()
5✔
3009
            self._htlc_switch_iterdone_event.set()
5✔
3010
            self._htlc_switch_iterdone_event.clear()
5✔
3011
            # We poll every 0.1 sec to check if there is work to do,
3012
            # or we can also be triggered via events.
3013
            # When forwarding an HTLC originating from this peer (the upstream),
3014
            # we can get triggered for events that happen on the downstream peer.
3015
            # TODO: trampoline forwarding relies on the polling
3016
            async with ignore_after(0.1):
5✔
3017
                async with OldTaskGroup(wait=any) as group:
5✔
3018
                    await group.spawn(self._received_revack_event.wait())
5✔
3019
                    await group.spawn(self.downstream_htlc_resolved_event.wait())
5✔
3020
            self._htlc_switch_iterstart_event.set()
5✔
3021
            self._htlc_switch_iterstart_event.clear()
5✔
3022
            self._maybe_cleanup_received_htlcs_pending_removal()
5✔
3023
            for chan_id, chan in self.channels.items():
5✔
3024
                if not chan.can_update_ctx(proposer=LOCAL):
5✔
UNCOV
3025
                    continue
2✔
3026
                self.maybe_send_commitment(chan)
5✔
3027
                done = set()
5✔
3028
                unfulfilled = chan.unfulfilled_htlcs
5✔
3029
                for htlc_id, (onion_packet_hex, forwarding_key) in unfulfilled.items():
5✔
3030
                    if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
5✔
3031
                        continue
5✔
3032
                    htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
5✔
3033
                    if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
5✔
3034
                        assert onion_packet_hex is None
5✔
3035
                        self.lnworker.maybe_cleanup_mpp(chan.get_scid_or_local_alias(), htlc)
5✔
3036
                        if forwarding_key:
5✔
3037
                            self.lnworker.maybe_cleanup_forwarding(forwarding_key)
5✔
3038
                        done.add(htlc_id)
5✔
3039
                        continue
5✔
3040
                    if onion_packet_hex is None:
5✔
3041
                        # has been processed already
3042
                        continue
5✔
3043
                    error_reason = None  # type: Optional[OnionRoutingFailure]
5✔
3044
                    error_bytes = None  # type: Optional[bytes]
5✔
3045
                    preimage = None
5✔
3046
                    onion_packet_bytes = bytes.fromhex(onion_packet_hex)
5✔
3047
                    onion_packet = None
5✔
3048
                    try:
5✔
3049
                        onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
5✔
3050
                    except OnionRoutingFailure as e:
×
3051
                        error_reason = e
×
3052
                    else:
3053
                        try:
5✔
3054
                            preimage, _forwarding_key, error_bytes = self.process_unfulfilled_htlc(
5✔
3055
                                chan=chan,
3056
                                htlc=htlc,
3057
                                forwarding_key=forwarding_key,
3058
                                onion_packet_bytes=onion_packet_bytes,
3059
                                onion_packet=onion_packet)
3060
                            if _forwarding_key:
5✔
3061
                                assert forwarding_key is None
5✔
3062
                                unfulfilled[htlc_id] = onion_packet_hex, _forwarding_key
5✔
3063
                        except OnionRoutingFailure as e:
5✔
3064
                            error_bytes = construct_onion_error(e, onion_packet.public_key, self.privkey, self.network.get_local_height())
5✔
3065
                        if error_bytes:
5✔
3066
                            error_bytes = obfuscate_onion_error(error_bytes, onion_packet.public_key, our_onion_private_key=self.privkey)
5✔
3067

3068
                    if preimage or error_reason or error_bytes:
5✔
3069
                        if preimage:
5✔
3070
                            self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
5✔
3071
                            if not self.lnworker.enable_htlc_settle:
5✔
3072
                                continue
5✔
3073
                            self.fulfill_htlc(chan, htlc.htlc_id, preimage)
5✔
3074
                        elif error_bytes:
5✔
3075
                            self.fail_htlc(
5✔
3076
                                chan=chan,
3077
                                htlc_id=htlc.htlc_id,
3078
                                error_bytes=error_bytes)
3079
                        else:
3080
                            self.fail_malformed_htlc(
×
3081
                                chan=chan,
3082
                                htlc_id=htlc.htlc_id,
3083
                                reason=error_reason)
3084
                        # blank onion field to mark it as processed
3085
                        unfulfilled[htlc_id] = None, forwarding_key
5✔
3086

3087
                # cleanup
3088
                for htlc_id in done:
5✔
3089
                    unfulfilled.pop(htlc_id)
5✔
3090
                self.maybe_send_commitment(chan)
5✔
3091

3092
    def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
5✔
3093
        done = set()
5✔
3094
        for chan, htlc_id in self.received_htlcs_pending_removal:
5✔
3095
            if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
5✔
3096
                done.add((chan, htlc_id))
5✔
3097
        if done:
5✔
3098
            for key in done:
5✔
3099
                self.received_htlcs_pending_removal.remove(key)
5✔
3100
            self.received_htlc_removed_event.set()
5✔
3101
            self.received_htlc_removed_event.clear()
5✔
3102

3103
    async def wait_one_htlc_switch_iteration(self) -> None:
5✔
3104
        """Waits until the HTLC switch does a full iteration or the peer disconnects,
3105
        whichever happens first.
3106
        """
3107
        async def htlc_switch_iteration():
5✔
3108
            await self._htlc_switch_iterstart_event.wait()
5✔
3109
            await self._htlc_switch_iterdone_event.wait()
5✔
3110

3111
        async with OldTaskGroup(wait=any) as group:
5✔
3112
            await group.spawn(htlc_switch_iteration())
5✔
3113
            await group.spawn(self.got_disconnected.wait())
5✔
3114

3115
    def process_unfulfilled_htlc(
5✔
3116
            self, *,
3117
            chan: Channel,
3118
            htlc: UpdateAddHtlc,
3119
            forwarding_key: Optional[str],
3120
            onion_packet_bytes: bytes,
3121
            onion_packet: OnionPacket) -> Tuple[Optional[bytes], Optional[str], Optional[bytes]]:
3122
        """
3123
        return (preimage, payment_key, error_bytes) with at most a single element that is not None
3124
        raise an OnionRoutingFailure if we need to fail the htlc
3125
        """
3126
        payment_hash = htlc.payment_hash
5✔
3127
        processed_onion = self.process_onion_packet(
5✔
3128
            onion_packet,
3129
            payment_hash=payment_hash,
3130
            onion_packet_bytes=onion_packet_bytes)
3131

3132
        preimage, forwarding_info = self.maybe_fulfill_htlc(
5✔
3133
            chan=chan,
3134
            htlc=htlc,
3135
            processed_onion=processed_onion,
3136
            onion_packet_bytes=onion_packet_bytes,
3137
            already_forwarded=bool(forwarding_key))
3138

3139
        if not forwarding_key:
5✔
3140
            if forwarding_info:
5✔
3141
                # HTLC we are supposed to forward, but haven't forwarded yet
3142
                payment_key, forwarding_callback = forwarding_info
5✔
3143
                if not self.lnworker.enable_htlc_forwarding:
5✔
3144
                    return None, None, None
×
3145
                if payment_key not in self.lnworker.active_forwardings:
5✔
3146
                    async def wrapped_callback():
5✔
3147
                        forwarding_coro = forwarding_callback()
5✔
3148
                        try:
5✔
3149
                            next_htlc = await forwarding_coro
5✔
3150
                            if next_htlc:
5✔
3151
                                htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc.htlc_id)
5✔
3152
                                self.lnworker.active_forwardings[payment_key].append(next_htlc)
5✔
3153
                                self.lnworker.downstream_to_upstream_htlc[next_htlc] = htlc_key
5✔
3154
                        except OnionRoutingFailure as e:
5✔
3155
                            if len(self.lnworker.active_forwardings[payment_key]) == 0:
5✔
3156
                                self.lnworker.save_forwarding_failure(payment_key, failure_message=e)
5✔
3157
                        # TODO what about other errors? e.g. TxBroadcastError for a swap.
3158
                        #        - malicious electrum server could fake TxBroadcastError
3159
                        #      Could we "catch-all Exception" and fail back the htlcs with e.g. TEMPORARY_NODE_FAILURE?
3160
                        #        - we don't want to fail the inc-HTLC for a syntax error that happens in the callback
3161
                        #      If we don't call save_forwarding_failure(), the inc-HTLC gets stuck until expiry
3162
                        #      and then the inc-channel will get force-closed.
3163
                        #      => forwarding_callback() could have an API with two exceptions types:
3164
                        #        - type1, such as OnionRoutingFailure, that signals we need to fail back the inc-HTLC
3165
                        #        - type2, such as TxBroadcastError, that signals we want to retry the callback
3166
                    # add to list
3167
                    assert len(self.lnworker.active_forwardings.get(payment_key, [])) == 0
5✔
3168
                    self.lnworker.active_forwardings[payment_key] = []
5✔
3169
                    fut = asyncio.ensure_future(wrapped_callback())
5✔
3170
                # return payment_key so this branch will not be executed again
3171
                return None, payment_key, None
5✔
3172
            elif preimage:
5✔
3173
                return preimage, None, None
5✔
3174
            else:
3175
                # we are waiting for mpp consolidation or preimage
3176
                return None, None, None
5✔
3177
        else:
3178
            # HTLC we are supposed to forward, and have already forwarded
3179
            # for final trampoline onions, forwarding failures are stored with forwarding_key (which is the inner key)
3180
            payment_key = forwarding_key
5✔
3181
            preimage = self.lnworker.get_preimage(payment_hash)
5✔
3182
            error_bytes, error_reason = self.lnworker.get_forwarding_failure(payment_key)
5✔
3183
            if error_bytes:
5✔
3184
                return None, None, error_bytes
5✔
3185
            if error_reason:
5✔
3186
                raise error_reason
5✔
3187
            if preimage:
5✔
3188
                return preimage, None, None
5✔
3189
            return None, None, None
5✔
3190

3191
    def process_onion_packet(
5✔
3192
            self,
3193
            onion_packet: OnionPacket, *,
3194
            payment_hash: bytes,
3195
            onion_packet_bytes: bytes,
3196
            is_trampoline: bool = False) -> ProcessedOnionPacket:
3197

3198
        failure_data = sha256(onion_packet_bytes)
5✔
3199
        try:
5✔
3200
            processed_onion = process_onion_packet(
5✔
3201
                onion_packet,
3202
                our_onion_private_key=self.privkey,
3203
                associated_data=payment_hash,
3204
                is_trampoline=is_trampoline)
3205
        except UnsupportedOnionPacketVersion:
×
3206
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
3207
        except InvalidOnionPubkey:
×
3208
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
×
3209
        except InvalidOnionMac:
×
3210
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
×
3211
        except Exception as e:
×
3212
            self.logger.info(f"error processing onion packet: {e!r}")
×
3213
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
3214
        if self.network.config.TEST_FAIL_HTLCS_AS_MALFORMED:
5✔
3215
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
3216
        if self.network.config.TEST_FAIL_HTLCS_WITH_TEMP_NODE_FAILURE:
5✔
3217
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
3218
        return processed_onion
5✔
3219

3220
    def on_onion_message(self, payload):
5✔
3221
        if hasattr(self.lnworker, 'onion_message_manager'):  # only on LNWallet
×
3222
            self.lnworker.onion_message_manager.on_onion_message(payload)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc