• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5304010765238272

17 Aug 2023 02:17PM UTC coverage: 59.027% (+0.02%) from 59.008%
5304010765238272

Pull #8493

CirrusCI

ecdsa
storage.append: fail if the file length is not what we expect
Pull Request #8493: partial-writes using jsonpatch

165 of 165 new or added lines in 9 files covered. (100.0%)

18653 of 31601 relevant lines covered (59.03%)

2.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.69
/electrum/lnpeer.py
1
#!/usr/bin/env python3
2
#
3
# Copyright (C) 2018 The Electrum developers
4
# Distributed under the MIT software license, see the accompanying
5
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
6

7
import zlib
5✔
8
from collections import OrderedDict, defaultdict
5✔
9
import asyncio
5✔
10
import os
5✔
11
import time
5✔
12
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable
5✔
13
from datetime import datetime
5✔
14
import functools
5✔
15

16
import aiorpcx
5✔
17
from aiorpcx import ignore_after
5✔
18

19
from .crypto import sha256, sha256d
5✔
20
from . import bitcoin, util
5✔
21
from . import ecc
5✔
22
from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
5✔
23
from . import constants
5✔
24
from .util import (bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
5✔
25
                   UnrelatedTransactionException, error_text_bytes_to_safe_str)
26
from . import transaction
5✔
27
from .bitcoin import make_op_return
5✔
28
from .transaction import PartialTxOutput, match_script_against_template, Sighash
5✔
29
from .logging import Logger
5✔
30
from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
5✔
31
                      process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
32
                      ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
33
                      OnionFailureCodeMetaFlag)
34
from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState, ChanCloseOption
5✔
35
from . import lnutil
5✔
36
from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, ChannelConfig,
5✔
37
                     RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
38
                     funding_output_script, get_per_commitment_secret_from_seed,
39
                     secret_to_pubkey, PaymentFailure, LnFeatures,
40
                     LOCAL, REMOTE, HTLCOwner,
41
                     ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
42
                     LightningPeerConnectionClosed, HandshakeFailed,
43
                     RemoteMisbehaving, ShortChannelID,
44
                     IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
45
                     ChannelType, LNProtocolWarning, validate_features, IncompatibleOrInsaneFeatures)
46
from .lnutil import FeeUpdate, channel_id_from_funding_tx
5✔
47
from .lntransport import LNTransport, LNTransportBase
5✔
48
from .lnmsg import encode_msg, decode_msg, UnknownOptionalMsgType, FailedToParseMsg
5✔
49
from .interface import GracefulDisconnect
5✔
50
from .lnrouter import fee_for_edge_msat
5✔
51
from .lnutil import ln_dummy_address
5✔
52
from .json_db import StoredDict
5✔
53
from .invoices import PR_PAID
5✔
54
from .simple_config import FEE_LN_ETA_TARGET
5✔
55

56
if TYPE_CHECKING:
5✔
57
    from .lnworker import LNGossip, LNWallet
×
58
    from .lnrouter import LNPaymentRoute
×
59
    from .transaction import PartialTransaction
×
60

61

62
LN_P2P_NETWORK_TIMEOUT = 20
5✔
63

64

65
class Peer(Logger):
5✔
66
    # note: in general this class is NOT thread-safe. Most methods are assumed to be running on asyncio thread.
67

68
    LOGGING_SHORTCUT = 'P'
5✔
69

70
    ORDERED_MESSAGES = (
5✔
71
        'accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'closing_signed')
72
    SPAMMY_MESSAGES = (
5✔
73
        'ping', 'pong', 'channel_announcement', 'node_announcement', 'channel_update',)
74

75
    DELAY_INC_MSG_PROCESSING_SLEEP = 0.01
5✔
76

77
    def __init__(
5✔
78
            self,
79
            lnworker: Union['LNGossip', 'LNWallet'],
80
            pubkey: bytes,
81
            transport: LNTransportBase,
82
            *, is_channel_backup= False):
83

84
        self.lnworker = lnworker
5✔
85
        self.network = lnworker.network
5✔
86
        self.asyncio_loop = self.network.asyncio_loop
5✔
87
        self.is_channel_backup = is_channel_backup
5✔
88
        self._sent_init = False  # type: bool
5✔
89
        self._received_init = False  # type: bool
5✔
90
        self.initialized = self.asyncio_loop.create_future()
5✔
91
        self.got_disconnected = asyncio.Event()
5✔
92
        self.querying = asyncio.Event()
5✔
93
        self.transport = transport
5✔
94
        self.pubkey = pubkey  # remote pubkey
5✔
95
        self.privkey = self.transport.privkey  # local privkey
5✔
96
        self.features = self.lnworker.features  # type: LnFeatures
5✔
97
        self.their_features = LnFeatures(0)  # type: LnFeatures
5✔
98
        self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
5✔
99
        assert self.node_ids[0] != self.node_ids[1]
5✔
100
        self.last_message_time = 0
5✔
101
        self.pong_event = asyncio.Event()
5✔
102
        self.reply_channel_range = asyncio.Queue()
5✔
103
        # gossip uses a single queue to preserve message order
104
        self.gossip_queue = asyncio.Queue()
5✔
105
        self.ordered_message_queues = defaultdict(asyncio.Queue)  # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
5✔
106
        self.temp_id_to_id = {}  # type: Dict[bytes, Optional[bytes]]   # to forward error messages
5✔
107
        self.funding_created_sent = set() # for channels in PREOPENING
5✔
108
        self.funding_signed_sent = set()  # for channels in PREOPENING
5✔
109
        self.shutdown_received = {} # chan_id -> asyncio.Future()
5✔
110
        self.announcement_signatures = defaultdict(asyncio.Queue)
5✔
111
        self.channel_reestablish_msg = defaultdict(self.asyncio_loop.create_future)
5✔
112
        self.orphan_channel_updates = OrderedDict()  # type: OrderedDict[ShortChannelID, dict]
5✔
113
        Logger.__init__(self)
5✔
114
        self.taskgroup = OldTaskGroup()
5✔
115
        # HTLCs offered by REMOTE, that we started removing but are still active:
116
        self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
5✔
117
        self.received_htlc_removed_event = asyncio.Event()
5✔
118
        self._htlc_switch_iterstart_event = asyncio.Event()
5✔
119
        self._htlc_switch_iterdone_event = asyncio.Event()
5✔
120
        self._received_revack_event = asyncio.Event()
5✔
121
        self.received_commitsig_event = asyncio.Event()
5✔
122
        self.downstream_htlc_resolved_event = asyncio.Event()
5✔
123

124
    def send_message(self, message_name: str, **kwargs):
5✔
125
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
5✔
126
        assert type(message_name) is str
5✔
127
        if message_name not in self.SPAMMY_MESSAGES:
5✔
128
            self.logger.debug(f"Sending {message_name.upper()}")
5✔
129
        if message_name.upper() != "INIT" and not self.is_initialized():
5✔
130
            raise Exception("tried to send message before we are initialized")
×
131
        raw_msg = encode_msg(message_name, **kwargs)
5✔
132
        self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
5✔
133
        self.transport.send_bytes(raw_msg)
5✔
134

135
    def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
5✔
136
        is_commitment_signed = message_name == "commitment_signed"
5✔
137
        if not (message_name.startswith("update_") or is_commitment_signed):
5✔
138
            return
5✔
139
        assert channel_id
5✔
140
        chan = self.get_channel_by_id(channel_id)
5✔
141
        if not chan:
5✔
142
            raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
×
143
        chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
5✔
144
        if is_commitment_signed:
5✔
145
            # saving now, to ensure replaying updates works (in case of channel reestablishment)
146
            self.lnworker.save_channel(chan)
5✔
147

148
    def maybe_set_initialized(self):
5✔
149
        if self.initialized.done():
5✔
150
            return
5✔
151
        if self._sent_init and self._received_init:
5✔
152
            self.initialized.set_result(True)
5✔
153

154
    def is_initialized(self) -> bool:
5✔
155
        return (self.initialized.done()
5✔
156
                and not self.initialized.cancelled()
157
                and self.initialized.exception() is None
158
                and self.initialized.result() is True)
159

160
    async def initialize(self):
5✔
161
        # If outgoing transport, do handshake now. For incoming, it has already been done.
162
        if isinstance(self.transport, LNTransport):
5✔
163
            await self.transport.handshake()
×
164
        self.logger.info(f"handshake done for {self.transport.peer_addr or self.pubkey.hex()}")
5✔
165
        features = self.features.for_init_message()
5✔
166
        b = int.bit_length(features)
5✔
167
        flen = b // 8 + int(bool(b % 8))
5✔
168
        self.send_message(
5✔
169
            "init", gflen=0, flen=flen,
170
            features=features,
171
            init_tlvs={
172
                'networks':
173
                {'chains': constants.net.rev_genesis_bytes()}
174
            })
175
        self._sent_init = True
5✔
176
        self.maybe_set_initialized()
5✔
177

178
    @property
5✔
179
    def channels(self) -> Dict[bytes, Channel]:
5✔
180
        return self.lnworker.channels_for_peer(self.pubkey)
5✔
181

182
    def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
5✔
183
        # note: this is faster than self.channels.get(channel_id)
184
        chan = self.lnworker.get_channel_by_id(channel_id)
5✔
185
        if not chan:
5✔
186
            return None
×
187
        if chan.node_id != self.pubkey:
5✔
188
            return None
×
189
        return chan
5✔
190

191
    def diagnostic_name(self):
5✔
192
        return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
5✔
193

194
    async def ping_if_required(self):
5✔
195
        if time.time() - self.last_message_time > 30:
5✔
196
            self.send_message('ping', num_pong_bytes=4, byteslen=4)
×
197
            self.pong_event.clear()
×
198
            await self.pong_event.wait()
×
199

200
    def process_message(self, message: bytes):
5✔
201
        try:
5✔
202
            message_type, payload = decode_msg(message)
5✔
203
        except UnknownOptionalMsgType as e:
5✔
204
            self.logger.info(f"received unknown message from peer. ignoring: {e!r}")
5✔
205
            return
5✔
206
        except FailedToParseMsg as e:
5✔
207
            self.logger.info(
5✔
208
                f"failed to parse message from peer. disconnecting. "
209
                f"msg_type={e.msg_type_name}({e.msg_type_int}). exc={e!r}")
210
            #self.logger.info(f"failed to parse message: message(SECRET?)={message.hex()}")
211
            raise GracefulDisconnect() from e
5✔
212
        self.last_message_time = time.time()
5✔
213
        if message_type not in self.SPAMMY_MESSAGES:
5✔
214
            self.logger.debug(f"Received {message_type.upper()}")
5✔
215
        # only process INIT if we are a backup
216
        if self.is_channel_backup is True and message_type != 'init':
5✔
217
            return
×
218
        if message_type in self.ORDERED_MESSAGES:
5✔
219
            chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
5✔
220
            self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
5✔
221
        else:
222
            if message_type not in ('error', 'warning') and 'channel_id' in payload:
5✔
223
                chan = self.get_channel_by_id(payload['channel_id'])
5✔
224
                if chan is None:
5✔
225
                    self.logger.info(f"Received {message_type} for unknown channel {payload['channel_id'].hex()}")
×
226
                    return
×
227
                args = (chan, payload)
5✔
228
            else:
229
                args = (payload,)
5✔
230
            try:
5✔
231
                f = getattr(self, 'on_' + message_type)
5✔
232
            except AttributeError:
×
233
                #self.logger.info("Received '%s'" % message_type.upper(), payload)
234
                return
×
235
            # raw message is needed to check signature
236
            if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
5✔
237
                payload['raw'] = message
5✔
238
            execution_result = f(*args)
5✔
239
            if asyncio.iscoroutinefunction(f):
5✔
240
                asyncio.ensure_future(self.taskgroup.spawn(execution_result))
5✔
241

242
    def on_warning(self, payload):
5✔
243
        chan_id = payload.get("channel_id")
5✔
244
        err_bytes = payload['data']
5✔
245
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
5✔
246
        self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: "
5✔
247
                         f"{error_text_bytes_to_safe_str(err_bytes)}. chan_id={chan_id.hex()}. "
248
                         f"{is_known_chan_id=}")
249

250
    def on_error(self, payload):
5✔
251
        chan_id = payload.get("channel_id")
5✔
252
        err_bytes = payload['data']
5✔
253
        is_known_chan_id = (chan_id in self.channels) or (chan_id in self.temp_id_to_id)
5✔
254
        self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: "
5✔
255
                         f"{error_text_bytes_to_safe_str(err_bytes)}. chan_id={chan_id.hex()}. "
256
                         f"{is_known_chan_id=}")
257
        if chan_id in self.channels:
5✔
258
            self.schedule_force_closing(chan_id)
5✔
259
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
5✔
260
        elif chan_id in self.temp_id_to_id:
×
261
            chan_id = self.temp_id_to_id[chan_id] or chan_id
×
262
            self.ordered_message_queues[chan_id].put_nowait((None, {'error': err_bytes}))
×
263
        elif chan_id == bytes(32):
×
264
            # if channel_id is all zero:
265
            # - MUST fail all channels with the sending node.
266
            for cid in self.channels:
×
267
                self.schedule_force_closing(cid)
×
268
                self.ordered_message_queues[cid].put_nowait((None, {'error': err_bytes}))
×
269
        else:
270
            # if no existing channel is referred to by channel_id:
271
            # - MUST ignore the message.
272
            return
×
273
        raise GracefulDisconnect
5✔
274

275
    async def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=False):
5✔
276
        """Sends a warning and disconnects if close_connection.
277

278
        Note:
279
        * channel_id is the temporary channel id when the channel id is not yet available
280

281
        A sending node:
282
        MAY set channel_id to all zero if the warning is not related to a specific channel.
283

284
        when failure was caused by an invalid signature check:
285
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
286
          funding_signed, closing_signed, or commitment_signed message.
287
        """
288
        assert isinstance(channel_id, bytes)
5✔
289
        encoded_data = b'' if not message else message.encode('ascii')
5✔
290
        self.send_message('warning', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
5✔
291
        if close_connection:
5✔
292
            raise GracefulDisconnect
5✔
293

294
    async def send_error(self, channel_id: bytes, message: str = None, *, force_close_channel=False):
5✔
295
        """Sends an error message and force closes the channel.
296

297
        Note:
298
        * channel_id is the temporary channel id when the channel id is not yet available
299

300
        A sending node:
301
        * SHOULD send error for protocol violations or internal errors that make channels
302
          unusable or that make further communication unusable.
303
        * SHOULD send error with the unknown channel_id in reply to messages of type
304
          32-255 related to unknown channels.
305
        * MUST fail the channel(s) referred to by the error message.
306
        * MAY set channel_id to all zero to indicate all channels.
307

308
        when failure was caused by an invalid signature check:
309
        * SHOULD include the raw, hex-encoded transaction in reply to a funding_created,
310
          funding_signed, closing_signed, or commitment_signed message.
311
        """
312
        assert isinstance(channel_id, bytes)
5✔
313
        encoded_data = b'' if not message else message.encode('ascii')
5✔
314
        self.send_message('error', channel_id=channel_id, data=encoded_data, len=len(encoded_data))
5✔
315
        # MUST fail the channel(s) referred to by the error message:
316
        #  we may violate this with force_close_channel
317
        if force_close_channel:
5✔
318
            if channel_id in self.channels:
5✔
319
                self.schedule_force_closing(channel_id)
5✔
320
            elif channel_id == bytes(32):
×
321
                for cid in self.channels:
×
322
                    self.schedule_force_closing(cid)
×
323
        raise GracefulDisconnect
5✔
324

325
    def on_ping(self, payload):
5✔
326
        l = payload['num_pong_bytes']
5✔
327
        self.send_message('pong', byteslen=l)
5✔
328

329
    def on_pong(self, payload):
5✔
330
        self.pong_event.set()
5✔
331

332
    async def wait_for_message(self, expected_name: str, channel_id: bytes):
5✔
333
        q = self.ordered_message_queues[channel_id]
5✔
334
        name, payload = await util.wait_for2(q.get(), LN_P2P_NETWORK_TIMEOUT)
5✔
335
        # raise exceptions for errors, so that the caller sees them
336
        if (err_bytes := payload.get("error")) is not None:
5✔
337
            err_text = error_text_bytes_to_safe_str(err_bytes)
×
338
            raise GracefulDisconnect(
×
339
                f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {err_text}")
340
        if name != expected_name:
5✔
341
            raise Exception(f"Received unexpected '{name}'")
×
342
        return payload
5✔
343

344
    def on_init(self, payload):
5✔
345
        if self._received_init:
5✔
346
            self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
5✔
347
            return
5✔
348
        _their_features = int.from_bytes(payload['features'], byteorder="big")
5✔
349
        _their_features |= int.from_bytes(payload['globalfeatures'], byteorder="big")
5✔
350
        try:
5✔
351
            self.their_features = validate_features(_their_features)
5✔
352
        except IncompatibleOrInsaneFeatures as e:
×
353
            raise GracefulDisconnect(f"remote sent insane features: {repr(e)}")
×
354
        # check if features are compatible, and set self.features to what we negotiated
355
        try:
5✔
356
            self.features = ln_compare_features(self.features, self.their_features)
5✔
357
        except IncompatibleLightningFeatures as e:
×
358
            self.initialized.set_exception(e)
×
359
            raise GracefulDisconnect(f"{str(e)}")
×
360
        # check that they are on the same chain as us, if provided
361
        their_networks = payload["init_tlvs"].get("networks")
5✔
362
        if their_networks:
5✔
363
            their_chains = list(chunks(their_networks["chains"], 32))
5✔
364
            if constants.net.rev_genesis_bytes() not in their_chains:
5✔
365
                raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
×
366
        # all checks passed
367
        self.lnworker.on_peer_successfully_established(self)
5✔
368
        self._received_init = True
5✔
369
        self.maybe_set_initialized()
5✔
370

371
    def on_node_announcement(self, payload):
5✔
372
        if not self.lnworker.uses_trampoline():
×
373
            self.gossip_queue.put_nowait(('node_announcement', payload))
×
374

375
    def on_channel_announcement(self, payload):
5✔
376
        if not self.lnworker.uses_trampoline():
×
377
            self.gossip_queue.put_nowait(('channel_announcement', payload))
×
378

379
    def on_channel_update(self, payload):
5✔
380
        self.maybe_save_remote_update(payload)
5✔
381
        if not self.lnworker.uses_trampoline():
5✔
382
            self.gossip_queue.put_nowait(('channel_update', payload))
5✔
383

384
    def maybe_save_remote_update(self, payload):
5✔
385
        if not self.channels:
5✔
386
            return
×
387
        for chan in self.channels.values():
5✔
388
            if payload['short_channel_id'] in [chan.short_channel_id, chan.get_local_scid_alias()]:
5✔
389
                chan.set_remote_update(payload)
5✔
390
                self.logger.info(f"saved remote channel_update gossip msg for chan {chan.get_id_for_log()}")
5✔
391
                break
5✔
392
        else:
393
            # Save (some bounded number of) orphan channel updates for later
394
            # as it might be for our own direct channel with this peer
395
            # (and we might not yet know the short channel id for that)
396
            # Background: this code is here to deal with a bug in LND,
397
            # see https://github.com/lightningnetwork/lnd/issues/3651
398
            # and https://github.com/lightningnetwork/lightning-rfc/pull/657
399
            # This code assumes gossip_queries is set. BOLT7: "if the
400
            # gossip_queries feature is negotiated, [a node] MUST NOT
401
            # send gossip it did not generate itself"
402
            short_channel_id = ShortChannelID(payload['short_channel_id'])
×
403
            self.logger.info(f'received orphan channel update {short_channel_id}')
×
404
            self.orphan_channel_updates[short_channel_id] = payload
×
405
            while len(self.orphan_channel_updates) > 25:
×
406
                self.orphan_channel_updates.popitem(last=False)
×
407

408
    def on_announcement_signatures(self, chan: Channel, payload):
5✔
409
        if chan.config[LOCAL].was_announced:
×
410
            h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
×
411
        else:
412
            self.announcement_signatures[chan.channel_id].put_nowait(payload)
×
413

414
    def handle_disconnect(func):
5✔
415
        @functools.wraps(func)
5✔
416
        async def wrapper_func(self, *args, **kwargs):
5✔
417
            try:
×
418
                return await func(self, *args, **kwargs)
×
419
            except GracefulDisconnect as e:
×
420
                self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
×
421
            except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
×
422
                    aiorpcx.socks.SOCKSError) as e:
423
                self.logger.info(f"Disconnecting: {repr(e)}")
×
424
            finally:
425
                self.close_and_cleanup()
×
426
        return wrapper_func
5✔
427

428
    @ignore_exceptions  # do not kill outer taskgroup
5✔
429
    @log_exceptions
5✔
430
    @handle_disconnect
5✔
431
    async def main_loop(self):
5✔
432
        async with self.taskgroup as group:
×
433
            await group.spawn(self._message_loop())
×
434
            await group.spawn(self.htlc_switch())
×
435
            await group.spawn(self.query_gossip())
×
436
            await group.spawn(self.process_gossip())
×
437

438
    async def process_gossip(self):
5✔
439
        while True:
440
            await asyncio.sleep(5)
×
441
            if not self.network.lngossip:
×
442
                continue
×
443
            chan_anns = []
×
444
            chan_upds = []
×
445
            node_anns = []
×
446
            while True:
447
                name, payload = await self.gossip_queue.get()
×
448
                if name == 'channel_announcement':
×
449
                    chan_anns.append(payload)
×
450
                elif name == 'channel_update':
×
451
                    chan_upds.append(payload)
×
452
                elif name == 'node_announcement':
×
453
                    node_anns.append(payload)
×
454
                else:
455
                    raise Exception('unknown message')
×
456
                if self.gossip_queue.empty():
×
457
                    break
×
458
            if self.network.lngossip:
×
459
                await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
×
460

461
    async def query_gossip(self):
5✔
462
        try:
×
463
            await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
464
        except Exception as e:
×
465
            raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
×
466
        if self.lnworker == self.lnworker.network.lngossip:
×
467
            try:
×
468
                ids, complete = await util.wait_for2(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
×
469
            except asyncio.TimeoutError as e:
×
470
                raise GracefulDisconnect("query_channel_range timed out") from e
×
471
            self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
×
472
            await self.lnworker.add_new_ids(ids)
×
473
            while True:
474
                todo = self.lnworker.get_ids_to_query()
×
475
                if not todo:
×
476
                    await asyncio.sleep(1)
×
477
                    continue
×
478
                await self.get_short_channel_ids(todo)
×
479

480
    async def get_channel_range(self):
5✔
481
        first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
×
482
        num_blocks = self.lnworker.network.get_local_height() - first_block
×
483
        self.query_channel_range(first_block, num_blocks)
×
484
        intervals = []
×
485
        ids = set()
×
486
        # note: implementations behave differently...
487
        # "sane implementation that follows BOLT-07" example:
488
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
489
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
490
        #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
491
        #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
492
        #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
493
        # lnd example:
494
        #   query_channel_range. <<< first_block 497000, num_blocks 79038
495
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
496
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
497
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
498
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
499
        #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
500
        while True:
501
            index, num, complete, _ids = await self.reply_channel_range.get()
×
502
            ids.update(_ids)
×
503
            intervals.append((index, index+num))
×
504
            intervals.sort()
×
505
            while len(intervals) > 1:
×
506
                a,b = intervals[0]
×
507
                c,d = intervals[1]
×
508
                if not (a <= c and a <= b and c <= d):
×
509
                    raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
×
510
                if b >= c:
×
511
                    intervals = [(a,d)] + intervals[2:]
×
512
                else:
513
                    break
×
514
            if len(intervals) == 1 and complete:
×
515
                a, b = intervals[0]
×
516
                if a <= first_block and b >= first_block + num_blocks:
×
517
                    break
×
518
        return ids, complete
×
519

520
    def request_gossip(self, timestamp=0):
5✔
521
        if timestamp == 0:
×
522
            self.logger.info('requesting whole channel graph')
×
523
        else:
524
            self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
×
525
        self.send_message(
×
526
            'gossip_timestamp_filter',
527
            chain_hash=constants.net.rev_genesis_bytes(),
528
            first_timestamp=timestamp,
529
            timestamp_range=b'\xff'*4)
530

531
    def query_channel_range(self, first_block, num_blocks):
5✔
532
        self.logger.info(f'query channel range {first_block} {num_blocks}')
×
533
        self.send_message(
×
534
            'query_channel_range',
535
            chain_hash=constants.net.rev_genesis_bytes(),
536
            first_blocknum=first_block,
537
            number_of_blocks=num_blocks)
538

539
    def decode_short_ids(self, encoded):
5✔
540
        if encoded[0] == 0:
×
541
            decoded = encoded[1:]
×
542
        elif encoded[0] == 1:
×
543
            decoded = zlib.decompress(encoded[1:])
×
544
        else:
545
            raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
×
546
        ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
×
547
        return ids
×
548

549
    def on_reply_channel_range(self, payload):
5✔
550
        first = payload['first_blocknum']
×
551
        num = payload['number_of_blocks']
×
552
        complete = bool(int.from_bytes(payload['sync_complete'], 'big'))
×
553
        encoded = payload['encoded_short_ids']
×
554
        ids = self.decode_short_ids(encoded)
×
555
        #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
556
        self.reply_channel_range.put_nowait((first, num, complete, ids))
×
557

558
    async def get_short_channel_ids(self, ids):
5✔
559
        self.logger.info(f'Querying {len(ids)} short_channel_ids')
×
560
        assert not self.querying.is_set()
×
561
        self.query_short_channel_ids(ids)
×
562
        await self.querying.wait()
×
563
        self.querying.clear()
×
564

565
    def query_short_channel_ids(self, ids, compressed=True):
5✔
566
        ids = sorted(ids)
×
567
        s = b''.join(ids)
×
568
        encoded = zlib.compress(s) if compressed else s
×
569
        prefix = b'\x01' if compressed else b'\x00'
×
570
        self.send_message(
×
571
            'query_short_channel_ids',
572
            chain_hash=constants.net.rev_genesis_bytes(),
573
            len=1+len(encoded),
574
            encoded_short_ids=prefix+encoded)
575

576
    async def _message_loop(self):
5✔
577
        try:
5✔
578
            await util.wait_for2(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
5✔
579
        except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
×
580
            raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
×
581
        async for msg in self.transport.read_messages():
5✔
582
            self.process_message(msg)
5✔
583
            if self.DELAY_INC_MSG_PROCESSING_SLEEP:
5✔
584
                # rate-limit message-processing a bit, to make it harder
585
                # for a single peer to bog down the event loop / cpu:
586
                await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
5✔
587

588
    def on_reply_short_channel_ids_end(self, payload):
5✔
589
        self.querying.set()
×
590

591
    def close_and_cleanup(self):
5✔
592
        # note: This method might get called multiple times!
593
        #       E.g. if you call close_and_cleanup() to cause a disconnection from the peer,
594
        #       it will get called a second time in handle_disconnect().
595
        try:
×
596
            if self.transport:
×
597
                self.transport.close()
×
598
        except Exception:
×
599
            pass
×
600
        self.lnworker.peer_closed(self)
×
601
        self.got_disconnected.set()
×
602

603
    def is_shutdown_anysegwit(self):
5✔
604
        return self.features.supports(LnFeatures.OPTION_SHUTDOWN_ANYSEGWIT_OPT)
5✔
605

606
    def is_channel_type(self):
5✔
607
        return self.features.supports(LnFeatures.OPTION_CHANNEL_TYPE_OPT)
×
608

609
    def is_upfront_shutdown_script(self):
5✔
610
        return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
5✔
611

612
    def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
5✔
613
        if msg_identifier not in ['accept', 'open']:
×
614
            raise ValueError("msg_identifier must be either 'accept' or 'open'")
×
615

616
        uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
×
617
            'upfront_shutdown_script')
618

619
        if uss_tlv and self.is_upfront_shutdown_script():
×
620
            upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
×
621
        else:
622
            upfront_shutdown_script = b''
×
623
        self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
×
624
        return upfront_shutdown_script
×
625

626
    def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner, channel_type: ChannelType) -> LocalConfig:
5✔
627
        channel_seed = os.urandom(32)
×
628
        initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
×
629

630
        # sending empty bytes as the upfront_shutdown_script will give us the
631
        # flexibility to decide an address at closing time
632
        upfront_shutdown_script = b''
×
633

634
        assert channel_type & channel_type.OPTION_STATIC_REMOTEKEY
×
635
        wallet = self.lnworker.wallet
×
636
        assert wallet.txin_type == 'p2wpkh'
×
637
        addr = wallet.get_new_sweep_address_for_channel()
×
638
        static_remotekey = bytes.fromhex(wallet.get_public_key(addr))
×
639

640
        dust_limit_sat = bitcoin.DUST_LIMIT_P2PKH
×
641
        reserve_sat = max(funding_sat // 100, dust_limit_sat)
×
642
        # for comparison of defaults, see
643
        # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
644
        # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
645
        # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
646
        local_config = LocalConfig.from_seed(
×
647
            channel_seed=channel_seed,
648
            static_remotekey=static_remotekey,
649
            upfront_shutdown_script=upfront_shutdown_script,
650
            to_self_delay=self.network.config.LIGHTNING_TO_SELF_DELAY_CSV,
651
            dust_limit_sat=dust_limit_sat,
652
            max_htlc_value_in_flight_msat=funding_sat * 1000,
653
            max_accepted_htlcs=30,
654
            initial_msat=initial_msat,
655
            reserve_sat=reserve_sat,
656
            funding_locked_received=False,
657
            was_announced=False,
658
            current_commitment_signature=None,
659
            current_htlc_signatures=b'',
660
            htlc_minimum_msat=1,
661
        )
662
        local_config.validate_params(funding_sat=funding_sat, config=self.network.config, peer_features=self.features)
×
663
        return local_config
×
664

665
    def temporarily_reserve_funding_tx_change_address(func):
5✔
666
        # During the channel open flow, if we initiated, we might have used a change address
667
        # of ours in the funding tx. The funding tx is not part of the wallet history
668
        # at that point yet, but we should already consider this change address as 'used'.
669
        @functools.wraps(func)
5✔
670
        async def wrapper(self: 'Peer', *args, **kwargs):
5✔
671
            funding_tx = kwargs['funding_tx']  # type: PartialTransaction
×
672
            wallet = self.lnworker.wallet
×
673
            change_addresses = [txout.address for txout in funding_tx.outputs()
×
674
                                if wallet.is_change(txout.address)]
675
            for addr in change_addresses:
×
676
                wallet.set_reserved_state_of_address(addr, reserved=True)
×
677
            try:
×
678
                return await func(self, *args, **kwargs)
×
679
            finally:
680
                for addr in change_addresses:
×
681
                    self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
×
682
        return wrapper
5✔
683

684
    @temporarily_reserve_funding_tx_change_address
5✔
685
    async def channel_establishment_flow(
5✔
686
            self, *,
687
            funding_tx: 'PartialTransaction',
688
            funding_sat: int,
689
            push_msat: int,
690
            temp_channel_id: bytes
691
    ) -> Tuple[Channel, 'PartialTransaction']:
692
        """Implements the channel opening flow.
693

694
        -> open_channel message
695
        <- accept_channel message
696
        -> funding_created message
697
        <- funding_signed message
698

699
        Channel configurations are initialized in this method.
700
        """
701
        # will raise if init fails
702
        await util.wait_for2(self.initialized, LN_P2P_NETWORK_TIMEOUT)
×
703
        # trampoline is not yet in features
704
        if self.lnworker.uses_trampoline() and not self.lnworker.is_trampoline_peer(self.pubkey):
×
705
            raise Exception('Not a trampoline node: ' + str(self.their_features))
×
706

707
        feerate = self.lnworker.current_feerate_per_kw()
×
708
        # we set a channel type for internal bookkeeping
709
        open_channel_tlvs = {}
×
710
        assert self.their_features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
×
711
        our_channel_type = ChannelType(ChannelType.OPTION_STATIC_REMOTEKEY)
×
712
        # We do not set the option_scid_alias bit in channel_type because LND rejects it.
713
        # Eclair accepts channel_type with that bit, but does not require it.
714

715
        # if option_channel_type is negotiated: MUST set channel_type
716
        if self.is_channel_type():
×
717
            # if it includes channel_type: MUST set it to a defined type representing the type it wants.
718
            open_channel_tlvs['channel_type'] = {
×
719
                'type': our_channel_type.to_bytes_minimal()
720
            }
721

722
        local_config = self.make_local_config(funding_sat, push_msat, LOCAL, our_channel_type)
×
723
        # if it includes open_channel_tlvs: MUST include upfront_shutdown_script.
724
        open_channel_tlvs['upfront_shutdown_script'] = {
×
725
            'shutdown_scriptpubkey': local_config.upfront_shutdown_script
726
        }
727

728
        # for the first commitment transaction
729
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
730
            local_config.per_commitment_secret_seed,
731
            RevocationStore.START_INDEX
732
        )
733
        per_commitment_point_first = secret_to_pubkey(
×
734
            int.from_bytes(per_commitment_secret_first, 'big'))
735

736
        # store the temp id now, so that it is recognized for e.g. 'error' messages
737
        # TODO: this is never cleaned up; the dict grows unbounded until disconnect
738
        self.temp_id_to_id[temp_channel_id] = None
×
739
        self.send_message(
×
740
            "open_channel",
741
            temporary_channel_id=temp_channel_id,
742
            chain_hash=constants.net.rev_genesis_bytes(),
743
            funding_satoshis=funding_sat,
744
            push_msat=push_msat,
745
            dust_limit_satoshis=local_config.dust_limit_sat,
746
            feerate_per_kw=feerate,
747
            max_accepted_htlcs=local_config.max_accepted_htlcs,
748
            funding_pubkey=local_config.multisig_key.pubkey,
749
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
750
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
751
            payment_basepoint=local_config.payment_basepoint.pubkey,
752
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
753
            first_per_commitment_point=per_commitment_point_first,
754
            to_self_delay=local_config.to_self_delay,
755
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
756
            channel_flags=0x00,  # not willing to announce channel
757
            channel_reserve_satoshis=local_config.reserve_sat,
758
            htlc_minimum_msat=local_config.htlc_minimum_msat,
759
            open_channel_tlvs=open_channel_tlvs,
760
        )
761

762
        # <- accept_channel
763
        payload = await self.wait_for_message('accept_channel', temp_channel_id)
×
764
        self.logger.debug(f"received accept_channel for temp_channel_id={temp_channel_id.hex()}. {payload=}")
×
765
        remote_per_commitment_point = payload['first_per_commitment_point']
×
766
        funding_txn_minimum_depth = payload['minimum_depth']
×
767
        if funding_txn_minimum_depth <= 0:
×
768
            raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
×
769
        if funding_txn_minimum_depth > 30:
×
770
            raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
×
771

772
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
773
            payload, 'accept')
774

775
        accept_channel_tlvs = payload.get('accept_channel_tlvs')
×
776
        their_channel_type = accept_channel_tlvs.get('channel_type') if accept_channel_tlvs else None
×
777
        if their_channel_type:
×
778
            their_channel_type = ChannelType.from_bytes(their_channel_type['type'], byteorder='big').discard_unknown_and_check()
×
779
            # if channel_type is set, and channel_type was set in open_channel,
780
            # and they are not equal types: MUST reject the channel.
781
            if open_channel_tlvs.get('channel_type') is not None and their_channel_type != our_channel_type:
×
782
                raise Exception("Channel type is not the one that we sent.")
×
783

784
        remote_config = RemoteConfig(
×
785
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
786
            multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
787
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
788
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
789
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
790
            to_self_delay=payload['to_self_delay'],
791
            dust_limit_sat=payload['dust_limit_satoshis'],
792
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
793
            max_accepted_htlcs=payload["max_accepted_htlcs"],
794
            initial_msat=push_msat,
795
            reserve_sat=payload["channel_reserve_satoshis"],
796
            htlc_minimum_msat=payload['htlc_minimum_msat'],
797
            next_per_commitment_point=remote_per_commitment_point,
798
            current_per_commitment_point=None,
799
            upfront_shutdown_script=upfront_shutdown_script,
800
        )
801
        ChannelConfig.cross_validate_params(
×
802
            local_config=local_config,
803
            remote_config=remote_config,
804
            funding_sat=funding_sat,
805
            is_local_initiator=True,
806
            initial_feerate_per_kw=feerate,
807
            config=self.network.config,
808
            peer_features=self.features,
809
        )
810

811
        # -> funding created
812
        # replace dummy output in funding tx
813
        redeem_script = funding_output_script(local_config, remote_config)
×
814
        funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
×
815
        funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
×
816
        dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
×
817
        if dummy_output not in funding_tx.outputs(): raise Exception("LN dummy output (err 1)")
×
818
        funding_tx._outputs.remove(dummy_output)
×
819
        if dummy_output in funding_tx.outputs(): raise Exception("LN dummy output (err 2)")
×
820
        funding_tx.add_outputs([funding_output])
×
821
        # find and encrypt op_return data associated to funding_address
822
        has_onchain_backup = self.lnworker and self.lnworker.has_recoverable_channels()
×
823
        if has_onchain_backup:
×
824
            backup_data = self.lnworker.cb_data(self.pubkey)
×
825
            dummy_scriptpubkey = make_op_return(backup_data)
×
826
            for o in funding_tx.outputs():
×
827
                if o.scriptpubkey == dummy_scriptpubkey:
×
828
                    encrypted_data = self.lnworker.encrypt_cb_data(backup_data, funding_address)
×
829
                    assert len(encrypted_data) == len(backup_data)
×
830
                    o.scriptpubkey = make_op_return(encrypted_data)
×
831
                    break
×
832
            else:
833
                raise Exception('op_return output not found in funding tx')
×
834
        # must not be malleable
835
        funding_tx.set_rbf(False)
×
836
        if not funding_tx.is_segwit():
×
837
            raise Exception('Funding transaction is not segwit')
×
838
        funding_txid = funding_tx.txid()
×
839
        assert funding_txid
×
840
        funding_index = funding_tx.outputs().index(funding_output)
×
841
        # build remote commitment transaction
842
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
×
843
        outpoint = Outpoint(funding_txid, funding_index)
×
844
        constraints = ChannelConstraints(
×
845
            capacity=funding_sat,
846
            is_initiator=True,
847
            funding_txn_minimum_depth=funding_txn_minimum_depth
848
        )
849
        storage = self.create_channel_storage(
×
850
            channel_id, outpoint, local_config, remote_config, constraints, our_channel_type)
851
        chan = Channel(
×
852
            storage,
853
            lnworker=self.lnworker,
854
            initial_feerate=feerate
855
        )
856
        chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
×
857
        chan.storage['has_onchain_backup'] = has_onchain_backup
×
858
        if isinstance(self.transport, LNTransport):
×
859
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
860
        sig_64, _ = chan.sign_next_commitment()
×
861
        self.temp_id_to_id[temp_channel_id] = channel_id
×
862

863
        self.send_message("funding_created",
×
864
            temporary_channel_id=temp_channel_id,
865
            funding_txid=funding_txid_bytes,
866
            funding_output_index=funding_index,
867
            signature=sig_64)
868
        self.funding_created_sent.add(channel_id)
×
869

870
        # <- funding signed
871
        payload = await self.wait_for_message('funding_signed', channel_id)
×
872
        self.logger.info('received funding_signed')
×
873
        remote_sig = payload['signature']
×
874
        try:
×
875
            chan.receive_new_commitment(remote_sig, [])
×
876
        except LNProtocolWarning as e:
×
877
            await self.send_warning(channel_id, message=str(e), close_connection=True)
×
878
        chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
×
879
        chan.set_state(ChannelState.OPENING)
×
880
        self.lnworker.add_new_channel(chan)
×
881
        return chan, funding_tx
×
882

883
    def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints, channel_type):
5✔
884
        chan_dict = {
×
885
            "node_id": self.pubkey.hex(),
886
            "channel_id": channel_id.hex(),
887
            "short_channel_id": None,
888
            "funding_outpoint": outpoint,
889
            "remote_config": remote_config,
890
            "local_config": local_config,
891
            "constraints": constraints,
892
            "remote_update": None,
893
            "state": ChannelState.PREOPENING.name,
894
            'onion_keys': {},
895
            'data_loss_protect_remote_pcp': {},
896
            "log": {},
897
            "fail_htlc_reasons": {},  # htlc_id -> onion_packet
898
            "unfulfilled_htlcs": {},  # htlc_id -> error_bytes, failure_message
899
            "revocation_store": {},
900
            "channel_type": channel_type,
901
        }
902
        # set db to None, because we do not want to write updates until channel is saved
903
        return StoredDict(chan_dict, None, [])
×
904

905
    async def on_open_channel(self, payload):
5✔
906
        """Implements the channel acceptance flow.
907

908
        <- open_channel message
909
        -> accept_channel message
910
        <- funding_created message
911
        -> funding_signed message
912

913
        Channel configurations are initialized in this method.
914
        """
915
        if self.lnworker.has_recoverable_channels():
×
916
            # FIXME: we might want to keep the connection open
917
            raise Exception('not accepting channels')
×
918
        # <- open_channel
919
        if payload['chain_hash'] != constants.net.rev_genesis_bytes():
×
920
            raise Exception('wrong chain_hash')
×
921
        funding_sat = payload['funding_satoshis']
×
922
        push_msat = payload['push_msat']
×
923
        feerate = payload['feerate_per_kw']  # note: we are not validating this
×
924
        temp_chan_id = payload['temporary_channel_id']
×
925
        # store the temp id now, so that it is recognized for e.g. 'error' messages
926
        # TODO: this is never cleaned up; the dict grows unbounded until disconnect
927
        self.temp_id_to_id[temp_chan_id] = None
×
928

929
        open_channel_tlvs = payload.get('open_channel_tlvs')
×
930
        channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None
×
931
        # The receiving node MAY fail the channel if:
932
        # option_channel_type was negotiated but the message doesn't include a channel_type
933
        if self.is_channel_type() and channel_type is None:
×
934
            raise Exception("sender has advertized option_channel_type, but hasn't sent the channel type")
×
935
        # MUST fail the channel if it supports channel_type,
936
        # channel_type was set, and the type is not suitable.
937
        elif self.is_channel_type() and channel_type is not None:
×
938
            channel_type = ChannelType.from_bytes(channel_type['type'], byteorder='big').discard_unknown_and_check()
×
939
            if not channel_type.complies_with_features(self.features):
×
940
                raise Exception("sender has sent a channel type we don't support")
×
941

942
        local_config = self.make_local_config(funding_sat, push_msat, REMOTE, channel_type)
×
943

944
        upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
×
945
            payload, 'open')
946

947
        remote_config = RemoteConfig(
×
948
            payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
949
            multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
950
            htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
951
            delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
952
            revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
953
            to_self_delay=payload['to_self_delay'],
954
            dust_limit_sat=payload['dust_limit_satoshis'],
955
            max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
956
            max_accepted_htlcs=payload['max_accepted_htlcs'],
957
            initial_msat=funding_sat * 1000 - push_msat,
958
            reserve_sat=payload['channel_reserve_satoshis'],
959
            htlc_minimum_msat=payload['htlc_minimum_msat'],
960
            next_per_commitment_point=payload['first_per_commitment_point'],
961
            current_per_commitment_point=None,
962
            upfront_shutdown_script=upfront_shutdown_script,
963
        )
964
        ChannelConfig.cross_validate_params(
×
965
            local_config=local_config,
966
            remote_config=remote_config,
967
            funding_sat=funding_sat,
968
            is_local_initiator=False,
969
            initial_feerate_per_kw=feerate,
970
            config=self.network.config,
971
            peer_features=self.features,
972
        )
973

974
        # note: we ignore payload['channel_flags'],  which e.g. contains 'announce_channel'.
975
        #       Notably, if the remote sets 'announce_channel' to True, we will ignore that too,
976
        #       but we will not play along with actually announcing the channel (so we keep it private).
977

978
        # -> accept channel
979
        # for the first commitment transaction
980
        per_commitment_secret_first = get_per_commitment_secret_from_seed(
×
981
            local_config.per_commitment_secret_seed,
982
            RevocationStore.START_INDEX
983
        )
984
        per_commitment_point_first = secret_to_pubkey(
×
985
            int.from_bytes(per_commitment_secret_first, 'big'))
986
        min_depth = 3
×
987
        accept_channel_tlvs = {
×
988
            'upfront_shutdown_script': {
989
                'shutdown_scriptpubkey': local_config.upfront_shutdown_script
990
            },
991
        }
992
        # The sender: if it sets channel_type: MUST set it to the channel_type from open_channel
993
        if self.is_channel_type():
×
994
            accept_channel_tlvs['channel_type'] = {
×
995
                'type': channel_type.to_bytes_minimal()
996
            }
997

998
        self.send_message(
×
999
            'accept_channel',
1000
            temporary_channel_id=temp_chan_id,
1001
            dust_limit_satoshis=local_config.dust_limit_sat,
1002
            max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
1003
            channel_reserve_satoshis=local_config.reserve_sat,
1004
            htlc_minimum_msat=local_config.htlc_minimum_msat,
1005
            minimum_depth=min_depth,
1006
            to_self_delay=local_config.to_self_delay,
1007
            max_accepted_htlcs=local_config.max_accepted_htlcs,
1008
            funding_pubkey=local_config.multisig_key.pubkey,
1009
            revocation_basepoint=local_config.revocation_basepoint.pubkey,
1010
            payment_basepoint=local_config.payment_basepoint.pubkey,
1011
            delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
1012
            htlc_basepoint=local_config.htlc_basepoint.pubkey,
1013
            first_per_commitment_point=per_commitment_point_first,
1014
            accept_channel_tlvs=accept_channel_tlvs,
1015
        )
1016

1017
        # <- funding created
1018
        funding_created = await self.wait_for_message('funding_created', temp_chan_id)
×
1019

1020
        # -> funding signed
1021
        funding_idx = funding_created['funding_output_index']
×
1022
        funding_txid = funding_created['funding_txid'][::-1].hex()
×
1023
        channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
×
1024
        constraints = ChannelConstraints(
×
1025
            capacity=funding_sat,
1026
            is_initiator=False,
1027
            funding_txn_minimum_depth=min_depth
1028
        )
1029
        outpoint = Outpoint(funding_txid, funding_idx)
×
1030
        chan_dict = self.create_channel_storage(
×
1031
            channel_id, outpoint, local_config, remote_config, constraints, channel_type)
1032
        chan = Channel(
×
1033
            chan_dict,
1034
            lnworker=self.lnworker,
1035
            initial_feerate=feerate
1036
        )
1037
        chan.storage['init_timestamp'] = int(time.time())
×
1038
        if isinstance(self.transport, LNTransport):
×
1039
            chan.add_or_update_peer_addr(self.transport.peer_addr)
×
1040
        remote_sig = funding_created['signature']
×
1041
        try:
×
1042
            chan.receive_new_commitment(remote_sig, [])
×
1043
        except LNProtocolWarning as e:
×
1044
            await self.send_warning(channel_id, message=str(e), close_connection=True)
×
1045
        sig_64, _ = chan.sign_next_commitment()
×
1046
        self.send_message('funding_signed',
×
1047
            channel_id=channel_id,
1048
            signature=sig_64,
1049
        )
1050
        self.temp_id_to_id[temp_chan_id] = channel_id
×
1051
        self.funding_signed_sent.add(chan.channel_id)
×
1052
        chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
×
1053
        chan.set_state(ChannelState.OPENING)
×
1054
        self.lnworker.add_new_channel(chan)
×
1055

1056
    async def request_force_close(self, channel_id: bytes):
5✔
1057
        """Try to trigger the remote peer to force-close."""
1058
        await self.initialized
×
1059
        # First, we intentionally send a "channel_reestablish" msg with an old state.
1060
        # Many nodes (but not all) automatically force-close when seeing this.
1061
        latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
×
1062
        self.send_message(
×
1063
            "channel_reestablish",
1064
            channel_id=channel_id,
1065
            next_commitment_number=0,
1066
            next_revocation_number=0,
1067
            your_last_per_commitment_secret=0,
1068
            my_current_per_commitment_point=latest_point)
1069
        # Newish nodes that have lightning/bolts/pull/950 force-close upon receiving an "error" msg,
1070
        # so send that too. E.g. old "channel_reestablish" is not enough for eclair 0.7+,
1071
        # but "error" is. see https://github.com/ACINQ/eclair/pull/2036
1072
        # The receiving node:
1073
        #   - upon receiving `error`:
1074
        #     - MUST fail the channel referred to by `channel_id`, if that channel is with the sending node.
1075
        self.send_message("error", channel_id=channel_id, data=b"", len=0)
×
1076

1077
    def schedule_force_closing(self, channel_id: bytes):
5✔
1078
        """ wrapper of lnworker's method, that raises if channel is not with this peer """
1079
        channels_with_peer = list(self.channels.keys())
5✔
1080
        channels_with_peer.extend(self.temp_id_to_id.values())
5✔
1081
        if channel_id not in channels_with_peer:
5✔
1082
            raise ValueError(f"channel {channel_id.hex()} does not belong to this peer")
×
1083
        chan = self.channels.get(channel_id)
5✔
1084
        if not chan:
5✔
1085
            self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet")
×
1086
        if ChanCloseOption.LOCAL_FCLOSE in chan.get_close_options():
5✔
1087
            self.lnworker.schedule_force_closing(channel_id)
5✔
1088
        else:
1089
            self.logger.info(f"tried to force-close channel {chan.get_id_for_log()} "
×
1090
                             f"but close option is not allowed. {chan.get_state()=!r}")
1091

1092
    def on_channel_reestablish(self, chan, msg):
5✔
1093
        their_next_local_ctn = msg["next_commitment_number"]
5✔
1094
        their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
5✔
1095
        their_local_pcp = msg.get("my_current_per_commitment_point")
5✔
1096
        their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
5✔
1097
        self.logger.info(
5✔
1098
            f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
1099
            f'(their_next_local_ctn={their_next_local_ctn}, '
1100
            f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
1101
        # sanity checks of received values
1102
        if their_next_local_ctn < 0:
5✔
1103
            raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
×
1104
        if their_oldest_unrevoked_remote_ctn < 0:
5✔
1105
            raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
×
1106
        # ctns
1107
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
5✔
1108
        latest_local_ctn = chan.get_latest_ctn(LOCAL)
5✔
1109
        next_local_ctn = chan.get_next_ctn(LOCAL)
5✔
1110
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
5✔
1111
        latest_remote_ctn = chan.get_latest_ctn(REMOTE)
5✔
1112
        next_remote_ctn = chan.get_next_ctn(REMOTE)
5✔
1113
        # compare remote ctns
1114
        we_are_ahead = False
5✔
1115
        they_are_ahead = False
5✔
1116
        we_must_resend_revoke_and_ack = False
5✔
1117
        if next_remote_ctn != their_next_local_ctn:
5✔
1118
            if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
5✔
1119
                # We will replay the local updates (see reestablish_channel), which should contain a commitment_signed
1120
                # (due to is_revack_pending being true), and this should remedy this situation.
1121
                pass
5✔
1122
            else:
1123
                self.logger.warning(
5✔
1124
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1125
                    f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
1126
                if their_next_local_ctn < next_remote_ctn:
5✔
1127
                    we_are_ahead = True
5✔
1128
                else:
1129
                    they_are_ahead = True
5✔
1130
        # compare local ctns
1131
        if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
5✔
1132
            if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
5✔
1133
                # A node:
1134
                #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
1135
                #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
1136
                #        MUST re-send the revoke_and_ack.
1137
                we_must_resend_revoke_and_ack = True
5✔
1138
            else:
1139
                self.logger.warning(
5✔
1140
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1141
                    f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
1142
                if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
5✔
1143
                    we_are_ahead = True
5✔
1144
                else:
1145
                    they_are_ahead = True
5✔
1146
        # option_data_loss_protect
1147
        assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
5✔
1148
        def are_datalossprotect_fields_valid() -> bool:
5✔
1149
            if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
5✔
1150
                return False
×
1151
            if their_oldest_unrevoked_remote_ctn > 0:
5✔
1152
                our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
5✔
1153
            else:
1154
                assert their_oldest_unrevoked_remote_ctn == 0
5✔
1155
                our_pcs = bytes(32)
5✔
1156
            if our_pcs != their_claim_of_our_last_per_commitment_secret:
5✔
1157
                self.logger.error(
×
1158
                    f"channel_reestablish ({chan.get_id_for_log()}): "
1159
                    f"(DLP) local PCS mismatch: {our_pcs.hex()} != {their_claim_of_our_last_per_commitment_secret.hex()}")
1160
                return False
×
1161
            assert chan.is_static_remotekey_enabled()
5✔
1162
            return True
5✔
1163
        if not are_datalossprotect_fields_valid():
5✔
1164
            raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
×
1165
        fut = self.channel_reestablish_msg[chan.channel_id]
5✔
1166
        if they_are_ahead:
5✔
1167
            self.logger.warning(
5✔
1168
                f"channel_reestablish ({chan.get_id_for_log()}): "
1169
                f"remote is ahead of us! They should force-close. Remote PCP: {their_local_pcp.hex()}")
1170
            # data_loss_protect_remote_pcp is used in lnsweep
1171
            chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
5✔
1172
            chan.set_state(ChannelState.WE_ARE_TOXIC)
5✔
1173
            self.lnworker.save_channel(chan)
5✔
1174
            chan.peer_state = PeerState.BAD
5✔
1175
            # raise after we send channel_reestablish, so the remote can realize they are ahead
1176
            fut.set_exception(RemoteMisbehaving("remote ahead of us"))
5✔
1177
        elif we_are_ahead:
5✔
1178
            self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
5✔
1179
            self.schedule_force_closing(chan.channel_id)
5✔
1180
            fut.set_exception(RemoteMisbehaving("we are ahead of remote"))
5✔
1181
        else:
1182
            # all good
1183
            fut.set_result((we_must_resend_revoke_and_ack, their_next_local_ctn))
5✔
1184

1185
    async def reestablish_channel(self, chan: Channel):
5✔
1186
        await self.initialized
5✔
1187
        chan_id = chan.channel_id
5✔
1188
        if chan.should_request_force_close:
5✔
1189
            chan.set_state(ChannelState.REQUESTED_FCLOSE)
×
1190
            await self.request_force_close(chan_id)
×
1191
            chan.should_request_force_close = False
×
1192
            return
×
1193
        assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
5✔
1194
        if chan.peer_state != PeerState.DISCONNECTED:
5✔
1195
            self.logger.info(
×
1196
                f'reestablish_channel was called but channel {chan.get_id_for_log()} '
1197
                f'already in peer_state {chan.peer_state!r}')
1198
            return
×
1199
        chan.peer_state = PeerState.REESTABLISHING
5✔
1200
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1201
        # ctns
1202
        oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
5✔
1203
        latest_local_ctn = chan.get_latest_ctn(LOCAL)
5✔
1204
        next_local_ctn = chan.get_next_ctn(LOCAL)
5✔
1205
        oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
5✔
1206
        latest_remote_ctn = chan.get_latest_ctn(REMOTE)
5✔
1207
        next_remote_ctn = chan.get_next_ctn(REMOTE)
5✔
1208
        # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
1209
        chan.hm.discard_unsigned_remote_updates()
5✔
1210
        # send message
1211
        assert chan.is_static_remotekey_enabled()
5✔
1212
        latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
5✔
1213
        if oldest_unrevoked_remote_ctn == 0:
5✔
1214
            last_rev_secret = 0
5✔
1215
        else:
1216
            last_rev_index = oldest_unrevoked_remote_ctn - 1
5✔
1217
            last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
5✔
1218
        self.send_message(
5✔
1219
            "channel_reestablish",
1220
            channel_id=chan_id,
1221
            next_commitment_number=next_local_ctn,
1222
            next_revocation_number=oldest_unrevoked_remote_ctn,
1223
            your_last_per_commitment_secret=last_rev_secret,
1224
            my_current_per_commitment_point=latest_point)
1225
        self.logger.info(
5✔
1226
            f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
1227
            f'(next_local_ctn={next_local_ctn}, '
1228
            f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
1229

1230
        # wait until we receive their channel_reestablish
1231
        fut = self.channel_reestablish_msg[chan_id]
5✔
1232
        await fut
5✔
1233
        we_must_resend_revoke_and_ack, their_next_local_ctn = fut.result()
5✔
1234

1235
        def replay_updates_and_commitsig():
5✔
1236
            # Replay un-acked local updates (including commitment_signed) byte-for-byte.
1237
            # If we have sent them a commitment signature that they "lost" (due to disconnect),
1238
            # we need to make sure we replay the same local updates, as otherwise they could
1239
            # end up with two (or more) signed valid commitment transactions at the same ctn.
1240
            # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
1241
            # e.g. for watchtowers, hence we must ensure these ctxs coincide.
1242
            # We replay the local updates even if they were not yet committed.
1243
            unacked = chan.hm.get_unacked_local_updates()
5✔
1244
            replayed_msgs = []
5✔
1245
            for ctn, messages in unacked.items():
5✔
1246
                if ctn < their_next_local_ctn:
5✔
1247
                    # They claim to have received these messages and the corresponding
1248
                    # commitment_signed, hence we must not replay them.
1249
                    continue
5✔
1250
                for raw_upd_msg in messages:
5✔
1251
                    self.transport.send_bytes(raw_upd_msg)
5✔
1252
                    replayed_msgs.append(raw_upd_msg)
5✔
1253
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {len(replayed_msgs)} unacked messages. '
5✔
1254
                             f'{[decode_msg(raw_upd_msg)[0] for raw_upd_msg in replayed_msgs]}')
1255

1256
        def resend_revoke_and_ack():
5✔
1257
            last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
5✔
1258
            next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
5✔
1259
            self.send_message(
5✔
1260
                "revoke_and_ack",
1261
                channel_id=chan.channel_id,
1262
                per_commitment_secret=last_secret,
1263
                next_per_commitment_point=next_point)
1264

1265
        # We need to preserve relative order of last revack and commitsig.
1266
        # note: it is not possible to recover and reestablish a channel if we are out-of-sync by
1267
        # more than one ctns, i.e. we will only ever retransmit up to one commitment_signed message.
1268
        # Hence, if we need to retransmit a revack, without loss of generality, we can either replay
1269
        # it as the first message or as the last message.
1270
        was_revoke_last = chan.hm.was_revoke_last()
5✔
1271
        if we_must_resend_revoke_and_ack and not was_revoke_last:
5✔
1272
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack first.')
5✔
1273
            resend_revoke_and_ack()
5✔
1274
        replay_updates_and_commitsig()
5✔
1275
        if we_must_resend_revoke_and_ack and was_revoke_last:
5✔
1276
            self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replaying a revoke_and_ack last.')
5✔
1277
            resend_revoke_and_ack()
5✔
1278

1279
        chan.peer_state = PeerState.GOOD
5✔
1280
        chan_just_became_ready = False
5✔
1281
        if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
5✔
1282
            chan_just_became_ready = True
5✔
1283
        if chan_just_became_ready or self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
5✔
1284
            self.send_channel_ready(chan)
5✔
1285
        # checks done
1286
        if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
5✔
1287
            self.mark_open(chan)
5✔
1288
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1289
        # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
1290
        if chan.get_state() == ChannelState.SHUTDOWN:
5✔
1291
            await self.send_shutdown(chan)
×
1292

1293
    def send_channel_ready(self, chan: Channel):
5✔
1294
        channel_id = chan.channel_id
5✔
1295
        per_commitment_secret_index = RevocationStore.START_INDEX - 1
5✔
1296
        second_per_commitment_point = secret_to_pubkey(int.from_bytes(
5✔
1297
            get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
1298

1299
        channel_ready_tlvs = {}
5✔
1300
        if self.features.supports(LnFeatures.OPTION_SCID_ALIAS_OPT):
5✔
1301
            # LND requires that we send an alias if the option has been negotiated in INIT.
1302
            # otherwise, the channel will not be marked as active.
1303
            # This does not apply if the channel was previously marked active without an alias.
1304
            channel_ready_tlvs['short_channel_id'] = {'alias': chan.get_local_scid_alias(create_new_if_needed=True)}
5✔
1305

1306
        # note: if 'channel_ready' was not yet received, we might send it multiple times
1307
        self.send_message(
5✔
1308
            "channel_ready",
1309
            channel_id=channel_id,
1310
            second_per_commitment_point=second_per_commitment_point,
1311
            channel_ready_tlvs=channel_ready_tlvs)
1312
        if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
5✔
1313
            self.mark_open(chan)
5✔
1314

1315
    def on_channel_ready(self, chan: Channel, payload):
5✔
1316
        self.logger.info(f"on_channel_ready. channel: {chan.channel_id.hex()}")
5✔
1317
        # save remote alias for use in invoices
1318
        scid_alias = payload.get('channel_ready_tlvs', {}).get('short_channel_id', {}).get('alias')
5✔
1319
        if scid_alias:
5✔
1320
            chan.save_remote_scid_alias(scid_alias)
5✔
1321

1322
        if not chan.config[LOCAL].funding_locked_received:
5✔
1323
            their_next_point = payload["second_per_commitment_point"]
×
1324
            chan.config[REMOTE].next_per_commitment_point = their_next_point
×
1325
            chan.config[LOCAL].funding_locked_received = True
×
1326
            self.lnworker.save_channel(chan)
×
1327
        if chan.is_funded():
5✔
1328
            self.mark_open(chan)
5✔
1329

1330
    def on_network_update(self, chan: Channel, funding_tx_depth: int):
5✔
1331
        """
1332
        Only called when the channel is OPEN.
1333

1334
        Runs on the Network thread.
1335
        """
1336
        if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
×
1337
            # don't announce our channels
1338
            # FIXME should this be a field in chan.local_state maybe?
1339
            return
×
1340
            chan.config[LOCAL].was_announced = True
1341
            self.lnworker.save_channel(chan)
1342
            coro = self.handle_announcements(chan)
1343
            asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
1344

1345
    @log_exceptions
5✔
1346
    async def handle_announcements(self, chan: Channel):
5✔
1347
        h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
×
1348
        announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
×
1349
        remote_node_sig = announcement_signatures_msg["node_signature"]
×
1350
        remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
×
1351
        if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
×
1352
            raise Exception("bitcoin_sig invalid in announcement_signatures")
×
1353
        if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
×
1354
            raise Exception("node_sig invalid in announcement_signatures")
×
1355

1356
        node_sigs = [remote_node_sig, local_node_sig]
×
1357
        bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
×
1358
        bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
×
1359

1360
        if self.node_ids[0] > self.node_ids[1]:
×
1361
            node_sigs.reverse()
×
1362
            bitcoin_sigs.reverse()
×
1363
            node_ids = list(reversed(self.node_ids))
×
1364
            bitcoin_keys.reverse()
×
1365
        else:
1366
            node_ids = self.node_ids
×
1367

1368
        self.send_message("channel_announcement",
×
1369
            node_signatures_1=node_sigs[0],
1370
            node_signatures_2=node_sigs[1],
1371
            bitcoin_signature_1=bitcoin_sigs[0],
1372
            bitcoin_signature_2=bitcoin_sigs[1],
1373
            len=0,
1374
            #features not set (defaults to zeros)
1375
            chain_hash=constants.net.rev_genesis_bytes(),
1376
            short_channel_id=chan.short_channel_id,
1377
            node_id_1=node_ids[0],
1378
            node_id_2=node_ids[1],
1379
            bitcoin_key_1=bitcoin_keys[0],
1380
            bitcoin_key_2=bitcoin_keys[1]
1381
        )
1382

1383
    def mark_open(self, chan: Channel):
5✔
1384
        assert chan.is_funded()
5✔
1385
        # only allow state transition from "FUNDED" to "OPEN"
1386
        old_state = chan.get_state()
5✔
1387
        if old_state == ChannelState.OPEN:
5✔
1388
            return
5✔
1389
        if old_state != ChannelState.FUNDED:
5✔
1390
            self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
×
1391
            return
×
1392
        assert chan.config[LOCAL].funding_locked_received
5✔
1393
        chan.set_state(ChannelState.OPEN)
5✔
1394
        util.trigger_callback('channel', self.lnworker.wallet, chan)
5✔
1395
        # peer may have sent us a channel update for the incoming direction previously
1396
        pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
5✔
1397
        if pending_channel_update:
5✔
1398
            chan.set_remote_update(pending_channel_update)
×
1399
        self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
5✔
1400
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
1401
        if forwarding_enabled:
5✔
1402
            # send channel_update of outgoing edge to peer,
1403
            # so that channel can be used to to receive payments
1404
            self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
5✔
1405
            chan_upd = chan.get_outgoing_gossip_channel_update()
5✔
1406
            self.transport.send_bytes(chan_upd)
5✔
1407

1408
    def send_announcement_signatures(self, chan: Channel):
5✔
1409
        chan_ann = chan.construct_channel_announcement_without_sigs()
×
1410
        preimage = chan_ann[256+2:]
×
1411
        msg_hash = sha256d(preimage)
×
1412
        bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
×
1413
        node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
×
1414
        self.send_message("announcement_signatures",
×
1415
            channel_id=chan.channel_id,
1416
            short_channel_id=chan.short_channel_id,
1417
            node_signature=node_signature,
1418
            bitcoin_signature=bitcoin_signature
1419
        )
1420
        return msg_hash, node_signature, bitcoin_signature
×
1421

1422
    def on_update_fail_htlc(self, chan: Channel, payload):
5✔
1423
        htlc_id = payload["id"]
5✔
1424
        reason = payload["reason"]
5✔
1425
        self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
1426
        chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
5✔
1427
        self.maybe_send_commitment(chan)
5✔
1428

1429
    def maybe_send_commitment(self, chan: Channel) -> bool:
5✔
1430
        assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
5✔
1431
        # REMOTE should revoke first before we can sign a new ctx
1432
        if chan.hm.is_revack_pending(REMOTE):
5✔
1433
            return False
5✔
1434
        # if there are no changes, we will not (and must not) send a new commitment
1435
        if not chan.has_pending_changes(REMOTE):
5✔
1436
            return False
5✔
1437
        self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
5✔
1438
        sig_64, htlc_sigs = chan.sign_next_commitment()
5✔
1439
        self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
5✔
1440
        return True
5✔
1441

1442
    def pay(self, *,
5✔
1443
            route: 'LNPaymentRoute',
1444
            chan: Channel,
1445
            amount_msat: int,
1446
            total_msat: int,
1447
            payment_hash: bytes,
1448
            min_final_cltv_expiry: int,
1449
            payment_secret: bytes,
1450
            trampoline_onion=None) -> UpdateAddHtlc:
1451

1452
        assert amount_msat > 0, "amount_msat is not greater zero"
5✔
1453
        assert len(route) > 0
5✔
1454
        if not chan.can_send_update_add_htlc():
5✔
1455
            raise PaymentFailure("Channel cannot send update_add_htlc")
5✔
1456
        # add features learned during "init" for direct neighbour:
1457
        route[0].node_features |= self.features
5✔
1458
        local_height = self.network.get_local_height()
5✔
1459
        final_cltv = local_height + min_final_cltv_expiry
5✔
1460
        hops_data, amount_msat, cltv = calc_hops_data_for_payment(
5✔
1461
            route,
1462
            amount_msat,
1463
            final_cltv,
1464
            total_msat=total_msat,
1465
            payment_secret=payment_secret)
1466
        num_hops = len(hops_data)
5✔
1467
        self.logger.info(f"lnpeer.pay len(route)={len(route)}")
5✔
1468
        for i in range(len(route)):
5✔
1469
            self.logger.info(f"  {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
5✔
1470
        assert final_cltv <= cltv, (final_cltv, cltv)
5✔
1471
        session_key = os.urandom(32) # session_key
5✔
1472
        # if we are forwarding a trampoline payment, add trampoline onion
1473
        if trampoline_onion:
5✔
1474
            self.logger.info(f'adding trampoline onion to final payload')
5✔
1475
            trampoline_payload = hops_data[num_hops-2].payload
5✔
1476
            trampoline_payload["trampoline_onion_packet"] = {
5✔
1477
                "version": trampoline_onion.version,
1478
                "public_key": trampoline_onion.public_key,
1479
                "hops_data": trampoline_onion.hops_data,
1480
                "hmac": trampoline_onion.hmac
1481
            }
1482
        # create onion packet
1483
        payment_path_pubkeys = [x.node_id for x in route]
5✔
1484
        onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
5✔
1485
        self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
5✔
1486
        # create htlc
1487
        if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
5✔
1488
            raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
×
1489
        htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
5✔
1490
        htlc = chan.add_htlc(htlc)
5✔
1491
        chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
5✔
1492
        self.logger.info(f"starting payment. htlc: {htlc}")
5✔
1493
        self.send_message(
5✔
1494
            "update_add_htlc",
1495
            channel_id=chan.channel_id,
1496
            id=htlc.htlc_id,
1497
            cltv_expiry=htlc.cltv_expiry,
1498
            amount_msat=htlc.amount_msat,
1499
            payment_hash=htlc.payment_hash,
1500
            onion_routing_packet=onion.to_bytes())
1501
        self.maybe_send_commitment(chan)
5✔
1502
        return htlc
5✔
1503

1504
    def send_revoke_and_ack(self, chan: Channel):
5✔
1505
        self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
5✔
1506
        rev = chan.revoke_current_commitment()
5✔
1507
        self.lnworker.save_channel(chan)
5✔
1508
        self.send_message("revoke_and_ack",
5✔
1509
            channel_id=chan.channel_id,
1510
            per_commitment_secret=rev.per_commitment_secret,
1511
            next_per_commitment_point=rev.next_per_commitment_point)
1512
        self.maybe_send_commitment(chan)
5✔
1513

1514
    def on_commitment_signed(self, chan: Channel, payload):
5✔
1515
        if chan.peer_state == PeerState.BAD:
5✔
1516
            return
×
1517
        self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
5✔
1518
        # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
1519
        if not chan.has_pending_changes(LOCAL):
5✔
1520
            # TODO if feerate changed A->B->A; so there were updates but the value is identical,
1521
            #      then it might be legal to send a commitment_signature
1522
            #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
1523
            raise RemoteMisbehaving('received commitment_signed without pending changes')
×
1524
        # REMOTE should wait until we have revoked
1525
        if chan.hm.is_revack_pending(LOCAL):
5✔
1526
            raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
×
1527
        data = payload["htlc_signature"]
5✔
1528
        htlc_sigs = list(chunks(data, 64))
5✔
1529
        chan.receive_new_commitment(payload["signature"], htlc_sigs)
5✔
1530
        self.send_revoke_and_ack(chan)
5✔
1531
        self.received_commitsig_event.set()
5✔
1532
        self.received_commitsig_event.clear()
5✔
1533

1534
    def on_update_fulfill_htlc(self, chan: Channel, payload):
5✔
1535
        preimage = payload["payment_preimage"]
5✔
1536
        payment_hash = sha256(preimage)
5✔
1537
        htlc_id = payload["id"]
5✔
1538
        self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
1539
        chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
5✔
1540
        self.lnworker.save_preimage(payment_hash, preimage)
5✔
1541
        self.maybe_send_commitment(chan)
5✔
1542

1543
    def on_update_fail_malformed_htlc(self, chan: Channel, payload):
5✔
1544
        htlc_id = payload["id"]
×
1545
        failure_code = payload["failure_code"]
×
1546
        self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
×
1547
                         f"htlc_id {htlc_id}. failure_code={failure_code}")
1548
        if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
×
1549
            self.schedule_force_closing(chan.channel_id)
×
1550
            raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
×
1551
        reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
×
1552
        chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
×
1553
        self.maybe_send_commitment(chan)
×
1554

1555
    def on_update_add_htlc(self, chan: Channel, payload):
5✔
1556
        payment_hash = payload["payment_hash"]
5✔
1557
        htlc_id = payload["id"]
5✔
1558
        cltv_expiry = payload["cltv_expiry"]
5✔
1559
        amount_msat_htlc = payload["amount_msat"]
5✔
1560
        onion_packet = payload["onion_routing_packet"]
5✔
1561
        htlc = UpdateAddHtlc(
5✔
1562
            amount_msat=amount_msat_htlc,
1563
            payment_hash=payment_hash,
1564
            cltv_expiry=cltv_expiry,
1565
            timestamp=int(time.time()),
1566
            htlc_id=htlc_id)
1567
        self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
5✔
1568
        if chan.get_state() != ChannelState.OPEN:
5✔
1569
            raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
×
1570
        if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
5✔
1571
            self.schedule_force_closing(chan.channel_id)
×
1572
            raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
×
1573
        # add htlc
1574
        chan.receive_htlc(htlc, onion_packet)
5✔
1575
        util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
5✔
1576

1577
    def maybe_forward_htlc(
5✔
1578
            self, *,
1579
            incoming_chan: Channel,
1580
            htlc: UpdateAddHtlc,
1581
            processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]:
1582

1583
        # Forward HTLC
1584
        # FIXME: there are critical safety checks MISSING here
1585
        #        - for example; atm we forward first and then persist "forwarding_info",
1586
        #          so if we segfault in-between and restart, we might forward an HTLC twice...
1587
        #          (same for trampoline forwarding)
1588
        #        - we could check for the exposure to dust HTLCs, see:
1589
        #          https://github.com/ACINQ/eclair/pull/1985
1590

1591
        def log_fail_reason(reason: str):
5✔
1592
            self.logger.debug(
5✔
1593
                f"maybe_forward_htlc. will FAIL HTLC: inc_chan={incoming_chan.get_id_for_log()}. "
1594
                f"{reason}. inc_htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
1595

1596
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
1597
        if not forwarding_enabled:
5✔
1598
            log_fail_reason("forwarding is disabled")
×
1599
            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
×
1600
        chain = self.network.blockchain()
5✔
1601
        if chain.is_tip_stale():
5✔
1602
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
1603
        try:
5✔
1604
            _next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]  # type: bytes
5✔
1605
            next_chan_scid = ShortChannelID(_next_chan_scid)
5✔
1606
        except Exception:
×
1607
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1608
        next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
5✔
1609
        local_height = chain.height()
5✔
1610
        if next_chan is None:
5✔
1611
            log_fail_reason(f"cannot find next_chan {next_chan_scid}")
×
1612
            raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
×
1613
        outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update(scid=next_chan_scid)[2:]
5✔
1614
        outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
5✔
1615
        outgoing_chan_upd_message = outgoing_chan_upd_len + outgoing_chan_upd
5✔
1616
        if not next_chan.can_send_update_add_htlc():
5✔
1617
            log_fail_reason(
×
1618
                f"next_chan {next_chan.get_id_for_log()} cannot send ctx updates. "
1619
                f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
1620
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
1621
        try:
5✔
1622
            next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
5✔
1623
        except Exception:
×
1624
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1625
        if not next_chan.can_pay(next_amount_msat_htlc):
5✔
1626
            log_fail_reason(f"transient error (likely due to insufficient funds): not next_chan.can_pay(amt)")
5✔
1627
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
5✔
1628
        try:
5✔
1629
            next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
1630
        except Exception:
×
1631
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1632
        if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
5✔
1633
            log_fail_reason(
×
1634
                f"INCORRECT_CLTV_EXPIRY. "
1635
                f"{htlc.cltv_expiry=} - {next_cltv_expiry=} < {next_chan.forwarding_cltv_expiry_delta=}")
1636
            data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_message
×
1637
            raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
×
1638
        if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
5✔
1639
                or next_cltv_expiry <= local_height:
1640
            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=outgoing_chan_upd_message)
×
1641
        if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
5✔
1642
            raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
×
1643
        forwarding_fees = fee_for_edge_msat(
5✔
1644
            forwarded_amount_msat=next_amount_msat_htlc,
1645
            fee_base_msat=next_chan.forwarding_fee_base_msat,
1646
            fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
1647
        if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
5✔
1648
            data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_message
×
1649
            raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
×
1650
        if self._maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(htlc.payment_hash):
5✔
1651
            log_fail_reason(f"RHASH corresponds to payreq we created")
5✔
1652
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
1653
        self.logger.info(
5✔
1654
            f"maybe_forward_htlc. will forward HTLC: inc_chan={incoming_chan.short_channel_id}. inc_htlc={str(htlc)}. "
1655
            f"next_chan={next_chan.get_id_for_log()}.")
1656
        next_peer = self.lnworker.peers.get(next_chan.node_id)
5✔
1657
        if next_peer is None:
5✔
1658
            log_fail_reason(f"next_peer offline ({next_chan.node_id.hex()})")
×
1659
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
1660
        next_htlc = UpdateAddHtlc(
5✔
1661
            amount_msat=next_amount_msat_htlc,
1662
            payment_hash=htlc.payment_hash,
1663
            cltv_expiry=next_cltv_expiry,
1664
            timestamp=int(time.time()))
1665
        next_htlc = next_chan.add_htlc(next_htlc)
5✔
1666
        try:
5✔
1667
            next_peer.send_message(
5✔
1668
                "update_add_htlc",
1669
                channel_id=next_chan.channel_id,
1670
                id=next_htlc.htlc_id,
1671
                cltv_expiry=next_cltv_expiry,
1672
                amount_msat=next_amount_msat_htlc,
1673
                payment_hash=next_htlc.payment_hash,
1674
                onion_routing_packet=processed_onion.next_packet.to_bytes()
1675
            )
1676
        except BaseException as e:
×
1677
            log_fail_reason(f"error sending message to next_peer={next_chan.node_id.hex()}")
×
1678
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=outgoing_chan_upd_message)
×
1679
        next_peer.maybe_send_commitment(next_chan)
5✔
1680
        return next_chan_scid, next_htlc.htlc_id
5✔
1681

1682
    @log_exceptions
5✔
1683
    async def maybe_forward_trampoline(
5✔
1684
            self, *,
1685
            payment_hash: bytes,
1686
            cltv_expiry: int,
1687
            outer_onion: ProcessedOnionPacket,
1688
            trampoline_onion: ProcessedOnionPacket):
1689

1690
        forwarding_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS
5✔
1691
        forwarding_trampoline_enabled = self.network.config.EXPERIMENTAL_LN_FORWARD_TRAMPOLINE_PAYMENTS
5✔
1692
        if not (forwarding_enabled and forwarding_trampoline_enabled):
5✔
1693
            self.logger.info(f"trampoline forwarding is disabled. failing htlc.")
×
1694
            raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
×
1695
        payload = trampoline_onion.hop_data.payload
5✔
1696
        payment_data = payload.get('payment_data')
5✔
1697
        if payment_data:  # legacy case
5✔
1698
            payment_secret = payment_data['payment_secret']
5✔
1699
        else:
1700
            payment_secret = os.urandom(32)
5✔
1701

1702
        try:
5✔
1703
            outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
5✔
1704
            amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
5✔
1705
            cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
1706
            if "invoice_features" in payload:
5✔
1707
                self.logger.info('forward_trampoline: legacy')
5✔
1708
                next_trampoline_onion = None
5✔
1709
                invoice_features = payload["invoice_features"]["invoice_features"]
5✔
1710
                invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
5✔
1711
                # TODO use invoice_routing_info
1712
                # TODO legacy mpp payment, use total_msat from trampoline onion
1713
            else:
1714
                self.logger.info('forward_trampoline: end-to-end')
5✔
1715
                invoice_features = LnFeatures.BASIC_MPP_OPT
5✔
1716
                next_trampoline_onion = trampoline_onion.next_packet
5✔
1717
        except Exception as e:
×
1718
            self.logger.exception('')
×
1719
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1720

1721
        if self._maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(payment_hash):
5✔
1722
            self.logger.debug(
5✔
1723
                f"maybe_forward_trampoline. will FAIL HTLC(s). "
1724
                f"RHASH corresponds to payreq we created. {payment_hash.hex()=}")
1725
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
1726

1727
        # these are the fee/cltv paid by the sender
1728
        # pay_to_node will raise if they are not sufficient
1729
        trampoline_cltv_delta = cltv_expiry - cltv_from_onion
5✔
1730
        total_msat = outer_onion.hop_data.payload["payment_data"]["total_msat"]
5✔
1731
        trampoline_fee = total_msat - amt_to_forward
5✔
1732
        self.logger.info(f'trampoline cltv and fee: {trampoline_cltv_delta, trampoline_fee}')
5✔
1733

1734
        try:
5✔
1735
            await self.lnworker.pay_to_node(
5✔
1736
                node_pubkey=outgoing_node_id,
1737
                payment_hash=payment_hash,
1738
                payment_secret=payment_secret,
1739
                amount_to_pay=amt_to_forward,
1740
                min_cltv_expiry=cltv_from_onion,
1741
                r_tags=[],
1742
                invoice_features=invoice_features,
1743
                fwd_trampoline_onion=next_trampoline_onion,
1744
                fwd_trampoline_fee=trampoline_fee,
1745
                fwd_trampoline_cltv_delta=trampoline_cltv_delta,
1746
                attempts=1)
1747
        except OnionRoutingFailure as e:
5✔
1748
            raise
5✔
1749
        except PaymentFailure as e:
5✔
1750
            self.logger.debug(
5✔
1751
                f"maybe_forward_trampoline. PaymentFailure for {payment_hash.hex()=}, {payment_secret.hex()=}: {e!r}")
1752
            # FIXME: adapt the error code
1753
            raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
5✔
1754

1755
    def _maybe_refuse_to_forward_htlc_that_corresponds_to_payreq_we_created(self, payment_hash: bytes) -> bool:
5✔
1756
        """Returns True if the HTLC should be failed.
1757
        We must not forward HTLCs with a matching payment_hash to a payment request we created.
1758
        Example attack:
1759
        - Bob creates payment request with HASH1, for 1 BTC; and gives the payreq to Alice
1760
        - Alice sends htlc A->B->C, for 1 sat, with HASH1
1761
        - Bob must not release the preimage of HASH1
1762
        """
1763
        payment_info = self.lnworker.get_payment_info(payment_hash)
5✔
1764
        is_our_payreq = payment_info and payment_info.direction == RECEIVED
5✔
1765
        # note: If we don't have the preimage for a payment request, then it must be a hold invoice.
1766
        #       Hold invoices are created by other parties (e.g. a counterparty initiating a submarine swap),
1767
        #       and it is the other party choosing the payment_hash. If we failed HTLCs with payment_hashes colliding
1768
        #       with hold invoices, then a party that can make us save a hold invoice for an arbitrary hash could
1769
        #       also make us fail arbitrary HTLCs.
1770
        return bool(is_our_payreq and self.lnworker.get_preimage(payment_hash))
5✔
1771

1772
    def maybe_fulfill_htlc(
5✔
1773
            self, *,
1774
            chan: Channel,
1775
            htlc: UpdateAddHtlc,
1776
            processed_onion: ProcessedOnionPacket,
1777
            onion_packet_bytes: bytes,
1778
            is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[Callable]]:
1779
        """As a final recipient of an HTLC, decide if we should fulfill it.
1780
        Return (preimage, forwarding_callback) with at most a single element not None
1781
        """
1782
        def log_fail_reason(reason: str):
5✔
1783
            self.logger.info(
5✔
1784
                f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
1785
                f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
1786
        try:
5✔
1787
            amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
5✔
1788
        except Exception:
×
1789
            log_fail_reason(f"'amt_to_forward' missing from onion")
×
1790
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1791

1792
        # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
1793
        # We should not release the preimage for an HTLC that its sender could already time out as
1794
        # then they might try to force-close and it becomes a race.
1795
        chain = self.network.blockchain()
5✔
1796
        if chain.is_tip_stale():
5✔
1797
            log_fail_reason(f"our chain tip is stale")
×
1798
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
×
1799
        local_height = chain.height()
5✔
1800
        exc_incorrect_or_unknown_pd = OnionRoutingFailure(
5✔
1801
            code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
1802
            data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
1803
        if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
5✔
1804
            log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
×
1805
            raise exc_incorrect_or_unknown_pd
×
1806
        try:
5✔
1807
            cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
5✔
1808
        except Exception:
×
1809
            log_fail_reason(f"'outgoing_cltv_value' missing from onion")
×
1810
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
×
1811

1812
        if not is_trampoline:
5✔
1813
            if cltv_from_onion != htlc.cltv_expiry:
5✔
1814
                log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
×
1815
                raise OnionRoutingFailure(
×
1816
                    code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
1817
                    data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
1818
        try:
5✔
1819
            total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
5✔
1820
        except Exception:
×
1821
            log_fail_reason(f"'total_msat' missing from onion")
×
1822
            raise exc_incorrect_or_unknown_pd
×
1823

1824
        if not is_trampoline and amt_to_forward != htlc.amount_msat:
5✔
1825
            log_fail_reason(f"amt_to_forward != htlc.amount_msat")
×
1826
            raise OnionRoutingFailure(
×
1827
                code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
1828
                data=htlc.amount_msat.to_bytes(8, byteorder="big"))
1829

1830
        try:
5✔
1831
            payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]  # type: bytes
5✔
1832
        except Exception:
×
1833
            log_fail_reason(f"'payment_secret' missing from onion")
×
1834
            raise exc_incorrect_or_unknown_pd
×
1835

1836
        from .lnworker import RecvMPPResolution
5✔
1837
        mpp_resolution = self.lnworker.check_mpp_status(
5✔
1838
            payment_secret=payment_secret_from_onion,
1839
            short_channel_id=chan.short_channel_id,
1840
            htlc=htlc,
1841
            expected_msat=total_msat,
1842
        )
1843
        if mpp_resolution == RecvMPPResolution.WAITING:
5✔
1844
            return None, None
5✔
1845
        elif mpp_resolution == RecvMPPResolution.EXPIRED:
5✔
1846
            log_fail_reason(f"MPP_TIMEOUT")
5✔
1847
            raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
5✔
1848
        elif mpp_resolution == RecvMPPResolution.FAILED:
5✔
1849
            log_fail_reason(f"mpp_resolution is FAILED")
×
1850
            raise exc_incorrect_or_unknown_pd
×
1851
        elif mpp_resolution == RecvMPPResolution.ACCEPTED:
5✔
1852
            pass  # continue
5✔
1853
        else:
1854
            raise Exception(f"unexpected {mpp_resolution=}")
×
1855

1856
        payment_hash = htlc.payment_hash
5✔
1857

1858
        # detect callback
1859
        # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
1860
        # order is important: if we receive a trampoline onion for a hold invoice, we need to peel the onion first.
1861

1862
        if processed_onion.trampoline_onion_packet:
5✔
1863
            # TODO: we should check that all trampoline_onions are the same
1864
            trampoline_onion = self.process_onion_packet(
5✔
1865
                processed_onion.trampoline_onion_packet,
1866
                payment_hash=payment_hash,
1867
                onion_packet_bytes=onion_packet_bytes,
1868
                is_trampoline=True)
1869
            if trampoline_onion.are_we_final:
5✔
1870
                # trampoline- we are final recipient of HTLC
1871
                preimage, cb = self.maybe_fulfill_htlc(
5✔
1872
                    chan=chan,
1873
                    htlc=htlc,
1874
                    processed_onion=trampoline_onion,
1875
                    onion_packet_bytes=onion_packet_bytes,
1876
                    is_trampoline=True)
1877
                if preimage:
5✔
1878
                    return preimage, None
5✔
1879
                else:
1880
                    return None, cb
5✔
1881
            else:
1882
                callback = lambda: self.maybe_forward_trampoline(
5✔
1883
                    payment_hash=payment_hash,
1884
                    cltv_expiry=htlc.cltv_expiry, # TODO: use max or enforce same value across mpp parts
1885
                    outer_onion=processed_onion,
1886
                    trampoline_onion=trampoline_onion)
1887
                return None, callback
5✔
1888

1889
        preimage = self.lnworker.get_preimage(payment_hash)
5✔
1890
        hold_invoice_callback = self.lnworker.hold_invoice_callbacks.get(payment_hash)
5✔
1891
        if hold_invoice_callback:
5✔
1892
            if preimage:
5✔
1893
                return preimage, None
×
1894
            else:
1895
                return None, lambda: hold_invoice_callback(payment_hash)
5✔
1896

1897
        # TODO don't accept payments twice for same invoice
1898
        # TODO check invoice expiry
1899
        info = self.lnworker.get_payment_info(payment_hash)
5✔
1900
        if info is None:
5✔
1901
            log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
×
1902
            raise exc_incorrect_or_unknown_pd
×
1903

1904
        preimage = self.lnworker.get_preimage(payment_hash)
5✔
1905
        if not preimage:
5✔
1906
            self.logger.info(f"missing preimage and no hold invoice callback {payment_hash.hex()}")
5✔
1907
            raise exc_incorrect_or_unknown_pd
5✔
1908

1909
        expected_payment_secrets = [self.lnworker.get_payment_secret(htlc.payment_hash)]
5✔
1910
        expected_payment_secrets.append(derive_payment_secret_from_payment_preimage(preimage)) # legacy secret for old invoices
5✔
1911
        if payment_secret_from_onion not in expected_payment_secrets:
5✔
1912
            log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {expected_payment_secrets[0].hex()}')
×
1913
            raise exc_incorrect_or_unknown_pd
×
1914
        invoice_msat = info.amount_msat
5✔
1915
        if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
5✔
1916
            log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
×
1917
            raise exc_incorrect_or_unknown_pd
×
1918
        self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
5✔
1919
        return preimage, None
5✔
1920

1921
    def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
5✔
1922
        self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
5✔
1923
        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
5✔
1924
        assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
5✔
1925
        self.received_htlcs_pending_removal.add((chan, htlc_id))
5✔
1926
        chan.settle_htlc(preimage, htlc_id)
5✔
1927
        self.send_message(
5✔
1928
            "update_fulfill_htlc",
1929
            channel_id=chan.channel_id,
1930
            id=htlc_id,
1931
            payment_preimage=preimage)
1932

1933
    def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
5✔
1934
        self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
5✔
1935
        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
5✔
1936
        self.received_htlcs_pending_removal.add((chan, htlc_id))
5✔
1937
        chan.fail_htlc(htlc_id)
5✔
1938
        self.send_message(
5✔
1939
            "update_fail_htlc",
1940
            channel_id=chan.channel_id,
1941
            id=htlc_id,
1942
            len=len(error_bytes),
1943
            reason=error_bytes)
1944

1945
    def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
5✔
1946
        self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
×
1947
        assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
×
1948
        if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
×
1949
            raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
×
1950
        self.received_htlcs_pending_removal.add((chan, htlc_id))
×
1951
        chan.fail_htlc(htlc_id)
×
1952
        self.send_message(
×
1953
            "update_fail_malformed_htlc",
1954
            channel_id=chan.channel_id,
1955
            id=htlc_id,
1956
            sha256_of_onion=reason.data,
1957
            failure_code=reason.code)
1958

1959
    def on_revoke_and_ack(self, chan: Channel, payload):
5✔
1960
        if chan.peer_state == PeerState.BAD:
5✔
1961
            return
×
1962
        self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
5✔
1963
        rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
5✔
1964
        chan.receive_revocation(rev)
5✔
1965
        self.lnworker.save_channel(chan)
5✔
1966
        self.maybe_send_commitment(chan)
5✔
1967
        self._received_revack_event.set()
5✔
1968
        self._received_revack_event.clear()
5✔
1969

1970
    def on_update_fee(self, chan: Channel, payload):
5✔
1971
        feerate = payload["feerate_per_kw"]
×
1972
        chan.update_fee(feerate, False)
×
1973

1974
    async def maybe_update_fee(self, chan: Channel):
5✔
1975
        """
1976
        called when our fee estimates change
1977
        """
1978
        if not chan.can_send_ctx_updates():
×
1979
            return
×
1980
        feerate_per_kw = self.lnworker.current_feerate_per_kw()
×
1981
        if not chan.constraints.is_initiator:
×
1982
            if constants.net is not constants.BitcoinRegtest:
×
1983
                chan_feerate = chan.get_latest_feerate(LOCAL)
×
1984
                ratio = chan_feerate / feerate_per_kw
×
1985
                if ratio < 0.5:
×
1986
                    # Note that we trust the Electrum server about fee rates
1987
                    # Thus, automated force-closing might not be a good idea
1988
                    # Maybe we should display something in the GUI instead
1989
                    self.logger.warning(
×
1990
                        f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
1991
                        f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
1992
            return
×
1993
        chan_fee = chan.get_next_feerate(REMOTE)
×
1994
        if feerate_per_kw < chan_fee / 2:
×
1995
            self.logger.info("FEES HAVE FALLEN")
×
1996
        elif feerate_per_kw > chan_fee * 2:
×
1997
            self.logger.info("FEES HAVE RISEN")
×
1998
        elif chan.get_latest_ctn(REMOTE) == 0:
×
1999
            # workaround eclair issue https://github.com/ACINQ/eclair/issues/1730
2000
            self.logger.info("updating fee to bump remote ctn")
×
2001
            if feerate_per_kw == chan_fee:
×
2002
                feerate_per_kw += 1
×
2003
        else:
2004
            return
×
2005
        self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
×
2006
                         f"new feerate {feerate_per_kw}")
2007
        chan.update_fee(feerate_per_kw, True)
×
2008
        self.send_message(
×
2009
            "update_fee",
2010
            channel_id=chan.channel_id,
2011
            feerate_per_kw=feerate_per_kw)
2012
        self.maybe_send_commitment(chan)
×
2013

2014
    @log_exceptions
5✔
2015
    async def close_channel(self, chan_id: bytes):
5✔
2016
        chan = self.channels[chan_id]
5✔
2017
        self.shutdown_received[chan_id] = self.asyncio_loop.create_future()
5✔
2018
        await self.send_shutdown(chan)
5✔
2019
        payload = await self.shutdown_received[chan_id]
5✔
2020
        try:
5✔
2021
            txid = await self._shutdown(chan, payload, is_local=True)
5✔
2022
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
5✔
2023
        except asyncio.TimeoutError:
×
2024
            txid = chan.unconfirmed_closing_txid
×
2025
            self.logger.info(f'({chan.get_id_for_log()}) did not send closing_signed, {txid}')
×
2026
            if txid is None:
×
2027
                raise Exception('The remote peer did not send their final signature. The channel may not have been be closed')
×
2028
        return txid
5✔
2029

2030
    async def on_shutdown(self, chan: Channel, payload):
5✔
2031
        # TODO: A receiving node: if it hasn't received a funding_signed (if it is a
2032
        #  funder) or a funding_created (if it is a fundee):
2033
        #  SHOULD send an error and fail the channel.
2034
        their_scriptpubkey = payload['scriptpubkey']
5✔
2035
        their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
5✔
2036
        # BOLT-02 check if they use the upfront shutdown script they advertized
2037
        if self.is_upfront_shutdown_script() and their_upfront_scriptpubkey:
5✔
2038
            if not (their_scriptpubkey == their_upfront_scriptpubkey):
5✔
2039
                await self.send_warning(
5✔
2040
                    chan.channel_id,
2041
                    "remote didn't use upfront shutdown script it commited to in channel opening",
2042
                    close_connection=True)
2043
        else:
2044
            # BOLT-02 restrict the scriptpubkey to some templates:
2045
            if self.is_shutdown_anysegwit() and match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT):
5✔
2046
                pass
×
2047
            elif match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
5✔
2048
                pass
5✔
2049
            else:
2050
                await self.send_warning(
×
2051
                    chan.channel_id,
2052
                    f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}',
2053
                    close_connection=True)
2054

2055
        chan_id = chan.channel_id
5✔
2056
        if chan_id in self.shutdown_received:
5✔
2057
            self.shutdown_received[chan_id].set_result(payload)
5✔
2058
        else:
2059
            chan = self.channels[chan_id]
5✔
2060
            await self.send_shutdown(chan)
5✔
2061
            txid = await self._shutdown(chan, payload, is_local=False)
5✔
2062
            self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
×
2063

2064
    def can_send_shutdown(self, chan: Channel):
5✔
2065
        if chan.get_state() >= ChannelState.OPENING:
5✔
2066
            return True
5✔
2067
        if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
×
2068
            return True
×
2069
        if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
×
2070
            return True
×
2071
        return False
×
2072

2073
    async def send_shutdown(self, chan: Channel):
5✔
2074
        if not self.can_send_shutdown(chan):
5✔
2075
            raise Exception('cannot send shutdown')
×
2076
        if chan.config[LOCAL].upfront_shutdown_script:
5✔
2077
            scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
5✔
2078
        else:
2079
            scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
5✔
2080
        assert scriptpubkey
5✔
2081
        # wait until no more pending updates (bolt2)
2082
        chan.set_can_send_ctx_updates(False)
5✔
2083
        while chan.has_pending_changes(REMOTE):
5✔
2084
            await asyncio.sleep(0.1)
×
2085
        self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
5✔
2086
        chan.set_state(ChannelState.SHUTDOWN)
5✔
2087
        # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
2088
        chan.set_can_send_ctx_updates(True)
5✔
2089

2090
    def get_shutdown_fee_range(self, chan, closing_tx, is_local):
5✔
2091
        """ return the closing fee and fee range we initially try to enforce """
2092
        config = self.network.config
5✔
2093
        our_fee = None
5✔
2094
        if config.TEST_SHUTDOWN_FEE:
5✔
2095
            our_fee = config.TEST_SHUTDOWN_FEE
5✔
2096
        else:
2097
            fee_rate_per_kb = config.eta_target_to_fee(FEE_LN_ETA_TARGET)
5✔
2098
            if fee_rate_per_kb is None:  # fallback
5✔
2099
                fee_rate_per_kb = self.network.config.fee_per_kb()
5✔
2100
            if fee_rate_per_kb is not None:
5✔
2101
                our_fee = fee_rate_per_kb * closing_tx.estimated_size() // 1000
5✔
2102
            # TODO: anchors: remove this, as commitment fee rate can be below chain head fee rate?
2103
            # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
2104
            max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
5✔
2105
            if our_fee is None:  # fallback
5✔
2106
                self.logger.warning(f"got no fee estimates for co-op close! falling back to chan.get_latest_fee")
×
2107
                our_fee = max_fee
×
2108
            our_fee = min(our_fee, max_fee)
5✔
2109
        # config modern_fee_negotiation can be set in tests
2110
        if config.TEST_SHUTDOWN_LEGACY:
5✔
2111
            our_fee_range = None
5✔
2112
        elif config.TEST_SHUTDOWN_FEE_RANGE:
5✔
2113
            our_fee_range = config.TEST_SHUTDOWN_FEE_RANGE
5✔
2114
        else:
2115
            # we aim at a fee between next block inclusion and some lower value
2116
            our_fee_range = {'min_fee_satoshis': our_fee // 2, 'max_fee_satoshis': our_fee * 2}
5✔
2117
        self.logger.info(f"Our fee range: {our_fee_range} and fee: {our_fee}")
5✔
2118
        return our_fee, our_fee_range
5✔
2119

2120
    @log_exceptions
5✔
2121
    async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
5✔
2122
        # wait until no HTLCs remain in either commitment transaction
2123
        while chan.has_unsettled_htlcs():
5✔
2124
            self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
5✔
2125
            await asyncio.sleep(1)
5✔
2126
        # if no HTLCs remain, we must not send updates
2127
        chan.set_can_send_ctx_updates(False)
5✔
2128
        their_scriptpubkey = payload['scriptpubkey']
5✔
2129
        if chan.config[LOCAL].upfront_shutdown_script:
5✔
2130
            our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
5✔
2131
        else:
2132
            our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
5✔
2133
        assert our_scriptpubkey
5✔
2134
        # estimate fee of closing tx
2135
        dummy_sig, dummy_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
5✔
2136
        our_sig = None  # type: Optional[bytes]
5✔
2137
        closing_tx = None  # type: Optional[PartialTransaction]
5✔
2138
        is_initiator = chan.constraints.is_initiator
5✔
2139
        our_fee, our_fee_range = self.get_shutdown_fee_range(chan, dummy_tx, is_local)
5✔
2140

2141
        def send_closing_signed(our_fee, our_fee_range, drop_remote):
5✔
2142
            nonlocal our_sig, closing_tx
2143
            if our_fee_range:
5✔
2144
                closing_signed_tlvs = {'fee_range': our_fee_range}
5✔
2145
            else:
2146
                closing_signed_tlvs = {}
5✔
2147
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
5✔
2148
            self.logger.info(f"Sending fee range: {closing_signed_tlvs} and fee: {our_fee}")
5✔
2149
            self.send_message(
5✔
2150
                'closing_signed',
2151
                channel_id=chan.channel_id,
2152
                fee_satoshis=our_fee,
2153
                signature=our_sig,
2154
                closing_signed_tlvs=closing_signed_tlvs,
2155
            )
2156

2157
        def verify_signature(tx, sig):
5✔
2158
            their_pubkey = chan.config[REMOTE].multisig_key.pubkey
5✔
2159
            preimage_hex = tx.serialize_preimage(0)
5✔
2160
            pre_hash = sha256d(bfh(preimage_hex))
5✔
2161
            return ecc.verify_signature(their_pubkey, sig, pre_hash)
5✔
2162

2163
        async def receive_closing_signed():
5✔
2164
            nonlocal our_sig, closing_tx
2165
            try:
5✔
2166
                cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
5✔
2167
            except asyncio.exceptions.TimeoutError:
5✔
2168
                self.schedule_force_closing(chan.channel_id)
×
2169
                raise Exception("closing_signed not received, force closing.")
×
2170
            their_fee = cs_payload['fee_satoshis']
5✔
2171
            their_fee_range = cs_payload['closing_signed_tlvs'].get('fee_range')
5✔
2172
            their_sig = cs_payload['signature']
5✔
2173
            # perform checks
2174
            our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
5✔
2175
            if verify_signature(closing_tx, their_sig):
5✔
2176
                drop_remote = False
5✔
2177
            else:
2178
                our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
×
2179
                if verify_signature(closing_tx, their_sig):
×
2180
                    drop_remote = True
×
2181
                else:
2182
                    # this can happen if we consider our output too valuable to drop,
2183
                    # but the remote drops it because it violates their dust limit
2184
                    raise Exception('failed to verify their signature')
×
2185
            # at this point we know how the closing tx looks like
2186
            # check that their output is above their scriptpubkey's network dust limit
2187
            to_remote_set = closing_tx.get_output_idxs_from_scriptpubkey(their_scriptpubkey.hex())
5✔
2188
            if not drop_remote and to_remote_set:
5✔
2189
                to_remote_idx = to_remote_set.pop()
5✔
2190
                to_remote_amount = closing_tx.outputs()[to_remote_idx].value
5✔
2191
                transaction.check_scriptpubkey_template_and_dust(their_scriptpubkey, to_remote_amount)
5✔
2192
            return their_fee, their_fee_range, their_sig, drop_remote
5✔
2193

2194
        def choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee):
5✔
2195
            assert our_fee != their_fee
5✔
2196
            fee_range_sent = our_fee_range and (is_initiator or (their_previous_fee is not None))
5✔
2197

2198
            # The sending node, if it is not the funder:
2199
            if our_fee_range and their_fee_range and not is_initiator and not self.network.config.TEST_SHUTDOWN_FEE_RANGE:
5✔
2200
                # SHOULD set max_fee_satoshis to at least the max_fee_satoshis received
2201
                our_fee_range['max_fee_satoshis'] = max(their_fee_range['max_fee_satoshis'], our_fee_range['max_fee_satoshis'])
5✔
2202
                # SHOULD set min_fee_satoshis to a fairly low value
2203
                our_fee_range['min_fee_satoshis'] = min(their_fee_range['min_fee_satoshis'], our_fee_range['min_fee_satoshis'])
5✔
2204
                # Note: the BOLT describes what the sending node SHOULD do.
2205
                # However, this assumes that we have decided to send 'funding_signed' in response to their fee_range.
2206
                # In practice, we might prefer to fail the channel in some cases (TODO)
2207

2208
            # the receiving node, if fee_satoshis matches its previously sent fee_range,
2209
            if fee_range_sent and (our_fee_range['min_fee_satoshis'] <= their_fee <= our_fee_range['max_fee_satoshis']):
5✔
2210
                # SHOULD reply with a closing_signed with the same fee_satoshis value if it is different from its previously sent fee_satoshis
2211
                our_fee = their_fee
5✔
2212

2213
            # the receiving node, if the message contains a fee_range
2214
            elif our_fee_range and their_fee_range:
5✔
2215
                overlap_min = max(our_fee_range['min_fee_satoshis'], their_fee_range['min_fee_satoshis'])
5✔
2216
                overlap_max = min(our_fee_range['max_fee_satoshis'], their_fee_range['max_fee_satoshis'])
5✔
2217
                # if there is no overlap between that and its own fee_range
2218
                if overlap_min > overlap_max:
5✔
2219
                    # TODO: the receiving node should first send a warning, and fail the channel
2220
                    # only if it doesn't receive a satisfying fee_range after a reasonable amount of time
2221
                    self.schedule_force_closing(chan.channel_id)
×
2222
                    raise Exception("There is no overlap between between their and our fee range.")
×
2223
                # otherwise, if it is the funder
2224
                if is_initiator:
5✔
2225
                    # if fee_satoshis is not in the overlap between the sent and received fee_range:
2226
                    if not (overlap_min <= their_fee <= overlap_max):
×
2227
                        # MUST fail the channel
2228
                        self.schedule_force_closing(chan.channel_id)
×
2229
                        raise Exception("Their fee is not in the overlap region, we force closed.")
×
2230
                    # otherwise, MUST reply with the same fee_satoshis.
2231
                    our_fee = their_fee
×
2232
                # otherwise (it is not the funder):
2233
                else:
2234
                    # if it has already sent a closing_signed:
2235
                    if fee_range_sent:
5✔
2236
                        # fee_satoshis is not the same as the value we sent, we MUST fail the channel
2237
                        self.schedule_force_closing(chan.channel_id)
×
2238
                        raise Exception("Expected the same fee as ours, we force closed.")
×
2239
                    # otherwise:
2240
                    # MUST propose a fee_satoshis in the overlap between received and (about-to-be) sent fee_range.
2241
                    our_fee = (overlap_min + overlap_max) // 2
5✔
2242
            else:
2243
                # otherwise, if fee_satoshis is not strictly between its last-sent fee_satoshis
2244
                # and its previously-received fee_satoshis, UNLESS it has since reconnected:
2245
                if their_previous_fee and not (min(our_fee, their_previous_fee) < their_fee < max(our_fee, their_previous_fee)):
5✔
2246
                    # SHOULD fail the connection.
2247
                    raise Exception('Their fee is not between our last sent and their last sent fee.')
×
2248
                # accept their fee if they are very close
2249
                if abs(their_fee - our_fee) < 2:
5✔
2250
                    our_fee = their_fee
5✔
2251
                else:
2252
                    # this will be "strictly between" (as in BOLT2) previous values because of the above
2253
                    our_fee = (our_fee + their_fee) // 2
5✔
2254

2255
            return our_fee, our_fee_range
5✔
2256

2257
        # Fee negotiation: both parties exchange 'funding_signed' messages.
2258
        # The funder sends the first message, the non-funder sends the last message.
2259
        # In the 'modern' case, at most 3 messages are exchanged, because choose_new_fee of the funder either returns their_fee or fails
2260
        their_fee = None
5✔
2261
        drop_remote = False  # does the peer drop its to_local output or not?
5✔
2262
        if is_initiator:
5✔
2263
            send_closing_signed(our_fee, our_fee_range, drop_remote)
5✔
2264
        while True:
3✔
2265
            their_previous_fee = their_fee
5✔
2266
            their_fee, their_fee_range, their_sig, drop_remote = await receive_closing_signed()
5✔
2267
            if our_fee == their_fee:
5✔
2268
                break
×
2269
            our_fee, our_fee_range = choose_new_fee(our_fee, our_fee_range, their_fee, their_fee_range, their_previous_fee)
5✔
2270
            if not is_initiator and our_fee == their_fee:
5✔
2271
                break
×
2272
            send_closing_signed(our_fee, our_fee_range, drop_remote)
5✔
2273
            if is_initiator and our_fee == their_fee:
5✔
2274
                break
5✔
2275
        if not is_initiator:
5✔
2276
            send_closing_signed(our_fee, our_fee_range, drop_remote)
×
2277

2278
        # add signatures
2279
        closing_tx.add_signature_to_txin(
5✔
2280
            txin_idx=0,
2281
            signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
2282
            sig=(der_sig_from_sig_string(our_sig) + Sighash.to_sigbytes(Sighash.ALL)).hex())
2283
        closing_tx.add_signature_to_txin(
5✔
2284
            txin_idx=0,
2285
            signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
2286
            sig=(der_sig_from_sig_string(their_sig) + Sighash.to_sigbytes(Sighash.ALL)).hex())
2287
        # save local transaction and set state
2288
        try:
5✔
2289
            self.lnworker.wallet.adb.add_transaction(closing_tx)
5✔
2290
        except UnrelatedTransactionException:
×
2291
            pass  # this can happen if (~all the balance goes to REMOTE)
×
2292
        chan.set_state(ChannelState.CLOSING)
5✔
2293
        # broadcast
2294
        await self.network.try_broadcasting(closing_tx, 'closing')
5✔
2295
        return closing_tx.txid()
5✔
2296

2297
    async def htlc_switch(self):
5✔
2298
        await self.initialized
5✔
2299
        while True:
3✔
2300
            await self.ping_if_required()
5✔
2301
            self._htlc_switch_iterdone_event.set()
5✔
2302
            self._htlc_switch_iterdone_event.clear()
5✔
2303
            # We poll every 0.1 sec to check if there is work to do,
2304
            # or we can also be triggered via events.
2305
            # When forwarding an HTLC originating from this peer (the upstream),
2306
            # we can get triggered for events that happen on the downstream peer.
2307
            # TODO: trampoline forwarding relies on the polling
2308
            async with ignore_after(0.1):
5✔
2309
                async with OldTaskGroup(wait=any) as group:
5✔
2310
                    await group.spawn(self._received_revack_event.wait())
5✔
2311
                    await group.spawn(self.downstream_htlc_resolved_event.wait())
5✔
2312
            self._htlc_switch_iterstart_event.set()
5✔
2313
            self._htlc_switch_iterstart_event.clear()
5✔
2314
            self._maybe_cleanup_received_htlcs_pending_removal()
5✔
2315
            for chan_id, chan in self.channels.items():
5✔
2316
                if not chan.can_send_ctx_updates():
5✔
2317
                    continue
5✔
2318
                self.maybe_send_commitment(chan)
5✔
2319
                done = set()
5✔
2320
                unfulfilled = chan.unfulfilled_htlcs
5✔
2321
                for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
5✔
2322
                    if forwarding_info:
5✔
2323
                        self.lnworker.downstream_htlc_to_upstream_peer_map[forwarding_info] = self.pubkey
5✔
2324
                    if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
5✔
2325
                        continue
5✔
2326
                    htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
5✔
2327
                    error_reason = None  # type: Optional[OnionRoutingFailure]
5✔
2328
                    error_bytes = None  # type: Optional[bytes]
5✔
2329
                    preimage = None
5✔
2330
                    fw_info = None
5✔
2331
                    onion_packet_bytes = bytes.fromhex(onion_packet_hex)
5✔
2332
                    onion_packet = None
5✔
2333
                    try:
5✔
2334
                        onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
5✔
2335
                    except OnionRoutingFailure as e:
×
2336
                        error_reason = e
×
2337
                    else:
2338
                        try:
5✔
2339
                            preimage, fw_info, error_bytes = self.process_unfulfilled_htlc(
5✔
2340
                                chan=chan,
2341
                                htlc=htlc,
2342
                                forwarding_info=forwarding_info,
2343
                                onion_packet_bytes=onion_packet_bytes,
2344
                                onion_packet=onion_packet)
2345
                        except OnionRoutingFailure as e:
5✔
2346
                            error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
5✔
2347
                    if fw_info:
5✔
2348
                        unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
5✔
2349
                        self.lnworker.downstream_htlc_to_upstream_peer_map[fw_info] = self.pubkey
5✔
2350
                    elif preimage or error_reason or error_bytes:
5✔
2351
                        if preimage:
5✔
2352
                            self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
5✔
2353
                            if not self.lnworker.enable_htlc_settle:
5✔
2354
                                continue
2✔
2355
                            self.fulfill_htlc(chan, htlc.htlc_id, preimage)
5✔
2356
                        elif error_bytes:
5✔
2357
                            self.fail_htlc(
5✔
2358
                                chan=chan,
2359
                                htlc_id=htlc.htlc_id,
2360
                                error_bytes=error_bytes)
2361
                        else:
2362
                            self.fail_malformed_htlc(
×
2363
                                chan=chan,
2364
                                htlc_id=htlc.htlc_id,
2365
                                reason=error_reason)
2366
                        done.add(htlc_id)
5✔
2367
                # cleanup
2368
                for htlc_id in done:
5✔
2369
                    local_ctn, remote_ctn, onion_packet_hex, forwarding_info = unfulfilled.pop(htlc_id)
5✔
2370
                    if forwarding_info:
5✔
2371
                        self.lnworker.downstream_htlc_to_upstream_peer_map.pop(forwarding_info, None)
5✔
2372
                self.maybe_send_commitment(chan)
5✔
2373

2374
    def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
5✔
2375
        done = set()
5✔
2376
        for chan, htlc_id in self.received_htlcs_pending_removal:
5✔
2377
            if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
5✔
2378
                done.add((chan, htlc_id))
5✔
2379
        if done:
5✔
2380
            for key in done:
5✔
2381
                self.received_htlcs_pending_removal.remove(key)
5✔
2382
            self.received_htlc_removed_event.set()
5✔
2383
            self.received_htlc_removed_event.clear()
5✔
2384

2385
    async def wait_one_htlc_switch_iteration(self) -> None:
5✔
2386
        """Waits until the HTLC switch does a full iteration or the peer disconnects,
2387
        whichever happens first.
2388
        """
2389
        async def htlc_switch_iteration():
5✔
2390
            await self._htlc_switch_iterstart_event.wait()
5✔
2391
            await self._htlc_switch_iterdone_event.wait()
5✔
2392

2393
        async with OldTaskGroup(wait=any) as group:
5✔
2394
            await group.spawn(htlc_switch_iteration())
5✔
2395
            await group.spawn(self.got_disconnected.wait())
5✔
2396

2397
    def process_unfulfilled_htlc(
5✔
2398
            self, *,
2399
            chan: Channel,
2400
            htlc: UpdateAddHtlc,
2401
            forwarding_info: Tuple[str, int],
2402
            onion_packet_bytes: bytes,
2403
            onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
2404
        """
2405
        return (preimage, fw_info, error_bytes) with at most a single element that is not None
2406
        raise an OnionRoutingFailure if we need to fail the htlc
2407
        """
2408
        payment_hash = htlc.payment_hash
5✔
2409
        processed_onion = self.process_onion_packet(
5✔
2410
            onion_packet,
2411
            payment_hash=payment_hash,
2412
            onion_packet_bytes=onion_packet_bytes)
2413
        if processed_onion.are_we_final:
5✔
2414
            # either we are final recipient; or if trampoline, see cases below
2415
            if not forwarding_info:
5✔
2416
                preimage, forwarding_callback = self.maybe_fulfill_htlc(
5✔
2417
                    chan=chan,
2418
                    htlc=htlc,
2419
                    processed_onion=processed_onion,
2420
                    onion_packet_bytes=onion_packet_bytes)
2421
                if forwarding_callback:
5✔
2422
                    payment_secret = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
5✔
2423
                    payment_key = payment_hash + payment_secret
5✔
2424
                    # trampoline- HTLC we are supposed to forward, but haven't forwarded yet
2425
                    if not self.lnworker.enable_htlc_forwarding:
5✔
2426
                        pass
×
2427
                    elif payment_key in self.lnworker.final_onion_forwardings:
5✔
2428
                        # we are already forwarding this payment
2429
                        self.logger.info(f"we are already forwarding this.")
5✔
2430
                    else:
2431
                        # add to list of ongoing payments
2432
                        self.lnworker.final_onion_forwardings.add(payment_key)
5✔
2433
                        # clear previous failures
2434
                        self.lnworker.final_onion_forwarding_failures.pop(payment_key, None)
5✔
2435
                        async def wrapped_callback():
5✔
2436
                            forwarding_coro = forwarding_callback()
5✔
2437
                            try:
5✔
2438
                                await forwarding_coro
5✔
2439
                            except OnionRoutingFailure as e:
5✔
2440
                                self.lnworker.final_onion_forwarding_failures[payment_key] = e
5✔
2441
                            finally:
2442
                                # remove from list of payments, so that another attempt can be initiated
2443
                                self.lnworker.final_onion_forwardings.remove(payment_key)
5✔
2444
                        asyncio.ensure_future(wrapped_callback())
5✔
2445
                        fw_info = payment_key.hex()
5✔
2446
                        return None, fw_info, None
5✔
2447
            else:
2448
                # trampoline- HTLC we are supposed to forward, and have already forwarded
2449
                payment_key = bytes.fromhex(forwarding_info)
5✔
2450
                preimage = self.lnworker.get_preimage(payment_hash)
5✔
2451
                # get (and not pop) failure because the incoming payment might be multi-part
2452
                error_reason = self.lnworker.final_onion_forwarding_failures.get(payment_key)
5✔
2453
                if error_reason:
5✔
2454
                    self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}')
5✔
2455
                    raise error_reason
5✔
2456

2457
        elif not forwarding_info:
5✔
2458
            # HTLC we are supposed to forward, but haven't forwarded yet
2459
            if not self.lnworker.enable_htlc_forwarding:
5✔
2460
                return None, None, None
5✔
2461
            next_chan_id, next_htlc_id = self.maybe_forward_htlc(
5✔
2462
                incoming_chan=chan,
2463
                htlc=htlc,
2464
                processed_onion=processed_onion)
2465
            fw_info = (next_chan_id.hex(), next_htlc_id)
5✔
2466
            return None, fw_info, None
5✔
2467
        else:
2468
            # HTLC we are supposed to forward, and have already forwarded
2469
            preimage = self.lnworker.get_preimage(payment_hash)
5✔
2470
            next_chan_id_hex, htlc_id = forwarding_info
5✔
2471
            next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
5✔
2472
            if next_chan:
5✔
2473
                error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
5✔
2474
                if error_bytes:
5✔
2475
                    return None, None, error_bytes
5✔
2476
                if error_reason:
5✔
2477
                    raise error_reason
×
2478
        if preimage:
5✔
2479
            return preimage, None, None
5✔
2480
        return None, None, None
5✔
2481

2482
    def process_onion_packet(
5✔
2483
            self,
2484
            onion_packet: OnionPacket, *,
2485
            payment_hash: bytes,
2486
            onion_packet_bytes: bytes,
2487
            is_trampoline: bool = False) -> ProcessedOnionPacket:
2488

2489
        failure_data = sha256(onion_packet_bytes)
5✔
2490
        try:
5✔
2491
            processed_onion = process_onion_packet(
5✔
2492
                onion_packet,
2493
                associated_data=payment_hash,
2494
                our_onion_private_key=self.privkey,
2495
                is_trampoline=is_trampoline)
2496
        except UnsupportedOnionPacketVersion:
×
2497
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2498
        except InvalidOnionPubkey:
×
2499
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
×
2500
        except InvalidOnionMac:
×
2501
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
×
2502
        except Exception as e:
×
2503
            self.logger.info(f"error processing onion packet: {e!r}")
×
2504
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2505
        if self.network.config.TEST_FAIL_HTLCS_AS_MALFORMED:
5✔
2506
            raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
×
2507
        if self.network.config.TEST_FAIL_HTLCS_WITH_TEMP_NODE_FAILURE:
5✔
2508
            raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
5✔
2509
        return processed_onion
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc