• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6028015529885696

28 Oct 2025 10:43AM UTC coverage: 61.447% (-0.008%) from 61.455%
6028015529885696

push

CirrusCI

ecdsa
notary plugin

22938 of 37330 relevant lines covered (61.45%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.85
/electrum/lnpeer.py
1
#!/usr/bin/env python3
2
#
3
# Copyright (C) 2018 The Electrum developers
4
# Distributed under the MIT software license, see the accompanying
5
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
6

7
import zlib
1✔
8
from collections import OrderedDict, defaultdict
1✔
9
import asyncio
1✔
10
import os
1✔
11
import time
1✔
12
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable, Awaitable, List
1✔
13
from datetime import datetime
1✔
14
import functools
1✔
15
from functools import partial
1✔
16
import inspect
1✔
17

18
import electrum_ecc as ecc
1✔
19
from electrum_ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey
1✔
20

21
import aiorpcx
1✔
22
from aiorpcx import ignore_after
1✔
23

24
from .crypto import sha256, sha256d, privkey_to_pubkey
1✔
25
from . import bitcoin, util
1✔
26
from . import constants
1✔
27
from .util import (log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
1✔
28
                   UnrelatedTransactionException, error_text_bytes_to_safe_str, AsyncHangDetector,
29
                   NoDynamicFeeEstimates, event_listener, EventListener)
30
from . import transaction
1✔
31
from .bitcoin import make_op_return, DummyAddress
1✔
32
from .transaction import PartialTxOutput, match_script_against_template, Sighash
1✔
33
from .logging import Logger
1✔
34
from .lnrouter import RouteEdge
1✔
35
from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment, process_onion_packet,
1✔
36
                      OnionPacket, construct_onion_error, obfuscate_onion_error, OnionRoutingFailure,
37
                      ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
38
                      OnionFailureCodeMetaFlag)
39
from .lnchannel import Channel, RevokeAndAck, ChannelState, PeerState, ChanCloseOption, CF_ANNOUNCE_CHANNEL
1✔
40
from . import lnutil
1✔
41
from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, ChannelConfig,
1✔
42
                     RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
43
                     funding_output_script, get_per_commitment_secret_from_seed,
44
                     secret_to_pubkey, PaymentFailure, LnFeatures,
45
                     LOCAL, REMOTE, HTLCOwner,
46
                     ln_compare_features, MIN_FINAL_CLTV_DELTA_ACCEPTED,
47
                     RemoteMisbehaving, ShortChannelID,
48
                     IncompatibleLightningFeatures, ChannelType, LNProtocolWarning, validate_features,
49
                     IncompatibleOrInsaneFeatures, FeeBudgetExceeded,
50
                     GossipForwardingMessage, GossipTimestampFilter, channel_id_from_funding_tx,
51
                     PaymentFeeBudget, serialize_htlc_key, Keypair, RecvMPPResolution)
52
from .lntransport import LNTransport, LNTransportBase, LightningPeerConnectionClosed, HandshakeFailed
1✔
53
from .lnmsg import encode_msg, decode_msg, UnknownOptionalMsgType, FailedToParseMsg
1✔
54
from .interface import GracefulDisconnect
1✔
55
from .lnrouter import fee_for_edge_msat
1✔
56
from .json_db import StoredDict
1✔
57
from .invoices import PR_PAID
1✔
58
from .fee_policy import FEE_LN_ETA_TARGET, FEERATE_PER_KW_MIN_RELAY_LIGHTNING
1✔
59
from .trampoline import decode_routing_info
1✔
60

61
if TYPE_CHECKING:
62
    from .lnworker import LNGossip, LNWallet
63
    from .lnrouter import LNPaymentRoute
64
    from .transaction import PartialTransaction
65

66

67
LN_P2P_NETWORK_TIMEOUT = 20
1✔
68

69

70
class Peer(Logger, EventListener):
1✔
71
    # note: in general this class is NOT thread-safe. Most methods are assumed to be running on asyncio thread.
72

73
    ORDERED_MESSAGES = (
1✔
74
        'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'closing_signed')
75
    SPAMMY_MESSAGES = (
1✔
76
        'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',
77
        'gossip_timestamp_filter', 'reply_channel_range', 'query_channel_range',
78
        'query_short_channel_ids', 'reply_short_channel_ids', 'reply_short_channel_ids_end')
79

80
    DELAY_INC_MSG_PROCESSING_SLEEP = 0.01
1✔
81
    RECV_GOSSIP_QUEUE_SOFT_MAXSIZE = 2000
1✔
82
    RECV_GOSSIP_QUEUE_HARD_MAXSIZE = 5000
1✔
83

84
    def __init__(
1✔
85
            self,
86
            lnworker: Union['LNGossip', 'LNWallet'],
87
            pubkey: bytes,
88
            transport: LNTransportBase,
89
            *, is_channel_backup= False):
90

91
        self.lnworker = lnworker
1✔
92
        self.network = lnworker.network
1✔
93
        self.asyncio_loop = self.network.asyncio_loop
1✔
94
        self.is_channel_backup = is_channel_backup
1✔
95
        self._sent_init = False  # type: bool
1✔
96
        self._received_init = False  # type: bool
1✔
97
        self.initialized = self.asyncio_loop.create_future()
1✔
98
        self.got_disconnected = asyncio.Event()
1✔
99
        self.querying = asyncio.Event()
1✔
100
        self.transport = transport
1✔
101
        self.pubkey = pubkey  # remote pubkey
1✔
102
        self.privkey = self.transport.privkey  # local privkey
1✔
103
        self.features = self.lnworker.features  # type: LnFeatures
1✔
104
        self.their_features = LnFeatures(0)  # type: LnFeatures
1✔
105
        self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
1✔
106
        assert self.node_ids[0] != self.node_ids[1]
1✔
107
        self.last_message_time = 0
1✔
108
        self.pong_event = asyncio.Event()
1✔
109
        self.reply_channel_range = None  # type: Optional[asyncio.Queue]
1✔
110
        # gossip uses a single queue to preserve message order
111
        self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE)
1✔
112
        self.our_gossip_timestamp_filter = None  # type: Optional[GossipTimestampFilter]
1✔
113
        self.their_gossip_timestamp_filter = None  # type: Optional[GossipTimestampFilter]
1✔
114
        self.outgoing_gossip_reply = False # type: bool
1✔
115
        self.ordered_message_queues = defaultdict(partial(asyncio.Queue, maxsize=10))  # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
1✔
116
        self.temp_id_to_id = {}  # type: Dict[bytes, Optional[bytes]]   # to forward error messages
1✔
117
        self.funding_created_sent = set() # for channels in PREOPENING
1✔
118
        self.funding_signed_sent = set()  # for channels in PREOPENING
1✔
119
        self.shutdown_received = {} # chan_id -> asyncio.Future()
1✔
120
        self.channel_reestablish_msg = defaultdict(self.asyncio_loop.create_future)  # type: Dict[bytes, asyncio.Future]
1✔
121
        self._chan_reest_finished = defaultdict(asyncio.Event)  # type: Dict[bytes, asyncio.Event]
1✔
122
        self.orphan_channel_updates = OrderedDict()  # type: OrderedDict[ShortChannelID, dict]
1✔
123
        Logger.__init__(self)
1✔
124
        self.taskgroup = OldTaskGroup()
1✔
125
        # HTLCs offered by REMOTE, that we started removing but are still active:
126
        self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
1✔
127
        self.received_htlc_removed_event = asyncio.Event()
1✔
128
        self._htlc_switch_iterstart_event = asyncio.Event()
1✔
129
        self._htlc_switch_iterdone_event = asyncio.Event()
1✔
130
        self._received_revack_event = asyncio.Event()
1✔
131
        self.received_commitsig_event = asyncio.Event()
1✔
132
        self.downstream_htlc_resolved_event = asyncio.Event()
1✔
133
        self.register_callbacks()
1✔
134
        self._num_gossip_messages_forwarded = 0
1✔
135

136
    def send_message(self, message_name: str, **kwargs):
1✔
137
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
1✔
138
        assert type(message_name) is str
1✔
139
        if message_name not in self.SPAMMY_MESSAGES:
1✔
140
            self.logger.debug(f"Sending {message_name.upper()}")
1✔
141
        if message_name.upper() != "INIT" and not self.is_initialized():
1✔
142
            raise Exception("tried to send message before we are initialized")
×
143
        raw_msg = encode_msg(message_name, **kwargs)
1✔
144
        self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
1✔
145
        self.transport.send_bytes(raw_msg)
1✔
146

147
    def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
1✔
148
        is_commitment_signed = message_name == "commitment_signed"
1✔
149
        if not (message_name.startswith("update_") or is_commitment_signed):
1✔
150
            return
1✔
151
        assert channel_id
1✔
152
        chan = self.get_channel_by_id(channel_id)
1✔
153
        if not chan:
1✔
154
            raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
×
155
        chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
1✔
156
        if is_commitment_signed:
1✔
157
            # saving now, to ensure replaying updates works (in case of channel reestablishment)
158
            self.lnworker.save_channel(chan)
1✔
159

160
    def maybe_set_initialized(self):
1✔
161
        if self.initialized.done():
1✔
162
            return
1✔
163
        if self._sent_init and self._received_init:
1✔
164
            self.initialized.set_result(True)
1✔
165

166
    def is_initialized(self) -> bool:
1✔
167
        return (self.initialized.done()
1✔
168
                and not self.initialized.cancelled()
169
                and self.initialized.exception() is None
170
                and self.initialized.result() is True)
171

172
    async def initialize(self):
1✔
173
        # If outgoing transport, do handshake now. For incoming, it has already been done.
174
        if isinstance(self.transport, LNTransport):
1✔
175
            await self.transport.handshake()
×
176
        self.logger.info(f"handshake done for {self.transport.peer_addr or self.pubkey.hex()}")
1✔
177
        features = self.features.for_init_message()
1✔
178
        flen = features.min_len()
1✔
179
        self.send_message(
1✔
180
            "init", gflen=0, flen=flen,
181
            features=features,
182
            init_tlvs={
183
                'networks':
184
                {'chains': constants.net.rev_genesis_bytes()}
185
            })
186
        self._sent_init = True
1✔
187
        self.maybe_set_initialized()
1✔
188

189
    @property
1✔
190
    def channels(self) -> Dict[bytes, Channel]:
1✔
191
        return self.lnworker.channels_for_peer(self.pubkey)
1✔
192

193
    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
1✔
194
        # note: this is faster than self.channels.get(channel_id)
195
        chan = self.lnworker.get_channel_by_id(channel_id)
1✔
196
        if not chan:
1✔
197
            return None
×
198
        if chan.node_id != self.pubkey:
1✔
199
            return None
×
200
        return chan
1✔
201

202
    def diagnostic_name(self):
1✔
203
        return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
1✔
204

205
    async def ping_if_required(self):
1✔
206
        if time.time() - self.last_message_time > 30:
1✔
207
            self.send_message('ping', num_pong_bytes=4, byteslen=4)
×
208
            self.pong_event.clear()
×
209
            await self.pong_event.wait()
×
210

211
    async def _process_message(self, message: bytes) -> None:
1✔
212
        try:
1✔
213
            message_type, payload = decode_msg(message)
1✔
214
        except UnknownOptionalMsgType as e:
1✔
215
            self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
1✔
216
            return
1✔
217
        except FailedToParseMsg as e:
1✔
218
            self.logger.info(
1✔
219
                f"failed to parse message from peer. disconnecting. "
220
                f"msg_type={e.msg_type_name}({e.msg_type_int}). exc={e!r}")
221
            #self.logger.info(f"failed to parse message: message(SECRET?)={message.hex()}")
222
            raise GracefulDisconnect() from e
1✔
223
        self.last_message_time = time.time()
1✔
224
        if message_type not in self.SPAMMY_MESSAGES:
1✔
225
            self.logger.debug(f"Received {message_type.upper()}")
1✔
226
        # only process INIT if we are a backup
227
        if self.is_channel_backup is True and message_type != 'init':
1✔
228
            return
×
229
        if message_type in self.ORDERED_MESSAGES:
1✔
230
            chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
1✔
231
            if (
1✔
232
                chan_id not in self.channels
233
                and chan_id not in self.temp_id_to_id
234
                and chan_id not in self.temp_id_to_id.values()
235
            ):
236
                raise Exception(f"received {message_type} for unknown {chan_id.hex()=}")
×
237
            self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
1✔
238
        else:
239
            if message_type not in ('error', 'warning') and 'channel_id' in payload:
1✔
240
                chan = self.get_channel_by_id(payload['channel_id'])
1✔
241
                if chan is None:
1✔
242
                    self.logger.info(f"Received {message_type} for unknown channel {payload['channel_id'].hex()}")
×
243
                    return
×
244
                args = (chan, payload)
1✔
245
            else:
246
                args = (payload,)
1✔
247
            try:
1✔
248
                f = getattr(self, 'on_' + message_type)
1✔
249
            except AttributeError:
×
250
                #self.logger.info("Received '%s'" % message_type.upper(), payload)
251
                return
×
252
            # raw message is needed to check signature
253
            if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
1✔
254
                payload['raw'] = message
1✔
255
                payload['sender_node_id'] = self.pubkey
1✔
256
            # note: the message handler might be async or non-async. In either case, by default,
257
            #       we wait for it to complete before we return, i.e. before the next message is processed.
258
            if inspect.iscoroutinefunction(f):
1✔
259
                async with AsyncHangDetector(
1✔
260
                    message=f"message handler still running for {message_type.upper()}",
261
                    logger=self.logger,
262
                ):
263
                    await f(*args)
1✔
264
            else:
265
                f(*args)
1✔
266

267
    def non_blocking_msg_handler(func):
1✔
268
        """Makes a message handler non-blocking: while processing the message,
269
        the message_loop keeps processing subsequent incoming messages asynchronously.
270
        """
271
        assert inspect.iscoroutinefunction(func), 'func needs to be a coroutine'
1✔
272
        @functools.wraps(func)
1✔
273
        async def wrapper(self: 'Peer', *args, **kwargs):
1✔
274
            return await self.taskgroup.spawn(func(self, *args, **kwargs))
1✔
275
        return wrapper
1✔
276

277
    def on_warning(self, payload):
1✔
278
        chan_id = payload.get("channel_id")
1✔
279
        err_bytes = payload['data']
1✔
280
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
1✔
281
        self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: "
1✔
282
                         f"{error_text_bytes_to_safe_str(err_bytes, max_len=None)}. chan_id={chan_id.hex()}. "
283
                         f"{is_known_chan_id=}")
284

285
    def on_error(self, payload):
1✔
286
        chan_id = payload.get("channel_id")
1✔
287
        err_bytes = payload['data']
1✔
288
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
1✔
289
        self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: "
1✔
290
                         f"{error_text_bytes_to_safe_str(err_bytes, max_len=None)}. chan_id={chan_id.hex()}. "
291
                         f"{is_known_chan_id=}")
292
        if chan := self.channels.get(chan_id):
1✔
293
            self.schedule_force_closing(chan_id)
1✔
294
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
1✔
295
            chan.save_remote_peer_sent_error(err_bytes)
1✔
296
        elif chan_id in self.temp_id_to_id:
×
297
            chan_id = self.temp_id_to_id[chan_id] or chan_id
×
298
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
×
299
        elif chan_id == bytes(32):
×
300
            # if channel_id is all zero:
301
            # - MUST fail all channels with the sending node.
302
            for cid in self.channels:
×
303
                self.schedule_force_closing(cid)
×
304
                self.ordered_message_queues[cid].put_nowait((None, {'error': err_bytes}))
×
305
        else:
306
            # if no existing channel is referred to by channel_id:
307
            # - MUST ignore the message.
308
            return
×
309
        raise GracefulDisconnect
1✔
310

311
    def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=False):
1✔
312
        """Sends a warning and disconnects if close_connection.
313

314
        Note:
315
        * channel_id is the temporary channel id when the channel id is not yet available
316

317
        A sending node:
318
        MAY set channel_id to all zero if the warning is not related to a specific channel.
319

320
        when failure was caused by an invalid signature check:
321
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
322
          funding_signed, closing_signed, or commitment_signed message.
323
        """
324
        assert isinstance(channel_id, bytes)
1✔
325
        encoded_data = b'' if not message else message.encode('ascii')
1✔
326
        self.send_message('warning', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
1✔
327
        if close_connection:
1✔
328
            raise GracefulDisconnect
1✔
329

330
    def send_error(self, channel_id: bytes, message: str = None, *, force_close_channel=False):
1✔
331
        """Sends an error message and force closes the channel.
332

333
        Note:
334
        * channel_id is the temporary channel id when the channel id is not yet available
335

336
        A sending node:
337
        * SHOULD send error for protocol violations or internal errors that make channels
338
          unusable or that make further communication unusable.
339
        * SHOULD send error with the unknown channel_id in reply to messages of type
340
          32-255 related to unknown channels.
341
        * MUST fail the channel(s) referred to by the error message.
342
        * MAY set channel_id to all zero to indicate all channels.
343

344
        when failure was caused by an invalid signature check:
345
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
346
          funding_signed, closing_signed, or commitment_signed message.
347
        """
348
        assert isinstance(channel_id, bytes)
1✔
349
        encoded_data = b'' if not message else message.encode('ascii')
1✔
350
        self.send_message('error', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
1✔
351
        # MUST fail the channel(s) referred to by the error message:
352
        #  we may violate this with force_close_channel
353
        if force_close_channel:
1✔
354
            if channel_id in self.channels:
1✔
355
                self.schedule_force_closing(channel_id)
1✔
356
            elif channel_id == bytes(32):
×
357
                for cid in self.channels:
×
358
                    self.schedule_force_closing(cid)
×
359
        raise GracefulDisconnect
1✔
360

361
    def on_ping(self, payload):
1✔
362
        l = payload['num_pong_bytes']
1✔
363
        self.send_message('pong', byteslen=l)
1✔
364

365
    def on_pong(self, payload):
1✔
366
        self.pong_event.set()
1✔
367

368
    async def wait_for_message(self, expected_name: str, channel_id: bytes):
1✔
369
        q = self.ordered_message_queues[channel_id]
1✔
370
        name, payload = await util.wait_for2(q.get(), LN_P2P_NETWORK_TIMEOUT)
1✔
371
        # raise exceptions for errors, so that the caller sees them
372
        if (err_bytes := payload.get("error")) is not None:
1✔
373
            err_text = error_text_bytes_to_safe_str(err_bytes)
×
374
            raise GracefulDisconnect(
×
375
                f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {err_text}")
376
        if name != expected_name:
1✔
377
            raise Exception(f"Received unexpected '{name}'")
×
378
        return payload
1✔
379

380
    def on_init(self, payload):
1✔
381
        if self._received_init:
1✔
382
            self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
1✔
383
            return
1✔
384
        _their_features = int.from_bytes(payload['features'], byteorder="big")
1✔
385
        _their_features |= int.from_bytes(payload['globalfeatures'], byteorder="big")
1✔
386
        try:
1✔
387
            self.their_features = validate_features(_their_features)
1✔
388
        except IncompatibleOrInsaneFeatures as e:
×
389
            raise GracefulDisconnect(f"remote sent insane features: {repr(e)}")
×
390
        # check if features are compatible, and set self.features to what we negotiated
391
        try:
1✔
392
            self.features = ln_compare_features(self.features, self.their_features)
1✔
393
        except IncompatibleLightningFeatures as e:
×
394
            self.initialized.set_exception(e)
×
395
            raise GracefulDisconnect(f"{str(e)}")
×
396
        self.logger.info(
1✔
397
            f"received INIT with features={str(self.their_features.get_names())}. "
398
            f"negotiated={str(self.features)}")
399
        # check that they are on the same chain as us, if provided
400
        their_networks = payload["init_tlvs"].get("networks")
1✔
401
        if their_networks:
1✔
402
            their_chains = list(chunks(their_networks["chains"], 32))
1✔
403
            if constants.net.rev_genesis_bytes() not in their_chains:
1✔
404
                raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
×
405
        # all checks passed
406
        self.lnworker.on_peer_successfully_established(self)
1✔
407
        self._received_init = True
1✔
408
        self.maybe_set_initialized()
1✔
409

410
    def on_node_announcement(self, payload):
1✔
411
        if self.lnworker.uses_trampoline():
×
412
            return
×
413
        if self.our_gossip_timestamp_filter is None:
×
414
            return  # why is the peer sending this? should we disconnect?
×
415
        self.recv_gossip_queue.put_nowait(('node_announcement', payload))
×
416

417
    def on_channel_announcement(self, payload):
1✔
418
        if self.lnworker.uses_trampoline():
×
419
            return
×
420
        if self.our_gossip_timestamp_filter is None:
×
421
            return  # why is the peer sending this? should we disconnect?
×
422
        self.recv_gossip_queue.put_nowait(('channel_announcement', payload))
×
423

424
    def on_channel_update(self, payload):
1✔
425
        self.maybe_save_remote_update(payload)
1✔
426
        if self.lnworker.uses_trampoline():
1✔
427
            return
1✔
428
        if self.our_gossip_timestamp_filter is None:
1✔
429
            return  # why is the peer sending this? should we disconnect?
1✔
430
        self.recv_gossip_queue.put_nowait(('channel_update', payload))
×
431

432
    def on_query_channel_range(self, payload):
1✔
433
        if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
×
434
            return
×
435
        if not self._is_valid_channel_range_query(payload):
×
436
            return self.send_warning(bytes(32), "received invalid query_channel_range")
×
437
        if self.outgoing_gossip_reply:
×
438
            return self.send_warning(bytes(32), "received multiple queries at the same time")
×
439
        self.outgoing_gossip_reply = True
×
440
        self.recv_gossip_queue.put_nowait(('query_channel_range', payload))
×
441

442
    def on_query_short_channel_ids(self, payload):
1✔
443
        if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
×
444
            return
×
445
        if self.outgoing_gossip_reply:
×
446
            return self.send_warning(bytes(32), "received multiple queries at the same time")
×
447
        if not self._is_valid_short_channel_id_query(payload):
×
448
            return self.send_warning(bytes(32), "invalid query_short_channel_ids")
×
449
        self.outgoing_gossip_reply = True
×
450
        self.recv_gossip_queue.put_nowait(('query_short_channel_ids', payload))
×
451

452
    def on_gossip_timestamp_filter(self, payload):
1✔
453
        if self._should_forward_gossip():
×
454
            self.set_gossip_timestamp_filter(payload)
×
455

456
    def set_gossip_timestamp_filter(self, payload: dict) -> None:
1✔
457
        """Set the gossip_timestamp_filter for this peer. If the peer requested historical gossip,
458
        the request is put on the queue, otherwise only the forwarding loop will check the filter"""
459
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
460
            return
×
461
        filter = GossipTimestampFilter.from_payload(payload)
×
462
        self.their_gossip_timestamp_filter = filter
×
463
        self.logger.debug(f"got gossip_ts_filter from peer {self.pubkey.hex()}: "
×
464
                          f"{str(self.their_gossip_timestamp_filter)}")
465
        if filter and not filter.only_forwarding:
×
466
            self.recv_gossip_queue.put_nowait(('gossip_timestamp_filter', None))
×
467

468
    def maybe_save_remote_update(self, payload):
1✔
469
        if not self.channels:
1✔
470
            return
×
471
        for chan in self.channels.values():
1✔
472
            if payload['short_channel_id'] in [chan.short_channel_id, chan.get_local_scid_alias()]:
1✔
473
                chan.set_remote_update(payload)
1✔
474
                self.logger.info(f"saved remote channel_update gossip msg for chan {chan.get_id_for_log()}")
1✔
475
                break
1✔
476
        else:
477
            # Save (some bounded number of) orphan channel updates for later
478
            # as it might be for our own direct channel with this peer
479
            # (and we might not yet know the short channel id for that)
480
            # Background: this code is here to deal with a bug in LND,
481
            # see https://github.com/lightningnetwork/lnd/issues/3651 (closed 2022-08-13, lnd-v0.15.1)
482
            # and https://github.com/lightningnetwork/lightning-rfc/pull/657
483
            # This code assumes gossip_queries is set. BOLT7: "if the
484
            # gossip_queries feature is negotiated, [a node] MUST NOT
485
            # send gossip it did not generate itself"
486
            # NOTE: The definition of gossip_queries changed
487
            # https://github.com/lightning/bolts/commit/fce8bab931674a81a9ea895c9e9162e559e48a65
488
            short_channel_id = ShortChannelID(payload['short_channel_id'])
×
489
            self.logger.debug(f'received orphan channel update {short_channel_id}')
×
490
            self.orphan_channel_updates[short_channel_id] = payload
×
491
            while len(self.orphan_channel_updates) > 25:
×
492
                self.orphan_channel_updates.popitem(last=False)
×
493

494
    def on_announcement_signatures(self, chan: Channel, payload):
1✔
495
        h = chan.get_channel_announcement_hash()
×
496
        node_signature = payload["node_signature"]
×
497
        bitcoin_signature = payload["bitcoin_signature"]
×
498
        if not ECPubkey(chan.config[REMOTE].multisig_key.pubkey).ecdsa_verify(bitcoin_signature, h):
×
499
            raise Exception("bitcoin_sig invalid in announcement_signatures")
×
500
        if not ECPubkey(self.pubkey).ecdsa_verify(node_signature, h):
×
501
            raise Exception("node_sig invalid in announcement_signatures")
×
502
        chan.config[REMOTE].announcement_node_sig = node_signature
×
503
        chan.config[REMOTE].announcement_bitcoin_sig = bitcoin_signature
×
504
        self.lnworker.save_channel(chan)
×
505
        self.maybe_send_announcement_signatures(chan, is_reply=True)
×
506

507
    def handle_disconnect(func):
1✔
508
        @functools.wraps(func)
1✔
509
        async def wrapper_func(self, *args, **kwargs):
1✔
510
            try:
×
511
                return await func(self, *args, **kwargs)
×
512
            except GracefulDisconnect as e:
×
513
                self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
×
514
            except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
×
515
                    aiorpcx.socks.SOCKSError) as e:
516
                self.logger.info(f"Disconnecting: {repr(e)}")
×
517
            finally:
518
                self.close_and_cleanup()
×
519
        return wrapper_func
1✔
520

521
    @ignore_exceptions  # do not kill outer taskgroup
1✔
522
    @log_exceptions
1✔
523
    @handle_disconnect
1✔
524
    async def main_loop(self):
1✔
525
        async with self.taskgroup as group:
×
526
            await group.spawn(self._message_loop())
×
527
            await group.spawn(self._query_gossip())
×
528
            await group.spawn(self._process_gossip())
×
529
            await group.spawn(self._send_own_gossip())
×
530
            await group.spawn(self._forward_gossip())
×
531
            if self.network.lngossip != self.lnworker:
×
532
                await group.spawn(self.htlc_switch())
×
533

534
    async def _process_gossip(self):
1✔
535
        while True:
×
536
            await asyncio.sleep(5)
×
537
            if not self.network.lngossip:
×
538
                continue
×
539
            chan_anns = []
×
540
            chan_upds = []
×
541
            node_anns = []
×
542
            while True:
×
543
                name, payload = await self.recv_gossip_queue.get()
×
544
                if name == 'channel_announcement':
×
545
                    chan_anns.append(payload)
×
546
                elif name == 'channel_update':
×
547
                    chan_upds.append(payload)
×
548
                elif name == 'node_announcement':
×
549
                    node_anns.append(payload)
×
550
                elif name == 'query_channel_range':
×
551
                    await self.taskgroup.spawn(self._send_reply_channel_range(payload))
×
552
                elif name == 'query_short_channel_ids':
×
553
                    await self.taskgroup.spawn(self._send_reply_short_channel_ids(payload))
×
554
                elif name == 'gossip_timestamp_filter':
×
555
                    await self.taskgroup.spawn(self._handle_historical_gossip_request())
×
556
                else:
557
                    raise Exception('unknown message')
×
558
                if self.recv_gossip_queue.empty():
×
559
                    break
×
560
            if self.network.lngossip:
×
561
                await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
×
562

563
    async def _send_own_gossip(self):
1✔
564
        if self.lnworker == self.lnworker.network.lngossip:
×
565
            return
×
566
        await asyncio.sleep(10)
×
567
        while True:
×
568
            public_channels = [chan for chan in self.lnworker.channels.values() if chan.is_public()]
×
569
            if public_channels:
×
570
                alias = self.lnworker.config.LIGHTNING_NODE_ALIAS
×
571
                color = self.lnworker.config.LIGHTNING_NODE_COLOR_RGB
×
572
                self.send_node_announcement(alias, color)
×
573
                for chan in public_channels:
×
574
                    if chan.is_open() and chan.peer_state == PeerState.GOOD:
×
575
                        self.maybe_send_channel_announcement(chan)
×
576
            await asyncio.sleep(600)
×
577

578
    def _should_forward_gossip(self) -> bool:
1✔
579
        if (self.network.lngossip != self.lnworker
×
580
                and not self.lnworker.uses_trampoline()
581
                and self.features.supports(LnFeatures.GOSSIP_QUERIES_REQ)):
582
            return True
×
583
        return False
×
584

585
    async def _forward_gossip(self):
1✔
586
        if not self._should_forward_gossip():
×
587
            return
×
588

589
        async def send_new_gossip_with_semaphore(gossip: List[GossipForwardingMessage]):
×
590
            async with self.network.lngossip.gossip_request_semaphore:
×
591
                sent = await self._send_gossip_messages(gossip)
×
592
            if sent > 0:
×
593
                self.logger.debug(f"forwarded {sent} gossip messages to {self.pubkey.hex()}")
×
594

595
        lngossip = self.network.lngossip
×
596
        last_gossip_batch_ts = 0
×
597
        while True:
×
598
            await asyncio.sleep(10)
×
599
            if not self.their_gossip_timestamp_filter:
×
600
                continue  # peer didn't request gossip
×
601

602
            new_gossip, last_lngossip_refresh_ts = await lngossip.get_forwarding_gossip()
×
603
            if not last_lngossip_refresh_ts > last_gossip_batch_ts:
×
604
                continue  # no new batch available
×
605
            last_gossip_batch_ts = last_lngossip_refresh_ts
×
606

607
            await self.taskgroup.spawn(send_new_gossip_with_semaphore(new_gossip))
×
608

609
    async def _handle_historical_gossip_request(self):
1✔
610
        """Called when a peer requests historical gossip with a gossip_timestamp_filter query."""
611
        filter = self.their_gossip_timestamp_filter
×
612
        if not self._should_forward_gossip() or not filter or filter.only_forwarding:
×
613
            return
×
614
        async with self.network.lngossip.gossip_request_semaphore:
×
615
            requested_gossip = self.lnworker.channel_db.get_gossip_in_timespan(filter)
×
616
            filter.only_forwarding = True
×
617
            sent = await self._send_gossip_messages(requested_gossip)
×
618
            if sent > 0:
×
619
                self._num_gossip_messages_forwarded += sent
×
620
                #self.logger.debug(f"forwarded {sent} historical gossip messages to {self.pubkey.hex()}")
621

622
    async def _send_gossip_messages(self, messages: List[GossipForwardingMessage]) -> int:
1✔
623
        amount_sent = 0
×
624
        for msg in messages:
×
625
            if self.their_gossip_timestamp_filter.in_range(msg.timestamp) \
×
626
                and self.pubkey != msg.sender_node_id:
627
                await self.transport.send_bytes_and_drain(msg.msg)
×
628
                amount_sent += 1
×
629
                if amount_sent % 250 == 0:
×
630
                    # this can be a lot of messages, completely blocking the event loop
631
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
632
        return amount_sent
×
633

634
    async def _query_gossip(self):
1✔
635
        try:
×
636
            await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
637
        except Exception as e:
×
638
            raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
×
639
        if self.lnworker == self.lnworker.network.lngossip:
×
640
            if not self.their_features.supports(LnFeatures.GOSSIP_QUERIES_OPT):
×
641
                raise GracefulDisconnect("remote does not support gossip_queries, which we need")
×
642
            try:
×
643
                ids, complete = await util.wait_for2(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
×
644
            except asyncio.TimeoutError as e:
×
645
                raise GracefulDisconnect("query_channel_range timed out") from e
×
646
            self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
×
647
            await self.lnworker.add_new_ids(ids)
×
648
            self.request_gossip(int(time.time()))
×
649
            while True:
×
650
                todo = self.lnworker.get_ids_to_query()
×
651
                if not todo:
×
652
                    await asyncio.sleep(1)
×
653
                    continue
×
654
                await self.get_short_channel_ids(todo)
×
655

656
    @staticmethod
1✔
657
    def _is_valid_channel_range_query(payload: dict) -> bool:
1✔
658
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
659
            return False
×
660
        if payload.get('first_blocknum', -1) < constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS:
×
661
            return False
×
662
        if payload.get('number_of_blocks', 0) < 1:
×
663
            return False
×
664
        return True
×
665

666
    def _is_valid_short_channel_id_query(self, payload: dict) -> bool:
1✔
667
        if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
×
668
            return False
×
669
        enc_short_ids = payload['encoded_short_ids']
×
670
        if enc_short_ids[0] != 0:
×
671
            self.logger.debug(f"got query_short_channel_ids with invalid encoding: {repr(enc_short_ids[0])}")
×
672
            return False
×
673
        if (len(enc_short_ids) - 1) % 8 != 0:
×
674
            self.logger.debug(f"got query_short_channel_ids with invalid length")
×
675
            return False
×
676
        return True
×
677

678
    async def _send_reply_channel_range(self, payload: dict):
1✔
679
        """https://github.com/lightning/bolts/blob/acd383145dd8c3fecd69ce94e4a789767b984ac0/07-routing-gossip.md#requirements-5"""
680
        first_blockheight: int = payload['first_blocknum']
×
681

682
        async with self.network.lngossip.gossip_request_semaphore:
×
683
            sorted_scids: List[ShortChannelID] = self.lnworker.channel_db.get_channels_in_range(
×
684
                first_blockheight,
685
                payload['number_of_blocks']
686
            )
687
            self.logger.debug(f"reply_channel_range to request "
×
688
                              f"first_height={first_blockheight}, "
689
                              f"num_blocks={payload['number_of_blocks']}, "
690
                              f"sending {len(sorted_scids)} scids")
691

692
            complete: bool = False
×
693
            while not complete:
×
694
                # create a 64800 byte chunk of skids, split the remaining scids
695
                encoded_scids, sorted_scids = b''.join(sorted_scids[:8100]), sorted_scids[8100:]
×
696
                complete = len(sorted_scids) == 0  # if there are no scids remaining we are done
×
697
                # number of blocks covered by the scids in this chunk
698
                if complete:
×
699
                    # LAST MESSAGE MUST have first_blocknum plus number_of_blocks equal or greater than
700
                    # the query_channel_range first_blocknum plus number_of_blocks.
701
                    number_of_blocks = ((payload['first_blocknum'] + payload['number_of_blocks'])
×
702
                                        - first_blockheight)
703
                else:
704
                    # we cover the range until the height of the first scid in the next chunk
705
                    number_of_blocks = sorted_scids[0].block_height - first_blockheight
×
706
                self.send_message('reply_channel_range',
×
707
                    chain_hash=constants.net.rev_genesis_bytes(),
708
                    first_blocknum=first_blockheight,
709
                    number_of_blocks=number_of_blocks,
710
                    sync_complete=complete,
711
                    len=1+len(encoded_scids),
712
                    encoded_short_ids=b'\x00' + encoded_scids)
713
                if not complete:
×
714
                    first_blockheight = sorted_scids[0].block_height
×
715
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
716
            self.outgoing_gossip_reply = False
×
717

718
    async def get_channel_range(self):
1✔
719
        self.reply_channel_range = asyncio.Queue()
×
720
        first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
×
721
        num_blocks = self.lnworker.network.get_local_height() - first_block
×
722
        self.query_channel_range(first_block, num_blocks)
×
723
        intervals = []
×
724
        ids = set()
×
725
        # note: implementations behave differently...
726
        # "sane implementation that follows BOLT-07" example:
727
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
728
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
729
        #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
730
        #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
731
        #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
732
        # lnd example:
733
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
734
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
735
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
736
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
737
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
738
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
739
        # ADDENDUM (01/2025): now it's 'MUST set sync_complete to false if this is not the final reply_channel_range.'
740
        while True:
×
741
            index, num, complete, _ids = await self.reply_channel_range.get()
×
742
            ids.update(_ids)
×
743
            intervals.append((index, index+num))
×
744
            intervals.sort()
×
745
            while len(intervals) > 1:
×
746
                a,b = intervals[0]
×
747
                c,d = intervals[1]
×
748
                if not (a <= c and a <= b and c <= d):
×
749
                    raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
×
750
                if b >= c:
×
751
                    intervals = [(a,d)] + intervals[2:]
×
752
                else:
753
                    break
×
754
            if len(intervals) == 1 and complete:
×
755
                a, b = intervals[0]
×
756
                if a <= first_block and b >= first_block + num_blocks:
×
757
                    break
×
758
        self.reply_channel_range = None
×
759
        return ids, complete
×
760

761
    def request_gossip(self, timestamp=0):
1✔
762
        if timestamp == 0:
×
763
            self.logger.info('requesting whole channel graph')
×
764
        else:
765
            self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).isoformat()}')
×
766
        timestamp_range = 0xFFFFFFFF
×
767
        self.our_gossip_timestamp_filter = GossipTimestampFilter(
×
768
            first_timestamp=timestamp,
769
            timestamp_range=timestamp_range,
770
        )
771
        self.send_message(
×
772
            'gossip_timestamp_filter',
773
            chain_hash=constants.net.rev_genesis_bytes(),
774
            first_timestamp=timestamp,
775
            timestamp_range=timestamp_range,
776
        )
777

778
    def query_channel_range(self, first_block, num_blocks):
1✔
779
        self.logger.info(f'query channel range {first_block} {num_blocks}')
×
780
        self.send_message(
×
781
            'query_channel_range',
782
            chain_hash=constants.net.rev_genesis_bytes(),
783
            first_blocknum=first_block,
784
            number_of_blocks=num_blocks)
785

786
    def decode_short_ids(self, encoded):
1✔
787
        if encoded[0] == 0:
×
788
            decoded = encoded[1:]
×
789
        elif encoded[0] == 1:
×
790
            decoded = zlib.decompress(encoded[1:])
×
791
        else:
792
            raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
×
793
        ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
×
794
        return ids
×
795

796
    async def on_reply_channel_range(self, payload):
1✔
797
        first = payload['first_blocknum']
×
798
        num = payload['number_of_blocks']
×
799
        complete = bool(int.from_bytes(payload['sync_complete'], 'big'))
×
800
        encoded = payload['encoded_short_ids']
×
801
        ids = self.decode_short_ids(encoded)
×
802
        # self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, "
803
        #                  f"num_ids {len(ids)}, complete {complete}")
804
        if self.reply_channel_range is None:
×
805
            raise Exception("received 'reply_channel_range' without corresponding 'query_channel_range'")
×
806
        while self.reply_channel_range.qsize() > 10:
×
807
            # we block process_message until the queue gets consumed
808
            self.logger.info("reply_channel_range queue is overflowing. sleeping...")
×
809
            await asyncio.sleep(0.1)
×
810
        self.reply_channel_range.put_nowait((first, num, complete, ids))
×
811

812
    async def _send_reply_short_channel_ids(self, payload: dict):
1✔
813
        async with self.network.lngossip.gossip_request_semaphore:
×
814
            requested_scids = payload['encoded_short_ids']
×
815
            decoded_scids = [ShortChannelID.normalize(scid)
×
816
                             for scid in self.decode_short_ids(requested_scids)]
817
            self.logger.debug(f"serving query_short_channel_ids request: "
×
818
                              f"requested {len(decoded_scids)} scids")
819
            chan_db = self.lnworker.channel_db
×
820
            response: Set[bytes] = set()
×
821
            for scid in decoded_scids:
×
822
                requested_msgs = chan_db.get_gossip_for_scid_request(scid)
×
823
                response.update(requested_msgs)
×
824
            self.logger.debug(f"found {len(response)} gossip messages to serve scid request")
×
825
            for index, msg in enumerate(response):
×
826
                await self.transport.send_bytes_and_drain(msg)
×
827
                if index % 250 == 0:
×
828
                    await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
×
829
            self.send_message(
×
830
                'reply_short_channel_ids_end',
831
                chain_hash=constants.net.rev_genesis_bytes(),
832
                full_information=self.network.lngossip.is_synced()
833
            )
834
            self.outgoing_gossip_reply = False
×
835

836
    async def get_short_channel_ids(self, ids):
1✔
837
        #self.logger.info(f'Querying {len(ids)} short_channel_ids')
838
        assert not self.querying.is_set()
×
839
        self.query_short_channel_ids(ids)
×
840
        await self.querying.wait()
×
841
        self.querying.clear()
×
842

843
    def query_short_channel_ids(self, ids):
1✔
844
        # compression MUST NOT be used according to updated bolt
845
        # (https://github.com/lightning/bolts/pull/981)
846
        ids = sorted(ids)
×
847
        s = b''.join(ids)
×
848
        prefix = b'\x00'  # uncompressed
×
849
        self.send_message(
×
850
            'query_short_channel_ids',
851
            chain_hash=constants.net.rev_genesis_bytes(),
852
            len=1+len(s),
853
            encoded_short_ids=prefix+s)
854

855
    async def _message_loop(self):
1✔
856
        try:
1✔
857
            await util.wait_for2(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
1✔
858
        except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
×
859
            raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
×
860
        async for msg in self.transport.read_messages():
1✔
861
            await self._process_message(msg)
1✔
862
            if self.DELAY_INC_MSG_PROCESSING_SLEEP:
1✔
863
                # rate-limit message-processing a bit, to make it harder
864
                # for a single peer to bog down the event loop / cpu:
865
                await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
1✔
866
            # If receiving too much gossip from this peer, we need to slow them down.
867
            # note: if the gossip queue gets full, we will disconnect from them
868
            #       and throw away unprocessed gossip.
869
            if self.recv_gossip_queue.qsize() > self.RECV_GOSSIP_QUEUE_SOFT_MAXSIZE:
1✔
870
                sleep = self.recv_gossip_queue.qsize() / 1000
×
871
                self.logger.debug(
×
872
                    f"message_loop sleeping due to getting much gossip. qsize={self.recv_gossip_queue.qsize()}. "
873
                    f"waiting for existing gossip data to be processed first.")
874
                await asyncio.sleep(sleep)
×
875

876
    def on_reply_short_channel_ids_end(self, payload):
1✔
877
        self.querying.set()
×
878

879
    def close_and_cleanup(self):
1✔
880
        # note: This method might get called multiple times!
881
        #       E.g. if you call close_and_cleanup() to cause a disconnection from the peer,
882
        #       it will get called a second time in handle_disconnect().
883
        self.unregister_callbacks()
×
884
        try:
×
885
            if self.transport:
×
886
                self.transport.close()
×
887
        except Exception:
×
888
            pass
×
889
        self.lnworker.peer_closed(self)
×
890
        self.got_disconnected.set()
×
891

892
    def is_shutdown_anysegwit(self):
1✔
893
        return self.features.supports(LnFeatures.OPTION_SHUTDOWN_ANYSEGWIT_OPT)
1✔
894

895
    def is_channel_type(self):
1✔
896
        return self.features.supports(LnFeatures.OPTION_CHANNEL_TYPE_OPT)
×
897

898
    def accepts_zeroconf(self):
1✔
899
        return self.features.supports(LnFeatures.OPTION_ZEROCONF_OPT)
1✔
900

901
    def is_upfront_shutdown_script(self):
1✔
902
        return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
1✔
903

904
    def use_anchors(self) -> bool:
1✔
905
        return self.features.supports(LnFeatures.OPTION_ANCHORS_ZERO_FEE_HTLC_OPT)
×
906

907
    def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
1✔
908
        if msg_identifier not in ['accept', 'open']:
×
909
            raise ValueError("msg_identifier must be either 'accept' or 'open'")
×
910

911
        uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
×
912
            'upfront_shutdown_script')
913

914
        if uss_tlv and self.is_upfront_shutdown_script():
×
915
            upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
×
916
        else:
917
            upfront_shutdown_script = b''
×
918
        self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
×
919
        return upfront_shutdown_script
×
920

921
    def make_local_config(
1✔
922
        self,
923
        *,
924
        funding_sat: int,
925
        push_msat: int,
926
        initiator: HTLCOwner,
927
        channel_type: ChannelType,
928
        multisig_funding_keypair: Optional[Keypair],  # if None, will get derived from channel_seed
929
    ) -> LocalConfig:
930
        channel_seed = os.urandom(32)
×
931
        initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
×
932

933
        # sending empty bytes as the upfront_shutdown_script will give us the
934
        # flexibility to decide an address at closing time
935
        upfront_shutdown_script = b''
×
936

937
        if self.use_anchors():
×
938
            static_payment_key = self.lnworker.static_payment_key
×
939
            static_remotekey = None
×
940
        else:
941
            assert channel_type & channel_type.OPTION_STATIC_REMOTEKEY
×
942
            wallet = self.lnworker.wallet
×
943
            assert wallet.txin_type == 'p2wpkh'
×
944
            addr = wallet.get_new_sweep_address_for_channel()
×
945
            static_payment_key = None
×
946
            static_remotekey = bytes.fromhex(wallet.get_public_key(addr))
×
947

948
        if multisig_funding_keypair:
×
949
            for chan in self.lnworker.channels.values():  # check against all chans of lnworker, for sanity
×
950
                if multisig_funding_keypair.pubkey == chan.config[LOCAL].multisig_key.pubkey:
×
951
                    raise Exception(
×
952
                        "Refusing to reuse multisig_funding_keypair for new channel. "
953
                        "Wait one block before opening another channel with this peer."
954
                    )
955

956
        dust_limit_sat = bitcoin.DUST_LIMIT_P2PKH
×
957
        reserve_sat = max(funding_sat // 100, dust_limit_sat)
×
958
        # for comparison of defaults, see
959
        # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
960
        # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
961
        # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
962
        max_htlc_value_in_flight_msat = self.network.config.LIGHTNING_MAX_HTLC_VALUE_IN_FLIGHT_MSAT or funding_sat * 1000
×
963
        local_config = LocalConfig.from_seed(
×
964
            channel_seed=channel_seed,
965
            static_remotekey=static_remotekey,
966
            static_payment_key=static_payment_key,
967
            multisig_key=multisig_funding_keypair,
968
            upfront_shutdown_script=upfront_shutdown_script,
969
            to_self_delay=self.network.config.LIGHTNING_TO_SELF_DELAY_CSV,
970
            dust_limit_sat=dust_limit_sat,
971
            max_htlc_value_in_flight_msat=max_htlc_value_in_flight_msat,
972
            max_accepted_htlcs=30,
973
            initial_msat=initial_msat,
974
            reserve_sat=reserve_sat,
975
            funding_locked_received=False,
976
            current_commitment_signature=None,
977
            current_htlc_signatures=b'',
978
            htlc_minimum_msat=1,
979
            announcement_node_sig=b'',
980
            announcement_bitcoin_sig=b'',
981
        )
982
        local_config.validate_params(funding_sat=funding_sat, config=self.network.config, peer_features=self.features)
×
983
        return local_config
×
984

985
    def temporarily_reserve_funding_tx_change_address(func):
1✔
986
        # During the channel open flow, if we initiated, we might have used a change address
987
        # of ours in the funding tx. The funding tx is not part of the wallet history
988
        # at that point yet, but we should already consider this change address as 'used'.
989
        @functools.wraps(func)
1✔
990
        async def wrapper(self: 'Peer', *args, **kwargs):
1✔
991
            funding_tx = kwargs['funding_tx']  # type: PartialTransaction
×
992
            wallet = self.lnworker.wallet
×
993
            change_addresses = [txout.address for txout in funding_tx.outputs()
×
994
                                if wallet.is_change(txout.address)]
995
            for addr in change_addresses:
×
996
                wallet.set_reserved_state_of_address(addr, reserved=True)
×
997
            try:
×
998
                return await func(self, *args, **kwargs)
×
999
            finally:
1000
                for addr in change_addresses:
×
1001
                    self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
×
1002
        return wrapper
1✔
1003

1004
    @temporarily_reserve_funding_tx_change_address
1✔
1005
    async def channel_establishment_flow(
1✔
1006
            self, *,
1007
            funding_tx: 'PartialTransaction',
1008
            funding_sat: int,
1009
            push_msat: int,
1010
            public: bool,
1011
            zeroconf: bool = False,
1012
            temp_channel_id: bytes,
1013
            opening_fee: int = None,
1014
    ) -> Tuple[Channel, 'PartialTransaction']:
1015
        """Implements the channel opening flow.
1016

1017
        -> open_channel message
1018
        <- accept_channel message
1019
        -> funding_created message
1020
        <- funding_signed message
1021

1022
        Channel configurations are initialized in this method.
1023
        """
1024

1025
        if public and not self.lnworker.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS:
×
1026
            raise Exception('Cannot create public channels')
×
1027

1028
        if not self.lnworker.wallet.can_have_lightning():
×
1029
            # old wallet that cannot have lightning anymore
1030
            raise Exception('This wallet cannot create new channels')
×
1031

1032
        # will raise if init fails
1033
        await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
1034
        # trampoline is not yet in features
1035
        if self.lnworker.uses_trampoline() and not self.lnworker.is_trampoline_peer(self.pubkey):
×
1036
            raise Exception('Not a trampoline node: ' + str(self.their_features))
×
1037

1038
        channel_flags = CF_ANNOUNCE_CHANNEL if public else 0
×
1039
        feerate: Optional[int] = self.lnworker.current_target_feerate_per_kw(
×
1040
            has_anchors=self.use_anchors()
1041
        )
1042
        if feerate is None:
×
1043
            raise NoDynamicFeeEstimates()
×
1044
        # we set a channel type for internal bookkeeping
1045
        open_channel_tlvs = {}
×
1046
        assert self.their_features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
×
1047
        our_channel_type = ChannelType(ChannelType.OPTION_STATIC_REMOTEKEY)
×
1048
        if self.use_anchors():
×
1049
            our_channel_type |= ChannelType(ChannelType.OPTION_ANCHORS_ZERO_FEE_HTLC_TX)
×
1050
        if zeroconf:
×
1051
            our_channel_type |= ChannelType(ChannelType.OPTION_ZEROCONF)
×
1052
        # We do not set the option_scid_alias bit in channel_type because LND rejects it.
1053
        # Eclair accepts channel_type with that bit, but does not require it.
1054

1055
        # if option_channel_type is negotiated: MUST set channel_type
1056
        if self.is_channel_type():
×
1057
            # if it includes channel_type: MUST set it to a defined type representing the type it wants.
1058
            open_channel_tlvs['channel_type'] = {
×
1059
                'type': our_channel_type.to_bytes_minimal()
1060
            }
1061

1062
        if self.use_anchors():
×
1063
            multisig_funding_keypair = lnutil.derive_multisig_funding_key_if_we_opened(
×
1064
                funding_root_secret=self.lnworker.funding_root_keypair.privkey,
1065
                remote_node_id_or_prefix=self.pubkey,
1066
                nlocktime=funding_tx.locktime,
1067
            )
1068
        else:
1069
            multisig_funding_keypair = None
×
1070
        local_config = self.make_local_config(
×
1071
            funding_sat=funding_sat,
1072
            push_msat=push_msat,
1073
            initiator=LOCAL,
1074
            channel_type=our_channel_type,
1075
            multisig_funding_keypair=multisig_funding_keypair,
1076
        )
1077
        # if it includes open_channel_tlvs: MUST include upfront_shutdown_script.
1078
        open_channel_tlvs['upfront_shutdown_script'] = {
×
1079
            'shutdown_scriptpubkey': local_config.upfront_shutdown_script
1080
        }
1081
        if opening_fee:
×
1082
            # todo: maybe add payment hash
1083
            open_channel_tlvs['channel_opening_fee'] = {
×
1084
                'channel_opening_fee': opening_fee
1085
            }
1086
        # for the first commitment transaction
1087
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
1088
            local_config.per_commitment_secret_seed,
1089
            RevocationStore.START_INDEX
1090
        )
1091
        per_commitment_point_first = secret_to_pubkey(
×
1092
            int.from_bytes(per_commitment_secret_first, 'big'))
1093

1094
        # store the temp id now, so that it is recognized for e.g. 'error' messages
1095
        self.temp_id_to_id[temp_channel_id] = None
×
1096
        self._cleanup_temp_channelids()
×
1097
        self.send_message(
×
1098
            "open_channel",
1099
            temporary_channel_id=temp_channel_id,
1100
            chain_hash=constants.net.rev_genesis_bytes(),
1101
            funding_satoshis=funding_sat,
1102
            push_msat=push_msat,
1103
            dust_limit_satoshis=local_config.dust_limit_sat,
1104
            feerate_per_kw=feerate,
1105
            max_accepted_htlcs=local_config.max_accepted_htlcs,
1106
            funding_pubkey=local_config.multisig_key.pubkey,
1107
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
1108
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
1109
            payment_basepoint=local_config.payment_basepoint.pubkey,
1110
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
1111
            first_per_commitment_point=per_commitment_point_first,
1112
            to_self_delay=local_config.to_self_delay,
1113
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
1114
            channel_flags=channel_flags,
1115
            channel_reserve_satoshis=local_config.reserve_sat,
1116
            htlc_minimum_msat=local_config.htlc_minimum_msat,
1117
            open_channel_tlvs=open_channel_tlvs,
1118
        )
1119

1120
        # <- accept_channel
1121
        payload = await self.wait_for_message('accept_channel', temp_channel_id)
×
1122
        self.logger.debug(f"received accept_channel for temp_channel_id={temp_channel_id.hex()}. {payload=}")
×
1123
        remote_per_commitment_point = payload['first_per_commitment_point']
×
1124
        funding_txn_minimum_depth = payload['minimum_depth']
×
1125
        if not zeroconf and funding_txn_minimum_depth <= 0:
×
1126
            raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
×
1127
        if funding_txn_minimum_depth > 30:
×
1128
            raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
×
1129

1130
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
1131
            payload, 'accept')
1132

1133
        accept_channel_tlvs = payload.get('accept_channel_tlvs')
×
1134
        their_channel_type = accept_channel_tlvs.get('channel_type') if accept_channel_tlvs else None
×
1135
        if their_channel_type:
×
1136
            their_channel_type = ChannelType.from_bytes(their_channel_type['type'], byteorder='big').discard_unknown_and_check()
×
1137
            # if channel_type is set, and channel_type was set in open_channel,
1138
            # and they are not equal types: MUST reject the channel.
1139
            if open_channel_tlvs.get('channel_type') is not None and their_channel_type != our_channel_type:
×
1140
                raise Exception("Channel type is not the one that we sent.")
×
1141

1142
        remote_config = RemoteConfig(
×
1143
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
1144
            multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
1145
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
1146
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
1147
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
1148
            to_self_delay=payload['to_self_delay'],
1149
            dust_limit_sat=payload['dust_limit_satoshis'],
1150
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
1151
            max_accepted_htlcs=payload["max_accepted_htlcs"],
1152
            initial_msat=push_msat,
1153
            reserve_sat=payload["channel_reserve_satoshis"],
1154
            htlc_minimum_msat=payload['htlc_minimum_msat'],
1155
            next_per_commitment_point=remote_per_commitment_point,
1156
            current_per_commitment_point=None,
1157
            upfront_shutdown_script=upfront_shutdown_script,
1158
            announcement_node_sig=b'',
1159
            announcement_bitcoin_sig=b'',
1160
        )
1161
        ChannelConfig.cross_validate_params(
×
1162
            local_config=local_config,
1163
            remote_config=remote_config,
1164
            funding_sat=funding_sat,
1165
            is_local_initiator=True,
1166
            initial_feerate_per_kw=feerate,
1167
            config=self.network.config,
1168
            peer_features=self.features,
1169
            has_anchors=self.use_anchors(),
1170
        )
1171

1172
        # -> funding created
1173
        # replace dummy output in funding tx
1174
        redeem_script = funding_output_script(local_config, remote_config)
×
1175
        funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
×
1176
        funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
×
1177
        funding_tx.replace_output_address(DummyAddress.CHANNEL, funding_address)
×
1178
        # find and encrypt op_return data associated to funding_address
1179
        has_onchain_backup = self.lnworker and self.lnworker.has_recoverable_channels()
×
1180
        if has_onchain_backup:
×
1181
            backup_data = self.lnworker.cb_data(self.pubkey)
×
1182
            dummy_scriptpubkey = make_op_return(backup_data)
×
1183
            for o in funding_tx.outputs():
×
1184
                if o.scriptpubkey == dummy_scriptpubkey:
×
1185
                    encrypted_data = self.lnworker.encrypt_cb_data(backup_data, funding_address)
×
1186
                    assert len(encrypted_data) == len(backup_data)
×
1187
                    o.scriptpubkey = make_op_return(encrypted_data)
×
1188
                    break
×
1189
            else:
1190
                raise Exception('op_return output not found in funding tx')
×
1191
        # must not be malleable
1192
        funding_tx.set_rbf(False)
×
1193
        if not funding_tx.is_segwit():
×
1194
            raise Exception('Funding transaction is not segwit')
×
1195
        funding_txid = funding_tx.txid()
×
1196
        assert funding_txid
×
1197
        funding_index = funding_tx.outputs().index(funding_output)
×
1198
        # build remote commitment transaction
1199
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
×
1200
        outpoint = Outpoint(funding_txid, funding_index)
×
1201
        constraints = ChannelConstraints(
×
1202
            flags=channel_flags,
1203
            capacity=funding_sat,
1204
            is_initiator=True,
1205
            funding_txn_minimum_depth=funding_txn_minimum_depth
1206
        )
1207
        storage = self.create_channel_storage(
×
1208
            channel_id, outpoint, local_config, remote_config, constraints, our_channel_type)
1209
        chan = Channel(
×
1210
            storage,
1211
            lnworker=self.lnworker,
1212
            initial_feerate=feerate
1213
        )
1214
        chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
×
1215
        chan.storage['has_onchain_backup'] = has_onchain_backup
×
1216
        chan.storage['init_timestamp'] = int(time.time())
×
1217
        if isinstance(self.transport, LNTransport):
×
1218
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
1219
        sig_64, _ = chan.sign_next_commitment()
×
1220
        self.temp_id_to_id[temp_channel_id] = channel_id
×
1221

1222
        self.send_message("funding_created",
×
1223
            temporary_channel_id=temp_channel_id,
1224
            funding_txid=funding_txid_bytes,
1225
            funding_output_index=funding_index,
1226
            signature=sig_64)
1227
        self.funding_created_sent.add(channel_id)
×
1228

1229
        # <- funding signed
1230
        payload = await self.wait_for_message('funding_signed', channel_id)
×
1231
        self.logger.info('received funding_signed')
×
1232
        remote_sig = payload['signature']
×
1233
        try:
×
1234
            chan.receive_new_commitment(remote_sig, [])
×
1235
        except LNProtocolWarning as e:
×
1236
            self.send_warning(channel_id, message=str(e), close_connection=True)
×
1237
        chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
×
1238
        chan.set_state(ChannelState.OPENING)
×
1239
        if zeroconf:
×
1240
            chan.set_state(ChannelState.FUNDED)
×
1241
            self.send_channel_ready(chan)
×
1242
        self.lnworker.add_new_channel(chan)
×
1243
        return chan, funding_tx
×
1244

1245
    def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints, channel_type):
1✔
1246
        chan_dict = {
×
1247
            "node_id": self.pubkey.hex(),
1248
            "channel_id": channel_id.hex(),
1249
            "short_channel_id": None,
1250
            "funding_outpoint": outpoint,
1251
            "remote_config": remote_config,
1252
            "local_config": local_config,
1253
            "constraints": constraints,
1254
            "remote_update": None,
1255
            "state": ChannelState.PREOPENING.name,
1256
            'onion_keys': {},
1257
            'data_loss_protect_remote_pcp': {},
1258
            "log": {},
1259
            "unfulfilled_htlcs": {},
1260
            "revocation_store": {},
1261
            "channel_type": channel_type,
1262
        }
1263
        return StoredDict(chan_dict, self.lnworker.db)
×
1264

1265
    @non_blocking_msg_handler
1✔
1266
    async def on_open_channel(self, payload):
1✔
1267
        """Implements the channel acceptance flow.
1268

1269
        <- open_channel message
1270
        -> accept_channel message
1271
        <- funding_created message
1272
        -> funding_signed message
1273

1274
        Channel configurations are initialized in this method.
1275
        """
1276

1277
        # <- open_channel
1278
        if payload['chain_hash'] != constants.net.rev_genesis_bytes():
×
1279
            raise Exception('wrong chain_hash')
×
1280

1281
        open_channel_tlvs = payload.get('open_channel_tlvs')
×
1282
        channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None
×
1283
        # The receiving node MAY fail the channel if:
1284
        # option_channel_type was negotiated but the message doesn't include a channel_type
1285
        if self.is_channel_type() and channel_type is None:
×
1286
            raise Exception("sender has advertised option_channel_type, but hasn't sent the channel type")
×
1287
        # MUST fail the channel if it supports channel_type,
1288
        # channel_type was set, and the type is not suitable.
1289
        elif self.is_channel_type() and channel_type is not None:
×
1290
            channel_type = ChannelType.from_bytes(channel_type['type'], byteorder='big').discard_unknown_and_check()
×
1291
            if not channel_type.complies_with_features(self.features):
×
1292
                raise Exception("sender has sent a channel type we don't support")
×
1293

1294
        if self.is_channel_type():
×
1295
            is_zeroconf = bool(channel_type & ChannelType.OPTION_ZEROCONF)
×
1296
            if is_zeroconf and not self.network.config.ZEROCONF_TRUSTED_NODE.startswith(self.pubkey.hex()):
×
1297
                raise Exception(f"not accepting zeroconf from node {self.pubkey}")
×
1298
        else:
1299
            is_zeroconf = False
×
1300

1301
        if self.lnworker.has_recoverable_channels() and not is_zeroconf:
×
1302
            # FIXME: we might want to keep the connection open
1303
            raise Exception('not accepting channels')
×
1304

1305
        if not self.lnworker.wallet.can_have_lightning():
×
1306
            # old wallet that cannot have lightning anymore
1307
            raise Exception('This wallet does not accept new channels')
×
1308

1309
        funding_sat = payload['funding_satoshis']
×
1310
        push_msat = payload['push_msat']
×
1311
        feerate = payload['feerate_per_kw']  # note: we are not validating this
×
1312
        temp_chan_id = payload['temporary_channel_id']
×
1313
        # store the temp id now, so that it is recognized for e.g. 'error' messages
1314
        self.temp_id_to_id[temp_chan_id] = None
×
1315
        self._cleanup_temp_channelids()
×
1316
        channel_opening_fee = open_channel_tlvs.get('channel_opening_fee') if open_channel_tlvs else None
×
1317
        if channel_opening_fee:
×
1318
            # todo check that the fee is reasonable
1319
            pass
×
1320

1321
        if self.use_anchors():
×
1322
            multisig_funding_keypair = lnutil.derive_multisig_funding_key_if_they_opened(
×
1323
                funding_root_secret=self.lnworker.funding_root_keypair.privkey,
1324
                remote_node_id_or_prefix=self.pubkey,
1325
                remote_funding_pubkey=payload['funding_pubkey'],
1326
            )
1327
        else:
1328
            multisig_funding_keypair = None
×
1329
        local_config = self.make_local_config(
×
1330
            funding_sat=funding_sat,
1331
            push_msat=push_msat,
1332
            initiator=REMOTE,
1333
            channel_type=channel_type,
1334
            multisig_funding_keypair=multisig_funding_keypair,
1335
        )
1336

1337
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
1338
            payload, 'open')
1339

1340
        remote_config = RemoteConfig(
×
1341
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
1342
            multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
1343
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
1344
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
1345
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
1346
            to_self_delay=payload['to_self_delay'],
1347
            dust_limit_sat=payload['dust_limit_satoshis'],
1348
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
1349
            max_accepted_htlcs=payload['max_accepted_htlcs'],
1350
            initial_msat=funding_sat * 1000 - push_msat,
1351
            reserve_sat=payload['channel_reserve_satoshis'],
1352
            htlc_minimum_msat=payload['htlc_minimum_msat'],
1353
            next_per_commitment_point=payload['first_per_commitment_point'],
1354
            current_per_commitment_point=None,
1355
            upfront_shutdown_script=upfront_shutdown_script,
1356
            announcement_node_sig=b'',
1357
            announcement_bitcoin_sig=b'',
1358
        )
1359
        ChannelConfig.cross_validate_params(
×
1360
            local_config=local_config,
1361
            remote_config=remote_config,
1362
            funding_sat=funding_sat,
1363
            is_local_initiator=False,
1364
            initial_feerate_per_kw=feerate,
1365
            config=self.network.config,
1366
            peer_features=self.features,
1367
            has_anchors=self.use_anchors(),
1368
        )
1369

1370
        channel_flags = ord(payload['channel_flags'])
×
1371

1372
        # -> accept channel
1373
        # for the first commitment transaction
1374
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
1375
            local_config.per_commitment_secret_seed,
1376
            RevocationStore.START_INDEX
1377
        )
1378
        per_commitment_point_first = secret_to_pubkey(
×
1379
            int.from_bytes(per_commitment_secret_first, 'big'))
1380

1381
        min_depth = 0 if is_zeroconf else 3
×
1382

1383
        accept_channel_tlvs = {
×
1384
            'upfront_shutdown_script': {
1385
                'shutdown_scriptpubkey': local_config.upfront_shutdown_script
1386
            },
1387
        }
1388
        # The sender: if it sets channel_type: MUST set it to the channel_type from open_channel
1389
        if self.is_channel_type():
×
1390
            accept_channel_tlvs['channel_type'] = {
×
1391
                'type': channel_type.to_bytes_minimal()
1392
            }
1393

1394
        self.send_message(
×
1395
            'accept_channel',
1396
            temporary_channel_id=temp_chan_id,
1397
            dust_limit_satoshis=local_config.dust_limit_sat,
1398
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
1399
            channel_reserve_satoshis=local_config.reserve_sat,
1400
            htlc_minimum_msat=local_config.htlc_minimum_msat,
1401
            minimum_depth=min_depth,
1402
            to_self_delay=local_config.to_self_delay,
1403
            max_accepted_htlcs=local_config.max_accepted_htlcs,
1404
            funding_pubkey=local_config.multisig_key.pubkey,
1405
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
1406
            payment_basepoint=local_config.payment_basepoint.pubkey,
1407
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
1408
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
1409
            first_per_commitment_point=per_commitment_point_first,
1410
            accept_channel_tlvs=accept_channel_tlvs,
1411
        )
1412

1413
        # <- funding created
1414
        funding_created = await self.wait_for_message('funding_created', temp_chan_id)
×
1415

1416
        # -> funding signed
1417
        funding_idx = funding_created['funding_output_index']
×
1418
        funding_txid = funding_created['funding_txid'][::-1].hex()
×
1419
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
×
1420
        constraints = ChannelConstraints(
×
1421
            flags=channel_flags,
1422
            capacity=funding_sat,
1423
            is_initiator=False,
1424
            funding_txn_minimum_depth=min_depth,
1425
        )
1426
        outpoint = Outpoint(funding_txid, funding_idx)
×
1427
        chan_dict = self.create_channel_storage(
×
1428
            channel_id, outpoint, local_config, remote_config, constraints, channel_type)
1429
        chan = Channel(
×
1430
            chan_dict,
1431
            lnworker=self.lnworker,
1432
            initial_feerate=feerate,
1433
            opening_fee = channel_opening_fee,
1434
        )
1435
        chan.storage['init_timestamp'] = int(time.time())
×
1436
        if isinstance(self.transport, LNTransport):
×
1437
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
1438
        remote_sig = funding_created['signature']
×
1439
        try:
×
1440
            chan.receive_new_commitment(remote_sig, [])
×
1441
        except LNProtocolWarning as e:
×
1442
            self.send_warning(channel_id, message=str(e), close_connection=True)
×
1443
        sig_64, _ = chan.sign_next_commitment()
×
1444
        self.send_message('funding_signed',
×
1445
            channel_id=channel_id,
1446
            signature=sig_64,
1447
        )
1448
        self.temp_id_to_id[temp_chan_id] = channel_id
×
1449
        self.funding_signed_sent.add(chan.channel_id)
×
1450
        chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
×
1451
        chan.set_state(ChannelState.OPENING)
×
1452
        if is_zeroconf:
×
1453
            chan.set_state(ChannelState.FUNDED)
×
1454
            self.send_channel_ready(chan)
×
1455
        self.lnworker.add_new_channel(chan)
×
1456

1457
    def _cleanup_temp_channelids(self) -> None:
1✔
1458
        self.temp_id_to_id = {
×
1459
            tmp_id: chan_id for (tmp_id, chan_id) in self.temp_id_to_id.items()
1460
            if chan_id not in self.channels
1461
        }
1462
        if len(self.temp_id_to_id) > 25:
×
1463
            # which one of us is opening all these chans?! let's disconnect
1464
            raise Exception("temp_id_to_id is getting too large.")
×
1465

1466
    async def request_force_close(self, channel_id: bytes):
1✔
1467
        """Try to trigger the remote peer to force-close."""
1468
        await self.initialized
1✔
1469
        self.logger.info(f"trying to get remote peer to force-close chan {channel_id.hex()}")
1✔
1470
        # First, we intentionally send a "channel_reestablish" msg with an old state.
1471
        # Many nodes (but not all) automatically force-close when seeing this.
1472
        latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
1✔
1473
        self.send_message(
1✔
1474
            "channel_reestablish",
1475
            channel_id=channel_id,
1476
            next_commitment_number=0,
1477
            next_revocation_number=0,
1478
            your_last_per_commitment_secret=0,
1479
            my_current_per_commitment_point=latest_point)
1480
        # Newish nodes that have lightning/bolts/pull/950 force-close upon receiving an "error" msg,
1481
        # so send that too. E.g. old "channel_reestablish" is not enough for eclair 0.7+,
1482
        # but "error" is. see https://github.com/ACINQ/eclair/pull/2036
1483
        # The receiving node:
1484
        #   - upon receiving `error`:
1485
        #     - MUST fail the channel referred to by `channel_id`, if that channel is with the sending node.
1486
        self.send_message("error", channel_id=channel_id, data=b"", len=0)
1✔
1487

1488
    def schedule_force_closing(self, channel_id: bytes):
1✔
1489
        """ wrapper of lnworker's method, that raises if channel is not with this peer """
1490
        channels_with_peer = list(self.channels.keys())
1✔
1491
        channels_with_peer.extend(self.temp_id_to_id.values())
1✔
1492
        if channel_id not in channels_with_peer:
1✔
1493
            raise ValueError(f"channel {channel_id.hex()} does not belong to this peer")
×
1494
        chan = self.channels.get(channel_id)
1✔
1495
        if not chan:
1✔
1496
            self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet")
×
1497
        if ChanCloseOption.LOCAL_FCLOSE in chan.get_close_options():
1✔
1498
            self.lnworker.schedule_force_closing(channel_id)
1✔
1499
        else:
1500
            self.logger.info(f"tried to force-close channel {chan.get_id_for_log()} "
1✔
1501
                             f"but close option is not allowed. {chan.get_state()=!r}")
1502

1503
    async def on_channel_reestablish(self, chan: Channel, msg):
1✔
1504
        # Note: it is critical for this message handler to block processing of further messages,
1505
        #       until this msg is processed. If we are behind (lost state), and send chan_reest to the remote,
1506
        #       when the remote realizes we are behind, they might send an "error" message - but the spec mandates
1507
        #       they send chan_reest first. If we processed the error first, we might force-close and lose money!
1508
        their_next_local_ctn = msg["next_commitment_number"]
1✔
1509
        their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
1✔
1510
        their_local_pcp = msg.get("my_current_per_commitment_point")
1✔
1511
        their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
1✔
1512
        self.logger.info(
1✔
1513
            f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
1514
            f'(their_next_local_ctn={their_next_local_ctn}, '
1515
            f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
1516
        if chan.get_state() >= ChannelState.CLOSED:
1✔
1517
            self.logger.warning(
×
1518
                f"on_channel_reestablish. dropping message. illegal action. "
1519
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1520
            return
×
1521
        # sanity checks of received values
1522
        if their_next_local_ctn < 0:
1✔
1523
            raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
×
1524
        if their_oldest_unrevoked_remote_ctn < 0:
1✔
1525
            raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
×
1526
        # ctns
1527
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
1✔
1528
        latest_local_ctn = chan.get_latest_ctn(LOCAL)
1✔
1529
        next_local_ctn = chan.get_next_ctn(LOCAL)
1✔
1530
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
1✔
1531
        latest_remote_ctn = chan.get_latest_ctn(REMOTE)
1✔
1532
        next_remote_ctn = chan.get_next_ctn(REMOTE)
1✔
1533
        # compare remote ctns
1534
        we_are_ahead = False
1✔
1535
        they_are_ahead = False
1✔
1536
        we_must_resend_revoke_and_ack = False
1✔
1537
        if next_remote_ctn != their_next_local_ctn:
1✔
1538
            if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
1✔
1539
                # We will replay the local updates (see reestablish_channel), which should contain a commitment_signed
1540
                # (due to is_revack_pending being true), and this should remedy this situation.
1541
                pass
1✔
1542
            else:
1543
                self.logger.warning(
1✔
1544
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1545
                    f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
1546
                if their_next_local_ctn < next_remote_ctn:
1✔
1547
                    we_are_ahead = True
1✔
1548
                else:
1549
                    they_are_ahead = True
1✔
1550
        # compare local ctns
1551
        if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
1✔
1552
            if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
1✔
1553
                # A node:
1554
                #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
1555
                #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
1556
                #        MUST re-send the revoke_and_ack.
1557
                we_must_resend_revoke_and_ack = True
1✔
1558
            else:
1559
                self.logger.warning(
1✔
1560
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1561
                    f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
1562
                if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
1✔
1563
                    we_are_ahead = True
1✔
1564
                else:
1565
                    they_are_ahead = True
1✔
1566
        # option_data_loss_protect
1567
        assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
1✔
1568
        def are_datalossprotect_fields_valid() -> bool:
1✔
1569
            if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
1✔
1570
                return False
×
1571
            if their_oldest_unrevoked_remote_ctn > 0:
1✔
1572
                our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
1✔
1573
            else:
1574
                assert their_oldest_unrevoked_remote_ctn == 0
1✔
1575
                our_pcs = bytes(32)
1✔
1576
            if our_pcs != their_claim_of_our_last_per_commitment_secret:
1✔
1577
                self.logger.error(
×
1578
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1579
                    f"(DLP) local PCS mismatch: {our_pcs.hex()} != {their_claim_of_our_last_per_commitment_secret.hex()}")
1580
                return False
×
1581
            assert chan.is_static_remotekey_enabled()
1✔
1582
            return True
1✔
1583
        if not are_datalossprotect_fields_valid():
1✔
1584
            raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
×
1585
        fut = self.channel_reestablish_msg[chan.channel_id]
1✔
1586
        if they_are_ahead:
1✔
1587
            self.logger.warning(
1✔
1588
                f"channel_reestablish ({chan.get_id_for_log()}): "
1589
                f"remote is ahead of us! They should force-close. Remote PCP: {their_local_pcp.hex()}")
1590
            # data_loss_protect_remote_pcp is used in lnsweep
1591
            chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
1✔
1592
            chan.set_state(ChannelState.WE_ARE_TOXIC)
1✔
1593
            self.lnworker.save_channel(chan)
1✔
1594
            chan.peer_state = PeerState.BAD
1✔
1595
            # raise after we send channel_reestablish, so the remote can realize they are ahead
1596
            # FIXME what if we have multiple chans with peer? timing...
1597
            fut.set_exception(GracefulDisconnect("remote ahead of us"))
1✔
1598
        elif we_are_ahead:
1✔
1599
            self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
1✔
1600
            self.schedule_force_closing(chan.channel_id)
1✔
1601
            # FIXME what if we have multiple chans with peer? timing...
1602
            fut.set_exception(GracefulDisconnect("we are ahead of remote"))
1✔
1603
        else:
1604
            # all good
1605
            fut.set_result((we_must_resend_revoke_and_ack, their_next_local_ctn))
1✔
1606
            # Block processing of further incoming messages until we finished our part of chan-reest.
1607
            # This is needed for the replaying of our local unacked updates to be sane (if the peer
1608
            # also replays some messages we must not react to them until we finished replaying our own).
1609
            # (it would be sufficient to only block messages related to this channel, but this is easier)
1610
            await self._chan_reest_finished[chan.channel_id].wait()
1✔
1611
            # Note: if the above event is never set, we won't detect if the connection was closed by remote...
1612

1613
    def _send_channel_reestablish(self, chan: Channel):
1✔
1614
        assert self.is_initialized()
1✔
1615
        chan_id = chan.channel_id
1✔
1616
        # ctns
1617
        next_local_ctn = chan.get_next_ctn(LOCAL)
1✔
1618
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
1✔
1619
        # send message
1620
        assert chan.is_static_remotekey_enabled()
1✔
1621
        latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
1✔
1622
        if oldest_unrevoked_remote_ctn == 0:
1✔
1623
            last_rev_secret = 0
1✔
1624
        else:
1625
            last_rev_index = oldest_unrevoked_remote_ctn - 1
1✔
1626
            last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
1✔
1627
        self.send_message(
1✔
1628
            "channel_reestablish",
1629
            channel_id=chan_id,
1630
            next_commitment_number=next_local_ctn,
1631
            next_revocation_number=oldest_unrevoked_remote_ctn,
1632
            your_last_per_commitment_secret=last_rev_secret,
1633
            my_current_per_commitment_point=latest_point)
1634
        self.logger.info(
1✔
1635
            f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
1636
            f'(next_local_ctn={next_local_ctn}, '
1637
            f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
1638

1639
    async def reestablish_channel(self, chan: Channel):
1✔
1640
        await self.initialized
1✔
1641
        chan_id = chan.channel_id
1✔
1642
        if chan.should_request_force_close:
1✔
1643
            if chan.get_state() != ChannelState.WE_ARE_TOXIC:
×
1644
                chan.set_state(ChannelState.REQUESTED_FCLOSE)
×
1645
            await self.request_force_close(chan_id)
×
1646
            chan.should_request_force_close = False
×
1647
            return
×
1648
        if chan.get_state() == ChannelState.WE_ARE_TOXIC:
1✔
1649
            # Depending on timing, the remote might not know we are behind.
1650
            # We should let them know, so that they force-close.
1651
            # We do "request force-close" with ctn=0, instead of leaking our actual ctns,
1652
            # to decrease the remote's confidence of actual data loss on our part.
1653
            await self.request_force_close(chan_id)
1✔
1654
            return
1✔
1655
        if chan.get_state() == ChannelState.FORCE_CLOSING:
1✔
1656
            # We likely got here because we found out that we are ahead (i.e. remote lost state).
1657
            # Depending on timing, the remote might not know they are behind.
1658
            # We should let them know:
1659
            self._send_channel_reestablish(chan)
1✔
1660
            return
1✔
1661
        if self.network.blockchain().is_tip_stale() \
1✔
1662
                or not self.lnworker.wallet.is_up_to_date() \
1663
                or self.lnworker.current_target_feerate_per_kw(has_anchors=chan.has_anchors()) \
1664
            is None:
1665
            # don't try to reestablish until we can do fee estimation and are up-to-date
1666
            return
×
1667
        # if we get here, we will try to do a proper reestablish
1668
        if not (ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING):
1✔
1669
            raise Exception(f"unexpected {chan.get_state()=} for reestablish")
×
1670
        if chan.peer_state != PeerState.DISCONNECTED:
1✔
1671
            self.logger.info(
×
1672
                f'reestablish_channel was called but channel {chan.get_id_for_log()} '
1673
                f'already in peer_state {chan.peer_state!r}')
1674
            return
×
1675
        chan.peer_state = PeerState.REESTABLISHING
1✔
1676
        util.trigger_callback('channel', self.lnworker.wallet, chan)
1✔
1677
        # ctns
1678
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
1✔
1679
        next_local_ctn = chan.get_next_ctn(LOCAL)
1✔
1680
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
1✔
1681
        # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
1682
        chan.hm.discard_unsigned_remote_updates()
1✔
1683
        # send message
1684
        self._send_channel_reestablish(chan)
1✔
1685
        # wait until we receive their channel_reestablish
1686
        fut = self.channel_reestablish_msg[chan_id]
1✔
1687
        await fut
1✔
1688
        we_must_resend_revoke_and_ack, their_next_local_ctn = fut.result()
1✔
1689

1690
        def replay_updates_and_commitsig():
1✔
1691
            # Replay un-acked local updates (including commitment_signed) byte-for-byte.
1692
            # If we have sent them a commitment signature that they "lost" (due to disconnect),
1693
            # we need to make sure we replay the same local updates, as otherwise they could
1694
            # end up with two (or more) signed valid commitment transactions at the same ctn.
1695
            # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
1696
            # e.g. for watchtowers, hence we must ensure these ctxs coincide.
1697
            # We replay the local updates even if they were not yet committed.
1698
            unacked = chan.hm.get_unacked_local_updates()
1✔
1699
            replayed_msgs = []
1✔
1700
            for ctn, messages in unacked.items():
1✔
1701
                if ctn < their_next_local_ctn:
1✔
1702
                    # They claim to have received these messages and the corresponding
1703
                    # commitment_signed, hence we must not replay them.
1704
                    continue
1✔
1705
                for raw_upd_msg in messages:
1✔
1706
                    self.transport.send_bytes(raw_upd_msg)
1✔
1707
                    replayed_msgs.append(raw_upd_msg)
1✔
1708
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {len(replayed_msgs)} unacked messages. '
1✔
1709
                             f'{[decode_msg(raw_upd_msg)[0] for raw_upd_msg in replayed_msgs]}')
1710

1711
        def resend_revoke_and_ack():
1✔
1712
            last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
1✔
1713
            next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
1✔
1714
            self.send_message(
1✔
1715
                "revoke_and_ack",
1716
                channel_id=chan.channel_id,
1717
                per_commitment_secret=last_secret,
1718
                next_per_commitment_point=next_point)
1719

1720
        # We need to preserve relative order of last revack and commitsig.
1721
        # note: it is not possible to recover and reestablish a channel if we are out-of-sync by
1722
        # more than one ctns, i.e. we will only ever retransmit up to one commitment_signed message.
1723
        # Hence, if we need to retransmit a revack, without loss of generality, we can either replay
1724
        # it as the first message or as the last message.
1725
        was_revoke_last = chan.hm.was_revoke_last()
1✔
1726
        if we_must_resend_revoke_and_ack and not was_revoke_last:
1✔
1727
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack first.')
1✔
1728
            resend_revoke_and_ack()
1✔
1729
        replay_updates_and_commitsig()
1✔
1730
        if we_must_resend_revoke_and_ack and was_revoke_last:
1✔
1731
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack last.')
1✔
1732
            resend_revoke_and_ack()
1✔
1733

1734
        chan.peer_state = PeerState.GOOD
1✔
1735
        self._chan_reest_finished[chan.channel_id].set()
1✔
1736
        if chan.is_funded():
1✔
1737
            chan_just_became_ready = (their_next_local_ctn == next_local_ctn == 1)
1✔
1738
            if chan_just_became_ready or self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
1✔
1739
                self.send_channel_ready(chan)
1✔
1740

1741
        self.maybe_send_announcement_signatures(chan)
1✔
1742
        self.maybe_update_fee(chan)  # if needed, update fee ASAP, to avoid force-closures from this
1✔
1743
        # checks done
1744
        util.trigger_callback('channel', self.lnworker.wallet, chan)
1✔
1745
        # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
1746
        if chan.get_state() == ChannelState.SHUTDOWN:
1✔
1747
            await self.taskgroup.spawn(self.send_shutdown(chan))
×
1748

1749
    def send_channel_ready(self, chan: Channel):
1✔
1750
        assert chan.is_funded()
1✔
1751
        if chan.sent_channel_ready:
1✔
1752
            return
×
1753
        channel_id = chan.channel_id
1✔
1754
        per_commitment_secret_index = RevocationStore.START_INDEX - 1
1✔
1755
        second_per_commitment_point = secret_to_pubkey(int.from_bytes(
1✔
1756
            get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
1757
        channel_ready_tlvs = {}
1✔
1758
        if self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
1✔
1759
            # LND requires that we send an alias if the option has been negotiated in INIT.
1760
            # otherwise, the channel will not be marked as active.
1761
            # This does not apply if the channel was previously marked active without an alias.
1762
            channel_ready_tlvs['short_channel_id'] = {'alias': chan.get_local_scid_alias(create_new_if_needed=True)}
1✔
1763
        # note: if 'channel_ready' was not yet received, we might send it multiple times
1764
        self.send_message(
1✔
1765
            "channel_ready",
1766
            channel_id=channel_id,
1767
            second_per_commitment_point=second_per_commitment_point,
1768
            channel_ready_tlvs=channel_ready_tlvs)
1769
        chan.sent_channel_ready = True
1✔
1770
        self.maybe_mark_open(chan)
1✔
1771

1772
    def on_channel_ready(self, chan: Channel, payload):
1✔
1773
        self.logger.info(f"on_channel_ready. channel: {chan.channel_id.hex()}")
1✔
1774
        if chan.peer_state != PeerState.GOOD:  # should never happen
1✔
1775
            raise Exception(f"received channel_ready in unexpected {chan.peer_state=!r}")
×
1776
        if chan.is_closed():
1✔
1777
            self.logger.warning(
×
1778
                f"on_channel_ready. dropping message. illegal action. "
1779
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1780
            return
×
1781
        # save remote alias for use in invoices
1782
        scid_alias = payload.get('channel_ready_tlvs', {}).get('short_channel_id', {}).get('alias')
1✔
1783
        if scid_alias:
1✔
1784
            chan.save_remote_scid_alias(scid_alias)
1✔
1785
        if not chan.config[LOCAL].funding_locked_received:
1✔
1786
            their_next_point = payload["second_per_commitment_point"]
×
1787
            chan.config[REMOTE].next_per_commitment_point = their_next_point
×
1788
            chan.config[LOCAL].funding_locked_received = True
×
1789
            self.lnworker.save_channel(chan)
×
1790
        self.maybe_mark_open(chan)
1✔
1791

1792
    def send_node_announcement(self, alias:str, color_hex:str):
1✔
1793
        from .channel_db import NodeInfo
×
1794
        timestamp = int(time.time())
×
1795
        node_id = privkey_to_pubkey(self.privkey)
×
1796
        features = self.features.for_node_announcement()
×
1797
        flen = features.min_len()
×
1798
        rgb_color = bytes.fromhex(color_hex)
×
1799
        alias = bytes(alias, 'utf8')
×
1800
        alias += bytes(32 - len(alias))
×
1801
        if self.lnworker.config.LIGHTNING_LISTEN is not None:
×
1802
            addr = self.lnworker.config.LIGHTNING_LISTEN
×
1803
            try:
×
1804
                hostname, port = addr.split(':')
×
1805
                if port is None:  # use default port if not specified
×
1806
                    port = 9735
×
1807
                addresses = NodeInfo.to_addresses_field(hostname, int(port))
×
1808
            except Exception:
×
1809
                self.logger.exception(f"Invalid lightning_listen address: {addr}")
×
1810
                return
×
1811
        else:
1812
            addresses = b''
×
1813
        raw_msg = encode_msg(
×
1814
            "node_announcement",
1815
            flen=flen,
1816
            features=features,
1817
            timestamp=timestamp,
1818
            rgb_color=rgb_color,
1819
            node_id=node_id,
1820
            alias=alias,
1821
            addrlen=len(addresses),
1822
            addresses=addresses)
1823
        h = sha256d(raw_msg[64+2:])
×
1824
        signature = ecc.ECPrivkey(self.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1825
        message_type, payload = decode_msg(raw_msg)
×
1826
        payload['signature'] = signature
×
1827
        raw_msg = encode_msg(message_type, **payload)
×
1828
        self.transport.send_bytes(raw_msg)
×
1829

1830
    def maybe_send_channel_announcement(self, chan: Channel):
1✔
1831
        node_sigs = [chan.config[REMOTE].announcement_node_sig, chan.config[LOCAL].announcement_node_sig]
×
1832
        bitcoin_sigs = [chan.config[REMOTE].announcement_bitcoin_sig, chan.config[LOCAL].announcement_bitcoin_sig]
×
1833
        if not bitcoin_sigs[0] or not bitcoin_sigs[1]:
×
1834
            return
×
1835
        raw_msg, is_reverse = chan.construct_channel_announcement_without_sigs()
×
1836
        if is_reverse:
×
1837
            node_sigs.reverse()
×
1838
            bitcoin_sigs.reverse()
×
1839
        message_type, payload = decode_msg(raw_msg)
×
1840
        payload['node_signature_1'] = node_sigs[0]
×
1841
        payload['node_signature_2'] = node_sigs[1]
×
1842
        payload['bitcoin_signature_1'] = bitcoin_sigs[0]
×
1843
        payload['bitcoin_signature_2'] = bitcoin_sigs[1]
×
1844
        raw_msg = encode_msg(message_type, **payload)
×
1845
        self.transport.send_bytes(raw_msg)
×
1846

1847
    def maybe_mark_open(self, chan: Channel):
1✔
1848
        if not chan.sent_channel_ready:
1✔
1849
            return
×
1850
        if not chan.config[LOCAL].funding_locked_received:
1✔
1851
            return
×
1852
        self.mark_open(chan)
1✔
1853

1854
    def mark_open(self, chan: Channel):
1✔
1855
        assert chan.is_funded()
1✔
1856
        # only allow state transition from "FUNDED" to "OPEN"
1857
        old_state = chan.get_state()
1✔
1858
        if old_state == ChannelState.OPEN:
1✔
1859
            return
1✔
1860
        if old_state != ChannelState.FUNDED:
1✔
1861
            self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
×
1862
            return
×
1863
        assert chan.config[LOCAL].funding_locked_received
1✔
1864
        chan.set_state(ChannelState.OPEN)
1✔
1865
        util.trigger_callback('channel', self.lnworker.wallet, chan)
1✔
1866
        # peer may have sent us a channel update for the incoming direction previously
1867
        pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
1✔
1868
        if pending_channel_update:
1✔
1869
            chan.set_remote_update(pending_channel_update)
×
1870
        self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
1✔
1871
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
1✔
1872
        if forwarding_enabled and chan.short_channel_id:
1✔
1873
            # send channel_update of outgoing edge to peer,
1874
            # so that channel can be used to receive payments
1875
            self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
1✔
1876
            chan_upd = chan.get_outgoing_gossip_channel_update()
1✔
1877
            self.transport.send_bytes(chan_upd)
1✔
1878

1879
    def maybe_send_announcement_signatures(self, chan: Channel, is_reply=False):
1✔
1880
        if not chan.is_public():
1✔
1881
            return
1✔
1882
        if chan.sent_announcement_signatures:
×
1883
            return
×
1884
        if not is_reply and chan.config[REMOTE].announcement_node_sig:
×
1885
            return
×
1886
        h = chan.get_channel_announcement_hash()
×
1887
        bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1888
        node_signature = ecc.ECPrivkey(self.privkey).ecdsa_sign(h, sigencode=ecdsa_sig64_from_r_and_s)
×
1889
        self.send_message(
×
1890
            "announcement_signatures",
1891
            channel_id=chan.channel_id,
1892
            short_channel_id=chan.short_channel_id,
1893
            node_signature=node_signature,
1894
            bitcoin_signature=bitcoin_signature
1895
        )
1896
        chan.config[LOCAL].announcement_node_sig = node_signature
×
1897
        chan.config[LOCAL].announcement_bitcoin_sig = bitcoin_signature
×
1898
        self.lnworker.save_channel(chan)
×
1899
        chan.sent_announcement_signatures = True
×
1900

1901
    def on_update_fail_htlc(self, chan: Channel, payload):
1✔
1902
        htlc_id = payload["id"]
1✔
1903
        reason = payload["reason"]
1✔
1904
        self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1✔
1905
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
1906
            self.logger.warning(
×
1907
                f"on_update_fail_htlc. dropping message. illegal action. "
1908
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
1909
            return
×
1910
        chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1✔
1911
        self.maybe_send_commitment(chan)
1✔
1912

1913
    def maybe_send_commitment(self, chan: Channel) -> bool:
1✔
1914
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
1✔
1915
        if not chan.can_update_ctx(proposer=LOCAL):
1✔
1916
            return False
×
1917
        # REMOTE should revoke first before we can sign a new ctx
1918
        if chan.hm.is_revack_pending(REMOTE):
1✔
1919
            return False
1✔
1920
        # if there are no changes, we will not (and must not) send a new commitment
1921
        if not chan.has_pending_changes(REMOTE):
1✔
1922
            return False
1✔
1923
        self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
1✔
1924
        sig_64, htlc_sigs = chan.sign_next_commitment()
1✔
1925
        self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
1✔
1926
        return True
1✔
1927

1928
    def send_htlc(
1✔
1929
        self,
1930
        *,
1931
        chan: Channel,
1932
        payment_hash: bytes,
1933
        amount_msat: int,
1934
        cltv_abs: int,
1935
        onion: OnionPacket,
1936
        session_key: Optional[bytes] = None,
1937
    ) -> UpdateAddHtlc:
1938
        assert chan.can_send_update_add_htlc(), f"cannot send updates: {chan.short_channel_id}"
1✔
1939
        htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_abs=cltv_abs, timestamp=int(time.time()))
1✔
1940
        htlc = chan.add_htlc(htlc)
1✔
1941
        if session_key:
1✔
1942
            chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
1✔
1943
        self.logger.info(f"starting payment. htlc: {htlc}")
1✔
1944
        self.send_message(
1✔
1945
            "update_add_htlc",
1946
            channel_id=chan.channel_id,
1947
            id=htlc.htlc_id,
1948
            cltv_expiry=htlc.cltv_abs,
1949
            amount_msat=htlc.amount_msat,
1950
            payment_hash=htlc.payment_hash,
1951
            onion_routing_packet=onion.to_bytes())
1952
        self.maybe_send_commitment(chan)
1✔
1953
        return htlc
1✔
1954

1955
    def pay(self, *,
1✔
1956
            route: 'LNPaymentRoute',
1957
            chan: Channel,
1958
            amount_msat: int,
1959
            total_msat: int,
1960
            payment_hash: bytes,
1961
            min_final_cltv_delta: int,
1962
            payment_secret: bytes,
1963
            trampoline_onion: Optional[OnionPacket] = None,
1964
        ) -> UpdateAddHtlc:
1965

1966
        assert amount_msat > 0, "amount_msat is not greater zero"
1✔
1967
        assert len(route) > 0
1✔
1968
        if not chan.can_send_update_add_htlc():
1✔
1969
            raise PaymentFailure("Channel cannot send update_add_htlc")
1✔
1970
        onion, amount_msat, cltv_abs, session_key = self.lnworker.create_onion_for_route(
1✔
1971
            route=route,
1972
            amount_msat=amount_msat,
1973
            total_msat=total_msat,
1974
            payment_hash=payment_hash,
1975
            min_final_cltv_delta=min_final_cltv_delta,
1976
            payment_secret=payment_secret,
1977
            trampoline_onion=trampoline_onion
1978
        )
1979
        htlc = self.send_htlc(
1✔
1980
            chan=chan,
1981
            payment_hash=payment_hash,
1982
            amount_msat=amount_msat,
1983
            cltv_abs=cltv_abs,
1984
            onion=onion,
1985
            session_key=session_key,
1986
        )
1987
        return htlc
1✔
1988

1989
    def send_revoke_and_ack(self, chan: Channel) -> None:
1✔
1990
        if not chan.can_update_ctx(proposer=LOCAL):
1✔
1991
            return
×
1992
        self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
1✔
1993
        rev = chan.revoke_current_commitment()
1✔
1994
        self.lnworker.save_channel(chan)
1✔
1995
        self.send_message("revoke_and_ack",
1✔
1996
            channel_id=chan.channel_id,
1997
            per_commitment_secret=rev.per_commitment_secret,
1998
            next_per_commitment_point=rev.next_per_commitment_point)
1999
        self.maybe_send_commitment(chan)
1✔
2000

2001
    def on_commitment_signed(self, chan: Channel, payload) -> None:
1✔
2002
        self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
1✔
2003
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
2004
            self.logger.warning(
×
2005
                f"on_commitment_signed. dropping message. illegal action. "
2006
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2007
            return
×
2008
        # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
2009
        if not chan.has_pending_changes(LOCAL):
1✔
2010
            # TODO if feerate changed A->B->A; so there were updates but the value is identical,
2011
            #      then it might be legal to send a commitment_signature
2012
            #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
2013
            raise RemoteMisbehaving('received commitment_signed without pending changes')
×
2014
        # REMOTE should wait until we have revoked
2015
        if chan.hm.is_revack_pending(LOCAL):
1✔
2016
            raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
×
2017
        data = payload["htlc_signature"]
1✔
2018
        htlc_sigs = list(chunks(data, 64))
1✔
2019
        chan.receive_new_commitment(payload["signature"], htlc_sigs)
1✔
2020
        self.send_revoke_and_ack(chan)
1✔
2021
        self.received_commitsig_event.set()
1✔
2022
        self.received_commitsig_event.clear()
1✔
2023

2024
    def on_update_fulfill_htlc(self, chan: Channel, payload):
1✔
2025
        preimage = payload["payment_preimage"]
1✔
2026
        payment_hash = sha256(preimage)
1✔
2027
        htlc_id = payload["id"]
1✔
2028
        self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1✔
2029
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
2030
            self.logger.warning(
×
2031
                f"on_update_fulfill_htlc. dropping message. illegal action. "
2032
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2033
            return
×
2034
        chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1✔
2035
        self.lnworker.save_preimage(payment_hash, preimage)
1✔
2036
        self.maybe_send_commitment(chan)
1✔
2037

2038
    def on_update_fail_malformed_htlc(self, chan: Channel, payload):
1✔
2039
        htlc_id = payload["id"]
×
2040
        failure_code = payload["failure_code"]
×
2041
        self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
×
2042
                         f"htlc_id {htlc_id}. failure_code={failure_code}")
2043
        if not chan.can_update_ctx(proposer=REMOTE):
×
2044
            self.logger.warning(
×
2045
                f"on_update_fail_malformed_htlc. dropping message. illegal action. "
2046
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2047
            return
×
2048
        if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
×
2049
            self.schedule_force_closing(chan.channel_id)
×
2050
            raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
×
2051
        reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
×
2052
        chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
×
2053
        self.maybe_send_commitment(chan)
×
2054

2055
    def on_update_add_htlc(self, chan: Channel, payload):
1✔
2056
        payment_hash = payload["payment_hash"]
1✔
2057
        htlc_id = payload["id"]
1✔
2058
        cltv_abs = payload["cltv_expiry"]
1✔
2059
        amount_msat_htlc = payload["amount_msat"]
1✔
2060
        onion_packet = payload["onion_routing_packet"]
1✔
2061
        htlc = UpdateAddHtlc(
1✔
2062
            amount_msat=amount_msat_htlc,
2063
            payment_hash=payment_hash,
2064
            cltv_abs=cltv_abs,
2065
            timestamp=int(time.time()),
2066
            htlc_id=htlc_id)
2067
        self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
1✔
2068
        if chan.get_state() != ChannelState.OPEN:
1✔
2069
            raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
×
2070
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
2071
            self.logger.warning(
×
2072
                f"on_update_add_htlc. dropping message. illegal action. "
2073
                f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2074
            return
×
2075
        if cltv_abs > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
1✔
2076
            self.schedule_force_closing(chan.channel_id)
×
2077
            raise RemoteMisbehaving(f"received update_add_htlc with {cltv_abs=} > BLOCKHEIGHT_MAX")
×
2078
        # add htlc
2079
        chan.receive_htlc(htlc, onion_packet)
1✔
2080
        util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
1✔
2081

2082
    def check_accepted_htlc(
1✔
2083
            self, *,
2084
            chan: Channel,
2085
            htlc: UpdateAddHtlc,
2086
            processed_onion: ProcessedOnionPacket,
2087
            log_fail_reason: Callable[[str], None],
2088
    ) -> tuple[bytes, int, int, OnionRoutingFailure]:
2089
        """
2090
        Perform checks that are invariant (results do not depend on height, network conditions, etc).
2091
        May raise OnionRoutingFailure
2092
        """
2093
        try:
1✔
2094
            amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
1✔
2095
        except Exception:
×
2096
            log_fail_reason(f"'amt_to_forward' missing from onion")
×
2097
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2098

2099
        exc_incorrect_or_unknown_pd = OnionRoutingFailure(
1✔
2100
            code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
2101
            data=amt_to_forward.to_bytes(8, byteorder="big")) # height will be added later
2102
        try:
1✔
2103
            cltv_abs_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1✔
2104
        except Exception:
×
2105
            log_fail_reason(f"'outgoing_cltv_value' missing from onion")
×
2106
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
2107

2108
        if cltv_abs_from_onion > htlc.cltv_abs:
1✔
2109
            log_fail_reason(f"cltv_abs_from_onion != htlc.cltv_abs")
×
2110
            raise OnionRoutingFailure(
×
2111
                code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
2112
                data=htlc.cltv_abs.to_bytes(4, byteorder="big"))
2113
        try:
1✔
2114
            total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]  # type: int
1✔
2115
        except Exception:
×
2116
            log_fail_reason(f"'total_msat' missing from onion")
×
2117
            raise exc_incorrect_or_unknown_pd
×
2118

2119
        if chan.opening_fee:
1✔
2120
            channel_opening_fee = chan.opening_fee['channel_opening_fee']  # type: int
×
2121
            total_msat -= channel_opening_fee
×
2122
            amt_to_forward -= channel_opening_fee
×
2123
        else:
2124
            channel_opening_fee = 0
1✔
2125

2126
        if amt_to_forward > htlc.amount_msat:
1✔
2127
            log_fail_reason(f"amt_to_forward != htlc.amount_msat")
×
2128
            raise OnionRoutingFailure(
×
2129
                code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
2130
                data=htlc.amount_msat.to_bytes(8, byteorder="big"))
2131

2132
        try:
1✔
2133
            payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]  # type: bytes
1✔
2134
        except Exception:
×
2135
            log_fail_reason(f"'payment_secret' missing from onion")
×
2136
            raise exc_incorrect_or_unknown_pd
×
2137

2138
        return payment_secret_from_onion, total_msat, channel_opening_fee, exc_incorrect_or_unknown_pd
1✔
2139

2140
    def check_mpp_is_waiting(
1✔
2141
        self,
2142
        *,
2143
        payment_secret: bytes,
2144
        short_channel_id: ShortChannelID,
2145
        htlc: UpdateAddHtlc,
2146
        expected_msat: int,
2147
        exc_incorrect_or_unknown_pd: OnionRoutingFailure,
2148
        log_fail_reason: Callable[[str], None],
2149
    ) -> bool:
2150
        mpp_resolution = self.lnworker.check_mpp_status(
1✔
2151
            payment_secret=payment_secret,
2152
            short_channel_id=short_channel_id,
2153
            htlc=htlc,
2154
            expected_msat=expected_msat,
2155
        )
2156
        if mpp_resolution == RecvMPPResolution.WAITING:
1✔
2157
            return True
1✔
2158
        elif mpp_resolution == RecvMPPResolution.EXPIRED:
1✔
2159
            log_fail_reason(f"MPP_TIMEOUT")
1✔
2160
            raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
1✔
2161
        elif mpp_resolution == RecvMPPResolution.FAILED:
1✔
2162
            log_fail_reason(f"mpp_resolution is FAILED")
1✔
2163
            raise exc_incorrect_or_unknown_pd
1✔
2164
        elif mpp_resolution == RecvMPPResolution.COMPLETE:
1✔
2165
            return False
1✔
2166
        else:
2167
            raise Exception(f"unexpected {mpp_resolution=}")
×
2168

2169
    def maybe_fulfill_htlc(
1✔
2170
            self, *,
2171
            chan: Channel,
2172
            htlc: UpdateAddHtlc,
2173
            processed_onion: ProcessedOnionPacket,
2174
            onion_packet_bytes: bytes,
2175
            already_forwarded: bool = False,
2176
    ) -> Tuple[Optional[bytes], Optional[Tuple[str, Callable[[], Awaitable[Optional[str]]]]]]:
2177
        """
2178
        Decide what to do with an HTLC: return preimage if it can be fulfilled, forwarding callback if it can be forwarded.
2179
        Return (preimage, (payment_key, callback)) with at most a single element not None.
2180
        """
2181
        if not processed_onion.are_we_final:
1✔
2182
            if not self.lnworker.enable_htlc_forwarding:
1✔
2183
                return None, None
1✔
2184
            # use the htlc key if we are forwarding
2185
            payment_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc.htlc_id)
1✔
2186
            callback = lambda: self.lnworker.maybe_forward_htlc(
1✔
2187
                incoming_chan=chan,
2188
                htlc=htlc,
2189
                processed_onion=processed_onion)
2190
            return None, (payment_key, callback)
1✔
2191

2192
        def log_fail_reason(reason: str):
1✔
2193
            self.logger.info(
1✔
2194
                f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
2195
                f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
2196

2197
        chain = self.network.blockchain()
1✔
2198
        # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
2199
        # We should not release the preimage for an HTLC that its sender could already time out as
2200
        # then they might try to force-close and it becomes a race.
2201
        if chain.is_tip_stale() and not already_forwarded:
1✔
2202
            log_fail_reason(f"our chain tip is stale")
×
2203
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
2204
        local_height = chain.height()
1✔
2205

2206
        # parse parameters and perform checks that are invariant
2207
        payment_secret_from_onion, total_msat, channel_opening_fee, exc_incorrect_or_unknown_pd = self.check_accepted_htlc(
1✔
2208
            chan=chan,
2209
            htlc=htlc,
2210
            processed_onion=processed_onion,
2211
            log_fail_reason=log_fail_reason)
2212

2213
        # payment key for final onions
2214
        payment_hash = htlc.payment_hash
1✔
2215
        payment_key = (payment_hash + payment_secret_from_onion).hex()
1✔
2216

2217
        if self.check_mpp_is_waiting(
1✔
2218
                payment_secret=payment_secret_from_onion,
2219
                short_channel_id=chan.get_scid_or_local_alias(),
2220
                htlc=htlc,
2221
                expected_msat=total_msat,
2222
                exc_incorrect_or_unknown_pd=exc_incorrect_or_unknown_pd,
2223
                log_fail_reason=log_fail_reason,
2224
        ):
2225
            return None, None
1✔
2226

2227
        # TODO check against actual min_final_cltv_expiry_delta from invoice (and give 2-3 blocks of leeway?)
2228
        # note: payment_bundles might get split here, e.g. one payment is "already forwarded" and the other is not.
2229
        #       In practice, for the swap prepayment use case, this does not matter.
2230
        if local_height + MIN_FINAL_CLTV_DELTA_ACCEPTED > htlc.cltv_abs and not already_forwarded:
1✔
2231
            log_fail_reason(f"htlc.cltv_abs is unreasonably close")
×
2232
            raise exc_incorrect_or_unknown_pd
×
2233

2234
        # detect callback
2235
        # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
2236
        # order is important: if we receive a trampoline onion for a hold invoice, we need to peel the onion first.
2237

2238
        if processed_onion.trampoline_onion_packet:
1✔
2239
            # TODO: we should check that all trampoline_onions are the same
2240
            trampoline_onion = self.process_onion_packet(
1✔
2241
                processed_onion.trampoline_onion_packet,
2242
                payment_hash=payment_hash,
2243
                onion_packet_bytes=onion_packet_bytes,
2244
                is_trampoline=True)
2245
            if trampoline_onion.are_we_final:
1✔
2246
                # trampoline- we are final recipient of HTLC
2247
                # note: the returned payment_key will contain the inner payment_secret
2248
                return self.maybe_fulfill_htlc(
1✔
2249
                    chan=chan,
2250
                    htlc=htlc,
2251
                    processed_onion=trampoline_onion,
2252
                    onion_packet_bytes=onion_packet_bytes,
2253
                    already_forwarded=already_forwarded,
2254
                )
2255
            else:
2256
                callback = lambda: self.lnworker.maybe_forward_trampoline(
1✔
2257
                    payment_hash=payment_hash,
2258
                    inc_cltv_abs=htlc.cltv_abs, # TODO: use max or enforce same value across mpp parts
2259
                    outer_onion=processed_onion,
2260
                    trampoline_onion=trampoline_onion,
2261
                    fw_payment_key=payment_key)
2262
                return None, (payment_key, callback)
1✔
2263

2264
        # TODO don't accept payments twice for same invoice
2265
        # note: we don't check invoice expiry (bolt11 'x' field) on the receiver-side.
2266
        #       - semantics are weird: would make sense for simple-payment-receives, but not
2267
        #         if htlc is expected to be pending for a while, e.g. for a hold-invoice.
2268
        info = self.lnworker.get_payment_info(payment_hash)
1✔
2269
        if info is None:
1✔
2270
            log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
×
2271
            raise exc_incorrect_or_unknown_pd
×
2272

2273
        preimage = self.lnworker.get_preimage(payment_hash)
1✔
2274
        expected_payment_secret = self.lnworker.get_payment_secret(htlc.payment_hash)
1✔
2275
        if payment_secret_from_onion != expected_payment_secret:
1✔
2276
            log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {expected_payment_secret.hex()}')
×
2277
            raise exc_incorrect_or_unknown_pd
×
2278
        invoice_msat = info.amount_msat
1✔
2279
        if channel_opening_fee:
1✔
2280
            invoice_msat -= channel_opening_fee
×
2281

2282
        if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
1✔
2283
            log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
×
2284
            raise exc_incorrect_or_unknown_pd
×
2285

2286
        hold_invoice_callback = self.lnworker.hold_invoice_callbacks.get(payment_hash)
1✔
2287
        if hold_invoice_callback and not preimage:
1✔
2288
            callback = lambda: hold_invoice_callback(payment_hash)
1✔
2289
            return None, (payment_key, callback)
1✔
2290

2291
        if payment_hash.hex() in self.lnworker.dont_settle_htlcs:
1✔
2292
            return None, None
×
2293

2294
        if not preimage:
1✔
2295
            if not already_forwarded:
1✔
2296
                log_fail_reason(f"missing preimage and no hold invoice callback {payment_hash.hex()}")
1✔
2297
                raise exc_incorrect_or_unknown_pd
1✔
2298
            else:
2299
                return None, None
×
2300

2301
        chan.opening_fee = None
1✔
2302
        self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
1✔
2303
        return preimage, None
1✔
2304

2305
    def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
1✔
2306
        self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1✔
2307
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
1✔
2308
        assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
1✔
2309
        self.received_htlcs_pending_removal.add((chan, htlc_id))
1✔
2310
        chan.settle_htlc(preimage, htlc_id)
1✔
2311
        self.send_message(
1✔
2312
            "update_fulfill_htlc",
2313
            channel_id=chan.channel_id,
2314
            id=htlc_id,
2315
            payment_preimage=preimage)
2316

2317
    def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
1✔
2318
        self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
1✔
2319
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
1✔
2320
        self.received_htlcs_pending_removal.add((chan, htlc_id))
1✔
2321
        chan.fail_htlc(htlc_id)
1✔
2322
        self.send_message(
1✔
2323
            "update_fail_htlc",
2324
            channel_id=chan.channel_id,
2325
            id=htlc_id,
2326
            len=len(error_bytes),
2327
            reason=error_bytes)
2328

2329
    def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
1✔
2330
        self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
×
2331
        assert chan.can_update_ctx(proposer=LOCAL), f"cannot send updates: {chan.short_channel_id}"
×
2332
        if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
×
2333
            raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
×
2334
        self.received_htlcs_pending_removal.add((chan, htlc_id))
×
2335
        chan.fail_htlc(htlc_id)
×
2336
        self.send_message(
×
2337
            "update_fail_malformed_htlc",
2338
            channel_id=chan.channel_id,
2339
            id=htlc_id,
2340
            sha256_of_onion=reason.data,
2341
            failure_code=reason.code)
2342

2343
    def on_revoke_and_ack(self, chan: Channel, payload) -> None:
1✔
2344
        self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
1✔
2345
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
2346
            self.logger.warning(
×
2347
                f"on_revoke_and_ack. dropping message. illegal action. "
2348
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2349
            return
×
2350
        rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
1✔
2351
        chan.receive_revocation(rev)
1✔
2352
        self.lnworker.save_channel(chan)
1✔
2353
        self.maybe_send_commitment(chan)
1✔
2354
        self._received_revack_event.set()
1✔
2355
        self._received_revack_event.clear()
1✔
2356

2357
    @event_listener
1✔
2358
    async def on_event_fee(self, *args):
1✔
2359
        async def async_wrapper():
×
2360
            for chan in self.channels.values():
×
2361
                self.maybe_update_fee(chan)
×
2362
        await self.taskgroup.spawn(async_wrapper)
×
2363

2364
    def on_update_fee(self, chan: Channel, payload):
1✔
2365
        if not chan.can_update_ctx(proposer=REMOTE):
1✔
2366
            self.logger.warning(
×
2367
                f"on_update_fee. dropping message. illegal action. "
2368
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2369
            return
×
2370
        feerate = payload["feerate_per_kw"]
1✔
2371
        chan.update_fee(feerate, False)
1✔
2372

2373
    def maybe_update_fee(self, chan: Channel):
1✔
2374
        """
2375
        called when our fee estimates change
2376
        """
2377
        if not chan.can_update_ctx(proposer=LOCAL):
1✔
2378
            return
×
2379
        if chan.get_state() != ChannelState.OPEN:
1✔
2380
            return
×
2381
        current_feerate_per_kw: Optional[int] = self.lnworker.current_target_feerate_per_kw(
1✔
2382
            has_anchors=chan.has_anchors()
2383
        )
2384
        if current_feerate_per_kw is None:
1✔
2385
            return
×
2386
        # add some buffer to anchor chan fees as we always act at the lower end and don't
2387
        # want to get kicked out of the mempool immediately if it grows
2388
        fee_buffer = current_feerate_per_kw * 0.5 if chan.has_anchors() else 0
1✔
2389
        update_feerate_per_kw = int(current_feerate_per_kw + fee_buffer)
1✔
2390
        def does_chan_fee_need_update(chan_feerate: Union[float, int]) -> Optional[bool]:
1✔
2391
            if chan.has_anchors():
1✔
2392
                # TODO: once package relay and electrum servers with submitpackage are more common,
2393
                # TODO: we should reconsider this logic and move towards 0 fee ctx
2394
                # update if we used up half of the buffer or the fee decreased a lot again
2395
                fee_increased = current_feerate_per_kw + (fee_buffer / 2) > chan_feerate
×
2396
                changed_significantly = abs((chan_feerate - update_feerate_per_kw) / chan_feerate) > 0.2
×
2397
                return fee_increased or changed_significantly
×
2398
            else:
2399
                # We raise fees more aggressively than we lower them. Overpaying is not too bad,
2400
                # but lowballing can be fatal if we can't even get into the mempool...
2401
                high_fee = 2 * current_feerate_per_kw  # type: # Union[float, int]
1✔
2402
                low_fee = self.lnworker.current_low_feerate_per_kw_srk_channel()  # type: Optional[Union[float, int]]
1✔
2403
                if low_fee is None:
1✔
2404
                    return None
×
2405
                low_fee = max(low_fee, 0.75 * current_feerate_per_kw)
1✔
2406
                # make sure low_feerate and target_feerate are not too close to each other:
2407
                low_fee = min(low_fee, current_feerate_per_kw - FEERATE_PER_KW_MIN_RELAY_LIGHTNING)
1✔
2408
                assert low_fee < high_fee, (low_fee, high_fee)
1✔
2409
                return not (low_fee < chan_feerate < high_fee)
1✔
2410
        if not chan.constraints.is_initiator:
1✔
2411
            if constants.net is not constants.BitcoinRegtest:
1✔
2412
                chan_feerate = chan.get_latest_feerate(LOCAL)
1✔
2413
                ratio = chan_feerate / update_feerate_per_kw
1✔
2414
                if ratio < 0.5:
1✔
2415
                    # Note that we trust the Electrum server about fee rates
2416
                    # Thus, automated force-closing might not be a good idea
2417
                    # Maybe we should display something in the GUI instead
2418
                    self.logger.warning(
×
2419
                        f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
2420
                        f"current recommended feerate is {update_feerate_per_kw} sat/kw, consider force closing!")
2421
            return
1✔
2422
        # it is our responsibility to update the fee
2423
        chan_fee = chan.get_next_feerate(REMOTE)
1✔
2424
        if does_chan_fee_need_update(chan_fee):
1✔
2425
            self.logger.info(f"({chan.get_id_for_log()}) onchain fees have changed considerably. updating fee.")
×
2426
        elif chan.get_latest_ctn(REMOTE) == 0:
1✔
2427
            # workaround eclair issue https://github.com/ACINQ/eclair/issues/1730 (fixed in 2022)
2428
            self.logger.info(f"({chan.get_id_for_log()}) updating fee to bump remote ctn")
1✔
2429
            if current_feerate_per_kw == chan_fee:
1✔
2430
                update_feerate_per_kw += 1
×
2431
        else:
2432
            return
1✔
2433
        self.logger.info(f"({chan.get_id_for_log()}) current pending feerate {chan_fee}. "
1✔
2434
                         f"new feerate {update_feerate_per_kw}")
2435
        assert update_feerate_per_kw >= FEERATE_PER_KW_MIN_RELAY_LIGHTNING, f"fee below minimum: {update_feerate_per_kw}"
1✔
2436
        chan.update_fee(update_feerate_per_kw, True)
1✔
2437
        self.send_message(
1✔
2438
            "update_fee",
2439
            channel_id=chan.channel_id,
2440
            feerate_per_kw=update_feerate_per_kw)
2441
        self.maybe_send_commitment(chan)
1✔
2442

2443
    @log_exceptions
1✔
2444
    async def close_channel(self, chan_id: bytes):
1✔
2445
        chan = self.channels[chan_id]
1✔
2446
        self.shutdown_received[chan_id] = self.asyncio_loop.create_future()
1✔
2447
        await self.send_shutdown(chan)
1✔
2448
        payload = await self.shutdown_received[chan_id]
1✔
2449
        try:
1✔
2450
            txid = await self._shutdown(chan, payload, is_local=True)
1✔
2451
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
1✔
2452
        except asyncio.TimeoutError:
×
2453
            txid = chan.unconfirmed_closing_txid
×
2454
            self.logger.warning(f'({chan.get_id_for_log()}) did not send closing_signed, {txid}')
×
2455
            if txid is None:
×
2456
                raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
×
2457
        return txid
1✔
2458

2459
    @non_blocking_msg_handler
1✔
2460
    async def on_shutdown(self, chan: Channel, payload):
1✔
2461
        if chan.peer_state != PeerState.GOOD:  # should never happen
1✔
2462
            raise Exception(f"received shutdown in unexpected {chan.peer_state=!r}")
×
2463
        if not self.can_send_shutdown(chan, proposer=REMOTE):
1✔
2464
            self.logger.warning(
×
2465
                f"on_shutdown. illegal action. "
2466
                f"chan={chan.get_id_for_log()}. {chan.get_state()=!r}. {chan.peer_state=!r}")
2467
            self.send_error(chan.channel_id, message="cannot process 'shutdown' in current channel state.")
×
2468
        their_scriptpubkey = payload['scriptpubkey']
1✔
2469
        their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
1✔
2470
        # BOLT-02 check if they use the upfront shutdown script they advertised
2471
        if self.is_upfront_shutdown_script() and their_upfront_scriptpubkey:
1✔
2472
            if not (their_scriptpubkey == their_upfront_scriptpubkey):
1✔
2473
                self.send_warning(
1✔
2474
                    chan.channel_id,
2475
                    "remote didn't use upfront shutdown script it committed to in channel opening",
2476
                    close_connection=True)
2477
        else:
2478
            # BOLT-02 restrict the scriptpubkey to some templates:
2479
            if self.is_shutdown_anysegwit() and match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT):
1✔
2480
                pass
×
2481
            elif match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
1✔
2482
                pass
1✔
2483
            else:
2484
                self.send_warning(
×
2485
                    chan.channel_id,
2486
                    f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}',
2487
                    close_connection=True)
2488

2489
        chan_id = chan.channel_id
1✔
2490
        if chan_id in self.shutdown_received:
1✔
2491
            self.shutdown_received[chan_id].set_result(payload)
1✔
2492
        else:
2493
            await self.send_shutdown(chan)
1✔
2494
            txid = await self._shutdown(chan, payload, is_local=False)
1✔
2495
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
1✔
2496

2497
    def can_send_shutdown(self, chan: Channel, *, proposer: HTLCOwner) -> bool:
1✔
2498
        if chan.get_state() >= ChannelState.CLOSED:
1✔
2499
            return False
×
2500
        if chan.get_state() >= ChannelState.OPENING:
1✔
2501
            return True
1✔
2502
        if proposer == LOCAL:
×
2503
            if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
×
2504
                return True
×
2505
            if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
×
2506
                return True
×
2507
        else:  # proposer == REMOTE
2508
            # (from BOLT-02)
2509
            #   A receiving node:
2510
            #       - if it hasn't received a funding_signed (if it is a funder) or a funding_created (if it is a fundee):
2511
            #           - SHOULD send an error and fail the channel.
2512
            # ^ that check is equivalent to `chan.get_state() < ChannelState.OPENING`, which is already checked.
2513
            pass
×
2514
        return False
×
2515

2516
    async def send_shutdown(self, chan: Channel):
1✔
2517
        if not self.can_send_shutdown(chan, proposer=LOCAL):
1✔
2518
            raise Exception(f"cannot send shutdown. chan={chan.get_id_for_log()}. {chan.get_state()=!r}")
×
2519
        if chan.config[LOCAL].upfront_shutdown_script:
1✔
2520
            scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1✔
2521
        else:
2522
            scriptpubkey = bitcoin.address_to_script(chan.get_sweep_address())
1✔
2523
        assert scriptpubkey
1✔
2524
        # wait until no more pending updates (bolt2)
2525
        chan.set_can_send_ctx_updates(False)
1✔
2526
        while chan.has_pending_changes(REMOTE):
1✔
2527
            await asyncio.sleep(0.1)
×
2528
        self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
1✔
2529
        chan.set_state(ChannelState.SHUTDOWN)
1✔
2530
        # can fulfill or fail htlcs. cannot add htlcs, because state != OPEN
2531
        chan.set_can_send_ctx_updates(True)
1✔
2532

2533
    def get_shutdown_fee_range(self, chan, closing_tx, is_local):
1✔
2534
        """ return the closing fee and fee range we initially try to enforce """
2535
        config = self.network.config
1✔
2536
        our_fee = None
1✔
2537
        if config.TEST_SHUTDOWN_FEE:
1✔
2538
            our_fee = config.TEST_SHUTDOWN_FEE
1✔
2539
        else:
2540
            fee_rate_per_kb = self.network.fee_estimates.eta_target_to_fee(FEE_LN_ETA_TARGET)
1✔
2541
            if fee_rate_per_kb is None:  # fallback
1✔
2542
                from .fee_policy import FeePolicy
×
2543
                fee_rate_per_kb = FeePolicy(config.FEE_POLICY).fee_per_kb(self.network)
×
2544
            if fee_rate_per_kb is not None:
1✔
2545
                our_fee = fee_rate_per_kb * closing_tx.estimated_size() // 1000
1✔
2546
            # TODO: anchors: remove this, as commitment fee rate can be below chain head fee rate?
2547
            # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
2548
            max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
1✔
2549
            if our_fee is None:  # fallback
1✔
2550
                self.logger.warning(f"got no fee estimates for co-op close! falling back to chan.get_latest_fee")
×
2551
                our_fee = max_fee
×
2552
            our_fee = min(our_fee, max_fee)
1✔
2553
        # config modern_fee_negotiation can be set in tests
2554
        if config.TEST_SHUTDOWN_LEGACY:
1✔
2555
            our_fee_range = None
1✔
2556
        elif config.TEST_SHUTDOWN_FEE_RANGE:
1✔
2557
            our_fee_range = config.TEST_SHUTDOWN_FEE_RANGE
1✔
2558
        else:
2559
            # we aim at a fee between next block inclusion and some lower value
2560
            our_fee_range = {'min_fee_satoshis': our_fee // 2, 'max_fee_satoshis': our_fee * 2}
1✔
2561
        self.logger.info(f"Our fee range: {our_fee_range} and fee: {our_fee}")
1✔
2562
        return our_fee, our_fee_range
1✔
2563

2564
    @log_exceptions
1✔
2565
    async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
1✔
2566
        # wait until no HTLCs remain in either commitment transaction
2567
        while chan.has_unsettled_htlcs():
1✔
2568
            self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
1✔
2569
            await asyncio.sleep(1)
1✔
2570
        # if no HTLCs remain, we must not send updates
2571
        chan.set_can_send_ctx_updates(False)
1✔
2572
        their_scriptpubkey = payload['scriptpubkey']
1✔
2573
        if chan.config[LOCAL].upfront_shutdown_script:
1✔
2574
            our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1✔
2575
        else:
2576
            our_scriptpubkey = bitcoin.address_to_script(chan.get_sweep_address())
1✔
2577
        assert our_scriptpubkey
1✔
2578
        # estimate fee of closing tx
2579
        dummy_sig, dummy_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
1✔
2580
        our_sig = None  # type: Optional[bytes]
1✔
2581
        closing_tx = None  # type: Optional[PartialTransaction]
1✔
2582
        is_initiator = chan.constraints.is_initiator
1✔
2583
        our_fee, our_fee_range = self.get_shutdown_fee_range(chan, dummy_tx, is_local)
1✔
2584

2585
        def send_closing_signed(our_fee, our_fee_range, drop_remote):
1✔
2586
            nonlocal our_sig, closing_tx
2587
            if our_fee_range:
1✔
2588
                closing_signed_tlvs = {'fee_range': our_fee_range}
1✔
2589
            else:
2590
                closing_signed_tlvs = {}
1✔
2591
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
1✔
2592
            self.logger.info(f"Sending fee range: {closing_signed_tlvs} and fee: {our_fee}")
1✔
2593
            self.send_message(
1✔
2594
                'closing_signed',
2595
                channel_id=chan.channel_id,
2596
                fee_satoshis=our_fee,
2597
                signature=our_sig,
2598
                closing_signed_tlvs=closing_signed_tlvs,
2599
            )
2600

2601
        def verify_signature(tx: 'PartialTransaction', sig) -> bool:
1✔
2602
            their_pubkey = chan.config[REMOTE].multisig_key.pubkey
1✔
2603
            pre_hash = tx.serialize_preimage(0)
1✔
2604
            msg_hash = sha256d(pre_hash)
1✔
2605
            return ECPubkey(their_pubkey).ecdsa_verify(sig, msg_hash)
1✔
2606

2607
        async def receive_closing_signed():
1✔
2608
            nonlocal our_sig, closing_tx
2609
            try:
1✔
2610
                cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
1✔
2611
            except asyncio.exceptions.TimeoutError:
1✔
2612
                self.schedule_force_closing(chan.channel_id)
×
2613
                raise Exception("closing_signed not received, force closing.")
×
2614
            their_fee = cs_payload['fee_satoshis']
1✔
2615
            their_fee_range = cs_payload['closing_signed_tlvs'].get('fee_range')
1✔
2616
            their_sig = cs_payload['signature']
1✔
2617
            # perform checks
2618
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
1✔
2619
            if verify_signature(closing_tx, their_sig):
1✔
2620
                drop_remote = False
1✔
2621
            else:
2622
                our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
×
2623
                if verify_signature(closing_tx, their_sig):
×
2624
                    drop_remote = True
×
2625
                else:
2626
                    # this can happen if we consider our output too valuable to drop,
2627
                    # but the remote drops it because it violates their dust limit
2628
                    raise Exception('failed to verify their signature')
×
2629
            # at this point we know how the closing tx looks like
2630
            # check that their output is above their scriptpubkey's network dust limit
2631
            to_remote_set = closing_tx.get_output_idxs_from_scriptpubkey(their_scriptpubkey)
1✔
2632
            if not drop_remote and to_remote_set:
1✔
2633
                to_remote_idx = to_remote_set.pop()
1✔
2634
                to_remote_amount = closing_tx.outputs()[to_remote_idx].value
1✔
2635
                transaction.check_scriptpubkey_template_and_dust(their_scriptpubkey, to_remote_amount)
1✔
2636
            return their_fee, their_fee_range, their_sig, drop_remote
1✔
2637

2638
        def choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee):
1✔
2639
            assert our_fee != their_fee
1✔
2640
            fee_range_sent = our_fee_range and (is_initiator or (their_previous_fee is not None))
1✔
2641

2642
            # The sending node, if it is not the funder:
2643
            if our_fee_range and their_fee_range and not is_initiator and not self.network.config.TEST_SHUTDOWN_FEE_RANGE:
1✔
2644
                # SHOULD set max_fee_satoshis to at least the max_fee_satoshis received
2645
                our_fee_range['max_fee_satoshis'] = max(their_fee_range['max_fee_satoshis'], our_fee_range['max_fee_satoshis'])
×
2646
                # SHOULD set min_fee_satoshis to a fairly low value
2647
                our_fee_range['min_fee_satoshis'] = min(their_fee_range['min_fee_satoshis'], our_fee_range['min_fee_satoshis'])
×
2648
                # Note: the BOLT describes what the sending node SHOULD do.
2649
                # However, this assumes that we have decided to send 'funding_signed' in response to their fee_range.
2650
                # In practice, we might prefer to fail the channel in some cases (TODO)
2651

2652
            # the receiving node, if fee_satoshis matches its previously sent fee_range,
2653
            if fee_range_sent and (our_fee_range['min_fee_satoshis'] <= their_fee <= our_fee_range['max_fee_satoshis']):
1✔
2654
                # SHOULD reply with a closing_signed with the same fee_satoshis value if it is different from its previously sent fee_satoshis
2655
                our_fee = their_fee
1✔
2656

2657
            # the receiving node, if the message contains a fee_range
2658
            elif our_fee_range and their_fee_range:
1✔
2659
                overlap_min = max(our_fee_range['min_fee_satoshis'], their_fee_range['min_fee_satoshis'])
1✔
2660
                overlap_max = min(our_fee_range['max_fee_satoshis'], their_fee_range['max_fee_satoshis'])
1✔
2661
                # if there is no overlap between that and its own fee_range
2662
                if overlap_min > overlap_max:
1✔
2663
                    # TODO: the receiving node should first send a warning, and fail the channel
2664
                    # only if it doesn't receive a satisfying fee_range after a reasonable amount of time
2665
                    self.schedule_force_closing(chan.channel_id)
×
2666
                    raise Exception("There is no overlap between between their and our fee range.")
×
2667
                # otherwise, if it is the funder
2668
                if is_initiator:
1✔
2669
                    # if fee_satoshis is not in the overlap between the sent and received fee_range:
2670
                    if not (overlap_min <= their_fee <= overlap_max):
×
2671
                        # MUST fail the channel
2672
                        self.schedule_force_closing(chan.channel_id)
×
2673
                        raise Exception("Their fee is not in the overlap region, we force closed.")
×
2674
                    # otherwise, MUST reply with the same fee_satoshis.
2675
                    our_fee = their_fee
×
2676
                # otherwise (it is not the funder):
2677
                else:
2678
                    # if it has already sent a closing_signed:
2679
                    if fee_range_sent:
1✔
2680
                        # fee_satoshis is not the same as the value we sent, we MUST fail the channel
2681
                        self.schedule_force_closing(chan.channel_id)
×
2682
                        raise Exception("Expected the same fee as ours, we force closed.")
×
2683
                    # otherwise:
2684
                    # MUST propose a fee_satoshis in the overlap between received and (about-to-be) sent fee_range.
2685
                    our_fee = (overlap_min + overlap_max) // 2
1✔
2686
            else:
2687
                # otherwise, if fee_satoshis is not strictly between its last-sent fee_satoshis
2688
                # and its previously-received fee_satoshis, UNLESS it has since reconnected:
2689
                if their_previous_fee and not (min(our_fee, their_previous_fee) < their_fee < max(our_fee, their_previous_fee)):
1✔
2690
                    # SHOULD fail the connection.
2691
                    raise Exception('Their fee is not between our last sent and their last sent fee.')
×
2692
                # accept their fee if they are very close
2693
                if abs(their_fee - our_fee) < 2:
1✔
2694
                    our_fee = their_fee
1✔
2695
                else:
2696
                    # this will be "strictly between" (as in BOLT2) previous values because of the above
2697
                    our_fee = (our_fee + their_fee) // 2
1✔
2698

2699
            return our_fee, our_fee_range
1✔
2700

2701
        # Fee negotiation: both parties exchange 'funding_signed' messages.
2702
        # The funder sends the first message, the non-funder sends the last message.
2703
        # In the 'modern' case, at most 3 messages are exchanged, because choose_new_fee of the funder either returns their_fee or fails
2704
        their_fee = None
1✔
2705
        drop_remote = False  # does the peer drop its to_local output or not?
1✔
2706
        if is_initiator:
1✔
2707
            send_closing_signed(our_fee, our_fee_range, drop_remote)
1✔
2708
        while True:
1✔
2709
            their_previous_fee = their_fee
1✔
2710
            their_fee, their_fee_range, their_sig, drop_remote = await receive_closing_signed()
1✔
2711
            if our_fee == their_fee:
1✔
2712
                break
1✔
2713
            our_fee, our_fee_range = choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee)
1✔
2714
            if not is_initiator and our_fee == their_fee:
1✔
2715
                break
×
2716
            send_closing_signed(our_fee, our_fee_range, drop_remote)
1✔
2717
            if is_initiator and our_fee == their_fee:
1✔
2718
                break
1✔
2719
        if not is_initiator:
1✔
2720
            send_closing_signed(our_fee, our_fee_range, drop_remote)
1✔
2721

2722
        # add signatures
2723
        closing_tx.add_signature_to_txin(
1✔
2724
            txin_idx=0,
2725
            signing_pubkey=chan.config[LOCAL].multisig_key.pubkey,
2726
            sig=ecdsa_der_sig_from_ecdsa_sig64(our_sig) + Sighash.to_sigbytes(Sighash.ALL))
2727
        closing_tx.add_signature_to_txin(
1✔
2728
            txin_idx=0,
2729
            signing_pubkey=chan.config[REMOTE].multisig_key.pubkey,
2730
            sig=ecdsa_der_sig_from_ecdsa_sig64(their_sig) + Sighash.to_sigbytes(Sighash.ALL))
2731
        # save local transaction and set state
2732
        try:
1✔
2733
            self.lnworker.wallet.adb.add_transaction(closing_tx)
1✔
2734
        except UnrelatedTransactionException:
×
2735
            pass  # this can happen if (~all the balance goes to REMOTE)
×
2736
        chan.set_state(ChannelState.CLOSING)
1✔
2737
        # broadcast
2738
        await self.network.try_broadcasting(closing_tx, 'closing')
1✔
2739
        return closing_tx.txid()
1✔
2740

2741
    async def htlc_switch(self):
1✔
2742
        # In this loop, an item of chan.unfulfilled_htlcs may go through 4 stages:
2743
        # - 1. not forwarded yet: (None, onion_packet_hex)
2744
        # - 2. forwarded: (forwarding_key, onion_packet_hex)
2745
        # - 3. processed: (forwarding_key, None), not irrevocably removed yet
2746
        # - 4. done: (forwarding_key, None), irrevocably removed
2747

2748
        await self.initialized
1✔
2749
        while True:
1✔
2750
            await self.ping_if_required()
1✔
2751
            self._htlc_switch_iterdone_event.set()
1✔
2752
            self._htlc_switch_iterdone_event.clear()
1✔
2753
            # We poll every 0.1 sec to check if there is work to do,
2754
            # or we can also be triggered via events.
2755
            # When forwarding an HTLC originating from this peer (the upstream),
2756
            # we can get triggered for events that happen on the downstream peer.
2757
            # TODO: trampoline forwarding relies on the polling
2758
            async with ignore_after(0.1):
1✔
2759
                async with OldTaskGroup(wait=any) as group:
1✔
2760
                    await group.spawn(self._received_revack_event.wait())
1✔
2761
                    await group.spawn(self.downstream_htlc_resolved_event.wait())
1✔
2762
            self._htlc_switch_iterstart_event.set()
1✔
2763
            self._htlc_switch_iterstart_event.clear()
1✔
2764
            self._maybe_cleanup_received_htlcs_pending_removal()
1✔
2765
            for chan_id, chan in self.channels.items():
1✔
2766
                if not chan.can_update_ctx(proposer=LOCAL):
1✔
2767
                    continue
×
2768
                self.maybe_send_commitment(chan)
1✔
2769
                done = set()
1✔
2770
                unfulfilled = chan.unfulfilled_htlcs
1✔
2771
                for htlc_id, (onion_packet_hex, forwarding_key) in unfulfilled.items():
1✔
2772
                    if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1✔
2773
                        continue
1✔
2774
                    htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
1✔
2775
                    if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1✔
2776
                        assert onion_packet_hex is None
1✔
2777
                        self.lnworker.maybe_cleanup_mpp(chan.get_scid_or_local_alias(), htlc)
1✔
2778
                        if forwarding_key:
1✔
2779
                            self.lnworker.maybe_cleanup_forwarding(forwarding_key)
1✔
2780
                        done.add(htlc_id)
1✔
2781
                        continue
1✔
2782
                    if onion_packet_hex is None:
1✔
2783
                        # has been processed already
2784
                        continue
1✔
2785
                    error_reason = None  # type: Optional[OnionRoutingFailure]
1✔
2786
                    error_bytes = None  # type: Optional[bytes]
1✔
2787
                    preimage = None
1✔
2788
                    onion_packet_bytes = bytes.fromhex(onion_packet_hex)
1✔
2789
                    onion_packet = None
1✔
2790
                    try:
1✔
2791
                        onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
1✔
2792
                    except OnionRoutingFailure as e:
×
2793
                        error_reason = e
×
2794
                    else:
2795
                        try:
1✔
2796
                            preimage, _forwarding_key, error_bytes = self.process_unfulfilled_htlc(
1✔
2797
                                chan=chan,
2798
                                htlc=htlc,
2799
                                forwarding_key=forwarding_key,
2800
                                onion_packet_bytes=onion_packet_bytes,
2801
                                onion_packet=onion_packet)
2802
                            if _forwarding_key:
1✔
2803
                                assert forwarding_key is None
1✔
2804
                                unfulfilled[htlc_id] = onion_packet_hex, _forwarding_key
1✔
2805
                        except OnionRoutingFailure as e:
1✔
2806
                            error_bytes = construct_onion_error(e, onion_packet.public_key, self.privkey, self.network.get_local_height())
1✔
2807
                        if error_bytes:
1✔
2808
                            error_bytes = obfuscate_onion_error(error_bytes, onion_packet.public_key, our_onion_private_key=self.privkey)
1✔
2809

2810
                    if preimage or error_reason or error_bytes:
1✔
2811
                        if preimage:
1✔
2812
                            self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
1✔
2813
                            if not self.lnworker.enable_htlc_settle:
1✔
2814
                                continue
1✔
2815
                            self.fulfill_htlc(chan, htlc.htlc_id, preimage)
1✔
2816
                        elif error_bytes:
1✔
2817
                            self.fail_htlc(
1✔
2818
                                chan=chan,
2819
                                htlc_id=htlc.htlc_id,
2820
                                error_bytes=error_bytes)
2821
                        else:
2822
                            self.fail_malformed_htlc(
×
2823
                                chan=chan,
2824
                                htlc_id=htlc.htlc_id,
2825
                                reason=error_reason)
2826
                        # blank onion field to mark it as processed
2827
                        unfulfilled[htlc_id] = None, forwarding_key
1✔
2828

2829
                # cleanup
2830
                for htlc_id in done:
1✔
2831
                    unfulfilled.pop(htlc_id)
1✔
2832
                self.maybe_send_commitment(chan)
1✔
2833

2834
    def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
1✔
2835
        done = set()
1✔
2836
        for chan, htlc_id in self.received_htlcs_pending_removal:
1✔
2837
            if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1✔
2838
                done.add((chan, htlc_id))
1✔
2839
        if done:
1✔
2840
            for key in done:
1✔
2841
                self.received_htlcs_pending_removal.remove(key)
1✔
2842
            self.received_htlc_removed_event.set()
1✔
2843
            self.received_htlc_removed_event.clear()
1✔
2844

2845
    async def wait_one_htlc_switch_iteration(self) -> None:
1✔
2846
        """Waits until the HTLC switch does a full iteration or the peer disconnects,
2847
        whichever happens first.
2848
        """
2849
        async def htlc_switch_iteration():
1✔
2850
            await self._htlc_switch_iterstart_event.wait()
1✔
2851
            await self._htlc_switch_iterdone_event.wait()
1✔
2852

2853
        async with OldTaskGroup(wait=any) as group:
1✔
2854
            await group.spawn(htlc_switch_iteration())
1✔
2855
            await group.spawn(self.got_disconnected.wait())
1✔
2856

2857
    def process_unfulfilled_htlc(
1✔
2858
            self, *,
2859
            chan: Channel,
2860
            htlc: UpdateAddHtlc,
2861
            forwarding_key: Optional[str],
2862
            onion_packet_bytes: bytes,
2863
            onion_packet: OnionPacket) -> Tuple[Optional[bytes], Optional[str], Optional[bytes]]:
2864
        """
2865
        return (preimage, payment_key, error_bytes) with at most a single element that is not None
2866
        raise an OnionRoutingFailure if we need to fail the htlc
2867
        """
2868
        payment_hash = htlc.payment_hash
1✔
2869
        processed_onion = self.process_onion_packet(
1✔
2870
            onion_packet,
2871
            payment_hash=payment_hash,
2872
            onion_packet_bytes=onion_packet_bytes)
2873

2874
        preimage, forwarding_info = self.maybe_fulfill_htlc(
1✔
2875
            chan=chan,
2876
            htlc=htlc,
2877
            processed_onion=processed_onion,
2878
            onion_packet_bytes=onion_packet_bytes,
2879
            already_forwarded=bool(forwarding_key))
2880

2881
        if not forwarding_key:
1✔
2882
            if forwarding_info:
1✔
2883
                # HTLC we are supposed to forward, but haven't forwarded yet
2884
                payment_key, forwarding_callback = forwarding_info
1✔
2885
                if not self.lnworker.enable_htlc_forwarding:
1✔
2886
                    return None, None, None
×
2887
                if payment_key not in self.lnworker.active_forwardings:
1✔
2888
                    async def wrapped_callback():
1✔
2889
                        forwarding_coro = forwarding_callback()
1✔
2890
                        try:
1✔
2891
                            next_htlc = await forwarding_coro
1✔
2892
                            if next_htlc:
1✔
2893
                                htlc_key = serialize_htlc_key(chan.get_scid_or_local_alias(), htlc.htlc_id)
1✔
2894
                                self.lnworker.active_forwardings[payment_key].append(next_htlc)
1✔
2895
                                self.lnworker.downstream_to_upstream_htlc[next_htlc] = htlc_key
1✔
2896
                        except OnionRoutingFailure as e:
1✔
2897
                            if len(self.lnworker.active_forwardings[payment_key]) == 0:
1✔
2898
                                self.lnworker.save_forwarding_failure(payment_key, failure_message=e)
1✔
2899
                        # TODO what about other errors? e.g. TxBroadcastError for a swap.
2900
                        #        - malicious electrum server could fake TxBroadcastError
2901
                        #      Could we "catch-all Exception" and fail back the htlcs with e.g. TEMPORARY_NODE_FAILURE?
2902
                        #        - we don't want to fail the inc-HTLC for a syntax error that happens in the callback
2903
                        #      If we don't call save_forwarding_failure(), the inc-HTLC gets stuck until expiry
2904
                        #      and then the inc-channel will get force-closed.
2905
                        #      => forwarding_callback() could have an API with two exceptions types:
2906
                        #        - type1, such as OnionRoutingFailure, that signals we need to fail back the inc-HTLC
2907
                        #        - type2, such as TxBroadcastError, that signals we want to retry the callback
2908
                    # add to list
2909
                    assert len(self.lnworker.active_forwardings.get(payment_key, [])) == 0
1✔
2910
                    self.lnworker.active_forwardings[payment_key] = []
1✔
2911
                    fut = asyncio.ensure_future(wrapped_callback())
1✔
2912
                # return payment_key so this branch will not be executed again
2913
                return None, payment_key, None
1✔
2914
            elif preimage:
1✔
2915
                return preimage, None, None
1✔
2916
            else:
2917
                # we are waiting for mpp consolidation or preimage
2918
                return None, None, None
1✔
2919
        else:
2920
            # HTLC we are supposed to forward, and have already forwarded
2921
            # for final trampoline onions, forwarding failures are stored with forwarding_key (which is the inner key)
2922
            payment_key = forwarding_key
1✔
2923
            preimage = self.lnworker.get_preimage(payment_hash)
1✔
2924
            error_bytes, error_reason = self.lnworker.get_forwarding_failure(payment_key)
1✔
2925
            if error_bytes:
1✔
2926
                return None, None, error_bytes
1✔
2927
            if error_reason:
1✔
2928
                raise error_reason
1✔
2929
            if preimage:
1✔
2930
                return preimage, None, None
1✔
2931
            return None, None, None
1✔
2932

2933
    def process_onion_packet(
1✔
2934
            self,
2935
            onion_packet: OnionPacket, *,
2936
            payment_hash: bytes,
2937
            onion_packet_bytes: bytes,
2938
            is_trampoline: bool = False) -> ProcessedOnionPacket:
2939

2940
        failure_data = sha256(onion_packet_bytes)
1✔
2941
        try:
1✔
2942
            processed_onion = process_onion_packet(
1✔
2943
                onion_packet,
2944
                our_onion_private_key=self.privkey,
2945
                associated_data=payment_hash,
2946
                is_trampoline=is_trampoline)
2947
        except UnsupportedOnionPacketVersion:
×
2948
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2949
        except InvalidOnionPubkey:
×
2950
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
×
2951
        except InvalidOnionMac:
×
2952
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
×
2953
        except Exception as e:
×
2954
            self.logger.info(f"error processing onion packet: {e!r}")
×
2955
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2956
        if self.network.config.TEST_FAIL_HTLCS_AS_MALFORMED:
1✔
2957
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2958
        if self.network.config.TEST_FAIL_HTLCS_WITH_TEMP_NODE_FAILURE:
1✔
2959
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1✔
2960
        return processed_onion
1✔
2961

2962
    def on_onion_message(self, payload):
1✔
2963
        if hasattr(self.lnworker, 'onion_message_manager'):  # only on LNWallet
×
2964
            self.lnworker.onion_message_manager.on_onion_message(payload)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc