• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5175397198856192

13 Aug 2025 01:48PM UTC coverage: 60.887% (-0.2%) from 61.069%
5175397198856192

Pull #10123

CirrusCI

accumulator
hww: fix crash when disabling keystore for hww
Pull Request #10123: wizard: add script and derivation to keystorewizard flow. fixes #10063

5 of 9 new or added lines in 1 file covered. (55.56%)

57 existing lines in 3 files now uncovered.

22538 of 37016 relevant lines covered (60.89%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.04
/electrum/lnonion.py
1
# -*- coding: utf-8 -*-
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2018 The Electrum developers
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import io
1✔
27
import hashlib
1✔
28
from typing import Sequence, List, Tuple, NamedTuple, TYPE_CHECKING, Dict, Any, Optional, Union
1✔
29
from enum import IntEnum
1✔
30

31
import electrum_ecc as ecc
1✔
32

33
from .crypto import sha256, hmac_oneshot, chacha20_encrypt, get_ecdh, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt
1✔
34
from .util import profiler, xor_bytes, bfh
1✔
35
from .lnutil import (PaymentFailure, NUM_MAX_HOPS_IN_PAYMENT_PATH,
1✔
36
                     NUM_MAX_EDGES_IN_PAYMENT_PATH, ShortChannelID, OnionFailureCodeMetaFlag)
37
from .lnmsg import OnionWireSerializer, read_bigsize_int, write_bigsize_int
1✔
38
from . import lnmsg
1✔
39

40
if TYPE_CHECKING:
1✔
41
    from .lnrouter import LNPaymentRoute
×
42

43

44
HOPS_DATA_SIZE = 1300      # also sometimes called routingInfoSize in bolt-04
1✔
45
TRAMPOLINE_HOPS_DATA_SIZE = 400
1✔
46
PER_HOP_HMAC_SIZE = 32
1✔
47
ONION_MESSAGE_LARGE_SIZE = 32768
1✔
48

49
class UnsupportedOnionPacketVersion(Exception): pass
1✔
50
class InvalidOnionMac(Exception): pass
1✔
51
class InvalidOnionPubkey(Exception): pass
1✔
52
class InvalidPayloadSize(Exception): pass
1✔
53

54

55
class OnionHopsDataSingle:  # called HopData in lnd
1✔
56

57
    def __init__(self, *, payload: dict = None, tlv_stream_name: str = 'payload', blind_fields: dict = None):
1✔
58
        if payload is None:
1✔
59
            payload = {}
1✔
60
        self.payload = payload
1✔
61
        self.hmac = None
1✔
62
        self.tlv_stream_name = tlv_stream_name
1✔
63
        if blind_fields is None:
1✔
64
            blind_fields = {}
1✔
65
        self.blind_fields = blind_fields
1✔
66
        self._raw_bytes_payload = None  # used in unit tests
1✔
67

68
    def to_bytes(self) -> bytes:
1✔
69
        hmac_ = self.hmac if self.hmac is not None else bytes(PER_HOP_HMAC_SIZE)
1✔
70
        if self._raw_bytes_payload is not None:
1✔
71
            ret = self._raw_bytes_payload
1✔
72
            ret += hmac_
1✔
73
            return ret
1✔
74
        # adding TLV payload. note: legacy hop data format no longer supported.
75
        payload_fd = io.BytesIO()
1✔
76
        OnionWireSerializer.write_tlv_stream(fd=payload_fd,
1✔
77
                                             tlv_stream_name=self.tlv_stream_name,
78
                                             **self.payload)
79
        payload_bytes = payload_fd.getvalue()
1✔
80
        with io.BytesIO() as fd:
1✔
81
            fd.write(write_bigsize_int(len(payload_bytes)))
1✔
82
            fd.write(payload_bytes)
1✔
83
            fd.write(hmac_)
1✔
84
            return fd.getvalue()
1✔
85

86
    @classmethod
1✔
87
    def from_fd(cls, fd: io.BytesIO, *, tlv_stream_name: str = 'payload') -> 'OnionHopsDataSingle':
1✔
88
        first_byte = fd.read(1)
1✔
89
        if len(first_byte) == 0:
1✔
90
            raise Exception(f"unexpected EOF")
×
91
        fd.seek(-1, io.SEEK_CUR)  # undo read
1✔
92
        if first_byte == b'\x00':
1✔
93
            # legacy hop data format
94
            raise Exception("legacy hop data format no longer supported")
×
95
        elif first_byte == b'\x01':
1✔
96
            # reserved for future use
97
            raise Exception("unsupported hop payload: length==1")
×
98
        else:  # tlv format
99
            hop_payload_length = read_bigsize_int(fd)
1✔
100
            hop_payload = fd.read(hop_payload_length)
1✔
101
            if hop_payload_length != len(hop_payload):
1✔
102
                raise Exception(f"unexpected EOF")
×
103
            ret = OnionHopsDataSingle(tlv_stream_name=tlv_stream_name)
1✔
104
            ret.payload = OnionWireSerializer.read_tlv_stream(fd=io.BytesIO(hop_payload),
1✔
105
                                                              tlv_stream_name=tlv_stream_name)
106
            ret.hmac = fd.read(PER_HOP_HMAC_SIZE)
1✔
107
            assert len(ret.hmac) == PER_HOP_HMAC_SIZE
1✔
108
            return ret
1✔
109

110
    def __repr__(self):
1✔
111
        return f"<OnionHopsDataSingle. payload={self.payload}. hmac={self.hmac}>"
1✔
112

113

114
class OnionPacket:
1✔
115

116
    def __init__(self, public_key: bytes, hops_data: bytes, hmac: bytes):
1✔
117
        assert len(public_key) == 33
1✔
118
        assert len(hops_data) in [HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE, ONION_MESSAGE_LARGE_SIZE]
1✔
119
        assert len(hmac) == PER_HOP_HMAC_SIZE
1✔
120
        self.version = 0
1✔
121
        self.public_key = public_key
1✔
122
        self.hops_data = hops_data  # also called RoutingInfo in bolt-04
1✔
123
        self.hmac = hmac
1✔
124
        if not ecc.ECPubkey.is_pubkey_bytes(public_key):
1✔
125
            raise InvalidOnionPubkey()
×
126
        # for debugging our own onions:
127
        self._debug_hops_data = None  # type: Optional[Sequence[OnionHopsDataSingle]]
1✔
128
        self._debug_route = None      # type: Optional[LNPaymentRoute]
1✔
129

130
    def to_bytes(self) -> bytes:
1✔
131
        ret = bytes([self.version])
1✔
132
        ret += self.public_key
1✔
133
        ret += self.hops_data
1✔
134
        ret += self.hmac
1✔
135
        if len(ret) - 66 not in [HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE, ONION_MESSAGE_LARGE_SIZE]:
1✔
136
            raise Exception('unexpected length {}'.format(len(ret)))
×
137
        return ret
1✔
138

139
    @classmethod
1✔
140
    def from_bytes(cls, b: bytes):
1✔
141
        if len(b) - 66 not in [HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE, ONION_MESSAGE_LARGE_SIZE]:
1✔
142
            raise Exception('unexpected length {}'.format(len(b)))
×
143
        version = b[0]
1✔
144
        if version != 0:
1✔
145
            raise UnsupportedOnionPacketVersion('version {} is not supported'.format(version))
×
146
        return OnionPacket(
1✔
147
            public_key=b[1:34],
148
            hops_data=b[34:-32],
149
            hmac=b[-32:]
150
        )
151

152

153
def get_bolt04_onion_key(key_type: bytes, secret: bytes) -> bytes:
1✔
154
    if key_type not in (b'rho', b'mu', b'um', b'ammag', b'pad', b'blinded_node_id'):
1✔
155
        raise Exception('invalid key_type {}'.format(key_type))
×
156
    key = hmac_oneshot(key_type, msg=secret, digest=hashlib.sha256)
1✔
157
    return key
1✔
158

159

160
def get_shared_secrets_along_route(payment_path_pubkeys: Sequence[bytes],
1✔
161
                                   session_key: bytes) -> Tuple[Sequence[bytes], Sequence[bytes]]:
162
    num_hops = len(payment_path_pubkeys)
1✔
163
    hop_shared_secrets = num_hops * [b'']
1✔
164
    hop_blinded_node_ids = num_hops * [b'']
1✔
165
    ephemeral_key = session_key
1✔
166
    # compute shared key for each hop
167
    for i in range(0, num_hops):
1✔
168
        hop_shared_secrets[i] = get_ecdh(ephemeral_key, payment_path_pubkeys[i])
1✔
169
        hop_blinded_node_ids[i] = get_blinded_node_id(payment_path_pubkeys[i], hop_shared_secrets[i])
1✔
170
        ephemeral_pubkey = ecc.ECPrivkey(ephemeral_key).get_public_key_bytes()
1✔
171
        blinding_factor = sha256(ephemeral_pubkey + hop_shared_secrets[i])
1✔
172
        blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
1✔
173
        ephemeral_key_int = int.from_bytes(ephemeral_key, byteorder="big")
1✔
174
        ephemeral_key_int = ephemeral_key_int * blinding_factor_int % ecc.CURVE_ORDER
1✔
175
        ephemeral_key = ephemeral_key_int.to_bytes(32, byteorder="big")
1✔
176
    return hop_shared_secrets, hop_blinded_node_ids
1✔
177

178

179
def get_blinded_node_id(node_id: bytes, shared_secret: bytes):
1✔
180
    # blinded node id
181
    # B(i) = HMAC256("blinded_node_id", ss(i)) * N(i)
182
    ss_bni_hmac = get_bolt04_onion_key(b'blinded_node_id', shared_secret)
1✔
183
    ss_bni_hmac_int = int.from_bytes(ss_bni_hmac, byteorder="big")
1✔
184
    blinded_node_id = ecc.ECPubkey(node_id) * ss_bni_hmac_int
1✔
185
    return blinded_node_id.get_public_key_bytes()
1✔
186

187

188
def new_onion_packet(
1✔
189
    payment_path_pubkeys: Sequence[bytes],
190
    session_key: bytes,
191
    hops_data: Sequence[OnionHopsDataSingle],
192
    *,
193
    associated_data: bytes = b'',
194
    trampoline: bool = False,
195
    onion_message: bool = False
196
) -> OnionPacket:
197
    num_hops = len(payment_path_pubkeys)
1✔
198
    assert num_hops == len(hops_data)
1✔
199
    hop_shared_secrets, _ = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
1✔
200

201
    payload_size = 0
1✔
202
    for i in range(num_hops):
1✔
203
        # FIXME: serializing here and again below. cache bytes in OnionHopsDataSingle? _raw_bytes_payload?
204
        payload_size += PER_HOP_HMAC_SIZE + len(hops_data[i].to_bytes())
1✔
205
    if trampoline:
1✔
206
        data_size = TRAMPOLINE_HOPS_DATA_SIZE
1✔
207
    elif onion_message:
1✔
208
        if payload_size <= HOPS_DATA_SIZE:
1✔
209
            data_size = HOPS_DATA_SIZE
1✔
210
        else:
211
            data_size = ONION_MESSAGE_LARGE_SIZE
1✔
212
    else:
213
        data_size = HOPS_DATA_SIZE
1✔
214

215
    if payload_size > data_size:
1✔
216
        raise InvalidPayloadSize(f'payload too big for onion packet (max={data_size}, required={payload_size})')
1✔
217

218
    filler = _generate_filler(b'rho', hops_data, hop_shared_secrets, data_size)
1✔
219
    next_hmac = bytes(PER_HOP_HMAC_SIZE)
1✔
220

221
    # Our starting packet needs to be filled out with random bytes, we
222
    # generate some deterministically using the session private key.
223
    pad_key = get_bolt04_onion_key(b'pad', session_key)
1✔
224
    mix_header = generate_cipher_stream(pad_key, data_size)
1✔
225

226
    # compute routing info and MAC for each hop
227
    for i in range(num_hops-1, -1, -1):
1✔
228
        rho_key = get_bolt04_onion_key(b'rho', hop_shared_secrets[i])
1✔
229
        mu_key = get_bolt04_onion_key(b'mu', hop_shared_secrets[i])
1✔
230
        hops_data[i].hmac = next_hmac
1✔
231
        stream_bytes = generate_cipher_stream(rho_key, data_size)
1✔
232
        hop_data_bytes = hops_data[i].to_bytes()
1✔
233
        mix_header = mix_header[:-len(hop_data_bytes)]
1✔
234
        mix_header = hop_data_bytes + mix_header
1✔
235
        mix_header = xor_bytes(mix_header, stream_bytes)
1✔
236
        if i == num_hops - 1 and len(filler) != 0:
1✔
237
            mix_header = mix_header[:-len(filler)] + filler
1✔
238
        packet = mix_header + associated_data
1✔
239
        next_hmac = hmac_oneshot(mu_key, msg=packet, digest=hashlib.sha256)
1✔
240

241
    return OnionPacket(
1✔
242
        public_key=ecc.ECPrivkey(session_key).get_public_key_bytes(),
243
        hops_data=mix_header,
244
        hmac=next_hmac)
245

246

247
def encrypt_onionmsg_data_tlv(*, shared_secret, **kwargs):
1✔
248
    rho_key = get_bolt04_onion_key(b'rho', shared_secret)
1✔
249
    with io.BytesIO() as encrypted_data_tlv_fd:
1✔
250
        OnionWireSerializer.write_tlv_stream(
1✔
251
            fd=encrypted_data_tlv_fd,
252
            tlv_stream_name='encrypted_data_tlv',
253
            **kwargs)
254
        encrypted_data_tlv_bytes = encrypted_data_tlv_fd.getvalue()
1✔
255
        encrypted_recipient_data = chacha20_poly1305_encrypt(
1✔
256
            key=rho_key, nonce=bytes(12),
257
            data=encrypted_data_tlv_bytes)
258
        return encrypted_recipient_data
1✔
259

260

261
def decrypt_onionmsg_data_tlv(*, shared_secret: bytes, encrypted_recipient_data: bytes) -> dict:
1✔
262
    rho_key = get_bolt04_onion_key(b'rho', shared_secret)
1✔
263
    recipient_data_bytes = chacha20_poly1305_decrypt(key=rho_key, nonce=bytes(12), data=encrypted_recipient_data)
1✔
264

265
    with io.BytesIO(recipient_data_bytes) as fd:
1✔
266
        recipient_data = OnionWireSerializer.read_tlv_stream(fd=fd, tlv_stream_name='encrypted_data_tlv')
1✔
267

268
    return recipient_data
1✔
269

270

271
def calc_hops_data_for_payment(
1✔
272
        route: 'LNPaymentRoute',
273
        amount_msat: int,  # that final recipient receives
274
        *,
275
        final_cltv_abs: int,
276
        total_msat: int,
277
        payment_secret: bytes,
278
) -> Tuple[List[OnionHopsDataSingle], int, int]:
279
    """Returns the hops_data to be used for constructing an onion packet,
280
    and the amount_msat and cltv_abs to be used on our immediate channel.
281
    """
282
    if len(route) > NUM_MAX_EDGES_IN_PAYMENT_PATH:
1✔
283
        raise PaymentFailure(f"too long route ({len(route)} edges)")
×
284
    # payload that will be seen by the last hop:
285
    amt = amount_msat
1✔
286
    cltv_abs = final_cltv_abs
1✔
287
    hop_payload = {
1✔
288
        "amt_to_forward": {"amt_to_forward": amt},
289
        "outgoing_cltv_value": {"outgoing_cltv_value": cltv_abs},
290
    }
291
    # for multipart payments we need to tell the receiver about the total and
292
    # partial amounts
293
    hop_payload["payment_data"] = {
1✔
294
        "payment_secret": payment_secret,
295
        "total_msat": total_msat,
296
        "amount_msat": amt
297
    }
298
    hops_data = [OnionHopsDataSingle(payload=hop_payload)]
1✔
299
    # payloads, backwards from last hop (but excluding the first edge):
300
    for edge_index in range(len(route) - 1, 0, -1):
1✔
301
        route_edge = route[edge_index]
1✔
302
        hop_payload = {
1✔
303
            "amt_to_forward": {"amt_to_forward": amt},
304
            "outgoing_cltv_value": {"outgoing_cltv_value": cltv_abs},
305
            "short_channel_id": {"short_channel_id": route_edge.short_channel_id},
306
        }
307
        hops_data.append(
1✔
308
            OnionHopsDataSingle(payload=hop_payload))
309
        amt += route_edge.fee_for_edge(amt)
1✔
310
        cltv_abs += route_edge.cltv_delta
1✔
311
    hops_data.reverse()
1✔
312
    return hops_data, amt, cltv_abs
1✔
313

314

315
def _generate_filler(key_type: bytes, hops_data: Sequence[OnionHopsDataSingle],
1✔
316
                     shared_secrets: Sequence[bytes], data_size:int) -> bytes:
317
    num_hops = len(hops_data)
1✔
318

319
    # generate filler that matches all but the last hop (no HMAC for last hop)
320
    filler_size = 0
1✔
321
    for hop_data in hops_data[:-1]:
1✔
322
        filler_size += len(hop_data.to_bytes())
1✔
323
    filler = bytearray(filler_size)
1✔
324

325
    for i in range(0, num_hops-1):  # -1, as last hop does not obfuscate
1✔
326
        # Sum up how many frames were used by prior hops.
327
        filler_start = data_size
1✔
328
        for hop_data in hops_data[:i]:
1✔
329
            filler_start -= len(hop_data.to_bytes())
1✔
330
        # The filler is the part dangling off of the end of the
331
        # routingInfo, so offset it from there, and use the current
332
        # hop's frame count as its size.
333
        filler_end = data_size + len(hops_data[i].to_bytes())
1✔
334

335
        stream_key = get_bolt04_onion_key(key_type, shared_secrets[i])
1✔
336
        stream_bytes = generate_cipher_stream(stream_key, 2 * data_size)
1✔
337
        filler = xor_bytes(filler, stream_bytes[filler_start:filler_end])
1✔
338
        filler += bytes(filler_size - len(filler))  # right pad with zeroes
1✔
339

340
    return filler
1✔
341

342

343
def generate_cipher_stream(stream_key: bytes, num_bytes: int) -> bytes:
1✔
344
    return chacha20_encrypt(key=stream_key,
1✔
345
                            nonce=bytes(8),
346
                            data=bytes(num_bytes))
347

348

349
class ProcessedOnionPacket(NamedTuple):
1✔
350
    are_we_final: bool
1✔
351
    hop_data: OnionHopsDataSingle
1✔
352
    next_packet: OnionPacket
1✔
353
    trampoline_onion_packet: OnionPacket
1✔
354

355

356
# TODO replay protection
357
def process_onion_packet(
1✔
358
        onion_packet: OnionPacket,
359
        our_onion_private_key: bytes,
360
        *,
361
        associated_data: bytes = b'',
362
        is_trampoline=False,
363
        tlv_stream_name='payload') -> ProcessedOnionPacket:
364
    if not ecc.ECPubkey.is_pubkey_bytes(onion_packet.public_key):
1✔
365
        raise InvalidOnionPubkey()
×
366
    shared_secret = get_ecdh(our_onion_private_key, onion_packet.public_key)
1✔
367
    # check message integrity
368
    mu_key = get_bolt04_onion_key(b'mu', shared_secret)
1✔
369
    calculated_mac = hmac_oneshot(
1✔
370
        mu_key, msg=onion_packet.hops_data+associated_data,
371
        digest=hashlib.sha256)
372
    if onion_packet.hmac != calculated_mac:
1✔
373
        raise InvalidOnionMac()
×
374
    # peel an onion layer off
375
    rho_key = get_bolt04_onion_key(b'rho', shared_secret)
1✔
376
    data_size = TRAMPOLINE_HOPS_DATA_SIZE if is_trampoline else HOPS_DATA_SIZE
1✔
377
    stream_bytes = generate_cipher_stream(rho_key, 2 * data_size)
1✔
378
    padded_header = onion_packet.hops_data + bytes(data_size)
1✔
379
    next_hops_data = xor_bytes(padded_header, stream_bytes)
1✔
380
    next_hops_data_fd = io.BytesIO(next_hops_data)
1✔
381
    hop_data = OnionHopsDataSingle.from_fd(next_hops_data_fd, tlv_stream_name=tlv_stream_name)
1✔
382
    # trampoline
383
    trampoline_onion_packet = hop_data.payload.get('trampoline_onion_packet')
1✔
384
    if trampoline_onion_packet:
1✔
385
        top_version = trampoline_onion_packet.get('version')
1✔
386
        top_public_key = trampoline_onion_packet.get('public_key')
1✔
387
        top_hops_data = trampoline_onion_packet.get('hops_data')
1✔
388
        top_hops_data_fd = io.BytesIO(top_hops_data)
1✔
389
        top_hmac = trampoline_onion_packet.get('hmac')
1✔
390
        trampoline_onion_packet = OnionPacket(
1✔
391
            public_key=top_public_key,
392
            hops_data=top_hops_data_fd.read(TRAMPOLINE_HOPS_DATA_SIZE),
393
            hmac=top_hmac)
394
    # calc next ephemeral key
395
    blinding_factor = sha256(onion_packet.public_key + shared_secret)
1✔
396
    blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
1✔
397
    next_public_key_int = ecc.ECPubkey(onion_packet.public_key) * blinding_factor_int
1✔
398
    next_public_key = next_public_key_int.get_public_key_bytes()
1✔
399
    next_onion_packet = OnionPacket(
1✔
400
        public_key=next_public_key,
401
        hops_data=next_hops_data_fd.read(data_size),
402
        hmac=hop_data.hmac)
403
    if hop_data.hmac == bytes(PER_HOP_HMAC_SIZE):
1✔
404
        # we are the destination / exit node
405
        are_we_final = True
1✔
406
    else:
407
        # we are an intermediate node; forwarding
408
        are_we_final = False
1✔
409
    return ProcessedOnionPacket(are_we_final, hop_data, next_onion_packet, trampoline_onion_packet)
1✔
410

411

412
class FailedToDecodeOnionError(Exception): pass
1✔
413

414

415
class OnionRoutingFailure(Exception):
1✔
416

417
    def __init__(self, code: Union[int, 'OnionFailureCode'], data: bytes):
1✔
418
        self.code = code
1✔
419
        self.data = data
1✔
420

421
    def __repr__(self):
1✔
422
        return repr((self.code, self.data))
1✔
423

424
    def to_bytes(self) -> bytes:
1✔
425
        ret = self.code.to_bytes(2, byteorder="big")
1✔
426
        ret += self.data
1✔
427
        return ret
1✔
428

429
    @classmethod
1✔
430
    def from_bytes(cls, failure_msg: bytes):
1✔
431
        failure_code = int.from_bytes(failure_msg[:2], byteorder='big')
1✔
432
        try:
1✔
433
            failure_code = OnionFailureCode(failure_code)
1✔
434
        except ValueError:
×
435
            pass  # unknown failure code
×
436
        failure_data = failure_msg[2:]
1✔
437
        return OnionRoutingFailure(failure_code, failure_data)
1✔
438

439
    def code_name(self) -> str:
1✔
440
        if isinstance(self.code, OnionFailureCode):
1✔
441
            return str(self.code.name)
1✔
442
        return f"Unknown error ({self.code!r})"
×
443

444
    def decode_data(self) -> Optional[Dict[str, Any]]:
1✔
445
        try:
1✔
446
            message_type, payload = OnionWireSerializer.decode_msg(self.to_bytes())
1✔
447
        except lnmsg.FailedToParseMsg:
1✔
448
            payload = None
1✔
449
        return payload
1✔
450

451

452
def construct_onion_error(
1✔
453
        error: OnionRoutingFailure,
454
        their_public_key: bytes,
455
        our_onion_private_key: bytes,
456
        local_height: int
457
) -> bytes:
458
    # add local height
459
    if error.code == OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS:
1✔
460
        error.data += local_height.to_bytes(4, byteorder="big")
1✔
461
    # create payload
462
    failure_msg = error.to_bytes()
1✔
463
    failure_len = len(failure_msg)
1✔
464
    pad_len = 256 - failure_len
1✔
465
    assert pad_len >= 0
1✔
466
    error_packet =  failure_len.to_bytes(2, byteorder="big")
1✔
467
    error_packet += failure_msg
1✔
468
    error_packet += pad_len.to_bytes(2, byteorder="big")
1✔
469
    error_packet += bytes(pad_len)
1✔
470
    # add hmac
471
    shared_secret = get_ecdh(our_onion_private_key, their_public_key)
1✔
472
    um_key = get_bolt04_onion_key(b'um', shared_secret)
1✔
473
    hmac_ = hmac_oneshot(um_key, msg=error_packet, digest=hashlib.sha256)
1✔
474
    error_packet = hmac_ + error_packet
1✔
475
    return error_packet
1✔
476

477
def obfuscate_onion_error(error_packet, their_public_key, our_onion_private_key):
1✔
478
    shared_secret = get_ecdh(our_onion_private_key, their_public_key)
1✔
479
    ammag_key = get_bolt04_onion_key(b'ammag', shared_secret)
1✔
480
    stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
1✔
481
    error_packet = xor_bytes(error_packet, stream_bytes)
1✔
482
    return error_packet
1✔
483

484

485
def _decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
1✔
486
                        session_key: bytes) -> Tuple[bytes, int]:
487
    """Returns the decoded error bytes, and the index of the sender of the error."""
488
    num_hops = len(payment_path_pubkeys)
1✔
489
    hop_shared_secrets, _ = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
1✔
490
    for i in range(num_hops):
1✔
491
        ammag_key = get_bolt04_onion_key(b'ammag', hop_shared_secrets[i])
1✔
492
        um_key = get_bolt04_onion_key(b'um', hop_shared_secrets[i])
1✔
493
        stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
1✔
494
        error_packet = xor_bytes(error_packet, stream_bytes)
1✔
495
        hmac_computed = hmac_oneshot(um_key, msg=error_packet[32:], digest=hashlib.sha256)
1✔
496
        hmac_found = error_packet[:32]
1✔
497
        if hmac_computed == hmac_found:
1✔
498
            return error_packet, i
1✔
UNCOV
499
    raise FailedToDecodeOnionError()
×
500

501

502
def decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
1✔
503
                       session_key: bytes) -> (OnionRoutingFailure, int):
504
    """Returns the failure message, and the index of the sender of the error."""
505
    decrypted_error, sender_index = _decode_onion_error(error_packet, payment_path_pubkeys, session_key)
1✔
506
    failure_msg = get_failure_msg_from_onion_error(decrypted_error)
1✔
507
    return failure_msg, sender_index
1✔
508

509

510
def get_failure_msg_from_onion_error(decrypted_error_packet: bytes) -> OnionRoutingFailure:
1✔
511
    # get failure_msg bytes from error packet
512
    failure_len = int.from_bytes(decrypted_error_packet[32:34], byteorder='big')
1✔
513
    failure_msg = decrypted_error_packet[34:34+failure_len]
1✔
514
    # create failure message object
515
    return OnionRoutingFailure.from_bytes(failure_msg)
1✔
516

517

518

519
# TODO maybe we should rm this and just use OnionWireSerializer and onion_wire.csv
520
BADONION = OnionFailureCodeMetaFlag.BADONION
1✔
521
PERM     = OnionFailureCodeMetaFlag.PERM
1✔
522
NODE     = OnionFailureCodeMetaFlag.NODE
1✔
523
UPDATE   = OnionFailureCodeMetaFlag.UPDATE
1✔
524
class OnionFailureCode(IntEnum):
1✔
525
    INVALID_REALM =                           PERM | 1
1✔
526
    TEMPORARY_NODE_FAILURE =                  NODE | 2
1✔
527
    PERMANENT_NODE_FAILURE =                  PERM | NODE | 2
1✔
528
    REQUIRED_NODE_FEATURE_MISSING =           PERM | NODE | 3
1✔
529
    INVALID_ONION_VERSION =                   BADONION | PERM | 4
1✔
530
    INVALID_ONION_HMAC =                      BADONION | PERM | 5
1✔
531
    INVALID_ONION_KEY =                       BADONION | PERM | 6
1✔
532
    TEMPORARY_CHANNEL_FAILURE =               UPDATE | 7
1✔
533
    PERMANENT_CHANNEL_FAILURE =               PERM | 8
1✔
534
    REQUIRED_CHANNEL_FEATURE_MISSING =        PERM | 9
1✔
535
    UNKNOWN_NEXT_PEER =                       PERM | 10
1✔
536
    AMOUNT_BELOW_MINIMUM =                    UPDATE | 11
1✔
537
    FEE_INSUFFICIENT =                        UPDATE | 12
1✔
538
    INCORRECT_CLTV_EXPIRY =                   UPDATE | 13
1✔
539
    EXPIRY_TOO_SOON =                         UPDATE | 14
1✔
540
    INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS =    PERM | 15
1✔
541
    _LEGACY_INCORRECT_PAYMENT_AMOUNT =        PERM | 16
1✔
542
    FINAL_EXPIRY_TOO_SOON =                   17
1✔
543
    FINAL_INCORRECT_CLTV_EXPIRY =             18
1✔
544
    FINAL_INCORRECT_HTLC_AMOUNT =             19
1✔
545
    CHANNEL_DISABLED =                        UPDATE | 20
1✔
546
    EXPIRY_TOO_FAR =                          21
1✔
547
    INVALID_ONION_PAYLOAD =                   PERM | 22
1✔
548
    MPP_TIMEOUT =                             23
1✔
549
    TRAMPOLINE_FEE_INSUFFICIENT =             NODE | 51
1✔
550
    TRAMPOLINE_EXPIRY_TOO_SOON =              NODE | 52
1✔
551

552

553
# don't use these elsewhere, the names are ambiguous without context
554
del BADONION; del PERM; del NODE; del UPDATE
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc