• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 21485572389

29 Jan 2026 04:09PM UTC coverage: 65.247% (+0.2%) from 65.074%
21485572389

Pull #10089

github

web-flow
Merge 22d34d15e into 19b2ad797
Pull Request #10089: Onion message forwarding

1152 of 1448 new or added lines in 23 files covered. (79.56%)

4109 existing lines in 29 files now uncovered.

139515 of 213825 relevant lines covered (65.25%)

20529.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.59
/peer/test_utils.go
1
package peer
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "encoding/binary"
7
        "errors"
8
        "io"
9
        "math/rand"
10
        "net"
11
        "sync/atomic"
12
        "testing"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/btcutil"
17
        "github.com/btcsuite/btcd/chaincfg/chainhash"
18
        "github.com/btcsuite/btcd/wire"
19
        sphinx "github.com/lightningnetwork/lightning-onion"
20
        "github.com/lightningnetwork/lnd/actor"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/channeldb"
23
        "github.com/lightningnetwork/lnd/channelnotifier"
24
        "github.com/lightningnetwork/lnd/fn/v2"
25
        graphdb "github.com/lightningnetwork/lnd/graph/db"
26
        "github.com/lightningnetwork/lnd/htlcswitch"
27
        "github.com/lightningnetwork/lnd/input"
28
        "github.com/lightningnetwork/lnd/keychain"
29
        "github.com/lightningnetwork/lnd/lntest/channels"
30
        "github.com/lightningnetwork/lnd/lntest/mock"
31
        "github.com/lightningnetwork/lnd/lntypes"
32
        "github.com/lightningnetwork/lnd/lnwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/netann"
36
        "github.com/lightningnetwork/lnd/onionmessage"
37
        "github.com/lightningnetwork/lnd/pool"
38
        "github.com/lightningnetwork/lnd/queue"
39
        "github.com/lightningnetwork/lnd/shachain"
40
        "github.com/lightningnetwork/lnd/subscribe"
41
        "github.com/stretchr/testify/require"
42
)
43

44
const (
45
        broadcastHeight = 100
46

47
        // timeout is a timeout value to use for tests which need to wait for
48
        // a return value on a channel.
49
        timeout = time.Second * 5
50

51
        // testCltvRejectDelta is the minimum delta between expiry and current
52
        // height below which htlcs are rejected.
53
        testCltvRejectDelta = 13
54
)
55

56
var (
57
        testKeyLoc = keychain.KeyLocator{Family: keychain.KeyFamilyNodeKey}
58
)
59

60
// noUpdate is a function which can be used as a parameter in
61
// createTestPeerWithChannel to call the setup code with no custom values on
62
// the channels set up.
63
var noUpdate = func(a, b *channeldb.OpenChannel) {}
9✔
64

65
type peerTestCtx struct {
66
        peer          *Brontide
67
        channel       *lnwallet.LightningChannel
68
        notifier      *mock.ChainNotifier
69
        publishTx     <-chan *wire.MsgTx
70
        mockSwitch    *mockMessageSwitch
71
        db            *channeldb.DB
72
        privKey       *btcec.PrivateKey
73
        mockConn      *mockMessageConn
74
        customChan    chan *customMsg
75
        chanStatusMgr *netann.ChanStatusManager
76
}
77

78
// createTestPeerWithChannel creates a channel between two nodes, and returns a
79
// peer for one of the nodes, together with the channel seen from both nodes.
80
// It takes an updateChan function which can be used to modify the default
81
// values on the channel states for each peer.
82
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
83
        b *channeldb.OpenChannel)) (*peerTestCtx, error) {
14✔
84

14✔
85
        params := createTestPeer(t)
14✔
86

14✔
87
        var (
14✔
88
                publishTx     = params.publishTx
14✔
89
                mockSwitch    = params.mockSwitch
14✔
90
                alicePeer     = params.peer
14✔
91
                notifier      = params.notifier
14✔
92
                aliceKeyPriv  = params.privKey
14✔
93
                dbAlice       = params.db
14✔
94
                chanStatusMgr = params.chanStatusMgr
14✔
95
        )
14✔
96

14✔
97
        err := chanStatusMgr.Start()
14✔
98
        require.NoError(t, err)
14✔
99
        t.Cleanup(func() {
28✔
100
                require.NoError(t, chanStatusMgr.Stop())
14✔
101
        })
14✔
102

103
        aliceKeyPub := alicePeer.IdentityKey()
14✔
104
        estimator := alicePeer.cfg.FeeEstimator
14✔
105

14✔
106
        channelCapacity := btcutil.Amount(10 * 1e8)
14✔
107
        channelBal := channelCapacity / 2
14✔
108
        aliceDustLimit := btcutil.Amount(200)
14✔
109
        bobDustLimit := btcutil.Amount(1300)
14✔
110
        csvTimeoutAlice := uint32(5)
14✔
111
        csvTimeoutBob := uint32(4)
14✔
112
        isAliceInitiator := true
14✔
113

14✔
114
        prevOut := &wire.OutPoint{
14✔
115
                Hash:  channels.TestHdSeed,
14✔
116
                Index: 0,
14✔
117
        }
14✔
118
        fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
14✔
119

14✔
120
        bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
14✔
121
                channels.BobsPrivKey,
14✔
122
        )
14✔
123

14✔
124
        aliceCfg := channeldb.ChannelConfig{
14✔
125
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
126
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
127
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
128
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
129
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
130
                },
14✔
131
                CommitmentParams: channeldb.CommitmentParams{
14✔
132
                        DustLimit: aliceDustLimit,
14✔
133
                        CsvDelay:  uint16(csvTimeoutAlice),
14✔
134
                },
14✔
135
                MultiSigKey: keychain.KeyDescriptor{
14✔
136
                        PubKey: aliceKeyPub,
14✔
137
                },
14✔
138
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
139
                        PubKey: aliceKeyPub,
14✔
140
                },
14✔
141
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
142
                        PubKey: aliceKeyPub,
14✔
143
                },
14✔
144
                DelayBasePoint: keychain.KeyDescriptor{
14✔
145
                        PubKey: aliceKeyPub,
14✔
146
                },
14✔
147
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
148
                        PubKey: aliceKeyPub,
14✔
149
                },
14✔
150
        }
14✔
151
        bobCfg := channeldb.ChannelConfig{
14✔
152
                ChannelStateBounds: channeldb.ChannelStateBounds{
14✔
153
                        MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
14✔
154
                        ChanReserve:      btcutil.Amount(rand.Int63()),
14✔
155
                        MinHTLC:          lnwire.MilliSatoshi(rand.Int63()),
14✔
156
                        MaxAcceptedHtlcs: uint16(rand.Int31()),
14✔
157
                },
14✔
158
                CommitmentParams: channeldb.CommitmentParams{
14✔
159
                        DustLimit: bobDustLimit,
14✔
160
                        CsvDelay:  uint16(csvTimeoutBob),
14✔
161
                },
14✔
162
                MultiSigKey: keychain.KeyDescriptor{
14✔
163
                        PubKey: bobKeyPub,
14✔
164
                },
14✔
165
                RevocationBasePoint: keychain.KeyDescriptor{
14✔
166
                        PubKey: bobKeyPub,
14✔
167
                },
14✔
168
                PaymentBasePoint: keychain.KeyDescriptor{
14✔
169
                        PubKey: bobKeyPub,
14✔
170
                },
14✔
171
                DelayBasePoint: keychain.KeyDescriptor{
14✔
172
                        PubKey: bobKeyPub,
14✔
173
                },
14✔
174
                HtlcBasePoint: keychain.KeyDescriptor{
14✔
175
                        PubKey: bobKeyPub,
14✔
176
                },
14✔
177
        }
14✔
178

14✔
179
        bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
14✔
180
        if err != nil {
14✔
181
                return nil, err
×
182
        }
×
183
        bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
14✔
184
        bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
14✔
185
        if err != nil {
14✔
186
                return nil, err
×
187
        }
×
188
        bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
14✔
189

14✔
190
        aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
14✔
191
        if err != nil {
14✔
192
                return nil, err
×
193
        }
×
194
        alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
14✔
195
        aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
14✔
196
        if err != nil {
14✔
197
                return nil, err
×
198
        }
×
199
        aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
14✔
200

14✔
201
        aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
14✔
202
                channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
14✔
203
                bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
14✔
204
                isAliceInitiator, 0,
14✔
205
        )
14✔
206
        if err != nil {
14✔
207
                return nil, err
×
208
        }
×
209

210
        dbBob := channeldb.OpenForTesting(t, t.TempDir())
14✔
211

14✔
212
        feePerKw, err := estimator.EstimateFeePerKW(1)
14✔
213
        if err != nil {
14✔
214
                return nil, err
×
215
        }
×
216

217
        // TODO(roasbeef): need to factor in commit fee?
218
        aliceCommit := channeldb.ChannelCommitment{
14✔
219
                CommitHeight:  0,
14✔
220
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
221
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
222
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
223
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
224
                CommitTx:      aliceCommitTx,
14✔
225
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
226
        }
14✔
227
        bobCommit := channeldb.ChannelCommitment{
14✔
228
                CommitHeight:  0,
14✔
229
                LocalBalance:  lnwire.NewMSatFromSatoshis(channelBal),
14✔
230
                RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
14✔
231
                FeePerKw:      btcutil.Amount(feePerKw),
14✔
232
                CommitFee:     feePerKw.FeeForWeight(input.CommitWeight),
14✔
233
                CommitTx:      bobCommitTx,
14✔
234
                CommitSig:     bytes.Repeat([]byte{1}, 71),
14✔
235
        }
14✔
236

14✔
237
        var chanIDBytes [8]byte
14✔
238
        if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
14✔
239
                return nil, err
×
240
        }
×
241

242
        shortChanID := lnwire.NewShortChanIDFromInt(
14✔
243
                binary.BigEndian.Uint64(chanIDBytes[:]),
14✔
244
        )
14✔
245

14✔
246
        aliceChannelState := &channeldb.OpenChannel{
14✔
247
                LocalChanCfg:            aliceCfg,
14✔
248
                RemoteChanCfg:           bobCfg,
14✔
249
                IdentityPub:             aliceKeyPub,
14✔
250
                FundingOutpoint:         *prevOut,
14✔
251
                ShortChannelID:          shortChanID,
14✔
252
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
253
                IsInitiator:             isAliceInitiator,
14✔
254
                Capacity:                channelCapacity,
14✔
255
                RemoteCurrentRevocation: bobCommitPoint,
14✔
256
                RevocationProducer:      alicePreimageProducer,
14✔
257
                RevocationStore:         shachain.NewRevocationStore(),
14✔
258
                LocalCommitment:         aliceCommit,
14✔
259
                RemoteCommitment:        aliceCommit,
14✔
260
                Db:                      dbAlice.ChannelStateDB(),
14✔
261
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
262
                FundingTxn:              channels.TestFundingTx,
14✔
263
        }
14✔
264
        bobChannelState := &channeldb.OpenChannel{
14✔
265
                LocalChanCfg:            bobCfg,
14✔
266
                RemoteChanCfg:           aliceCfg,
14✔
267
                IdentityPub:             bobKeyPub,
14✔
268
                FundingOutpoint:         *prevOut,
14✔
269
                ChanType:                channeldb.SingleFunderTweaklessBit,
14✔
270
                IsInitiator:             !isAliceInitiator,
14✔
271
                Capacity:                channelCapacity,
14✔
272
                RemoteCurrentRevocation: aliceCommitPoint,
14✔
273
                RevocationProducer:      bobPreimageProducer,
14✔
274
                RevocationStore:         shachain.NewRevocationStore(),
14✔
275
                LocalCommitment:         bobCommit,
14✔
276
                RemoteCommitment:        bobCommit,
14✔
277
                Db:                      dbBob.ChannelStateDB(),
14✔
278
                Packager:                channeldb.NewChannelPackager(shortChanID),
14✔
279
        }
14✔
280

14✔
281
        // Set custom values on the channel states.
14✔
282
        updateChan(aliceChannelState, bobChannelState)
14✔
283

14✔
284
        aliceAddr := alicePeer.cfg.Addr.Address
14✔
285
        if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
14✔
286
                return nil, err
×
287
        }
×
288

289
        bobAddr := &net.TCPAddr{
14✔
290
                IP:   net.ParseIP("127.0.0.1"),
14✔
291
                Port: 18556,
14✔
292
        }
14✔
293

14✔
294
        if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
14✔
295
                return nil, err
×
296
        }
×
297

298
        aliceSigner := input.NewMockSigner(
14✔
299
                []*btcec.PrivateKey{aliceKeyPriv}, nil,
14✔
300
        )
14✔
301
        bobSigner := input.NewMockSigner(
14✔
302
                []*btcec.PrivateKey{bobKeyPriv}, nil,
14✔
303
        )
14✔
304

14✔
305
        alicePool := lnwallet.NewSigPool(1, aliceSigner)
14✔
306
        channelAlice, err := lnwallet.NewLightningChannel(
14✔
307
                aliceSigner, aliceChannelState, alicePool,
14✔
308
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
309
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
310
                        lnwallet.EmptyMockJobHandler,
14✔
311
                )),
14✔
312
        )
14✔
313
        if err != nil {
14✔
314
                return nil, err
×
315
        }
×
316
        _ = alicePool.Start()
14✔
317
        t.Cleanup(func() {
28✔
318
                require.NoError(t, alicePool.Stop())
14✔
319
        })
14✔
320

321
        bobPool := lnwallet.NewSigPool(1, bobSigner)
14✔
322
        channelBob, err := lnwallet.NewLightningChannel(
14✔
323
                bobSigner, bobChannelState, bobPool,
14✔
324
                lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
14✔
325
                lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
14✔
326
                        lnwallet.EmptyMockJobHandler,
14✔
327
                )),
14✔
328
        )
14✔
329
        if err != nil {
14✔
330
                return nil, err
×
331
        }
×
332
        _ = bobPool.Start()
14✔
333
        t.Cleanup(func() {
28✔
334
                require.NoError(t, bobPool.Stop())
14✔
335
        })
14✔
336

337
        alicePeer.remoteFeatures = lnwire.NewFeatureVector(
14✔
338
                nil, lnwire.Features,
14✔
339
        )
14✔
340

14✔
341
        chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
14✔
342
        alicePeer.activeChannels.Store(chanID, channelAlice)
14✔
343

14✔
344
        alicePeer.cg.WgAdd(1)
14✔
345
        go alicePeer.channelManager()
14✔
346

14✔
347
        return &peerTestCtx{
14✔
348
                peer:       alicePeer,
14✔
349
                channel:    channelBob,
14✔
350
                notifier:   notifier,
14✔
351
                publishTx:  publishTx,
14✔
352
                mockSwitch: mockSwitch,
14✔
353
                mockConn:   params.mockConn,
14✔
354
        }, nil
14✔
355
}
356

357
// mockMessageSwitch is a mock implementation of the messageSwitch interface
358
// used for testing without relying on a *htlcswitch.Switch in unit tests.
359
type mockMessageSwitch struct {
360
        links []htlcswitch.ChannelUpdateHandler
361
}
362

363
// BestHeight currently returns a dummy value.
364
func (m *mockMessageSwitch) BestHeight() uint32 {
×
365
        return 0
×
366
}
×
367

368
// CircuitModifier currently returns a dummy value.
369
func (m *mockMessageSwitch) CircuitModifier() htlcswitch.CircuitModifier {
×
370
        return nil
×
371
}
×
372

373
// RemoveLink currently does nothing.
374
func (m *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) {}
8✔
375

376
// CreateAndAddLink currently returns a dummy value.
377
func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig,
378
        lnChan *lnwallet.LightningChannel) error {
×
379

×
380
        return nil
×
381
}
×
382

383
// GetLinksByInterface returns the active links.
384
func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
385
        []htlcswitch.ChannelUpdateHandler, error) {
19✔
386

19✔
387
        return m.links, nil
19✔
388
}
19✔
389

390
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
391
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
392
type mockUpdateHandler struct {
393
        cid                  lnwire.ChannelID
394
        isOutgoingAddBlocked atomic.Bool
395
        isIncomingAddBlocked atomic.Bool
396
}
397

398
// newMockUpdateHandler creates a new mockUpdateHandler.
399
func newMockUpdateHandler(cid lnwire.ChannelID) *mockUpdateHandler {
9✔
400
        return &mockUpdateHandler{
9✔
401
                cid: cid,
9✔
402
        }
9✔
403
}
9✔
404

405
// HandleChannelUpdate currently does nothing.
406
func (m *mockUpdateHandler) HandleChannelUpdate(msg lnwire.Message) {}
×
407

408
// ChanID returns the mockUpdateHandler's cid.
409
func (m *mockUpdateHandler) ChanID() lnwire.ChannelID { return m.cid }
18✔
410

411
// Bandwidth currently returns a dummy value.
412
func (m *mockUpdateHandler) Bandwidth() lnwire.MilliSatoshi { return 0 }
×
413

414
// EligibleToForward currently returns a dummy value.
415
func (m *mockUpdateHandler) EligibleToForward() bool { return false }
×
416

417
// MayAddOutgoingHtlc currently returns nil.
418
func (m *mockUpdateHandler) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
×
419

420
type mockMessageConn struct {
421
        t *testing.T
422

423
        // MessageConn embeds our interface so that the mock does not need to
424
        // implement every function. The mock will panic if an unspecified function
425
        // is called.
426
        MessageConn
427

428
        // writtenMessages is a channel that our mock pushes written messages into.
429
        writtenMessages chan []byte
430

431
        readMessages   chan []byte
432
        curReadMessage []byte
433

434
        // writeRaceDetectingCounter is incremented on any function call
435
        // associated with writing to the connection. The race detector will
436
        // trigger on this counter if a data race exists.
437
        writeRaceDetectingCounter int
438

439
        // readRaceDetectingCounter is incremented on any function call
440
        // associated with reading from the connection. The race detector will
441
        // trigger on this counter if a data race exists.
442
        readRaceDetectingCounter int
443
}
444

445
func (m *mockUpdateHandler) EnableAdds(dir htlcswitch.LinkDirection) bool {
×
446
        if dir == htlcswitch.Outgoing {
×
447
                return m.isOutgoingAddBlocked.Swap(false)
×
448
        }
×
449

450
        return m.isIncomingAddBlocked.Swap(false)
×
451
}
452

453
func (m *mockUpdateHandler) DisableAdds(dir htlcswitch.LinkDirection) bool {
12✔
454
        if dir == htlcswitch.Outgoing {
20✔
455
                return !m.isOutgoingAddBlocked.Swap(true)
8✔
456
        }
8✔
457

458
        return !m.isIncomingAddBlocked.Swap(true)
4✔
459
}
460

461
func (m *mockUpdateHandler) IsFlushing(dir htlcswitch.LinkDirection) bool {
×
462
        switch dir {
×
463
        case htlcswitch.Outgoing:
×
464
                return m.isOutgoingAddBlocked.Load()
×
465
        case htlcswitch.Incoming:
×
466
                return m.isIncomingAddBlocked.Load()
×
467
        }
468

469
        return false
×
470
}
471

472
func (m *mockUpdateHandler) OnFlushedOnce(hook func()) {
4✔
473
        hook()
4✔
474
}
4✔
475
func (m *mockUpdateHandler) OnCommitOnce(
476
        _ htlcswitch.LinkDirection, hook func(),
477
) {
8✔
478

8✔
479
        hook()
8✔
480
}
8✔
481
func (m *mockUpdateHandler) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
×
482
        // TODO(proofofkeags): Implement
×
483
        c := make(chan fn.Result[lntypes.ChannelParty], 1)
×
484

×
485
        c <- fn.Errf[lntypes.ChannelParty]("InitStfu not yet implemented")
×
486

×
487
        return c
×
488
}
×
489

490
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
20✔
491
        return &mockMessageConn{
20✔
492
                t:               t,
20✔
493
                writtenMessages: make(chan []byte, expectedMessages),
20✔
494
                readMessages:    make(chan []byte, 1),
20✔
495
        }
20✔
496
}
20✔
497

498
// SetWriteDeadline mocks setting write deadline for our conn.
499
func (m *mockMessageConn) SetWriteDeadline(time.Time) error {
13✔
500
        m.writeRaceDetectingCounter++
13✔
501
        return nil
13✔
502
}
13✔
503

504
// Flush mocks a message conn flush.
505
func (m *mockMessageConn) Flush() (int, error) {
13✔
506
        m.writeRaceDetectingCounter++
13✔
507
        return 0, nil
13✔
508
}
13✔
509

510
// WriteMessage mocks sending of a message on our connection. It will push
511
// the bytes sent into the mock's writtenMessages channel.
512
func (m *mockMessageConn) WriteMessage(msg []byte) error {
13✔
513
        m.writeRaceDetectingCounter++
13✔
514

13✔
515
        msgCopy := make([]byte, len(msg))
13✔
516
        copy(msgCopy, msg)
13✔
517

13✔
518
        select {
13✔
519
        case m.writtenMessages <- msgCopy:
13✔
520
        case <-time.After(timeout):
×
521
                m.t.Fatalf("timeout sending message: %v", msgCopy)
×
522
        }
523

524
        return nil
13✔
525
}
526

527
// assertWrite asserts that our mock as had WriteMessage called with the byte
528
// slice we expect.
529
func (m *mockMessageConn) assertWrite(expected []byte) {
4✔
530
        select {
4✔
531
        case actual := <-m.writtenMessages:
4✔
532
                require.Equal(m.t, expected, actual)
4✔
533

534
        case <-time.After(timeout):
×
535
                m.t.Fatalf("timeout waiting for write: %v", expected)
×
536
        }
537
}
538

539
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
11✔
540
        m.readRaceDetectingCounter++
11✔
541
        return nil
11✔
542
}
11✔
543

544
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
7✔
545
        m.readRaceDetectingCounter++
7✔
546
        m.curReadMessage = <-m.readMessages
7✔
547
        return uint32(len(m.curReadMessage)), nil
7✔
548
}
7✔
549

550
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
4✔
551
        m.readRaceDetectingCounter++
4✔
552
        return m.curReadMessage, nil
4✔
553
}
4✔
554

555
func (m *mockMessageConn) RemoteAddr() net.Addr {
23✔
556
        return nil
23✔
557
}
23✔
558

559
func (m *mockMessageConn) LocalAddr() net.Addr {
3✔
560
        return nil
3✔
561
}
3✔
562

563
func (m *mockMessageConn) Close() error {
1✔
564
        return nil
1✔
565
}
1✔
566

567
// createTestPeer creates a new peer for testing and returns a context struct
568
// containing necessary handles and mock objects for conducting tests on peer
569
// functionalities.
570
func createTestPeer(t *testing.T) *peerTestCtx {
19✔
571
        nodeKeyLocator := keychain.KeyLocator{
19✔
572
                Family: keychain.KeyFamilyNodeKey,
19✔
573
        }
19✔
574

19✔
575
        aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
19✔
576
                channels.AlicesPrivKey,
19✔
577
        )
19✔
578

19✔
579
        aliceKeySigner := keychain.NewPrivKeyMessageSigner(
19✔
580
                aliceKeyPriv, nodeKeyLocator,
19✔
581
        )
19✔
582

19✔
583
        aliceAddr := &net.TCPAddr{
19✔
584
                IP:   net.ParseIP("127.0.0.1"),
19✔
585
                Port: 18555,
19✔
586
        }
19✔
587
        cfgAddr := &lnwire.NetAddress{
19✔
588
                IdentityKey: aliceKeyPub,
19✔
589
                Address:     aliceAddr,
19✔
590
                ChainNet:    wire.SimNet,
19✔
591
        }
19✔
592

19✔
593
        errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
19✔
594
        require.NoError(t, err)
19✔
595

19✔
596
        chainIO := &mock.ChainIO{
19✔
597
                BestHeight: broadcastHeight,
19✔
598
        }
19✔
599

19✔
600
        publishTx := make(chan *wire.MsgTx)
19✔
601
        wallet := &lnwallet.LightningWallet{
19✔
602
                WalletController: &mock.WalletController{
19✔
603
                        RootKey:               aliceKeyPriv,
19✔
604
                        PublishedTransactions: publishTx,
19✔
605
                },
19✔
606
        }
19✔
607

19✔
608
        const chanActiveTimeout = time.Minute
19✔
609

19✔
610
        dbAliceGraph := graphdb.MakeTestGraph(t)
19✔
611
        require.NoError(t, dbAliceGraph.Start())
19✔
612
        t.Cleanup(func() {
38✔
613
                require.NoError(t, dbAliceGraph.Stop())
19✔
614
        })
19✔
615

616
        dbAliceChannel := channeldb.OpenForTesting(t, t.TempDir())
19✔
617

19✔
618
        nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
19✔
619

19✔
620
        chanStatusMgr, err := netann.NewChanStatusManager(&netann.
19✔
621
                ChanStatusConfig{
19✔
622
                ChanStatusSampleInterval: 30 * time.Second,
19✔
623
                ChanEnableTimeout:        chanActiveTimeout,
19✔
624
                ChanDisableTimeout:       2 * time.Minute,
19✔
625
                DB:                       dbAliceChannel.ChannelStateDB(),
19✔
626
                Graph:                    dbAliceGraph,
19✔
627
                MessageSigner:            nodeSignerAlice,
19✔
628
                OurPubKey:                aliceKeyPub,
19✔
629
                OurKeyLoc:                testKeyLoc,
19✔
630
                IsChannelActive: func(lnwire.ChannelID) bool {
19✔
631
                        return true
×
632
                },
×
633
                ApplyChannelUpdate: func(*lnwire.ChannelUpdate1,
634
                        *wire.OutPoint, bool) error {
×
635

×
636
                        return nil
×
637
                },
×
638
        })
639
        require.NoError(t, err)
19✔
640

19✔
641
        interceptableSwitchNotifier := &mock.ChainNotifier{
19✔
642
                EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
19✔
643
        }
19✔
644
        interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
19✔
645
                Height: 1,
19✔
646
        }
19✔
647

19✔
648
        interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
19✔
649
                &htlcswitch.InterceptableSwitchConfig{
19✔
650
                        CltvRejectDelta:    testCltvRejectDelta,
19✔
651
                        CltvInterceptDelta: testCltvRejectDelta + 3,
19✔
652
                        Notifier:           interceptableSwitchNotifier,
19✔
653
                },
19✔
654
        )
19✔
655
        require.NoError(t, err)
19✔
656

19✔
657
        // TODO(yy): create interface for lnwallet.LightningChannel so we can
19✔
658
        // easily mock it without the following setups.
19✔
659
        notifier := &mock.ChainNotifier{
19✔
660
                SpendChan: make(chan *chainntnfs.SpendDetail),
19✔
661
                EpochChan: make(chan *chainntnfs.BlockEpoch),
19✔
662
                ConfChan:  make(chan *chainntnfs.TxConfirmation),
19✔
663
        }
19✔
664

19✔
665
        mockSwitch := &mockMessageSwitch{}
19✔
666

19✔
667
        // TODO(yy): change ChannelNotifier to be an interface.
19✔
668
        channelNotifier := channelnotifier.New(dbAliceChannel.ChannelStateDB())
19✔
669
        require.NoError(t, channelNotifier.Start())
19✔
670
        t.Cleanup(func() {
38✔
671
                require.NoError(t, channelNotifier.Stop(),
19✔
672
                        "stop channel notifier failed")
19✔
673
        })
19✔
674

675
        writeBufferPool := pool.NewWriteBuffer(
19✔
676
                pool.DefaultWriteBufferGCInterval,
19✔
677
                pool.DefaultWriteBufferExpiryInterval,
19✔
678
        )
19✔
679

19✔
680
        writePool := pool.NewWrite(
19✔
681
                writeBufferPool, 1, timeout,
19✔
682
        )
19✔
683
        require.NoError(t, writePool.Start())
19✔
684

19✔
685
        readBufferPool := pool.NewReadBuffer(
19✔
686
                pool.DefaultReadBufferGCInterval,
19✔
687
                pool.DefaultReadBufferExpiryInterval,
19✔
688
        )
19✔
689

19✔
690
        readPool := pool.NewRead(
19✔
691
                readBufferPool, 1, timeout,
19✔
692
        )
19✔
693
        require.NoError(t, readPool.Start())
19✔
694

19✔
695
        mockConn := newMockConn(t, 1)
19✔
696

19✔
697
        receivedCustomChan := make(chan *customMsg)
19✔
698

19✔
699
        var pubKey [33]byte
19✔
700
        copy(pubKey[:], aliceKeyPub.SerializeCompressed())
19✔
701

19✔
702
        // We have to have a valid server key for brontide to start up properly.
19✔
703
        serverKey, err := btcec.NewPrivateKey()
19✔
704
        require.NoError(t, err)
19✔
705

19✔
706
        var serverKeyArr [33]byte
19✔
707
        copy(serverKeyArr[:], serverKey.PubKey().SerializeCompressed())
19✔
708

19✔
709
        router := sphinx.NewRouter(
19✔
710
                &sphinx.PrivKeyECDH{PrivKey: aliceKeyPriv},
19✔
711
                sphinx.NewMemoryReplayLog(),
19✔
712
        )
19✔
713
        require.NoError(t, router.Start())
19✔
714
        t.Cleanup(func() {
38✔
715
                router.Stop()
19✔
716
        })
19✔
717

718
        onionMsgServer := subscribe.NewServer()
19✔
719
        require.NoError(t, onionMsgServer.Start())
19✔
720
        t.Cleanup(func() {
38✔
721
                require.NoError(t, onionMsgServer.Stop())
19✔
722
        })
19✔
723

724
        actorSystem := actor.NewActorSystem()
19✔
725
        t.Cleanup(func() {
38✔
726
                require.NoError(t, actorSystem.Shutdown())
19✔
727
        })
19✔
728

729
        // Create the onion endpoint for tests.
730
        onionEndpoint, err := onionmessage.NewOnionEndpoint(
19✔
731
                actorSystem.Receptionist(),
19✔
732
                router,
19✔
733
                &noopNodeIDResolver{},
19✔
734
                onionmessage.WithMessageServer(onionMsgServer),
19✔
735
        )
19✔
736
        require.NoError(t, err)
19✔
737

19✔
738
        estimator := chainfee.NewStaticEstimator(12500, 0)
19✔
739

19✔
740
        cfg := &Config{
19✔
741
                Addr:              cfgAddr,
19✔
742
                PubKeyBytes:       pubKey,
19✔
743
                ServerPubKey:      serverKeyArr,
19✔
744
                OnionEndpoint:     onionEndpoint,
19✔
745
                ActorSystem:       actorSystem,
19✔
746
                ErrorBuffer:       errBuffer,
19✔
747
                ChainIO:           chainIO,
19✔
748
                Switch:            mockSwitch,
19✔
749
                ChanActiveTimeout: chanActiveTimeout,
19✔
750
                InterceptSwitch:   interceptableSwitch,
19✔
751
                ChannelDB:         dbAliceChannel.ChannelStateDB(),
19✔
752
                FeeEstimator:      estimator,
19✔
753
                Wallet:            wallet,
19✔
754
                ChainNotifier:     notifier,
19✔
755
                ChanStatusMgr:     chanStatusMgr,
19✔
756
                Features: lnwire.NewFeatureVector(
19✔
757
                        nil, lnwire.Features,
19✔
758
                ),
19✔
759
                DisconnectPeer: func(b *btcec.PublicKey) error {
19✔
760
                        return nil
×
761
                },
×
762
                ChannelNotifier:               channelNotifier,
763
                PrunePersistentPeerConnection: func([33]byte) {},
1✔
764
                LegacyFeatures:                lnwire.EmptyFeatureVector(),
765
                WritePool:                     writePool,
766
                ReadPool:                      readPool,
767
                Conn:                          mockConn,
768
                HandleCustomMessage: func(
769
                        peer [33]byte, msg *lnwire.Custom) error {
1✔
770

1✔
771
                        receivedCustomChan <- &customMsg{
1✔
772
                                peer: peer,
1✔
773
                                msg:  *msg,
1✔
774
                        }
1✔
775

1✔
776
                        return nil
1✔
777
                },
1✔
778
                PongBuf: make([]byte, lnwire.MaxPongBytes),
779
                FetchLastChanUpdate: func(chanID lnwire.ShortChannelID,
780
                ) (*lnwire.ChannelUpdate1, error) {
2✔
781

2✔
782
                        return &lnwire.ChannelUpdate1{}, nil
2✔
783
                },
2✔
784
        }
785

786
        alicePeer := NewBrontide(*cfg)
19✔
787

19✔
788
        return &peerTestCtx{
19✔
789
                publishTx:     publishTx,
19✔
790
                mockSwitch:    mockSwitch,
19✔
791
                peer:          alicePeer,
19✔
792
                notifier:      notifier,
19✔
793
                db:            dbAliceChannel,
19✔
794
                privKey:       aliceKeyPriv,
19✔
795
                mockConn:      mockConn,
19✔
796
                customChan:    receivedCustomChan,
19✔
797
                chanStatusMgr: chanStatusMgr,
19✔
798
        }
19✔
799
}
800

801
// startPeer invokes the `Start` method on the specified peer and handles any
802
// initial startup messages for testing.
803
func startPeer(t *testing.T, mockConn *mockMessageConn,
804
        peer *Brontide) <-chan struct{} {
3✔
805

3✔
806
        // Start the peer in a goroutine so that we can handle and test for
3✔
807
        // startup messages. Successfully sending and receiving init message,
3✔
808
        // indicates a successful startup.
3✔
809
        done := make(chan struct{})
3✔
810
        go func() {
6✔
811
                require.NoError(t, peer.Start())
3✔
812
                close(done)
3✔
813
        }()
3✔
814

815
        // Receive the init message that should be the first message received on
816
        // startup.
817
        rawMsg, err := fn.RecvOrTimeout[[]byte](
3✔
818
                mockConn.writtenMessages, timeout,
3✔
819
        )
3✔
820
        require.NoError(t, err)
3✔
821

3✔
822
        msgReader := bytes.NewReader(rawMsg)
3✔
823
        nextMsg, err := lnwire.ReadMessage(msgReader, 0)
3✔
824
        require.NoError(t, err)
3✔
825

3✔
826
        _, ok := nextMsg.(*lnwire.Init)
3✔
827
        require.True(t, ok)
3✔
828

3✔
829
        // Write the reply for the init message to complete the startup.
3✔
830
        initReplyMsg := lnwire.NewInitMessage(
3✔
831
                lnwire.NewRawFeatureVector(
3✔
832
                        lnwire.DataLossProtectRequired,
3✔
833
                        lnwire.GossipQueriesOptional,
3✔
834
                ),
3✔
835
                lnwire.NewRawFeatureVector(),
3✔
836
        )
3✔
837

3✔
838
        var b bytes.Buffer
3✔
839
        _, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
3✔
840
        require.NoError(t, err)
3✔
841

3✔
842
        ok = fn.SendOrQuit[[]byte, struct{}](
3✔
843
                mockConn.readMessages, b.Bytes(), make(chan struct{}),
3✔
844
        )
3✔
845
        require.True(t, ok)
3✔
846

3✔
847
        return done
3✔
848
}
849

850
// noopNodeIDResolver is a resolver that returns an error for all lookups.
851
// Used in tests that don't need onion message SCID resolution.
852
type noopNodeIDResolver struct{}
853

854
// RemotePubFromSCID implements onionmessage.NodeIDResolver.
855
func (n *noopNodeIDResolver) RemotePubFromSCID(
NEW
856
        lnwire.ShortChannelID) (*btcec.PublicKey, error) {
×
NEW
857

×
NEW
858
        return nil, errors.New("noop resolver")
×
NEW
859
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc