• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 21485572389

29 Jan 2026 04:09PM UTC coverage: 65.247% (+0.2%) from 65.074%
21485572389

Pull #10089

github

web-flow
Merge 22d34d15e into 19b2ad797
Pull Request #10089: Onion message forwarding

1152 of 1448 new or added lines in 23 files covered. (79.56%)

4109 existing lines in 29 files now uncovered.

139515 of 213825 relevant lines covered (65.25%)

20529.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.87
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "log/slog"
9
        "runtime/debug"
10
        "strings"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
17
        "github.com/btcsuite/btcd/btcutil"
18
        "github.com/btcsuite/btcd/chaincfg"
19
        "github.com/btcsuite/btcd/chaincfg/chainhash"
20
        "github.com/btcsuite/btcd/txscript"
21
        "github.com/btcsuite/btcd/wire"
22
        "github.com/lightninglabs/neutrino/cache"
23
        "github.com/lightninglabs/neutrino/cache/lru"
24
        "github.com/lightningnetwork/lnd/batch"
25
        "github.com/lightningnetwork/lnd/chainntnfs"
26
        "github.com/lightningnetwork/lnd/channeldb"
27
        "github.com/lightningnetwork/lnd/fn/v2"
28
        "github.com/lightningnetwork/lnd/graph"
29
        graphdb "github.com/lightningnetwork/lnd/graph/db"
30
        "github.com/lightningnetwork/lnd/graph/db/models"
31
        "github.com/lightningnetwork/lnd/input"
32
        "github.com/lightningnetwork/lnd/keychain"
33
        "github.com/lightningnetwork/lnd/lnpeer"
34
        "github.com/lightningnetwork/lnd/lnutils"
35
        "github.com/lightningnetwork/lnd/lnwallet"
36
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
37
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
38
        "github.com/lightningnetwork/lnd/lnwire"
39
        "github.com/lightningnetwork/lnd/multimutex"
40
        "github.com/lightningnetwork/lnd/netann"
41
        "github.com/lightningnetwork/lnd/routing/route"
42
        "github.com/lightningnetwork/lnd/ticker"
43
        "golang.org/x/time/rate"
44
)
45

46
const (
47
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
48
        // for a specific channel and direction that we'll accept over an
49
        // interval.
50
        DefaultMaxChannelUpdateBurst = 10
51

52
        // DefaultChannelUpdateInterval is the default interval we'll use to
53
        // determine how often we should allow a new update for a specific
54
        // channel and direction.
55
        DefaultChannelUpdateInterval = time.Minute
56

57
        // maxPrematureUpdates tracks the max amount of premature channel
58
        // updates that we'll hold onto.
59
        maxPrematureUpdates = 100
60

61
        // maxFutureMessages tracks the max amount of future messages that
62
        // we'll hold onto.
63
        maxFutureMessages = 1000
64

65
        // DefaultSubBatchDelay is the default delay we'll use when
66
        // broadcasting the next announcement batch.
67
        DefaultSubBatchDelay = 5 * time.Second
68

69
        // maxRejectedUpdates tracks the max amount of rejected channel updates
70
        // we'll maintain. This is the global size across all peers. We'll
71
        // allocate ~3 MB max to the cache.
72
        maxRejectedUpdates = 10_000
73

74
        // DefaultProofMatureDelta specifies the default value used for
75
        // ProofMatureDelta, which is the number of confirmations needed before
76
        // processing the announcement signatures.
77
        DefaultProofMatureDelta = 6
78
)
79

80
var (
81
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
82
        // is in the process of being shut down.
83
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
84

85
        // ErrGossipSyncerNotFound signals that we were unable to find an active
86
        // gossip syncer corresponding to a gossip query message received from
87
        // the remote peer.
88
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
89

90
        // ErrNoFundingTransaction is returned when we are unable to find the
91
        // funding transaction described by the short channel ID on chain.
92
        ErrNoFundingTransaction = errors.New(
93
                "unable to find the funding transaction",
94
        )
95

96
        // ErrInvalidFundingOutput is returned if the channel funding output
97
        // fails validation.
98
        ErrInvalidFundingOutput = errors.New(
99
                "channel funding output validation failed",
100
        )
101

102
        // ErrChannelSpent is returned when we go to validate a channel, but
103
        // the purported funding output has actually already been spent on
104
        // chain.
105
        ErrChannelSpent = errors.New("channel output has been spent")
106

107
        // emptyPubkey is used to compare compressed pubkeys against an empty
108
        // byte array.
109
        emptyPubkey [33]byte
110
)
111

112
// optionalMsgFields is a set of optional message fields that external callers
113
// can provide that serve useful when processing a specific network
114
// announcement.
115
type optionalMsgFields struct {
116
        capacity      *btcutil.Amount
117
        channelPoint  *wire.OutPoint
118
        remoteAlias   *lnwire.ShortChannelID
119
        tapscriptRoot fn.Option[chainhash.Hash]
120
}
121

122
// apply applies the optional fields within the functional options.
123
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
49✔
124
        for _, optionalMsgField := range optionalMsgFields {
56✔
125
                optionalMsgField(f)
7✔
126
        }
7✔
127
}
128

129
// OptionalMsgField is a functional option parameter that can be used to provide
130
// external information that is not included within a network message but serves
131
// useful when processing it.
132
type OptionalMsgField func(*optionalMsgFields)
133

134
// ChannelCapacity is an optional field that lets the gossiper know of the
135
// capacity of a channel.
136
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
29✔
137
        return func(f *optionalMsgFields) {
32✔
138
                f.capacity = &capacity
3✔
139
        }
3✔
140
}
141

142
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
143
// of a channel.
144
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
32✔
145
        return func(f *optionalMsgFields) {
38✔
146
                f.channelPoint = &op
6✔
147
        }
6✔
148
}
149

150
// TapscriptRoot is an optional field that lets the gossiper know of the root of
151
// the tapscript tree for a custom channel.
152
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
28✔
153
        return func(f *optionalMsgFields) {
30✔
154
                f.tapscriptRoot = root
2✔
155
        }
2✔
156
}
157

158
// RemoteAlias is an optional field that lets the gossiper know that a locally
159
// sent channel update is actually an update for the peer that should replace
160
// the ShortChannelID field with the remote's alias. This is only used for
161
// channels with peers where the option-scid-alias feature bit was negotiated.
162
// The channel update will be added to the graph under the original SCID, but
163
// will be modified and re-signed with this alias.
164
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
28✔
165
        return func(f *optionalMsgFields) {
30✔
166
                f.remoteAlias = alias
2✔
167
        }
2✔
168
}
169

170
// networkMsg couples a routing related wire message with the peer that
171
// originally sent it.
172
type networkMsg struct {
173
        peer              lnpeer.Peer
174
        source            *btcec.PublicKey
175
        msg               lnwire.Message
176
        optionalMsgFields *optionalMsgFields
177

178
        isRemote bool
179

180
        err chan error
181
}
182

183
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
184
// wishes to update a particular set of channels. New ChannelUpdate messages
185
// will be crafted to be sent out during the next broadcast epoch and the fee
186
// updates committed to the lower layer.
187
type chanPolicyUpdateRequest struct {
188
        edgesToUpdate []EdgeWithInfo
189
        errChan       chan error
190
}
191

192
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
193
// syncer at all times.
194
type PinnedSyncers map[route.Vertex]struct{}
195

196
// Config defines the configuration for the service. ALL elements within the
197
// configuration MUST be non-nil for the service to carry out its duties.
198
type Config struct {
199
        // ChainParams holds the chain parameters for the active network this
200
        // node is participating on.
201
        ChainParams *chaincfg.Params
202

203
        // Graph is the subsystem which is responsible for managing the
204
        // topology of lightning network. After incoming channel, node, channel
205
        // updates announcements are validated they are sent to the router in
206
        // order to be included in the LN graph.
207
        Graph graph.ChannelGraphSource
208

209
        // ChainIO represents an abstraction over a source that can query the
210
        // blockchain.
211
        ChainIO lnwallet.BlockChainIO
212

213
        // ChanSeries is an interfaces that provides access to a time series
214
        // view of the current known channel graph. Each GossipSyncer enabled
215
        // peer will utilize this in order to create and respond to channel
216
        // graph time series queries.
217
        ChanSeries ChannelGraphTimeSeries
218

219
        // Notifier is used for receiving notifications of incoming blocks.
220
        // With each new incoming block found we process previously premature
221
        // announcements.
222
        //
223
        // TODO(roasbeef): could possibly just replace this with an epoch
224
        // channel.
225
        Notifier chainntnfs.ChainNotifier
226

227
        // Broadcast broadcasts a particular set of announcements to all peers
228
        // that the daemon is connected to. If supplied, the exclude parameter
229
        // indicates that the target peer should be excluded from the
230
        // broadcast.
231
        Broadcast func(skips map[route.Vertex]struct{},
232
                msg ...lnwire.Message) error
233

234
        // NotifyWhenOnline is a function that allows the gossiper to be
235
        // notified when a certain peer comes online, allowing it to
236
        // retry sending a peer message.
237
        //
238
        // NOTE: The peerChan channel must be buffered.
239
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
240

241
        // NotifyWhenOffline is a function that allows the gossiper to be
242
        // notified when a certain peer disconnects, allowing it to request a
243
        // notification for when it reconnects.
244
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
245

246
        // FetchSelfAnnouncement retrieves our current node announcement, for
247
        // use when determining whether we should update our peers about our
248
        // presence in the network.
249
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement1
250

251
        // UpdateSelfAnnouncement produces a new announcement for our node with
252
        // an updated timestamp which can be broadcast to our peers.
253
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement1, error)
254

255
        // ProofMatureDelta the number of confirmations which is needed before
256
        // exchange the channel announcement proofs.
257
        ProofMatureDelta uint32
258

259
        // TrickleDelay the period of trickle timer which flushes to the
260
        // network the pending batch of new announcements we've received since
261
        // the last trickle tick.
262
        TrickleDelay time.Duration
263

264
        // RetransmitTicker is a ticker that ticks with a period which
265
        // indicates that we should check if we need re-broadcast any of our
266
        // personal channels.
267
        RetransmitTicker ticker.Ticker
268

269
        // RebroadcastInterval is the maximum time we wait between sending out
270
        // channel updates for our active channels and our own node
271
        // announcement. We do this to ensure our active presence on the
272
        // network is known, and we are not being considered a zombie node or
273
        // having zombie channels.
274
        RebroadcastInterval time.Duration
275

276
        // WaitingProofStore is a persistent storage of partial channel proof
277
        // announcement messages. We use it to buffer half of the material
278
        // needed to reconstruct a full authenticated channel announcement.
279
        // Once we receive the other half the channel proof, we'll be able to
280
        // properly validate it and re-broadcast it out to the network.
281
        //
282
        // TODO(wilmer): make interface to prevent channeldb dependency.
283
        WaitingProofStore *channeldb.WaitingProofStore
284

285
        // MessageStore is a persistent storage of gossip messages which we will
286
        // use to determine which messages need to be resent for a given peer.
287
        MessageStore GossipMessageStore
288

289
        // AnnSigner is an instance of the MessageSigner interface which will
290
        // be used to manually sign any outgoing channel updates. The signer
291
        // implementation should be backed by the public key of the backing
292
        // Lightning node.
293
        //
294
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
295
        // here?
296
        AnnSigner lnwallet.MessageSigner
297

298
        // ScidCloser is an instance of ClosedChannelTracker that helps the
299
        // gossiper cut down on spam channel announcements for already closed
300
        // channels.
301
        ScidCloser ClosedChannelTracker
302

303
        // NumActiveSyncers is the number of peers for which we should have
304
        // active syncers with. After reaching NumActiveSyncers, any future
305
        // gossip syncers will be passive.
306
        NumActiveSyncers int
307

308
        // NoTimestampQueries will prevent the GossipSyncer from querying
309
        // timestamps of announcement messages from the peer and from replying
310
        // to timestamp queries.
311
        NoTimestampQueries bool
312

313
        // RotateTicker is a ticker responsible for notifying the SyncManager
314
        // when it should rotate its active syncers. A single active syncer with
315
        // a chansSynced state will be exchanged for a passive syncer in order
316
        // to ensure we don't keep syncing with the same peers.
317
        RotateTicker ticker.Ticker
318

319
        // HistoricalSyncTicker is a ticker responsible for notifying the
320
        // syncManager when it should attempt a historical sync with a gossip
321
        // sync peer.
322
        HistoricalSyncTicker ticker.Ticker
323

324
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
325
        // syncManager when it should attempt to start the next pending
326
        // activeSyncer due to the current one not completing its state machine
327
        // within the timeout.
328
        ActiveSyncerTimeoutTicker ticker.Ticker
329

330
        // MinimumBatchSize is minimum size of a sub batch of announcement
331
        // messages.
332
        MinimumBatchSize int
333

334
        // SubBatchDelay is the delay between sending sub batches of
335
        // gossip messages.
336
        SubBatchDelay time.Duration
337

338
        // IgnoreHistoricalFilters will prevent syncers from replying with
339
        // historical data when the remote peer sets a gossip_timestamp_range.
340
        // This prevents ranges with old start times from causing us to dump the
341
        // graph on connect.
342
        IgnoreHistoricalFilters bool
343

344
        // PinnedSyncers is a set of peers that will always transition to
345
        // ActiveSync upon connection. These peers will never transition to
346
        // PassiveSync.
347
        PinnedSyncers PinnedSyncers
348

349
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
350
        // specific channel and direction that we'll accept over an interval.
351
        MaxChannelUpdateBurst int
352

353
        // ChannelUpdateInterval specifies the interval we'll use to determine
354
        // how often we should allow a new update for a specific channel and
355
        // direction.
356
        ChannelUpdateInterval time.Duration
357

358
        // IsAlias returns true if a given ShortChannelID is an alias for
359
        // option_scid_alias channels.
360
        IsAlias func(scid lnwire.ShortChannelID) bool
361

362
        // SignAliasUpdate is used to re-sign a channel update using the
363
        // remote's alias if the option-scid-alias feature bit was negotiated.
364
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
365
                error)
366

367
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
368
        // This is used for channels that have negotiated the option-scid-alias
369
        // feature bit.
370
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
371
                lnwire.ShortChannelID, error)
372

373
        // GetAlias allows the gossiper to look up the peer's alias for a given
374
        // ChannelID. This is used to sign updates for them if the channel has
375
        // no AuthProof and the option-scid-alias feature bit was negotiated.
376
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
377

378
        // FindChannel allows the gossiper to find a channel that we're party
379
        // to without iterating over the entire set of open channels.
380
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
381
                *channeldb.OpenChannel, error)
382

383
        // IsStillZombieChannel takes the timestamps of the latest channel
384
        // updates for a channel and returns true if the channel should be
385
        // considered a zombie based on these timestamps.
386
        IsStillZombieChannel func(time.Time, time.Time) bool
387

388
        // AssumeChannelValid toggles whether the gossiper will check for
389
        // spent-ness of channel outpoints. For neutrino, this saves long
390
        // rescans from blocking initial usage of the daemon.
391
        AssumeChannelValid bool
392

393
        // MsgRateBytes is the rate limit for the number of bytes per second
394
        // that we'll allocate to outbound gossip messages.
395
        MsgRateBytes uint64
396

397
        // MsgBurstBytes is the allotted burst amount in bytes. This is the
398
        // number of starting tokens in our token bucket algorithm.
399
        MsgBurstBytes uint64
400

401
        // FilterConcurrency is the maximum number of concurrent gossip filter
402
        // applications that can be processed.
403
        FilterConcurrency int
404

405
        // BanThreshold is the score used to decide whether a given peer is
406
        // banned or not.
407
        BanThreshold uint64
408

409
        // PeerMsgRateBytes is the rate limit for the number of bytes per second
410
        // that we'll allocate to outbound gossip messages for a single peer.
411
        PeerMsgRateBytes uint64
412
}
413

414
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
415
// used to let the caller of the lru.Cache know if a message has already been
416
// processed or not.
417
type processedNetworkMsg struct {
418
        processed bool
419
        msg       *networkMsg
420
}
421

422
// cachedNetworkMsg is a wrapper around a network message that can be used with
423
// *lru.Cache.
424
//
425
// NOTE: This struct is not thread safe which means you need to assure no
426
// concurrent read write access to it and all its contents which are pointers
427
// as well.
428
type cachedNetworkMsg struct {
429
        msgs []*processedNetworkMsg
430
}
431

432
// Size returns the "size" of an entry. We return the number of items as we
433
// just want to limit the total amount of entries rather than do accurate size
434
// accounting.
435
func (c *cachedNetworkMsg) Size() (uint64, error) {
4✔
436
        return uint64(len(c.msgs)), nil
4✔
437
}
4✔
438

439
// rejectCacheKey is the cache key that we'll use to track announcements we've
440
// recently rejected.
441
type rejectCacheKey struct {
442
        gossipVersion lnwire.GossipVersion
443
        pubkey        [33]byte
444
        chanID        uint64
445
}
446

447
// newRejectCacheKey returns a new cache key for the reject cache.
448
func newRejectCacheKey(v lnwire.GossipVersion, cid uint64,
449
        pub [33]byte) rejectCacheKey {
476✔
450

476✔
451
        k := rejectCacheKey{
476✔
452
                gossipVersion: v,
476✔
453
                chanID:        cid,
476✔
454
                pubkey:        pub,
476✔
455
        }
476✔
456

476✔
457
        return k
476✔
458
}
476✔
459

460
// sourceToPub returns a serialized-compressed public key for use in the reject
461
// cache.
462
func sourceToPub(pk *btcec.PublicKey) [33]byte {
491✔
463
        var pub [33]byte
491✔
464
        copy(pub[:], pk.SerializeCompressed())
491✔
465
        return pub
491✔
466
}
491✔
467

468
// cachedReject is the empty value used to track the value for rejects.
469
type cachedReject struct {
470
}
471

472
// Size returns the "size" of an entry. We return 1 as we just want to limit
473
// the total size.
474
func (c *cachedReject) Size() (uint64, error) {
206✔
475
        return 1, nil
206✔
476
}
206✔
477

478
// AuthenticatedGossiper is a subsystem which is responsible for receiving
479
// announcements, validating them and applying the changes to router, syncing
480
// lightning network with newly connected nodes, broadcasting announcements
481
// after validation, negotiating the channel announcement proofs exchange and
482
// handling the premature announcements. All outgoing announcements are
483
// expected to be properly signed as dictated in BOLT#7, additionally, all
484
// incoming message are expected to be well formed and signed. Invalid messages
485
// will be rejected by this struct.
486
type AuthenticatedGossiper struct {
487
        // Parameters which are needed to properly handle the start and stop of
488
        // the service.
489
        started sync.Once
490
        stopped sync.Once
491

492
        // bestHeight is the height of the block at the tip of the main chain
493
        // as we know it. Accesses *MUST* be done with the gossiper's lock
494
        // held.
495
        bestHeight uint32
496

497
        // cfg is a copy of the configuration struct that the gossiper service
498
        // was initialized with.
499
        cfg *Config
500

501
        // blockEpochs encapsulates a stream of block epochs that are sent at
502
        // every new block height.
503
        blockEpochs *chainntnfs.BlockEpochEvent
504

505
        // prematureChannelUpdates is a map of ChannelUpdates we have received
506
        // that wasn't associated with any channel we know about.  We store
507
        // them temporarily, such that we can reprocess them when a
508
        // ChannelAnnouncement for the channel is received.
509
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
510

511
        // banman tracks our peer's ban status.
512
        banman *banman
513

514
        // networkMsgs is a channel that carries new network broadcasted
515
        // message from outside the gossiper service to be processed by the
516
        // networkHandler.
517
        networkMsgs chan *networkMsg
518

519
        // futureMsgs is a list of premature network messages that have a block
520
        // height specified in the future. We will save them and resend it to
521
        // the chan networkMsgs once the block height has reached. The cached
522
        // map format is,
523
        //   {msgID1: msg1, msgID2: msg2, ...}
524
        futureMsgs *futureMsgCache
525

526
        // chanPolicyUpdates is a channel that requests to update the
527
        // forwarding policy of a set of channels is sent over.
528
        chanPolicyUpdates chan *chanPolicyUpdateRequest
529

530
        // selfKey is the identity public key of the backing Lightning node.
531
        selfKey *btcec.PublicKey
532

533
        // selfKeyLoc is the locator for the identity public key of the backing
534
        // Lightning node.
535
        selfKeyLoc keychain.KeyLocator
536

537
        // channelMtx is used to restrict the database access to one
538
        // goroutine per channel ID. This is done to ensure that when
539
        // the gossiper is handling an announcement, the db state stays
540
        // consistent between when the DB is first read until it's written.
541
        channelMtx *multimutex.Mutex[uint64]
542

543
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
544

545
        // syncMgr is a subsystem responsible for managing the gossip syncers
546
        // for peers currently connected. When a new peer is connected, the
547
        // manager will create its accompanying gossip syncer and determine
548
        // whether it should have an activeSync or passiveSync sync type based
549
        // on how many other gossip syncers are currently active. Any activeSync
550
        // gossip syncers are started in a round-robin manner to ensure we're
551
        // not syncing with multiple peers at the same time.
552
        syncMgr *SyncManager
553

554
        // reliableSender is a subsystem responsible for handling reliable
555
        // message send requests to peers. This should only be used for channels
556
        // that are unadvertised at the time of handling the message since if it
557
        // is advertised, then peers should be able to get the message from the
558
        // network.
559
        reliableSender *reliableSender
560

561
        // chanUpdateRateLimiter contains rate limiters for each direction of
562
        // a channel update we've processed. We'll use these to determine
563
        // whether we should accept a new update for a specific channel and
564
        // direction.
565
        //
566
        // NOTE: This map must be synchronized with the main
567
        // AuthenticatedGossiper lock.
568
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
569

570
        // vb is used to enforce job dependency ordering of gossip messages.
571
        vb *ValidationBarrier
572

573
        sync.Mutex
574

575
        cancel fn.Option[context.CancelFunc]
576
        quit   chan struct{}
577
        wg     sync.WaitGroup
578
}
579

580
// New creates a new AuthenticatedGossiper instance, initialized with the
581
// passed configuration parameters.
582
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
40✔
583
        gossiper := &AuthenticatedGossiper{
40✔
584
                selfKey:           selfKeyDesc.PubKey,
40✔
585
                selfKeyLoc:        selfKeyDesc.KeyLocator,
40✔
586
                cfg:               &cfg,
40✔
587
                networkMsgs:       make(chan *networkMsg),
40✔
588
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
40✔
589
                quit:              make(chan struct{}),
40✔
590
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
40✔
591
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
40✔
592
                        maxPrematureUpdates,
40✔
593
                ),
40✔
594
                channelMtx: multimutex.NewMutex[uint64](),
40✔
595
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
40✔
596
                        maxRejectedUpdates,
40✔
597
                ),
40✔
598
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
40✔
599
                banman:                newBanman(cfg.BanThreshold),
40✔
600
        }
40✔
601

40✔
602
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
40✔
603

40✔
604
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
40✔
605
                ChainHash:                *cfg.ChainParams.GenesisHash,
40✔
606
                ChanSeries:               cfg.ChanSeries,
40✔
607
                RotateTicker:             cfg.RotateTicker,
40✔
608
                HistoricalSyncTicker:     cfg.HistoricalSyncTicker,
40✔
609
                NumActiveSyncers:         cfg.NumActiveSyncers,
40✔
610
                NoTimestampQueries:       cfg.NoTimestampQueries,
40✔
611
                IgnoreHistoricalFilters:  cfg.IgnoreHistoricalFilters,
40✔
612
                BestHeight:               gossiper.latestHeight,
40✔
613
                PinnedSyncers:            cfg.PinnedSyncers,
40✔
614
                IsStillZombieChannel:     cfg.IsStillZombieChannel,
40✔
615
                AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
40✔
616
                AllotedMsgBytesBurst:     cfg.MsgBurstBytes,
40✔
617
                FilterConcurrency:        cfg.FilterConcurrency,
40✔
618
                PeerMsgBytesPerSecond:    cfg.PeerMsgRateBytes,
40✔
619
        })
40✔
620

40✔
621
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
40✔
622
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
40✔
623
                NotifyWhenOffline: cfg.NotifyWhenOffline,
40✔
624
                MessageStore:      cfg.MessageStore,
40✔
625
                IsMsgStale:        gossiper.isMsgStale,
40✔
626
        })
40✔
627

40✔
628
        return gossiper
40✔
629
}
40✔
630

631
// EdgeWithInfo contains the information that is required to update an edge.
632
type EdgeWithInfo struct {
633
        // Info describes the channel.
634
        Info *models.ChannelEdgeInfo
635

636
        // Edge describes the policy in one direction of the channel.
637
        Edge *models.ChannelEdgePolicy
638
}
639

640
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
641
// specified edge updates. Updates are done in two stages: first, the
642
// AuthenticatedGossiper ensures the update has been committed by dependent
643
// sub-systems, then it signs and broadcasts new updates to the network. A
644
// mapping between outpoints and updated channel policies is returned, which is
645
// used to update the forwarding policies of the underlying links.
646
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
647
        edgesToUpdate []EdgeWithInfo) error {
3✔
648

3✔
649
        errChan := make(chan error, 1)
3✔
650
        policyUpdate := &chanPolicyUpdateRequest{
3✔
651
                edgesToUpdate: edgesToUpdate,
3✔
652
                errChan:       errChan,
3✔
653
        }
3✔
654

3✔
655
        select {
3✔
656
        case d.chanPolicyUpdates <- policyUpdate:
3✔
657
                err := <-errChan
3✔
658
                return err
3✔
UNCOV
659
        case <-d.quit:
×
UNCOV
660
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
661
        }
662
}
663

664
// Start spawns network messages handler goroutine and registers on new block
665
// notifications in order to properly handle the premature announcements.
666
func (d *AuthenticatedGossiper) Start() error {
40✔
667
        var err error
40✔
668
        d.started.Do(func() {
80✔
669
                ctx, cancel := context.WithCancel(context.Background())
40✔
670
                d.cancel = fn.Some(cancel)
40✔
671

40✔
672
                log.Info("Authenticated Gossiper starting")
40✔
673
                err = d.start(ctx)
40✔
674
        })
40✔
675
        return err
40✔
676
}
677

678
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
40✔
679
        // First we register for new notifications of newly discovered blocks.
40✔
680
        // We do this immediately so we'll later be able to consume any/all
40✔
681
        // blocks which were discovered.
40✔
682
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
40✔
683
        if err != nil {
40✔
UNCOV
684
                return err
×
UNCOV
685
        }
×
686
        d.blockEpochs = blockEpochs
40✔
687

40✔
688
        height, err := d.cfg.Graph.CurrentBlockHeight()
40✔
689
        if err != nil {
40✔
UNCOV
690
                return err
×
UNCOV
691
        }
×
692
        d.bestHeight = height
40✔
693

40✔
694
        // Start the reliable sender. In case we had any pending messages ready
40✔
695
        // to be sent when the gossiper was last shut down, we must continue on
40✔
696
        // our quest to deliver them to their respective peers.
40✔
697
        if err := d.reliableSender.Start(); err != nil {
40✔
UNCOV
698
                return err
×
UNCOV
699
        }
×
700

701
        d.syncMgr.Start()
40✔
702

40✔
703
        d.banman.start()
40✔
704

40✔
705
        // Start receiving blocks in its dedicated goroutine.
40✔
706
        d.wg.Add(2)
40✔
707
        go d.syncBlockHeight()
40✔
708
        go d.networkHandler(ctx)
40✔
709

40✔
710
        return nil
40✔
711
}
712

713
// syncBlockHeight syncs the best block height for the gossiper by reading
714
// blockEpochs.
715
//
716
// NOTE: must be run as a goroutine.
717
func (d *AuthenticatedGossiper) syncBlockHeight() {
40✔
718
        defer d.wg.Done()
40✔
719

40✔
720
        for {
80✔
721
                select {
40✔
722
                // A new block has arrived, so we can re-process the previously
723
                // premature announcements.
724
                case newBlock, ok := <-d.blockEpochs.Epochs:
2✔
725
                        // If the channel has been closed, then this indicates
2✔
726
                        // the daemon is shutting down, so we exit ourselves.
2✔
727
                        if !ok {
4✔
728
                                return
2✔
729
                        }
2✔
730

731
                        // Once a new block arrives, we update our running
732
                        // track of the height of the chain tip.
733
                        d.Lock()
2✔
734
                        blockHeight := uint32(newBlock.Height)
2✔
735
                        d.bestHeight = blockHeight
2✔
736
                        d.Unlock()
2✔
737

2✔
738
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
2✔
739
                                newBlock.Hash)
2✔
740

2✔
741
                        // Resend future messages, if any.
2✔
742
                        d.resendFutureMessages(blockHeight)
2✔
743

744
                case <-d.quit:
38✔
745
                        return
38✔
746
                }
747
        }
748
}
749

750
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
751
// the unique ID when saving the message.
752
type futureMsgCache struct {
753
        *lru.Cache[uint64, *cachedFutureMsg]
754

755
        // msgID is a monotonically increased integer.
756
        msgID atomic.Uint64
757
}
758

759
// nextMsgID returns a unique message ID.
760
func (f *futureMsgCache) nextMsgID() uint64 {
5✔
761
        return f.msgID.Add(1)
5✔
762
}
5✔
763

764
// newFutureMsgCache creates a new future message cache with the underlying lru
765
// cache being initialized with the specified capacity.
766
func newFutureMsgCache(capacity uint64) *futureMsgCache {
41✔
767
        // Create a new cache.
41✔
768
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
41✔
769

41✔
770
        return &futureMsgCache{
41✔
771
                Cache: cache,
41✔
772
        }
41✔
773
}
41✔
774

775
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
776
type cachedFutureMsg struct {
777
        // msg is the network message.
778
        msg *networkMsg
779

780
        // height is the block height.
781
        height uint32
782
}
783

784
// Size returns the size of the message.
785
func (c *cachedFutureMsg) Size() (uint64, error) {
6✔
786
        // Return a constant 1.
6✔
787
        return 1, nil
6✔
788
}
6✔
789

790
// resendFutureMessages takes a block height, resends all the future messages
791
// found below and equal to that height and deletes those messages found in the
792
// gossiper's futureMsgs.
793
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
2✔
794
        var (
2✔
795
                // msgs are the target messages.
2✔
796
                msgs []*networkMsg
2✔
797

2✔
798
                // keys are the target messages' caching keys.
2✔
799
                keys []uint64
2✔
800
        )
2✔
801

2✔
802
        // filterMsgs is the visitor used when iterating the future cache.
2✔
803
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
4✔
804
                if cmsg.height <= height {
4✔
805
                        msgs = append(msgs, cmsg.msg)
2✔
806
                        keys = append(keys, k)
2✔
807
                }
2✔
808

809
                return true
2✔
810
        }
811

812
        // Filter out the target messages.
813
        d.futureMsgs.Range(filterMsgs)
2✔
814

2✔
815
        // Return early if no messages found.
2✔
816
        if len(msgs) == 0 {
4✔
817
                return
2✔
818
        }
2✔
819

820
        // Remove the filtered messages.
821
        for _, key := range keys {
4✔
822
                d.futureMsgs.Delete(key)
2✔
823
        }
2✔
824

825
        log.Debugf("Resending %d network messages at height %d",
2✔
826
                len(msgs), height)
2✔
827

2✔
828
        for _, msg := range msgs {
4✔
829
                select {
2✔
830
                case d.networkMsgs <- msg:
2✔
UNCOV
831
                case <-d.quit:
×
UNCOV
832
                        msg.err <- ErrGossiperShuttingDown
×
833
                }
834
        }
835
}
836

837
// Stop signals any active goroutines for a graceful closure.
838
func (d *AuthenticatedGossiper) Stop() error {
41✔
839
        d.stopped.Do(func() {
81✔
840
                log.Info("Authenticated gossiper shutting down...")
40✔
841
                defer log.Debug("Authenticated gossiper shutdown complete")
40✔
842

40✔
843
                d.stop()
40✔
844
        })
40✔
845
        return nil
41✔
846
}
847

848
func (d *AuthenticatedGossiper) stop() {
40✔
849
        log.Debug("Authenticated Gossiper is stopping")
40✔
850
        defer log.Debug("Authenticated Gossiper stopped")
40✔
851

40✔
852
        // `blockEpochs` is only initialized in the start routine so we make
40✔
853
        // sure we don't panic here in the case where the `Stop` method is
40✔
854
        // called when the `Start` method does not complete.
40✔
855
        if d.blockEpochs != nil {
80✔
856
                d.blockEpochs.Cancel()
40✔
857
        }
40✔
858

859
        d.syncMgr.Stop()
40✔
860

40✔
861
        d.banman.stop()
40✔
862

40✔
863
        d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
80✔
864
        close(d.quit)
40✔
865
        d.wg.Wait()
40✔
866

40✔
867
        // We'll stop our reliable sender after all of the gossiper's goroutines
40✔
868
        // have exited to ensure nothing can cause it to continue executing.
40✔
869
        d.reliableSender.Stop()
40✔
870
}
871

872
// TODO(roasbeef): need method to get current gossip timestamp?
873
//  * using mtx, check time rotate forward is needed?
874

875
// ProcessRemoteAnnouncement sends a new remote announcement message along with
876
// the peer that sent the routing message. The announcement will be processed
877
// then added to a queue for batched trickled announcement to all connected
878
// peers.  Remote channel announcements should contain the announcement proof
879
// and be fully validated.
880
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
881
        msg lnwire.Message, peer lnpeer.Peer) chan error {
296✔
882

296✔
883
        errChan := make(chan error, 1)
296✔
884

296✔
885
        // For messages in the known set of channel series queries, we'll
296✔
886
        // dispatch the message directly to the GossipSyncer, and skip the main
296✔
887
        // processing loop.
296✔
888
        switch m := msg.(type) {
296✔
889
        case *lnwire.QueryShortChanIDs,
890
                *lnwire.QueryChannelRange,
891
                *lnwire.ReplyChannelRange,
892
                *lnwire.ReplyShortChanIDsEnd:
2✔
893

2✔
894
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
2✔
895
                if !ok {
2✔
896
                        log.Warnf("Gossip syncer for peer=%x not found",
×
897
                                peer.PubKey())
×
898

×
899
                        errChan <- ErrGossipSyncerNotFound
×
UNCOV
900
                        return errChan
×
UNCOV
901
                }
×
902

903
                // If we've found the message target, then we'll dispatch the
904
                // message directly to it.
905
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
2✔
906
                if err != nil {
2✔
907
                        log.Errorf("Process query msg from peer %x got %v",
×
UNCOV
908
                                peer.PubKey(), err)
×
UNCOV
909
                }
×
910

911
                errChan <- err
2✔
912
                return errChan
2✔
913

914
        // If a peer is updating its current update horizon, then we'll dispatch
915
        // that directly to the proper GossipSyncer.
916
        case *lnwire.GossipTimestampRange:
2✔
917
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
2✔
918
                if !ok {
2✔
919
                        log.Warnf("Gossip syncer for peer=%x not found",
×
920
                                peer.PubKey())
×
921

×
922
                        errChan <- ErrGossipSyncerNotFound
×
UNCOV
923
                        return errChan
×
UNCOV
924
                }
×
925

926
                // Queue the message for asynchronous processing to prevent
927
                // blocking the gossiper when rate limiting is active.
928
                if !syncer.QueueTimestampRange(m) {
2✔
929
                        log.Warnf("Unable to queue gossip filter for peer=%x: "+
×
930
                                "queue full", peer.PubKey())
×
931

×
932
                        // Return nil to indicate we've handled the message,
×
933
                        // even though it was dropped. This prevents the peer
×
934
                        // from being disconnected.
×
935
                        errChan <- nil
×
UNCOV
936
                        return errChan
×
UNCOV
937
                }
×
938

939
                errChan <- nil
2✔
940
                return errChan
2✔
941

942
        // To avoid inserting edges in the graph for our own channels that we
943
        // have already closed, we ignore such channel announcements coming
944
        // from the remote.
945
        case *lnwire.ChannelAnnouncement1:
223✔
946
                ownKey := d.selfKey.SerializeCompressed()
223✔
947
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
223✔
948
                        "for own channel")
223✔
949

223✔
950
                if bytes.Equal(m.NodeID1[:], ownKey) ||
223✔
951
                        bytes.Equal(m.NodeID2[:], ownKey) {
227✔
952

4✔
953
                        log.Warn(ownErr)
4✔
954
                        errChan <- ownErr
4✔
955
                        return errChan
4✔
956
                }
4✔
957
        }
958

959
        nMsg := &networkMsg{
294✔
960
                msg:      msg,
294✔
961
                isRemote: true,
294✔
962
                peer:     peer,
294✔
963
                source:   peer.IdentityKey(),
294✔
964
                err:      errChan,
294✔
965
        }
294✔
966

294✔
967
        select {
294✔
968
        case d.networkMsgs <- nMsg:
294✔
969

970
        // If the peer that sent us this error is quitting, then we don't need
971
        // to send back an error and can return immediately.
972
        // TODO(elle): the peer should now just rely on canceling the passed
973
        //  context.
974
        case <-peer.QuitSignal():
×
975
                return nil
×
976
        case <-ctx.Done():
×
977
                return nil
×
UNCOV
978
        case <-d.quit:
×
UNCOV
979
                nMsg.err <- ErrGossiperShuttingDown
×
980
        }
981

982
        return nMsg.err
294✔
983
}
984

985
// ProcessLocalAnnouncement sends a new remote announcement message along with
986
// the peer that sent the routing message. The announcement will be processed
987
// then added to a queue for batched trickled announcement to all connected
988
// peers.  Local channel announcements don't contain the announcement proof and
989
// will not be fully validated. Once the channel proofs are received, the
990
// entire channel announcement and update messages will be re-constructed and
991
// broadcast to the rest of the network.
992
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
993
        optionalFields ...OptionalMsgField) chan error {
49✔
994

49✔
995
        optionalMsgFields := &optionalMsgFields{}
49✔
996
        optionalMsgFields.apply(optionalFields...)
49✔
997

49✔
998
        nMsg := &networkMsg{
49✔
999
                msg:               msg,
49✔
1000
                optionalMsgFields: optionalMsgFields,
49✔
1001
                isRemote:          false,
49✔
1002
                source:            d.selfKey,
49✔
1003
                err:               make(chan error, 1),
49✔
1004
        }
49✔
1005

49✔
1006
        select {
49✔
1007
        case d.networkMsgs <- nMsg:
49✔
UNCOV
1008
        case <-d.quit:
×
UNCOV
1009
                nMsg.err <- ErrGossiperShuttingDown
×
1010
        }
1011

1012
        return nMsg.err
49✔
1013
}
1014

1015
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
1016
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
1017
// tuple.
1018
type channelUpdateID struct {
1019
        // channelID represents the set of data which is needed to
1020
        // retrieve all necessary data to validate the channel existence.
1021
        channelID lnwire.ShortChannelID
1022

1023
        // Flags least-significant bit must be set to 0 if the creating node
1024
        // corresponds to the first node in the previously sent channel
1025
        // announcement and 1 otherwise.
1026
        flags lnwire.ChanUpdateChanFlags
1027
}
1028

1029
// msgWithSenders is a wrapper struct around a message, and the set of peers
1030
// that originally sent us this message. Using this struct, we can ensure that
1031
// we don't re-send a message to the peer that sent it to us in the first
1032
// place.
1033
type msgWithSenders struct {
1034
        // msg is the wire message itself.
1035
        msg lnwire.Message
1036

1037
        // isLocal is true if this was a message that originated locally. We'll
1038
        // use this to bypass our normal checks to ensure we prioritize sending
1039
        // out our own updates.
1040
        isLocal bool
1041

1042
        // sender is the set of peers that sent us this message.
1043
        senders map[route.Vertex]struct{}
1044
}
1045

1046
// mergeSyncerMap is used to merge the set of senders of a particular message
1047
// with peers that we have an active GossipSyncer with. We do this to ensure
1048
// that we don't broadcast messages to any peers that we have active gossip
1049
// syncers for.
1050
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
31✔
1051
        for peerPub := range syncers {
33✔
1052
                m.senders[peerPub] = struct{}{}
2✔
1053
        }
2✔
1054
}
1055

1056
// deDupedAnnouncements de-duplicates announcements that have been added to the
1057
// batch. Internally, announcements are stored in three maps
1058
// (one each for channel announcements, channel updates, and node
1059
// announcements). These maps keep track of unique announcements and ensure no
1060
// announcements are duplicated. We keep the three message types separate, such
1061
// that we can send channel announcements first, then channel updates, and
1062
// finally node announcements when it's time to broadcast them.
1063
type deDupedAnnouncements struct {
1064
        // channelAnnouncements are identified by the short channel id field.
1065
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1066

1067
        // channelUpdates are identified by the channel update id field.
1068
        channelUpdates map[channelUpdateID]msgWithSenders
1069

1070
        // nodeAnnouncements are identified by the Vertex field.
1071
        nodeAnnouncements map[route.Vertex]msgWithSenders
1072

1073
        sync.Mutex
1074
}
1075

1076
// Reset operates on deDupedAnnouncements to reset the storage of
1077
// announcements.
1078
func (d *deDupedAnnouncements) Reset() {
42✔
1079
        d.Lock()
42✔
1080
        defer d.Unlock()
42✔
1081

42✔
1082
        d.reset()
42✔
1083
}
42✔
1084

1085
// reset is the private version of the Reset method. We have this so we can
1086
// call this method within method that are already holding the lock.
1087
func (d *deDupedAnnouncements) reset() {
337✔
1088
        // Storage of each type of announcement (channel announcements, channel
337✔
1089
        // updates, node announcements) is set to an empty map where the
337✔
1090
        // appropriate key points to the corresponding lnwire.Message.
337✔
1091
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
337✔
1092
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
337✔
1093
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
337✔
1094
}
337✔
1095

1096
// addMsg adds a new message to the current batch. If the message is already
1097
// present in the current batch, then this new instance replaces the latter,
1098
// and the set of senders is updated to reflect which node sent us this
1099
// message.
1100
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
94✔
1101
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
94✔
1102

94✔
1103
        // Depending on the message type (channel announcement, channel update,
94✔
1104
        // or node announcement), the message is added to the corresponding map
94✔
1105
        // in deDupedAnnouncements. Because each identifying key can have at
94✔
1106
        // most one value, the announcements are de-duplicated, with newer ones
94✔
1107
        // replacing older ones.
94✔
1108
        switch msg := message.msg.(type) {
94✔
1109

1110
        // Channel announcements are identified by the short channel id field.
1111
        case *lnwire.ChannelAnnouncement1:
26✔
1112
                deDupKey := msg.ShortChannelID
26✔
1113
                sender := route.NewVertex(message.source)
26✔
1114

26✔
1115
                mws, ok := d.channelAnnouncements[deDupKey]
26✔
1116
                if !ok {
51✔
1117
                        mws = msgWithSenders{
25✔
1118
                                msg:     msg,
25✔
1119
                                isLocal: !message.isRemote,
25✔
1120
                                senders: make(map[route.Vertex]struct{}),
25✔
1121
                        }
25✔
1122
                        mws.senders[sender] = struct{}{}
25✔
1123

25✔
1124
                        d.channelAnnouncements[deDupKey] = mws
25✔
1125

25✔
1126
                        return
25✔
1127
                }
25✔
1128

1129
                mws.msg = msg
1✔
1130
                mws.senders[sender] = struct{}{}
1✔
1131
                d.channelAnnouncements[deDupKey] = mws
1✔
1132

1133
        // Channel updates are identified by the (short channel id,
1134
        // channelflags) tuple.
1135
        case *lnwire.ChannelUpdate1:
48✔
1136
                sender := route.NewVertex(message.source)
48✔
1137
                deDupKey := channelUpdateID{
48✔
1138
                        msg.ShortChannelID,
48✔
1139
                        msg.ChannelFlags,
48✔
1140
                }
48✔
1141

48✔
1142
                oldTimestamp := uint32(0)
48✔
1143
                mws, ok := d.channelUpdates[deDupKey]
48✔
1144
                if ok {
51✔
1145
                        // If we already have seen this message, record its
3✔
1146
                        // timestamp.
3✔
1147
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1148
                        if !ok {
3✔
1149
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1150
                                        "got: %T", mws.msg)
×
1151

×
UNCOV
1152
                                return
×
UNCOV
1153
                        }
×
1154

1155
                        oldTimestamp = update.Timestamp
3✔
1156
                }
1157

1158
                // If we already had this message with a strictly newer
1159
                // timestamp, then we'll just discard the message we got.
1160
                if oldTimestamp > msg.Timestamp {
49✔
1161
                        log.Debugf("Ignored outdated network message: "+
1✔
1162
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1163
                        return
1✔
1164
                }
1✔
1165

1166
                // If the message we just got is newer than what we previously
1167
                // have seen, or this is the first time we see it, then we'll
1168
                // add it to our map of announcements.
1169
                if oldTimestamp < msg.Timestamp {
93✔
1170
                        mws = msgWithSenders{
46✔
1171
                                msg:     msg,
46✔
1172
                                isLocal: !message.isRemote,
46✔
1173
                                senders: make(map[route.Vertex]struct{}),
46✔
1174
                        }
46✔
1175

46✔
1176
                        // We'll mark the sender of the message in the
46✔
1177
                        // senders map.
46✔
1178
                        mws.senders[sender] = struct{}{}
46✔
1179

46✔
1180
                        d.channelUpdates[deDupKey] = mws
46✔
1181

46✔
1182
                        return
46✔
1183
                }
46✔
1184

1185
                // Lastly, if we had seen this exact message from before, with
1186
                // the same timestamp, we'll add the sender to the map of
1187
                // senders, such that we can skip sending this message back in
1188
                // the next batch.
1189
                mws.msg = msg
1✔
1190
                mws.senders[sender] = struct{}{}
1✔
1191
                d.channelUpdates[deDupKey] = mws
1✔
1192

1193
        // Node announcements are identified by the Vertex field.  Use the
1194
        // NodeID to create the corresponding Vertex.
1195
        case *lnwire.NodeAnnouncement1:
24✔
1196
                sender := route.NewVertex(message.source)
24✔
1197
                deDupKey := route.Vertex(msg.NodeID)
24✔
1198

24✔
1199
                // We do the same for node announcements as we did for channel
24✔
1200
                // updates, as they also carry a timestamp.
24✔
1201
                oldTimestamp := uint32(0)
24✔
1202
                mws, ok := d.nodeAnnouncements[deDupKey]
24✔
1203
                if ok {
31✔
1204
                        ann, _ := mws.msg.(*lnwire.NodeAnnouncement1)
7✔
1205
                        oldTimestamp = ann.Timestamp
7✔
1206
                }
7✔
1207

1208
                // Discard the message if it's old.
1209
                if oldTimestamp > msg.Timestamp {
26✔
1210
                        return
2✔
1211
                }
2✔
1212

1213
                // Replace if it's newer.
1214
                if oldTimestamp < msg.Timestamp {
44✔
1215
                        mws = msgWithSenders{
20✔
1216
                                msg:     msg,
20✔
1217
                                isLocal: !message.isRemote,
20✔
1218
                                senders: make(map[route.Vertex]struct{}),
20✔
1219
                        }
20✔
1220

20✔
1221
                        mws.senders[sender] = struct{}{}
20✔
1222

20✔
1223
                        d.nodeAnnouncements[deDupKey] = mws
20✔
1224

20✔
1225
                        return
20✔
1226
                }
20✔
1227

1228
                // Add to senders map if it's the same as we had.
1229
                mws.msg = msg
6✔
1230
                mws.senders[sender] = struct{}{}
6✔
1231
                d.nodeAnnouncements[deDupKey] = mws
6✔
1232
        }
1233
}
1234

1235
// AddMsgs is a helper method to add multiple messages to the announcement
1236
// batch.
1237
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
62✔
1238
        d.Lock()
62✔
1239
        defer d.Unlock()
62✔
1240

62✔
1241
        for _, msg := range msgs {
156✔
1242
                d.addMsg(msg)
94✔
1243
        }
94✔
1244
}
1245

1246
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1247
// to broadcast next into messages that are locally sourced and those that are
1248
// sourced remotely.
1249
type msgsToBroadcast struct {
1250
        // localMsgs is the set of messages we created locally.
1251
        localMsgs []msgWithSenders
1252

1253
        // remoteMsgs is the set of messages that we received from a remote
1254
        // party.
1255
        remoteMsgs []msgWithSenders
1256
}
1257

1258
// addMsg adds a new message to the appropriate sub-slice.
1259
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
78✔
1260
        if msg.isLocal {
127✔
1261
                m.localMsgs = append(m.localMsgs, msg)
49✔
1262
        } else {
80✔
1263
                m.remoteMsgs = append(m.remoteMsgs, msg)
31✔
1264
        }
31✔
1265
}
1266

1267
// isEmpty returns true if the batch is empty.
1268
func (m *msgsToBroadcast) isEmpty() bool {
296✔
1269
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
296✔
1270
}
296✔
1271

1272
// length returns the length of the combined message set.
1273
func (m *msgsToBroadcast) length() int {
1✔
1274
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1275
}
1✔
1276

1277
// Emit returns the set of de-duplicated announcements to be sent out during
1278
// the next announcement epoch, in the order of channel announcements, channel
1279
// updates, and node announcements. Each message emitted, contains the set of
1280
// peers that sent us the message. This way, we can ensure that we don't waste
1281
// bandwidth by re-sending a message to the peer that sent it to us in the
1282
// first place. Additionally, the set of stored messages are reset.
1283
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
297✔
1284
        d.Lock()
297✔
1285
        defer d.Unlock()
297✔
1286

297✔
1287
        // Get the total number of announcements.
297✔
1288
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
297✔
1289
                len(d.nodeAnnouncements)
297✔
1290

297✔
1291
        // Create an empty array of lnwire.Messages with a length equal to
297✔
1292
        // the total number of announcements.
297✔
1293
        msgs := msgsToBroadcast{
297✔
1294
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
297✔
1295
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
297✔
1296
        }
297✔
1297

297✔
1298
        // Add the channel announcements to the array first.
297✔
1299
        for _, message := range d.channelAnnouncements {
318✔
1300
                msgs.addMsg(message)
21✔
1301
        }
21✔
1302

1303
        // Then add the channel updates.
1304
        for _, message := range d.channelUpdates {
339✔
1305
                msgs.addMsg(message)
42✔
1306
        }
42✔
1307

1308
        // Finally add the node announcements.
1309
        for _, message := range d.nodeAnnouncements {
316✔
1310
                msgs.addMsg(message)
19✔
1311
        }
19✔
1312

1313
        d.reset()
297✔
1314

297✔
1315
        // Return the array of lnwire.messages.
297✔
1316
        return msgs
297✔
1317
}
1318

1319
// calculateSubBatchSize is a helper function that calculates the size to break
1320
// down the batchSize into.
1321
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1322
        minimumBatchSize, batchSize int) int {
15✔
1323
        if subBatchDelay > totalDelay {
17✔
1324
                return batchSize
2✔
1325
        }
2✔
1326

1327
        subBatchSize := (batchSize*int(subBatchDelay) +
13✔
1328
                int(totalDelay) - 1) / int(totalDelay)
13✔
1329

13✔
1330
        if subBatchSize < minimumBatchSize {
16✔
1331
                return minimumBatchSize
3✔
1332
        }
3✔
1333

1334
        return subBatchSize
10✔
1335
}
1336

1337
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1338
// this variable so the function can be mocked in our test.
1339
var batchSizeCalculator = calculateSubBatchSize
1340

1341
// splitAnnouncementBatches takes an exiting list of announcements and
1342
// decomposes it into sub batches controlled by the `subBatchSize`.
1343
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1344
        announcementBatch []msgWithSenders) [][]msgWithSenders {
77✔
1345

77✔
1346
        subBatchSize := batchSizeCalculator(
77✔
1347
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
77✔
1348
                d.cfg.MinimumBatchSize, len(announcementBatch),
77✔
1349
        )
77✔
1350

77✔
1351
        var splitAnnouncementBatch [][]msgWithSenders
77✔
1352

77✔
1353
        for subBatchSize < len(announcementBatch) {
200✔
1354
                // For slicing with minimal allocation
123✔
1355
                // https://github.com/golang/go/wiki/SliceTricks
123✔
1356
                announcementBatch, splitAnnouncementBatch =
123✔
1357
                        announcementBatch[subBatchSize:],
123✔
1358
                        append(splitAnnouncementBatch,
123✔
1359
                                announcementBatch[0:subBatchSize:subBatchSize])
123✔
1360
        }
123✔
1361
        splitAnnouncementBatch = append(
77✔
1362
                splitAnnouncementBatch, announcementBatch,
77✔
1363
        )
77✔
1364

77✔
1365
        return splitAnnouncementBatch
77✔
1366
}
1367

1368
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1369
// split size, and then sends out all items to the set of target peers. Locally
1370
// generated announcements are always sent before remotely generated
1371
// announcements.
1372
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
1373
        annBatch msgsToBroadcast) {
36✔
1374

36✔
1375
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
36✔
1376
        // duration to delay the sending of next announcement batch.
36✔
1377
        delayNextBatch := func() {
106✔
1378
                select {
70✔
1379
                case <-time.After(d.cfg.SubBatchDelay):
53✔
1380
                case <-d.quit:
17✔
1381
                        return
17✔
1382
                }
1383
        }
1384

1385
        // Fetch the local and remote announcements.
1386
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
36✔
1387
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
36✔
1388

36✔
1389
        d.wg.Add(1)
36✔
1390
        go func() {
72✔
1391
                defer d.wg.Done()
36✔
1392

36✔
1393
                log.Debugf("Broadcasting %v new local announcements in %d "+
36✔
1394
                        "sub batches", len(annBatch.localMsgs),
36✔
1395
                        len(localBatches))
36✔
1396

36✔
1397
                // Send out the local announcements first.
36✔
1398
                for _, annBatch := range localBatches {
72✔
1399
                        d.sendLocalBatch(annBatch)
36✔
1400
                        delayNextBatch()
36✔
1401
                }
36✔
1402

1403
                log.Debugf("Broadcasting %v new remote announcements in %d "+
36✔
1404
                        "sub batches", len(annBatch.remoteMsgs),
36✔
1405
                        len(remoteBatches))
36✔
1406

36✔
1407
                // Now send the remote announcements.
36✔
1408
                for _, annBatch := range remoteBatches {
72✔
1409
                        d.sendRemoteBatch(ctx, annBatch)
36✔
1410
                        delayNextBatch()
36✔
1411
                }
36✔
1412
        }()
1413
}
1414

1415
// sendLocalBatch broadcasts a list of locally generated announcements to our
1416
// peers. For local announcements, we skip the filter and dedup logic and just
1417
// send the announcements out to all our coonnected peers.
1418
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
36✔
1419
        msgsToSend := lnutils.Map(
36✔
1420
                annBatch, func(m msgWithSenders) lnwire.Message {
81✔
1421
                        return m.msg
45✔
1422
                },
45✔
1423
        )
1424

1425
        err := d.cfg.Broadcast(nil, msgsToSend...)
36✔
1426
        if err != nil {
36✔
UNCOV
1427
                log.Errorf("Unable to send local batch announcements: %v", err)
×
UNCOV
1428
        }
×
1429
}
1430

1431
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1432
// peers.
1433
func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context,
1434
        annBatch []msgWithSenders) {
36✔
1435

36✔
1436
        syncerPeers := d.syncMgr.GossipSyncers()
36✔
1437

36✔
1438
        // We'll first attempt to filter out this new message for all peers
36✔
1439
        // that have active gossip syncers active.
36✔
1440
        for pub, syncer := range syncerPeers {
38✔
1441
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
2✔
1442
                syncer.FilterGossipMsgs(ctx, annBatch...)
2✔
1443
        }
2✔
1444

1445
        for _, msgChunk := range annBatch {
67✔
1446
                msgChunk := msgChunk
31✔
1447

31✔
1448
                // With the syncers taken care of, we'll merge the sender map
31✔
1449
                // with the set of syncers, so we don't send out duplicate
31✔
1450
                // messages.
31✔
1451
                msgChunk.mergeSyncerMap(syncerPeers)
31✔
1452

31✔
1453
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
31✔
1454
                if err != nil {
31✔
1455
                        log.Errorf("Unable to send batch "+
×
UNCOV
1456
                                "announcements: %v", err)
×
UNCOV
1457
                        continue
×
1458
                }
1459
        }
1460
}
1461

1462
// networkHandler is the primary goroutine that drives this service. The roles
1463
// of this goroutine includes answering queries related to the state of the
1464
// network, syncing up newly connected peers, and also periodically
1465
// broadcasting our latest topology state to all connected peers.
1466
//
1467
// NOTE: This MUST be run as a goroutine.
1468
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
40✔
1469
        defer d.wg.Done()
40✔
1470

40✔
1471
        // Initialize empty deDupedAnnouncements to store announcement batch.
40✔
1472
        announcements := deDupedAnnouncements{}
40✔
1473
        announcements.Reset()
40✔
1474

40✔
1475
        d.cfg.RetransmitTicker.Resume()
40✔
1476
        defer d.cfg.RetransmitTicker.Stop()
40✔
1477

40✔
1478
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
40✔
1479
        defer trickleTimer.Stop()
40✔
1480

40✔
1481
        // To start, we'll first check to see if there are any stale channel or
40✔
1482
        // node announcements that we need to re-transmit.
40✔
1483
        if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
40✔
UNCOV
1484
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
UNCOV
1485
        }
×
1486

1487
        for {
717✔
1488
                select {
677✔
1489
                // A new policy update has arrived. We'll commit it to the
1490
                // sub-systems below us, then craft, sign, and broadcast a new
1491
                // ChannelUpdate for the set of affected clients.
1492
                case policyUpdate := <-d.chanPolicyUpdates:
3✔
1493
                        log.Tracef("Received channel %d policy update requests",
3✔
1494
                                len(policyUpdate.edgesToUpdate))
3✔
1495

3✔
1496
                        // First, we'll now create new fully signed updates for
3✔
1497
                        // the affected channels and also update the underlying
3✔
1498
                        // graph with the new state.
3✔
1499
                        newChanUpdates, err := d.processChanPolicyUpdate(
3✔
1500
                                ctx, policyUpdate.edgesToUpdate,
3✔
1501
                        )
3✔
1502
                        policyUpdate.errChan <- err
3✔
1503
                        if err != nil {
3✔
1504
                                log.Errorf("Unable to craft policy updates: %v",
×
UNCOV
1505
                                        err)
×
UNCOV
1506
                                continue
×
1507
                        }
1508

1509
                        // Finally, with the updates committed, we'll now add
1510
                        // them to the announcement batch to be flushed at the
1511
                        // start of the next epoch.
1512
                        announcements.AddMsgs(newChanUpdates...)
3✔
1513

1514
                case announcement := <-d.networkMsgs:
343✔
1515
                        log.Tracef("Received network message: "+
343✔
1516
                                "peer=%v, msg=%s, is_remote=%v",
343✔
1517
                                announcement.peer, announcement.msg.MsgType(),
343✔
1518
                                announcement.isRemote)
343✔
1519

343✔
1520
                        switch announcement.msg.(type) {
343✔
1521
                        // Channel announcement signatures are amongst the only
1522
                        // messages that we'll process serially.
1523
                        case *lnwire.AnnounceSignatures1:
23✔
1524
                                // Process in an anonymous function so we can
23✔
1525
                                // recover from any panics without crashing the
23✔
1526
                                // main networkHandler goroutine. We pass nil
23✔
1527
                                // for jobID since AnnounceSignatures bypass the
23✔
1528
                                // validation barrier.
23✔
1529
                                func() {
46✔
1530
                                        defer d.finalizeGossipProcessing(
23✔
1531
                                                ctx, "processing",
23✔
1532
                                                announcement, nil,
23✔
1533
                                        )
23✔
1534

23✔
1535
                                        //nolint:ll
23✔
1536
                                        emittedAnnouncements, _ := d.processNetworkAnnouncement(
23✔
1537
                                                ctx, announcement,
23✔
1538
                                        )
23✔
1539
                                        log.Debugf("Processed network "+
23✔
1540
                                                "message %s, returned "+
23✔
1541
                                                "len(announcements)=%v",
23✔
1542
                                                announcement.msg.MsgType(),
23✔
1543
                                                len(emittedAnnouncements))
23✔
1544

23✔
1545
                                        if emittedAnnouncements != nil {
35✔
1546
                                                announcements.AddMsgs(
12✔
1547
                                                        emittedAnnouncements...,
12✔
1548
                                                )
12✔
1549
                                        }
12✔
1550
                                }()
1551
                                continue
23✔
1552
                        }
1553

1554
                        // If this message was recently rejected, then we won't
1555
                        // attempt to re-process it.
1556
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
322✔
1557
                                announcement.msg,
322✔
1558
                                sourceToPub(announcement.source),
322✔
1559
                        ) {
323✔
1560

1✔
1561
                                announcement.err <- fmt.Errorf("recently " +
1✔
1562
                                        "rejected")
1✔
1563
                                continue
1✔
1564
                        }
1565

1566
                        // We'll set up any dependent, and wait until a free
1567
                        // slot for this job opens up, this allow us to not
1568
                        // have thousands of goroutines active.
1569
                        annJobID, err := d.vb.InitJobDependencies(
321✔
1570
                                announcement.msg,
321✔
1571
                        )
321✔
1572
                        if err != nil {
321✔
UNCOV
1573
                                announcement.err <- err
×
UNCOV
1574
                                continue
×
1575
                        }
1576

1577
                        d.wg.Add(1)
321✔
1578
                        go d.handleNetworkMessages(
321✔
1579
                                ctx, announcement, &announcements, annJobID,
321✔
1580
                        )
321✔
1581

1582
                // The trickle timer has ticked, which indicates we should
1583
                // flush to the network the pending batch of new announcements
1584
                // we've received since the last trickle tick.
1585
                case <-trickleTimer.C:
296✔
1586
                        // Emit the current batch of announcements from
296✔
1587
                        // deDupedAnnouncements.
296✔
1588
                        announcementBatch := announcements.Emit()
296✔
1589

296✔
1590
                        // If the current announcements batch is nil, then we
296✔
1591
                        // have no further work here.
296✔
1592
                        if announcementBatch.isEmpty() {
558✔
1593
                                continue
262✔
1594
                        }
1595

1596
                        // At this point, we have the set of local and remote
1597
                        // announcements we want to send out. We'll do the
1598
                        // batching as normal for both, but for local
1599
                        // announcements, we'll blast them out w/o regard for
1600
                        // our peer's policies so we ensure they propagate
1601
                        // properly.
1602
                        d.splitAndSendAnnBatch(ctx, announcementBatch)
36✔
1603

1604
                // The retransmission timer has ticked which indicates that we
1605
                // should check if we need to prune or re-broadcast any of our
1606
                // personal channels or node announcement. This addresses the
1607
                // case of "zombie" channels and channel advertisements that
1608
                // have been dropped, or not properly propagated through the
1609
                // network.
1610
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1611
                        if err := d.retransmitStaleAnns(ctx, tick); err != nil {
1✔
UNCOV
1612
                                log.Errorf("unable to rebroadcast stale "+
×
UNCOV
1613
                                        "announcements: %v", err)
×
UNCOV
1614
                        }
×
1615

1616
                // The gossiper has been signalled to exit, to we exit our
1617
                // main loop so the wait group can be decremented.
1618
                case <-d.quit:
40✔
1619
                        return
40✔
1620
                }
1621
        }
1622
}
1623

1624
// handleNetworkMessages is responsible for waiting for dependencies for a
1625
// given network message and processing the message. Once processed, it will
1626
// signal its dependants and add the new announcements to the announce batch.
1627
//
1628
// NOTE: must be run as a goroutine.
1629
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
1630
        nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
321✔
1631

321✔
1632
        defer d.wg.Done()
321✔
1633
        defer d.finalizeGossipProcessing(ctx, "processing", nMsg, &jobID)
321✔
1634

321✔
1635
        // We should only broadcast this message forward if it originated from
321✔
1636
        // us or it wasn't received as part of our initial historical sync.
321✔
1637
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
321✔
1638

321✔
1639
        // If this message has an existing dependency, then we'll wait until
321✔
1640
        // that has been fully validated before we proceed.
321✔
1641
        err := d.vb.WaitForParents(jobID, nMsg.msg)
321✔
1642
        if err != nil {
321✔
UNCOV
1643
                log.Debugf("Validating network message %s got err: %v",
×
UNCOV
1644
                        nMsg.msg.MsgType(), err)
×
UNCOV
1645

×
UNCOV
1646
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
UNCOV
1647
                        log.Warnf("unexpected error during validation "+
×
UNCOV
1648
                                "barrier shutdown: %v", err)
×
UNCOV
1649
                }
×
UNCOV
1650
                nMsg.err <- err
×
UNCOV
1651

×
1652
                return
×
1653
        }
1654

1655
        // Process the network announcement to determine if this is either a
1656
        // new announcement from our PoV or an edges to a prior vertex/edge we
1657
        // previously proceeded.
1658
        newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
321✔
1659

321✔
1660
        log.Tracef("Processed network message %s, returned "+
321✔
1661
                "len(announcements)=%v, allowDependents=%v",
321✔
1662
                nMsg.msg.MsgType(), len(newAnns), allow)
321✔
1663

321✔
1664
        // If this message had any dependencies, then we can now signal them to
321✔
1665
        // continue.
321✔
1666
        err = d.vb.SignalDependents(nMsg.msg, jobID)
321✔
1667
        if err != nil {
321✔
UNCOV
1668
                // Something is wrong if SignalDependents returns an error.
×
UNCOV
1669
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
UNCOV
1670
                        "JobID=%v", lnutils.SpewLogClosure(nMsg.msg), jobID)
×
UNCOV
1671

×
UNCOV
1672
                nMsg.err <- err
×
UNCOV
1673

×
UNCOV
1674
                return
×
UNCOV
1675
        }
×
1676

1677
        // If the announcement was accepted, then add the emitted announcements
1678
        // to our announce batch to be broadcast once the trickle timer ticks
1679
        // gain.
1680
        if newAnns != nil && shouldBroadcast {
361✔
1681
                // TODO(roasbeef): exclude peer that sent.
40✔
1682
                deDuped.AddMsgs(newAnns...)
40✔
1683
        } else if newAnns != nil {
326✔
1684
                log.Trace("Skipping broadcast of announcements received " +
3✔
1685
                        "during initial graph sync")
3✔
1686
        }
3✔
1687
}
1688

1689
// finalizeGossipProcessing handles cleanup for gossip message processing,
1690
// including job completion and panic recovery. It guards gossip goroutines
1691
// against panics to keep the daemon alive. On panic, it logs the error,
1692
// signals dependents, and reports back to the caller if possible.
1693
//
1694
// NOTE: This function MUST be called via defer to recover from panics.
1695
func (d *AuthenticatedGossiper) finalizeGossipProcessing(logCtx context.Context,
1696
        ctxStr string, nMsg *networkMsg, jobID *JobID) {
348✔
1697

348✔
1698
        // Always complete the job when provided, regardless of panic state.
348✔
1699
        // This ensures job slots are returned even if callers forget or
348✔
1700
        // misordering occurs.
348✔
1701
        if jobID != nil {
673✔
1702
                d.vb.CompleteJob()
325✔
1703
        }
325✔
1704

1705
        r := recover()
348✔
1706
        if r == nil {
690✔
1707
                return
342✔
1708
        }
342✔
1709

1710
        msgType := "unknown"
6✔
1711
        if nMsg != nil && nMsg.msg != nil {
11✔
1712
                msgType = nMsg.msg.MsgType().String()
5✔
1713
        }
5✔
1714

1715
        var peerPub string
6✔
1716
        if nMsg != nil && nMsg.peer != nil {
10✔
1717
                peerPub = route.NewVertex(nMsg.peer.IdentityKey()).String()
4✔
1718
        } else {
6✔
1719
                peerPub = "unknown"
2✔
1720
        }
2✔
1721

1722
        log.ErrorS(logCtx, "Panic during gossip message processing",
6✔
1723
                fmt.Errorf("%v", r),
6✔
1724
                slog.String("context", ctxStr),
6✔
1725
                slog.String("msg_type", msgType),
6✔
1726
                slog.String("peer", peerPub),
6✔
1727
        )
6✔
1728
        // Truncate the stack trace to avoid filling up disk space if an
6✔
1729
        // attacker repeatedly triggers panics.
6✔
1730
        const maxStackSize = 8192
6✔
1731
        stack := debug.Stack()
6✔
1732
        if len(stack) > maxStackSize {
6✔
UNCOV
1733
                stack = stack[:maxStackSize]
×
UNCOV
1734
        }
×
1735
        log.DebugS(logCtx, "Panic stack trace",
6✔
1736
                slog.String("stack", string(stack)),
6✔
1737
        )
6✔
1738

6✔
1739
        // Signal any dependents waiting on this message so they don't block
6✔
1740
        // forever.
6✔
1741
        if nMsg != nil && nMsg.msg != nil && jobID != nil {
10✔
1742
                if err := d.vb.SignalDependents(
4✔
1743
                        nMsg.msg, *jobID,
4✔
1744
                ); err != nil {
4✔
UNCOV
1745
                        log.ErrorS(logCtx, "SignalDependents after panic failed",
×
UNCOV
1746
                                err,
×
UNCOV
1747
                                slog.String("msg_type", nMsg.msg.MsgType().String()),
×
UNCOV
1748
                        )
×
UNCOV
1749
                }
×
1750
        }
1751

1752
        // Send an error back to the caller if possible.
1753
        if nMsg != nil && nMsg.err != nil {
11✔
1754
                select {
5✔
1755
                case nMsg.err <- fmt.Errorf("panic while %s gossip "+
1756
                        "message %s: %v", ctxStr, msgType, r):
4✔
1757
                default:
1✔
1758
                        log.WarnS(logCtx, "Unable to send panic error, "+
1✔
1759
                                "error channel blocked", nil,
1✔
1760
                                slog.String("msg_type", msgType),
1✔
1761
                        )
1✔
1762
                }
1763
        }
1764
}
1765

1766
// TODO(roasbeef): d/c peers that send updates not on our chain
1767

1768
// InitSyncState is called by outside sub-systems when a connection is
1769
// established to a new peer that understands how to perform channel range
1770
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1771
// needed to handle new queries.
1772
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
2✔
1773
        d.syncMgr.InitSyncState(syncPeer)
2✔
1774
}
2✔
1775

1776
// PruneSyncState is called by outside sub-systems once a peer that we were
1777
// previously connected to has been disconnected. In this case we can stop the
1778
// existing GossipSyncer assigned to the peer and free up resources.
1779
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
2✔
1780
        d.syncMgr.PruneSyncState(peer)
2✔
1781
}
2✔
1782

1783
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1784
// false otherwise, This avoids expensive reprocessing of the message.
1785
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1786
        peerPub [33]byte) bool {
285✔
1787

285✔
1788
        // We only cache rejections for gossip messages. So if it is not
285✔
1789
        // a gossip message, we return false.
285✔
1790
        gMsg, ok := msg.(lnwire.GossipMessage)
285✔
1791
        if !ok {
285✔
UNCOV
1792
                return false
×
UNCOV
1793
        }
×
1794

1795
        var scid uint64
285✔
1796
        switch m := gMsg.(type) {
285✔
1797
        case *lnwire.ChannelUpdate1:
51✔
1798
                scid = m.ShortChannelID.ToUint64()
51✔
1799

1800
        case *lnwire.ChannelAnnouncement1:
221✔
1801
                scid = m.ShortChannelID.ToUint64()
221✔
1802

1803
        default:
17✔
1804
                return false
17✔
1805
        }
1806

1807
        _, err := d.recentRejects.Get(newRejectCacheKey(
270✔
1808
                gMsg.GossipVersion(), scid, peerPub,
270✔
1809
        ))
270✔
1810

270✔
1811
        return !errors.Is(err, cache.ErrElementNotFound)
270✔
1812
}
1813

1814
// retransmitStaleAnns examines all outgoing channels that the source node is
1815
// known to maintain to check to see if any of them are "stale". A channel is
1816
// stale iff, the last timestamp of its rebroadcast is older than the
1817
// RebroadcastInterval. We also check if a refreshed node announcement should
1818
// be resent.
1819
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
1820
        now time.Time) error {
41✔
1821

41✔
1822
        // Iterate over all of our channels and check if any of them fall
41✔
1823
        // within the prune interval or re-broadcast interval.
41✔
1824
        type updateTuple struct {
41✔
1825
                info *models.ChannelEdgeInfo
41✔
1826
                edge *models.ChannelEdgePolicy
41✔
1827
        }
41✔
1828

41✔
1829
        var (
41✔
1830
                havePublicChannels bool
41✔
1831
                edgesToUpdate      []updateTuple
41✔
1832
        )
41✔
1833
        err := d.cfg.Graph.ForAllOutgoingChannels(ctx, func(
41✔
1834
                info *models.ChannelEdgeInfo,
41✔
1835
                edge *models.ChannelEdgePolicy) error {
45✔
1836

4✔
1837
                // If there's no auth proof attached to this edge, it means
4✔
1838
                // that it is a private channel not meant to be announced to
4✔
1839
                // the greater network, so avoid sending channel updates for
4✔
1840
                // this channel to not leak its
4✔
1841
                // existence.
4✔
1842
                if info.AuthProof == nil {
7✔
1843
                        log.Debugf("Skipping retransmission of channel "+
3✔
1844
                                "without AuthProof: %v", info.ChannelID)
3✔
1845
                        return nil
3✔
1846
                }
3✔
1847

1848
                // We make a note that we have at least one public channel. We
1849
                // use this to determine whether we should send a node
1850
                // announcement below.
1851
                havePublicChannels = true
3✔
1852

3✔
1853
                // If this edge has a ChannelUpdate that was created before the
3✔
1854
                // introduction of the MaxHTLC field, then we'll update this
3✔
1855
                // edge to propagate this information in the network.
3✔
1856
                if !edge.MessageFlags.HasMaxHtlc() {
3✔
UNCOV
1857
                        // We'll make sure we support the new max_htlc field if
×
UNCOV
1858
                        // not already present.
×
UNCOV
1859
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
UNCOV
1860
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
UNCOV
1861

×
1862
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1863
                                info: info,
×
UNCOV
1864
                                edge: edge,
×
UNCOV
1865
                        })
×
UNCOV
1866
                        return nil
×
UNCOV
1867
                }
×
1868

1869
                timeElapsed := now.Sub(edge.LastUpdate)
3✔
1870

3✔
1871
                // If it's been longer than RebroadcastInterval since we've
3✔
1872
                // re-broadcasted the channel, add the channel to the set of
3✔
1873
                // edges we need to update.
3✔
1874
                if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1875
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1876
                                info: info,
1✔
1877
                                edge: edge,
1✔
1878
                        })
1✔
1879
                }
1✔
1880

1881
                return nil
3✔
1882
        }, func() {
2✔
1883
                havePublicChannels = false
2✔
1884
                edgesToUpdate = nil
2✔
1885
        })
2✔
1886
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
41✔
UNCOV
1887
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
UNCOV
1888
                        err)
×
UNCOV
1889
        }
×
1890

1891
        var signedUpdates []lnwire.Message
41✔
1892
        for _, chanToUpdate := range edgesToUpdate {
42✔
1893
                // Re-sign and update the channel on disk and retrieve our
1✔
1894
                // ChannelUpdate to broadcast.
1✔
1895
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1896
                        ctx, chanToUpdate.info, chanToUpdate.edge,
1✔
1897
                )
1✔
1898
                if err != nil {
1✔
UNCOV
1899
                        return fmt.Errorf("unable to update channel: %w", err)
×
UNCOV
1900
                }
×
1901

1902
                // If we have a valid announcement to transmit, then we'll send
1903
                // that along with the update.
1904
                if chanAnn != nil {
2✔
1905
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1906
                }
1✔
1907

1908
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1909
        }
1910

1911
        // If we don't have any public channels, we return as we don't want to
1912
        // broadcast anything that would reveal our existence.
1913
        if !havePublicChannels {
81✔
1914
                return nil
40✔
1915
        }
40✔
1916

1917
        // We'll also check that our NodeAnnouncement1 is not too old.
1918
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
3✔
1919
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
3✔
1920
        timeElapsed := now.Sub(timestamp)
3✔
1921

3✔
1922
        // If it's been a full day since we've re-broadcasted the
3✔
1923
        // node announcement, refresh it and resend it.
3✔
1924
        nodeAnnStr := ""
3✔
1925
        if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1926
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1927
                if err != nil {
1✔
1928
                        return fmt.Errorf("unable to get refreshed node "+
×
1929
                                "announcement: %v", err)
×
1930
                }
×
1931

1932
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1933
                nodeAnnStr = " and our refreshed node announcement"
1✔
1934

1✔
1935
                // Before broadcasting the refreshed node announcement, add it
1✔
1936
                // to our own graph.
1✔
1937
                if err := d.addNode(ctx, &newNodeAnn); err != nil {
2✔
1938
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1939
                                "to graph: %v", err)
1✔
1940
                }
1✔
1941
        }
1942

1943
        // If we don't have any updates to re-broadcast, then we'll exit
1944
        // early.
1945
        if len(signedUpdates) == 0 {
5✔
1946
                return nil
2✔
1947
        }
2✔
1948

1949
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1950
                len(edgesToUpdate), nodeAnnStr)
1✔
1951

1✔
1952
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1953
        // our known outgoing channels to all our immediate peers.
1✔
1954
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
UNCOV
1955
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
UNCOV
1956
        }
×
1957

1958
        return nil
1✔
1959
}
1960

1961
// processChanPolicyUpdate generates a new set of channel updates for the
1962
// provided list of edges and updates the backing ChannelGraphSource.
1963
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
1964
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
3✔
1965

3✔
1966
        var chanUpdates []networkMsg
3✔
1967
        for _, edgeInfo := range edgesToUpdate {
8✔
1968
                // Now that we've collected all the channels we need to update,
5✔
1969
                // we'll re-sign and update the backing ChannelGraphSource, and
5✔
1970
                // retrieve our ChannelUpdate to broadcast.
5✔
1971
                _, chanUpdate, err := d.updateChannel(
5✔
1972
                        ctx, edgeInfo.Info, edgeInfo.Edge,
5✔
1973
                )
5✔
1974
                if err != nil {
5✔
UNCOV
1975
                        return nil, err
×
UNCOV
1976
                }
×
1977

1978
                // We'll avoid broadcasting any updates for private channels to
1979
                // avoid directly giving away their existence. Instead, we'll
1980
                // send the update directly to the remote party.
1981
                if edgeInfo.Info.AuthProof == nil {
8✔
1982
                        // If AuthProof is nil and an alias was found for this
3✔
1983
                        // ChannelID (meaning the option-scid-alias feature was
3✔
1984
                        // negotiated), we'll replace the ShortChannelID in the
3✔
1985
                        // update with the peer's alias. We do this after
3✔
1986
                        // updateChannel so that the alias isn't persisted to
3✔
1987
                        // the database.
3✔
1988
                        chanID := lnwire.NewChanIDFromOutPoint(
3✔
1989
                                edgeInfo.Info.ChannelPoint,
3✔
1990
                        )
3✔
1991

3✔
1992
                        var defaultAlias lnwire.ShortChannelID
3✔
1993
                        foundAlias, _ := d.cfg.GetAlias(chanID)
3✔
1994
                        if foundAlias != defaultAlias {
5✔
1995
                                chanUpdate.ShortChannelID = foundAlias
2✔
1996

2✔
1997
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
2✔
1998
                                if err != nil {
2✔
UNCOV
1999
                                        log.Errorf("Unable to sign alias "+
×
UNCOV
2000
                                                "update: %v", err)
×
2001
                                        continue
×
2002
                                }
2003

2004
                                lnSig, err := lnwire.NewSigFromSignature(sig)
2✔
2005
                                if err != nil {
2✔
2006
                                        log.Errorf("Unable to create sig: %v",
×
2007
                                                err)
×
2008
                                        continue
×
2009
                                }
2010

2011
                                chanUpdate.Signature = lnSig
2✔
2012
                        }
2013

2014
                        remotePubKey := remotePubFromChanInfo(
3✔
2015
                                edgeInfo.Info, chanUpdate.ChannelFlags,
3✔
2016
                        )
3✔
2017
                        err := d.reliableSender.sendMessage(
3✔
2018
                                ctx, chanUpdate, remotePubKey,
3✔
2019
                        )
3✔
2020
                        if err != nil {
3✔
2021
                                log.Errorf("Unable to reliably send %v for "+
×
2022
                                        "channel=%v to peer=%x: %v",
×
2023
                                        chanUpdate.MsgType(),
×
2024
                                        chanUpdate.ShortChannelID,
×
UNCOV
2025
                                        remotePubKey, err)
×
UNCOV
2026
                        }
×
2027
                        continue
3✔
2028
                }
2029

2030
                // We set ourselves as the source of this message to indicate
2031
                // that we shouldn't skip any peers when sending this message.
2032
                chanUpdates = append(chanUpdates, networkMsg{
4✔
2033
                        source:   d.selfKey,
4✔
2034
                        isRemote: false,
4✔
2035
                        msg:      chanUpdate,
4✔
2036
                })
4✔
2037
        }
2038

2039
        return chanUpdates, nil
3✔
2040
}
2041

2042
// remotePubFromChanInfo returns the public key of the remote peer given a
2043
// ChannelEdgeInfo that describe a channel we have with them.
2044
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
2045
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
14✔
2046

14✔
2047
        var remotePubKey [33]byte
14✔
2048
        switch {
14✔
2049
        case chanFlags&lnwire.ChanUpdateDirection == 0:
14✔
2050
                remotePubKey = chanInfo.NodeKey2Bytes
14✔
2051
        case chanFlags&lnwire.ChanUpdateDirection == 1:
2✔
2052
                remotePubKey = chanInfo.NodeKey1Bytes
2✔
2053
        }
2054

2055
        return remotePubKey
14✔
2056
}
2057

2058
// processRejectedEdge examines a rejected edge to see if we can extract any
2059
// new announcements from it.  An edge will get rejected if we already added
2060
// the same edge without AuthProof to the graph. If the received announcement
2061
// contains a proof, we can add this proof to our edge.  We can end up in this
2062
// situation in the case where we create a channel, but for some reason fail
2063
// to receive the remote peer's proof, while the remote peer is able to fully
2064
// assemble the proof and craft the ChannelAnnouncement.
2065
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
2066
        chanAnnMsg *lnwire.ChannelAnnouncement1,
2067
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
2✔
2068

2✔
2069
        // First, we'll fetch the state of the channel as we know if from the
2✔
2070
        // database.
2✔
2071
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
2✔
2072
                chanAnnMsg.ShortChannelID,
2✔
2073
        )
2✔
2074
        if err != nil {
2✔
UNCOV
2075
                return nil, err
×
UNCOV
2076
        }
×
2077

2078
        // The edge is in the graph, and has a proof attached, then we'll just
2079
        // reject it as normal.
2080
        if chanInfo.AuthProof != nil {
4✔
2081
                return nil, nil
2✔
2082
        }
2✔
2083

2084
        // Otherwise, this means that the edge is within the graph, but it
2085
        // doesn't yet have a proper proof attached. If we did not receive
2086
        // the proof such that we now can add it, there's nothing more we
2087
        // can do.
UNCOV
2088
        if proof == nil {
×
UNCOV
2089
                return nil, nil
×
UNCOV
2090
        }
×
2091

2092
        // We'll then create then validate the new fully assembled
2093
        // announcement.
UNCOV
2094
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
UNCOV
2095
                proof, chanInfo, e1, e2,
×
UNCOV
2096
        )
×
UNCOV
2097
        if err != nil {
×
UNCOV
2098
                return nil, err
×
UNCOV
2099
        }
×
UNCOV
2100
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
UNCOV
2101
        if err != nil {
×
UNCOV
2102
                err := fmt.Errorf("assembled channel announcement proof "+
×
UNCOV
2103
                        "for shortChanID=%v isn't valid: %v",
×
UNCOV
2104
                        chanAnnMsg.ShortChannelID, err)
×
UNCOV
2105
                log.Error(err)
×
UNCOV
2106
                return nil, err
×
UNCOV
2107
        }
×
2108

2109
        // If everything checks out, then we'll add the fully assembled proof
2110
        // to the database.
UNCOV
2111
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
UNCOV
2112
        if err != nil {
×
UNCOV
2113
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
UNCOV
2114
                        chanAnnMsg.ShortChannelID, err)
×
UNCOV
2115
                log.Error(err)
×
UNCOV
2116
                return nil, err
×
UNCOV
2117
        }
×
2118

2119
        // As we now have a complete channel announcement for this channel,
2120
        // we'll construct the announcement so they can be broadcast out to all
2121
        // our peers.
UNCOV
2122
        announcements := make([]networkMsg, 0, 3)
×
UNCOV
2123
        announcements = append(announcements, networkMsg{
×
UNCOV
2124
                source: d.selfKey,
×
UNCOV
2125
                msg:    chanAnn,
×
UNCOV
2126
        })
×
UNCOV
2127
        if e1Ann != nil {
×
UNCOV
2128
                announcements = append(announcements, networkMsg{
×
UNCOV
2129
                        source: d.selfKey,
×
UNCOV
2130
                        msg:    e1Ann,
×
UNCOV
2131
                })
×
UNCOV
2132
        }
×
UNCOV
2133
        if e2Ann != nil {
×
UNCOV
2134
                announcements = append(announcements, networkMsg{
×
UNCOV
2135
                        source: d.selfKey,
×
UNCOV
2136
                        msg:    e2Ann,
×
UNCOV
2137
                })
×
UNCOV
2138

×
UNCOV
2139
        }
×
2140

2141
        return announcements, nil
×
2142
}
2143

2144
// fetchPKScript fetches the output script for the given SCID.
2145
func (d *AuthenticatedGossiper) fetchPKScript(chanID lnwire.ShortChannelID) (
UNCOV
2146
        txscript.ScriptClass, btcutil.Address, error) {
×
UNCOV
2147

×
UNCOV
2148
        pkScript, err := lnwallet.FetchPKScriptWithQuit(
×
UNCOV
2149
                d.cfg.ChainIO, chanID, d.quit,
×
UNCOV
2150
        )
×
UNCOV
2151
        if err != nil {
×
UNCOV
2152
                return txscript.WitnessUnknownTy, nil, err
×
UNCOV
2153
        }
×
2154

UNCOV
2155
        scriptClass, addrs, _, err := txscript.ExtractPkScriptAddrs(
×
UNCOV
2156
                pkScript, d.cfg.ChainParams,
×
UNCOV
2157
        )
×
UNCOV
2158
        if err != nil {
×
UNCOV
2159
                return txscript.WitnessUnknownTy, nil, err
×
UNCOV
2160
        }
×
2161

UNCOV
2162
        if len(addrs) != 1 {
×
UNCOV
2163
                return txscript.WitnessUnknownTy, nil, fmt.Errorf("expected "+
×
UNCOV
2164
                        "1 address, got: %d", len(addrs))
×
UNCOV
2165
        }
×
2166

UNCOV
2167
        return scriptClass, addrs[0], nil
×
2168
}
2169

2170
// addNode processes the given node announcement, and adds it to our channel
2171
// graph.
2172
func (d *AuthenticatedGossiper) addNode(ctx context.Context,
2173
        msg *lnwire.NodeAnnouncement1, op ...batch.SchedulerOption) error {
19✔
2174

19✔
2175
        if err := netann.ValidateNodeAnn(msg); err != nil {
20✔
2176
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
2177
                        err)
1✔
2178
        }
1✔
2179

2180
        return d.cfg.Graph.AddNode(
18✔
2181
                ctx, models.NodeFromWireAnnouncement(msg), op...,
18✔
2182
        )
18✔
2183
}
2184

2185
// isPremature decides whether a given network message has a block height+delta
2186
// value specified in the future. If so, the message will be added to the
2187
// future message map and be processed when the block height as reached.
2188
//
2189
// NOTE: must be used inside a lock.
2190
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2191
        delta uint32, msg *networkMsg) bool {
293✔
2192

293✔
2193
        // The channel is already confirmed at chanID.BlockHeight so we minus
293✔
2194
        // one block. For instance, if the required confirmation for this
293✔
2195
        // channel announcement is 6, we then only need to wait for 5 more
293✔
2196
        // blocks once the funding tx is confirmed.
293✔
2197
        if delta > 0 {
295✔
2198
                delta--
2✔
2199
        }
2✔
2200

2201
        msgHeight := chanID.BlockHeight + delta
293✔
2202

293✔
2203
        // The message height is smaller or equal to our best known height,
293✔
2204
        // thus the message is mature.
293✔
2205
        if msgHeight <= d.bestHeight {
585✔
2206
                return false
292✔
2207
        }
292✔
2208

2209
        // Add the premature message to our future messages which will be
2210
        // resent once the block height has reached.
2211
        //
2212
        // Copy the networkMsgs since the old message's err chan will be
2213
        // consumed.
2214
        copied := &networkMsg{
3✔
2215
                peer:              msg.peer,
3✔
2216
                source:            msg.source,
3✔
2217
                msg:               msg.msg,
3✔
2218
                optionalMsgFields: msg.optionalMsgFields,
3✔
2219
                isRemote:          msg.isRemote,
3✔
2220
                err:               make(chan error, 1),
3✔
2221
        }
3✔
2222

3✔
2223
        // Create the cached message.
3✔
2224
        cachedMsg := &cachedFutureMsg{
3✔
2225
                msg:    copied,
3✔
2226
                height: msgHeight,
3✔
2227
        }
3✔
2228

3✔
2229
        // Increment the msg ID and add it to the cache.
3✔
2230
        nextMsgID := d.futureMsgs.nextMsgID()
3✔
2231
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
3✔
2232
        if err != nil {
3✔
UNCOV
2233
                log.Errorf("Adding future message got error: %v", err)
×
UNCOV
2234
        }
×
2235

2236
        log.Debugf("Network message: %v added to future messages for "+
3✔
2237
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
3✔
2238
                msgHeight, d.bestHeight)
3✔
2239

3✔
2240
        return true
3✔
2241
}
2242

2243
// processNetworkAnnouncement processes a new network relate authenticated
2244
// channel or node announcement or announcements proofs. If the announcement
2245
// didn't affect the internal state due to either being out of date, invalid,
2246
// or redundant, then nil is returned. Otherwise, the set of announcements will
2247
// be returned which should be broadcasted to the rest of the network. The
2248
// boolean returned indicates whether any dependents of the announcement should
2249
// attempt to be processed as well.
2250
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
2251
        nMsg *networkMsg) ([]networkMsg, bool) {
342✔
2252

342✔
2253
        // If this is a remote update, we set the scheduler option to lazily
342✔
2254
        // add it to the graph.
342✔
2255
        var schedulerOp []batch.SchedulerOption
342✔
2256
        if nMsg.isRemote {
637✔
2257
                schedulerOp = append(schedulerOp, batch.LazyAdd())
295✔
2258
        }
295✔
2259

2260
        switch msg := nMsg.msg.(type) {
342✔
2261
        // A new node announcement has arrived which either presents new
2262
        // information about a node in one of the channels we know about, or a
2263
        // updating previously advertised information.
2264
        case *lnwire.NodeAnnouncement1:
27✔
2265
                return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
27✔
2266

2267
        // A new channel announcement has arrived, this indicates the
2268
        // *creation* of a new channel within the network. This only advertises
2269
        // the existence of a channel and not yet the routing policies in
2270
        // either direction of the channel.
2271
        case *lnwire.ChannelAnnouncement1:
234✔
2272
                return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
234✔
2273

2274
        // A new authenticated channel edge update has arrived. This indicates
2275
        // that the directional information for an already known channel has
2276
        // been updated.
2277
        case *lnwire.ChannelUpdate1:
64✔
2278
                return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
64✔
2279

2280
        // A new signature announcement has been received. This indicates
2281
        // willingness of nodes involved in the funding of a channel to
2282
        // announce this new channel to the rest of the world.
2283
        case *lnwire.AnnounceSignatures1:
23✔
2284
                return d.handleAnnSig(ctx, nMsg, msg)
23✔
2285

UNCOV
2286
        default:
×
UNCOV
2287
                err := errors.New("wrong type of the announcement")
×
UNCOV
2288
                nMsg.err <- err
×
UNCOV
2289
                return nil, false
×
2290
        }
2291
}
2292

2293
// processZombieUpdate determines whether the provided channel update should
2294
// resurrect a given zombie edge.
2295
//
2296
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2297
// should be inspected.
2298
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
2299
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2300
        msg *lnwire.ChannelUpdate1) error {
3✔
2301

3✔
2302
        // The least-significant bit in the flag on the channel update tells us
3✔
2303
        // which edge is being updated.
3✔
2304
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2305

3✔
2306
        // Since we've deemed the update as not stale above, before marking it
3✔
2307
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2308
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2309
        // already marked this with the stricter, single-sided resurrection we
3✔
2310
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2311
        var pubKey *btcec.PublicKey
3✔
2312
        switch {
3✔
UNCOV
2313
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
UNCOV
2314
                pubKey, _ = chanInfo.NodeKey1()
×
2315
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2316
                pubKey, _ = chanInfo.NodeKey2()
2✔
2317
        }
2318
        if pubKey == nil {
4✔
2319
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2320
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2321
        }
1✔
2322

2323
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2324
        if err != nil {
3✔
2325
                return fmt.Errorf("unable to verify channel "+
1✔
2326
                        "update signature: %v", err)
1✔
2327
        }
1✔
2328

2329
        // With the signature valid, we'll proceed to mark the
2330
        // edge as live and wait for the channel announcement to
2331
        // come through again.
2332
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2333
        switch {
1✔
UNCOV
2334
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
UNCOV
2335
                log.Errorf("edge with chan_id=%v was not found in the "+
×
UNCOV
2336
                        "zombie index: %v", err)
×
2337

×
2338
                return nil
×
2339

UNCOV
2340
        case err != nil:
×
UNCOV
2341
                return fmt.Errorf("unable to remove edge with "+
×
UNCOV
2342
                        "chan_id=%v from zombie index: %v",
×
2343
                        msg.ShortChannelID, err)
×
2344

2345
        default:
1✔
2346
        }
2347

2348
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2349
                "index", msg.ShortChannelID)
1✔
2350

1✔
2351
        return nil
1✔
2352
}
2353

2354
// fetchNodeAnn fetches the latest signed node announcement from our point of
2355
// view for the node with the given public key. It also validates the node
2356
// announcement fields and returns an error if they are invalid to prevent
2357
// forwarding invalid node announcements to our peers.
2358
func (d *AuthenticatedGossiper) fetchNodeAnn(ctx context.Context,
2359
        pubKey [33]byte) (*lnwire.NodeAnnouncement1, error) {
22✔
2360

22✔
2361
        node, err := d.cfg.Graph.FetchNode(ctx, pubKey)
22✔
2362
        if err != nil {
28✔
2363
                return nil, err
6✔
2364
        }
6✔
2365

2366
        nodeAnn, err := node.NodeAnnouncement(true)
16✔
2367
        if err != nil {
18✔
2368
                return nil, err
2✔
2369
        }
2✔
2370

2371
        return nodeAnn, netann.ValidateNodeAnnFields(nodeAnn)
16✔
2372
}
2373

2374
// isMsgStale determines whether a message retrieved from the backing
2375
// MessageStore is seen as stale by the current graph.
2376
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
2377
        msg lnwire.Message) bool {
14✔
2378

14✔
2379
        switch msg := msg.(type) {
14✔
2380
        case *lnwire.AnnounceSignatures1:
4✔
2381
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
4✔
2382
                        msg.ShortChannelID,
4✔
2383
                )
4✔
2384

4✔
2385
                // If the channel cannot be found, it is most likely a leftover
4✔
2386
                // message for a channel that was closed, so we can consider it
4✔
2387
                // stale.
4✔
2388
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
6✔
2389
                        return true
2✔
2390
                }
2✔
2391
                if err != nil {
4✔
UNCOV
2392
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
UNCOV
2393
                                "%v", msg.ShortChannelID, err)
×
UNCOV
2394
                        return false
×
UNCOV
2395
                }
×
2396

2397
                // If the proof exists in the graph, then we have successfully
2398
                // received the remote proof and assembled the full proof, so we
2399
                // can safely delete the local proof from the database.
2400
                return chanInfo.AuthProof != nil
4✔
2401

2402
        case *lnwire.ChannelUpdate1:
12✔
2403
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
12✔
2404

12✔
2405
                // If the channel cannot be found, it is most likely a leftover
12✔
2406
                // message for a channel that was closed, so we can consider it
12✔
2407
                // stale.
12✔
2408
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
14✔
2409
                        return true
2✔
2410
                }
2✔
2411
                if err != nil {
12✔
UNCOV
2412
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
UNCOV
2413
                                "%v", msg.ShortChannelID, err)
×
UNCOV
2414
                        return false
×
UNCOV
2415
                }
×
2416

2417
                // Otherwise, we'll retrieve the correct policy that we
2418
                // currently have stored within our graph to check if this
2419
                // message is stale by comparing its timestamp.
2420
                var p *models.ChannelEdgePolicy
12✔
2421
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
24✔
2422
                        p = p1
12✔
2423
                } else {
14✔
2424
                        p = p2
2✔
2425
                }
2✔
2426

2427
                // If the policy is still unknown, then we can consider this
2428
                // policy fresh.
2429
                if p == nil {
12✔
UNCOV
2430
                        return false
×
UNCOV
2431
                }
×
2432

2433
                timestamp := time.Unix(int64(msg.Timestamp), 0)
12✔
2434
                return p.LastUpdate.After(timestamp)
12✔
2435

UNCOV
2436
        default:
×
UNCOV
2437
                // We'll make sure to not mark any unsupported messages as stale
×
UNCOV
2438
                // to ensure they are not removed.
×
UNCOV
2439
                return false
×
2440
        }
2441
}
2442

2443
// updateChannel creates a new fully signed update for the channel, and updates
2444
// the underlying graph with the new state.
2445
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
2446
        info *models.ChannelEdgeInfo,
2447
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2448
        *lnwire.ChannelUpdate1, error) {
6✔
2449

6✔
2450
        // Parse the unsigned edge into a channel update.
6✔
2451
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
6✔
2452

6✔
2453
        // We'll generate a new signature over a digest of the channel
6✔
2454
        // announcement itself and update the timestamp to ensure it propagate.
6✔
2455
        err := netann.SignChannelUpdate(
6✔
2456
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
6✔
2457
                netann.ChanUpdSetTimestamp,
6✔
2458
        )
6✔
2459
        if err != nil {
6✔
UNCOV
2460
                return nil, nil, err
×
UNCOV
2461
        }
×
2462

2463
        // Next, we'll set the new signature in place, and update the reference
2464
        // in the backing slice.
2465
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
6✔
2466
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
6✔
2467

6✔
2468
        // To ensure that our signature is valid, we'll verify it ourself
6✔
2469
        // before committing it to the slice returned.
6✔
2470
        err = netann.ValidateChannelUpdateAnn(
6✔
2471
                d.selfKey, info.Capacity, chanUpdate,
6✔
2472
        )
6✔
2473
        if err != nil {
6✔
2474
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
UNCOV
2475
                        "update sig: %v", err)
×
2476
        }
×
2477

2478
        // Finally, we'll write the new edge policy to disk.
2479
        if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
6✔
2480
                return nil, nil, err
×
UNCOV
2481
        }
×
2482

2483
        // We'll also create the original channel announcement so the two can
2484
        // be broadcast along side each other (if necessary), but only if we
2485
        // have a full channel announcement for this channel.
2486
        var chanAnn *lnwire.ChannelAnnouncement1
6✔
2487
        if info.AuthProof != nil {
11✔
2488
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
5✔
2489
                chanAnn = &lnwire.ChannelAnnouncement1{
5✔
2490
                        ShortChannelID:  chanID,
5✔
2491
                        NodeID1:         info.NodeKey1Bytes,
5✔
2492
                        NodeID2:         info.NodeKey2Bytes,
5✔
2493
                        ChainHash:       info.ChainHash,
5✔
2494
                        BitcoinKey1:     info.BitcoinKey1Bytes,
5✔
2495
                        Features:        lnwire.NewRawFeatureVector(),
5✔
2496
                        BitcoinKey2:     info.BitcoinKey2Bytes,
5✔
2497
                        ExtraOpaqueData: info.ExtraOpaqueData,
5✔
2498
                }
5✔
2499
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
5✔
2500
                        info.AuthProof.NodeSig1Bytes,
5✔
2501
                )
5✔
2502
                if err != nil {
5✔
UNCOV
2503
                        return nil, nil, err
×
UNCOV
2504
                }
×
2505
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
5✔
2506
                        info.AuthProof.NodeSig2Bytes,
5✔
2507
                )
5✔
2508
                if err != nil {
5✔
UNCOV
2509
                        return nil, nil, err
×
UNCOV
2510
                }
×
2511
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
5✔
2512
                        info.AuthProof.BitcoinSig1Bytes,
5✔
2513
                )
5✔
2514
                if err != nil {
5✔
UNCOV
2515
                        return nil, nil, err
×
UNCOV
2516
                }
×
2517
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
5✔
2518
                        info.AuthProof.BitcoinSig2Bytes,
5✔
2519
                )
5✔
2520
                if err != nil {
5✔
UNCOV
2521
                        return nil, nil, err
×
2522
                }
×
2523
        }
2524

2525
        return chanAnn, chanUpdate, err
6✔
2526
}
2527

2528
// SyncManager returns the gossiper's SyncManager instance.
2529
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
2✔
2530
        return d.syncMgr
2✔
2531
}
2✔
2532

2533
// IsKeepAliveUpdate determines whether this channel update is considered a
2534
// keep-alive update based on the previous channel update processed for the same
2535
// direction.
2536
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2537
        prev *models.ChannelEdgePolicy) bool {
19✔
2538

19✔
2539
        // Both updates should be from the same direction.
19✔
2540
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
19✔
2541
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
19✔
UNCOV
2542

×
UNCOV
2543
                return false
×
UNCOV
2544
        }
×
2545

2546
        // The timestamp should always increase for a keep-alive update.
2547
        timestamp := time.Unix(int64(update.Timestamp), 0)
19✔
2548
        if !timestamp.After(prev.LastUpdate) {
19✔
UNCOV
2549
                return false
×
UNCOV
2550
        }
×
2551

2552
        // None of the remaining fields should change for a keep-alive update.
2553
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
21✔
2554
                return false
2✔
2555
        }
2✔
2556
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
36✔
2557
                return false
17✔
2558
        }
17✔
2559
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
6✔
2560
                return false
2✔
2561
        }
2✔
2562
        if update.TimeLockDelta != prev.TimeLockDelta {
4✔
UNCOV
2563
                return false
×
UNCOV
2564
        }
×
2565
        if update.HtlcMinimumMsat != prev.MinHTLC {
4✔
UNCOV
2566
                return false
×
UNCOV
2567
        }
×
2568
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
4✔
UNCOV
2569
                return false
×
UNCOV
2570
        }
×
2571
        if update.HtlcMaximumMsat != prev.MaxHTLC {
4✔
UNCOV
2572
                return false
×
UNCOV
2573
        }
×
2574
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
6✔
2575
                return false
2✔
2576
        }
2✔
2577
        return true
4✔
2578
}
2579

2580
// latestHeight returns the gossiper's latest height known of the chain.
2581
func (d *AuthenticatedGossiper) latestHeight() uint32 {
2✔
2582
        d.Lock()
2✔
2583
        defer d.Unlock()
2✔
2584
        return d.bestHeight
2✔
2585
}
2✔
2586

2587
// handleNodeAnnouncement processes a new node announcement.
2588
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
2589
        nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement1,
2590
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
27✔
2591

27✔
2592
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
27✔
2593

27✔
2594
        log.Debugf("Processing NodeAnnouncement1: peer=%v, timestamp=%v, "+
27✔
2595
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
27✔
2596
                nMsg.source.SerializeCompressed())
27✔
2597

27✔
2598
        // Although not explicitly required by BOLT 7 for node announcements
27✔
2599
        // (unlike channel updates), we still enforce non-zero timestamps as a
27✔
2600
        // sanity check. A timestamp of zero is likely indicative of a bug or
27✔
2601
        // uninitialized message.
27✔
2602
        if nodeAnn.Timestamp == 0 {
28✔
2603
                err := fmt.Errorf("rejecting node announcement with zero "+
1✔
2604
                        "timestamp for node %x", nodeAnn.NodeID)
1✔
2605

1✔
2606
                log.Warnf("Rejecting node announcement from peer=%v: %v",
1✔
2607
                        nMsg.peer, err)
1✔
2608

1✔
2609
                nMsg.err <- err
1✔
2610

1✔
2611
                return nil, false
1✔
2612
        }
1✔
2613

2614
        // We'll quickly ask the router if it already has a newer update for
2615
        // this node so we can skip validating signatures if not required.
2616
        if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
36✔
2617
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
10✔
2618
                nMsg.err <- nil
10✔
2619
                return nil, true
10✔
2620
        }
10✔
2621

2622
        if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
20✔
2623
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
2✔
2624
                        err)
2✔
2625

2✔
2626
                if !graph.IsError(
2✔
2627
                        err,
2✔
2628
                        graph.ErrOutdated,
2✔
2629
                        graph.ErrIgnored,
2✔
2630
                ) {
2✔
UNCOV
2631

×
UNCOV
2632
                        log.Error(err)
×
UNCOV
2633
                }
×
2634

2635
                nMsg.err <- err
2✔
2636
                return nil, false
2✔
2637
        }
2638

2639
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2640
        // quick check to ensure this node intends to publicly advertise itself
2641
        // to the network.
2642
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
18✔
2643
        if err != nil {
18✔
2644
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2645
                        nodeAnn.NodeID, err)
×
UNCOV
2646
                nMsg.err <- err
×
UNCOV
2647
                return nil, false
×
UNCOV
2648
        }
×
2649

2650
        var announcements []networkMsg
18✔
2651

18✔
2652
        // If it does, we'll add their announcement to our batch so that it can
18✔
2653
        // be broadcast to the rest of our peers.
18✔
2654
        if isPublic {
23✔
2655
                announcements = append(announcements, networkMsg{
5✔
2656
                        peer:     nMsg.peer,
5✔
2657
                        isRemote: nMsg.isRemote,
5✔
2658
                        source:   nMsg.source,
5✔
2659
                        msg:      nodeAnn,
5✔
2660
                })
5✔
2661
        } else {
20✔
2662
                log.Tracef("Skipping broadcasting node announcement for %x "+
15✔
2663
                        "due to being unadvertised", nodeAnn.NodeID)
15✔
2664
        }
15✔
2665

2666
        nMsg.err <- nil
18✔
2667
        // TODO(roasbeef): get rid of the above
18✔
2668

18✔
2669
        log.Debugf("Processed NodeAnnouncement1: peer=%v, timestamp=%v, "+
18✔
2670
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
18✔
2671
                nMsg.source.SerializeCompressed())
18✔
2672

18✔
2673
        return announcements, true
18✔
2674
}
2675

2676
// handleChanAnnouncement processes a new channel announcement.
2677
//
2678
//nolint:funlen
2679
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
2680
        nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
2681
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
237✔
2682

237✔
2683
        scid := ann.ShortChannelID
237✔
2684
        chainHash := d.cfg.ChainParams.GenesisHash
237✔
2685

237✔
2686
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
237✔
2687
                nMsg.peer, scid.ToUint64())
237✔
2688

237✔
2689
        // We'll ignore any channel announcements that target any chain other
237✔
2690
        // than the set of chains we know of.
237✔
2691
        if !bytes.Equal(ann.ChainHash[:], chainHash[:]) {
237✔
UNCOV
2692
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
UNCOV
2693
                        ", gossiper on chain=%v", ann.ChainHash, chainHash)
×
UNCOV
2694
                log.Errorf(err.Error())
×
UNCOV
2695

×
UNCOV
2696
                key := newRejectCacheKey(
×
UNCOV
2697
                        ann.GossipVersion(),
×
UNCOV
2698
                        scid.ToUint64(),
×
UNCOV
2699
                        sourceToPub(nMsg.source),
×
UNCOV
2700
                )
×
UNCOV
2701
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2702

×
UNCOV
2703
                nMsg.err <- err
×
UNCOV
2704
                return nil, false
×
UNCOV
2705
        }
×
2706

2707
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2708
        // reject the announcement. Since the router accepts alias SCIDs,
2709
        // not erroring out would be a DoS vector.
2710
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
237✔
UNCOV
2711
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
UNCOV
2712
                log.Errorf(err.Error())
×
UNCOV
2713

×
UNCOV
2714
                key := newRejectCacheKey(
×
UNCOV
2715
                        ann.GossipVersion(),
×
UNCOV
2716
                        scid.ToUint64(),
×
UNCOV
2717
                        sourceToPub(nMsg.source),
×
UNCOV
2718
                )
×
UNCOV
2719
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2720

×
UNCOV
2721
                nMsg.err <- err
×
UNCOV
2722
                return nil, false
×
UNCOV
2723
        }
×
2724

2725
        // If the advertised inclusionary block is beyond our knowledge of the
2726
        // chain tip, then we'll ignore it for now.
2727
        d.Lock()
237✔
2728
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
238✔
2729
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2730
                        "advertises height %v, only height %v is known",
1✔
2731
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2732
                d.Unlock()
1✔
2733
                nMsg.err <- nil
1✔
2734
                return nil, false
1✔
2735
        }
1✔
2736
        d.Unlock()
236✔
2737

236✔
2738
        // At this point, we'll now ask the router if this is a zombie/known
236✔
2739
        // edge. If so we can skip all the processing below.
236✔
2740
        if d.cfg.Graph.IsKnownEdge(scid) {
239✔
2741
                nMsg.err <- nil
3✔
2742
                return nil, true
3✔
2743
        }
3✔
2744

2745
        // Check if the channel is already closed in which case we can ignore
2746
        // it.
2747
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
235✔
2748
        if err != nil {
235✔
UNCOV
2749
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
UNCOV
2750
                        err)
×
UNCOV
2751
                nMsg.err <- err
×
UNCOV
2752

×
UNCOV
2753
                return nil, false
×
UNCOV
2754
        }
×
2755

2756
        if closed {
236✔
2757
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2758

1✔
2759
                // If this is an announcement from us, we'll just ignore it.
1✔
2760
                if !nMsg.isRemote {
1✔
UNCOV
2761
                        nMsg.err <- err
×
UNCOV
2762
                        return nil, false
×
UNCOV
2763
                }
×
2764

2765
                log.Warnf("Increasing ban score for peer=%v due to outdated "+
1✔
2766
                        "channel announcement for channel %v", nMsg.peer, scid)
1✔
2767

1✔
2768
                // Increment the peer's ban score if they are sending closed
1✔
2769
                // channel announcements.
1✔
2770
                dcErr := d.handleBadPeer(nMsg.peer)
1✔
2771
                if dcErr != nil {
1✔
UNCOV
2772
                        err = dcErr
×
UNCOV
2773
                }
×
2774

2775
                nMsg.err <- err
1✔
2776

1✔
2777
                return nil, false
1✔
2778
        }
2779

2780
        // If this is a remote channel announcement, then we'll validate all
2781
        // the signatures within the proof as it should be well formed.
2782
        var proof *models.ChannelAuthProof
234✔
2783
        if nMsg.isRemote {
454✔
2784
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
220✔
2785
                if err != nil {
220✔
2786
                        err := fmt.Errorf("unable to validate announcement: "+
×
2787
                                "%v", err)
×
2788

×
2789
                        key := newRejectCacheKey(
×
2790
                                ann.GossipVersion(),
×
2791
                                scid.ToUint64(),
×
UNCOV
2792
                                sourceToPub(nMsg.source),
×
2793
                        )
×
2794
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2795

×
2796
                        log.Error(err)
×
2797
                        nMsg.err <- err
×
2798
                        return nil, false
×
2799
                }
×
2800

2801
                // If the proof checks out, then we'll save the proof itself to
2802
                // the database so we can fetch it later when gossiping with
2803
                // other nodes.
2804
                proof = &models.ChannelAuthProof{
220✔
2805
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
220✔
2806
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
220✔
2807
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
220✔
2808
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
220✔
2809
                }
220✔
2810
        }
2811

2812
        // With the proof validated (if necessary), we can now store it within
2813
        // the database for our path finding and syncing needs.
2814
        edge := &models.ChannelEdgeInfo{
234✔
2815
                ChannelID:        scid.ToUint64(),
234✔
2816
                ChainHash:        ann.ChainHash,
234✔
2817
                NodeKey1Bytes:    ann.NodeID1,
234✔
2818
                NodeKey2Bytes:    ann.NodeID2,
234✔
2819
                BitcoinKey1Bytes: ann.BitcoinKey1,
234✔
2820
                BitcoinKey2Bytes: ann.BitcoinKey2,
234✔
2821
                AuthProof:        proof,
234✔
2822
                Features: lnwire.NewFeatureVector(
234✔
2823
                        ann.Features, lnwire.Features,
234✔
2824
                ),
234✔
2825
                ExtraOpaqueData: ann.ExtraOpaqueData,
234✔
2826
        }
234✔
2827

234✔
2828
        // If there were any optional message fields provided, we'll include
234✔
2829
        // them in its serialized disk representation now.
234✔
2830
        var tapscriptRoot fn.Option[chainhash.Hash]
234✔
2831
        if nMsg.optionalMsgFields != nil {
250✔
2832
                if nMsg.optionalMsgFields.capacity != nil {
19✔
2833
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
3✔
2834
                }
3✔
2835
                if nMsg.optionalMsgFields.channelPoint != nil {
22✔
2836
                        cp := *nMsg.optionalMsgFields.channelPoint
6✔
2837
                        edge.ChannelPoint = cp
6✔
2838
                }
6✔
2839

2840
                // Optional tapscript root for custom channels.
2841
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
16✔
2842
        }
2843

2844
        // Before we start validation or add the edge to the database, we obtain
2845
        // the mutex for this channel ID. We do this to ensure no other
2846
        // goroutine has read the database and is now making decisions based on
2847
        // this DB state, before it writes to the DB. It also ensures that we
2848
        // don't perform the expensive validation check on the same channel
2849
        // announcement at the same time.
2850
        d.channelMtx.Lock(scid.ToUint64())
234✔
2851

234✔
2852
        // If AssumeChannelValid is present, then we are unable to perform any
234✔
2853
        // of the expensive checks below, so we'll short-circuit our path
234✔
2854
        // straight to adding the edge to our graph. If the passed
234✔
2855
        // ShortChannelID is an alias, then we'll skip validation as it will
234✔
2856
        // not map to a legitimate tx. This is not a DoS vector as only we can
234✔
2857
        // add an alias ChannelAnnouncement from the gossiper.
234✔
2858
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) {
466✔
2859
                op, capacity, script, err := d.validateFundingTransaction(
232✔
2860
                        ctx, ann, tapscriptRoot,
232✔
2861
                )
232✔
2862
                if err != nil {
436✔
2863
                        defer d.channelMtx.Unlock(scid.ToUint64())
204✔
2864

204✔
2865
                        switch {
204✔
2866
                        case errors.Is(err, ErrNoFundingTransaction),
2867
                                errors.Is(err, ErrInvalidFundingOutput):
202✔
2868

202✔
2869
                                key := newRejectCacheKey(
202✔
2870
                                        ann.GossipVersion(),
202✔
2871
                                        scid.ToUint64(),
202✔
2872
                                        sourceToPub(nMsg.source),
202✔
2873
                                )
202✔
2874
                                _, _ = d.recentRejects.Put(
202✔
2875
                                        key, &cachedReject{},
202✔
2876
                                )
202✔
2877

2878
                        case errors.Is(err, ErrChannelSpent):
2✔
2879
                                key := newRejectCacheKey(
2✔
2880
                                        ann.GossipVersion(),
2✔
2881
                                        scid.ToUint64(),
2✔
2882
                                        sourceToPub(nMsg.source),
2✔
2883
                                )
2✔
2884
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
2✔
2885

2✔
2886
                                // Since this channel has already been closed,
2✔
2887
                                // we'll add it to the graph's closed channel
2✔
2888
                                // index such that we won't attempt to do
2✔
2889
                                // expensive validation checks on it again.
2✔
2890
                                // TODO: Populate the ScidCloser by using closed
2✔
2891
                                // channel notifications.
2✔
2892
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
2✔
2893
                                if dbErr != nil {
2✔
UNCOV
2894
                                        log.Errorf("failed to mark scid(%v) "+
×
UNCOV
2895
                                                "as closed: %v", scid, dbErr)
×
UNCOV
2896

×
2897
                                        nMsg.err <- dbErr
×
2898

×
2899
                                        return nil, false
×
2900
                                }
×
2901

2902
                        default:
×
UNCOV
2903
                                // Otherwise, this is just a regular rejected
×
UNCOV
2904
                                // edge. We won't increase the ban score for the
×
UNCOV
2905
                                // remote peer.
×
2906
                                key := newRejectCacheKey(
×
2907
                                        ann.GossipVersion(),
×
2908
                                        scid.ToUint64(),
×
2909
                                        sourceToPub(nMsg.source),
×
2910
                                )
×
2911
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2912

×
UNCOV
2913
                                nMsg.err <- err
×
2914

×
2915
                                return nil, false
×
2916
                        }
2917

2918
                        if !nMsg.isRemote {
204✔
UNCOV
2919
                                log.Errorf("failed to add edge for local "+
×
UNCOV
2920
                                        "channel: %v", err)
×
UNCOV
2921
                                nMsg.err <- err
×
UNCOV
2922

×
UNCOV
2923
                                return nil, false
×
UNCOV
2924
                        }
×
2925

2926
                        log.Warnf("Increasing ban score for peer=%v due to "+
204✔
2927
                                "invalid channel announcement for channel %v",
204✔
2928
                                nMsg.peer, scid)
204✔
2929

204✔
2930
                        // Increment the peer's ban score if they are sending
204✔
2931
                        // us invalid channel announcements.
204✔
2932
                        dcErr := d.handleBadPeer(nMsg.peer)
204✔
2933
                        if dcErr != nil {
204✔
UNCOV
2934
                                err = dcErr
×
UNCOV
2935
                        }
×
2936

2937
                        nMsg.err <- err
204✔
2938

204✔
2939
                        return nil, false
204✔
2940
                }
2941

2942
                edge.FundingScript = fn.Some(script)
28✔
2943

28✔
2944
                // TODO(roasbeef): this is a hack, needs to be removed after
28✔
2945
                //  commitment fees are dynamic.
28✔
2946
                edge.Capacity = capacity
28✔
2947
                edge.ChannelPoint = op
28✔
2948
        }
2949

2950
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
30✔
2951

30✔
2952
        // We will add the edge to the channel router. If the nodes present in
30✔
2953
        // this channel are not present in the database, a partial node will be
30✔
2954
        // added to represent each node while we wait for a node announcement.
30✔
2955
        err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
30✔
2956
        if err != nil {
33✔
2957
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
3✔
2958
                        scid.ToUint64(), err)
3✔
2959

3✔
2960
                defer d.channelMtx.Unlock(scid.ToUint64())
3✔
2961

3✔
2962
                // If the edge was rejected due to already being known, then it
3✔
2963
                // may be the case that this new message has a fresh channel
3✔
2964
                // proof, so we'll check.
3✔
2965
                if graph.IsError(err, graph.ErrIgnored) {
5✔
2966
                        // Attempt to process the rejected message to see if we
2✔
2967
                        // get any new announcements.
2✔
2968
                        anns, rErr := d.processRejectedEdge(ctx, ann, proof)
2✔
2969
                        if rErr != nil {
2✔
UNCOV
2970
                                key := newRejectCacheKey(
×
UNCOV
2971
                                        ann.GossipVersion(),
×
UNCOV
2972
                                        scid.ToUint64(),
×
UNCOV
2973
                                        sourceToPub(nMsg.source),
×
2974
                                )
×
2975
                                cr := &cachedReject{}
×
2976
                                _, _ = d.recentRejects.Put(key, cr)
×
UNCOV
2977

×
UNCOV
2978
                                nMsg.err <- rErr
×
UNCOV
2979

×
UNCOV
2980
                                return nil, false
×
UNCOV
2981
                        }
×
2982

2983
                        log.Debugf("Extracted %v announcements from rejected "+
2✔
2984
                                "msgs", len(anns))
2✔
2985

2✔
2986
                        // If while processing this rejected edge, we realized
2✔
2987
                        // there's a set of announcements we could extract,
2✔
2988
                        // then we'll return those directly.
2✔
2989
                        //
2✔
2990
                        // NOTE: since this is an ErrIgnored, we can return
2✔
2991
                        // true here to signal "allow" to its dependants.
2✔
2992
                        nMsg.err <- nil
2✔
2993

2✔
2994
                        return anns, true
2✔
2995
                }
2996

2997
                // Otherwise, this is just a regular rejected edge.
2998
                key := newRejectCacheKey(
1✔
2999
                        ann.GossipVersion(),
1✔
3000
                        scid.ToUint64(),
1✔
3001
                        sourceToPub(nMsg.source),
1✔
3002
                )
1✔
3003
                _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
3004

1✔
3005
                if !nMsg.isRemote {
1✔
UNCOV
3006
                        log.Errorf("failed to add edge for local channel: %v",
×
UNCOV
3007
                                err)
×
UNCOV
3008
                        nMsg.err <- err
×
UNCOV
3009

×
UNCOV
3010
                        return nil, false
×
UNCOV
3011
                }
×
3012

3013
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
3014
                if dcErr != nil {
1✔
UNCOV
3015
                        log.Errorf("failed to check if we should disconnect "+
×
UNCOV
3016
                                "peer: %v", dcErr)
×
UNCOV
3017
                        nMsg.err <- dcErr
×
3018

×
3019
                        return nil, false
×
3020
                }
×
3021

3022
                if shouldDc {
1✔
3023
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
3024
                }
×
3025

3026
                nMsg.err <- err
1✔
3027

1✔
3028
                return nil, false
1✔
3029
        }
3030

3031
        // If err is nil, release the lock immediately.
3032
        d.channelMtx.Unlock(scid.ToUint64())
29✔
3033

29✔
3034
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
29✔
3035

29✔
3036
        // If we earlier received any ChannelUpdates for this channel, we can
29✔
3037
        // now process them, as the channel is added to the graph.
29✔
3038
        var channelUpdates []*processedNetworkMsg
29✔
3039

29✔
3040
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
29✔
3041
        if err == nil {
33✔
3042
                // There was actually an entry in the map, so we'll accumulate
4✔
3043
                // it. We don't worry about deletion, since it'll eventually
4✔
3044
                // fall out anyway.
4✔
3045
                chanMsgs := earlyChanUpdates
4✔
3046
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
4✔
3047
        }
4✔
3048

3049
        // Launch a new goroutine to handle each ChannelUpdate, this is to
3050
        // ensure we don't block here, as we can handle only one announcement
3051
        // at a time.
3052
        for _, cu := range channelUpdates {
33✔
3053
                // Skip if already processed.
4✔
3054
                if cu.processed {
5✔
3055
                        continue
1✔
3056
                }
3057

3058
                // Mark the ChannelUpdate as processed. This ensures that a
3059
                // subsequent announcement in the option-scid-alias case does
3060
                // not re-use an old ChannelUpdate.
3061
                cu.processed = true
4✔
3062

4✔
3063
                d.wg.Add(1)
4✔
3064
                go func(updMsg *networkMsg) {
8✔
3065
                        defer d.wg.Done()
4✔
3066

4✔
3067
                        switch msg := updMsg.msg.(type) {
4✔
3068
                        // Reprocess the message, making sure we return an
3069
                        // error to the original caller in case the gossiper
3070
                        // shuts down.
3071
                        case *lnwire.ChannelUpdate1:
4✔
3072
                                log.Debugf("Reprocessing ChannelUpdate for "+
4✔
3073
                                        "shortChanID=%v", scid.ToUint64())
4✔
3074

4✔
3075
                                select {
4✔
3076
                                case d.networkMsgs <- updMsg:
4✔
UNCOV
3077
                                case <-d.quit:
×
UNCOV
3078
                                        updMsg.err <- ErrGossiperShuttingDown
×
3079
                                }
3080

3081
                        // We don't expect any other message type than
3082
                        // ChannelUpdate to be in this cache.
UNCOV
3083
                        default:
×
UNCOV
3084
                                log.Errorf("Unsupported message type found "+
×
UNCOV
3085
                                        "among ChannelUpdates: %T", msg)
×
3086
                        }
3087
                }(cu.msg)
3088
        }
3089

3090
        // Channel announcement was successfully processed and now it might be
3091
        // broadcast to other connected nodes if it was an announcement with
3092
        // proof (remote).
3093
        var announcements []networkMsg
29✔
3094

29✔
3095
        if proof != nil {
44✔
3096
                announcements = append(announcements, networkMsg{
15✔
3097
                        peer:     nMsg.peer,
15✔
3098
                        isRemote: nMsg.isRemote,
15✔
3099
                        source:   nMsg.source,
15✔
3100
                        msg:      ann,
15✔
3101
                })
15✔
3102
        }
15✔
3103

3104
        nMsg.err <- nil
29✔
3105

29✔
3106
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
29✔
3107
                nMsg.peer, scid.ToUint64())
29✔
3108

29✔
3109
        return announcements, true
29✔
3110
}
3111

3112
// handleChanUpdate processes a new channel update.
3113
//
3114
//nolint:funlen
3115
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
3116
        nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
3117
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
64✔
3118

64✔
3119
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
64✔
3120
                nMsg.peer, upd.ShortChannelID.ToUint64())
64✔
3121

64✔
3122
        chainHash := d.cfg.ChainParams.GenesisHash
64✔
3123

64✔
3124
        // We'll ignore any channel updates that target any chain other than
64✔
3125
        // the set of chains we know of.
64✔
3126
        if !bytes.Equal(upd.ChainHash[:], chainHash[:]) {
64✔
UNCOV
3127
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
UNCOV
3128
                        "gossiper on chain=%v", upd.ChainHash, chainHash)
×
UNCOV
3129
                log.Errorf(err.Error())
×
UNCOV
3130

×
UNCOV
3131
                key := newRejectCacheKey(
×
UNCOV
3132
                        upd.GossipVersion(),
×
UNCOV
3133
                        upd.ShortChannelID.ToUint64(),
×
UNCOV
3134
                        sourceToPub(nMsg.source),
×
UNCOV
3135
                )
×
UNCOV
3136
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
3137

×
UNCOV
3138
                nMsg.err <- err
×
UNCOV
3139
                return nil, false
×
UNCOV
3140
        }
×
3141

3142
        blockHeight := upd.ShortChannelID.BlockHeight
64✔
3143
        shortChanID := upd.ShortChannelID.ToUint64()
64✔
3144

64✔
3145
        // If the advertised inclusionary block is beyond our knowledge of the
64✔
3146
        // chain tip, then we'll put the announcement in limbo to be fully
64✔
3147
        // verified once we advance forward in the chain. If the update has an
64✔
3148
        // alias SCID, we'll skip the isPremature check. This is necessary
64✔
3149
        // since aliases start at block height 16_000_000.
64✔
3150
        d.Lock()
64✔
3151
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
64✔
3152
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
65✔
3153

1✔
3154
                log.Warnf("Update announcement for short_chan_id(%v), is "+
1✔
3155
                        "premature: advertises height %v, only height %v is "+
1✔
3156
                        "known", shortChanID, blockHeight, d.bestHeight)
1✔
3157
                d.Unlock()
1✔
3158
                nMsg.err <- nil
1✔
3159
                return nil, false
1✔
3160
        }
1✔
3161
        d.Unlock()
64✔
3162

64✔
3163
        // Before we perform any of the expensive checks below, we'll check
64✔
3164
        // whether this update is stale or is for a zombie channel in order to
64✔
3165
        // quickly reject it.
64✔
3166
        timestamp := time.Unix(int64(upd.Timestamp), 0)
64✔
3167

64✔
3168
        // Per BOLT 7, the timestamp MUST be greater than 0.
64✔
3169
        if upd.Timestamp == 0 {
65✔
3170
                err := fmt.Errorf("rejecting channel update with zero "+
1✔
3171
                        "timestamp for short_chan_id(%v)", shortChanID)
1✔
3172

1✔
3173
                // Only increase ban score for remote peers.
1✔
3174
                if nMsg.isRemote {
2✔
3175
                        log.Warnf("Increasing ban score for peer=%v: %v",
1✔
3176
                                nMsg.peer, err)
1✔
3177

1✔
3178
                        dcErr := d.handleBadPeer(nMsg.peer)
1✔
3179
                        if dcErr != nil {
1✔
UNCOV
3180
                                err = dcErr
×
UNCOV
3181
                        }
×
3182
                }
3183

3184
                nMsg.err <- err
1✔
3185

1✔
3186
                return nil, false
1✔
3187
        }
3188

3189
        // Fetch the SCID we should be using to lock the channelMtx and make
3190
        // graph queries with.
3191
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
63✔
3192
        if err != nil {
126✔
3193
                // Fallback and set the graphScid to the peer-provided SCID.
63✔
3194
                // This will occur for non-option-scid-alias channels and for
63✔
3195
                // public option-scid-alias channels after 6 confirmations.
63✔
3196
                // Once public option-scid-alias channels have 6 confs, we'll
63✔
3197
                // ignore ChannelUpdates with one of their aliases.
63✔
3198
                graphScid = upd.ShortChannelID
63✔
3199
        }
63✔
3200

3201
        // We make sure to obtain the mutex for this channel ID before we access
3202
        // the database. This ensures the state we read from the database has
3203
        // not changed between this point and when we call UpdateEdge() later.
3204
        d.channelMtx.Lock(graphScid.ToUint64())
63✔
3205
        defer d.channelMtx.Unlock(graphScid.ToUint64())
63✔
3206

63✔
3207
        if d.cfg.Graph.IsStaleEdgePolicy(
63✔
3208
                graphScid, timestamp, upd.ChannelFlags,
63✔
3209
        ) {
68✔
3210

5✔
3211
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
5✔
3212
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
5✔
3213
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
5✔
3214
                )
5✔
3215

5✔
3216
                nMsg.err <- nil
5✔
3217
                return nil, true
5✔
3218
        }
5✔
3219

3220
        // Check that the ChanUpdate is not too far into the future, this could
3221
        // reveal some faulty implementation therefore we log an error.
3222
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
60✔
UNCOV
3223
                err := fmt.Errorf("skewed timestamp of edge policy, "+
×
UNCOV
3224
                        "timestamp too far in the future: %v", timestamp.Unix())
×
UNCOV
3225

×
UNCOV
3226
                // If this is a channel_update from us, we'll just ignore it.
×
UNCOV
3227
                if !nMsg.isRemote {
×
UNCOV
3228
                        nMsg.err <- err
×
UNCOV
3229
                        return nil, false
×
UNCOV
3230
                }
×
3231

UNCOV
3232
                log.Errorf("Increasing ban score for peer=%v due to bad "+
×
UNCOV
3233
                        "channel_update with short_chan_id(%v): timestamp(%v) "+
×
UNCOV
3234
                        "too far in the future", nMsg.peer, shortChanID,
×
UNCOV
3235
                        timestamp.Unix())
×
UNCOV
3236

×
UNCOV
3237
                // Increment the peer's ban score if they are skewed channel
×
UNCOV
3238
                // updates.
×
UNCOV
3239
                dcErr := d.handleBadPeer(nMsg.peer)
×
UNCOV
3240
                if dcErr != nil {
×
UNCOV
3241
                        err = dcErr
×
UNCOV
3242
                }
×
3243

UNCOV
3244
                nMsg.err <- err
×
UNCOV
3245

×
UNCOV
3246
                return nil, false
×
3247
        }
3248

3249
        // Get the node pub key as far since we don't have it in the channel
3250
        // update announcement message. We'll need this to properly verify the
3251
        // message's signature.
3252
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
60✔
3253
        switch {
60✔
3254
        // No error, break.
3255
        case err == nil:
56✔
3256
                break
56✔
3257

3258
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
3259
                err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
3✔
3260
                if err != nil {
5✔
3261
                        log.Debug(err)
2✔
3262
                        nMsg.err <- err
2✔
3263
                        return nil, false
2✔
3264
                }
2✔
3265

3266
                // We'll fallthrough to ensure we stash the update until we
3267
                // receive its corresponding ChannelAnnouncement. This is
3268
                // needed to ensure the edge exists in the graph before
3269
                // applying the update.
3270
                fallthrough
1✔
3271
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
3272
                fallthrough
1✔
3273
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
3274
                fallthrough
1✔
3275
        case errors.Is(err, graphdb.ErrEdgeNotFound):
4✔
3276
                // If the edge corresponding to this ChannelUpdate was not
4✔
3277
                // found in the graph, this might be a channel in the process
4✔
3278
                // of being opened, and we haven't processed our own
4✔
3279
                // ChannelAnnouncement yet, hence it is not not found in the
4✔
3280
                // graph. This usually gets resolved after the channel proofs
4✔
3281
                // are exchanged and the channel is broadcasted to the rest of
4✔
3282
                // the network, but in case this is a private channel this
4✔
3283
                // won't ever happen. This can also happen in the case of a
4✔
3284
                // zombie channel with a fresh update for which we don't have a
4✔
3285
                // ChannelAnnouncement for since we reject them. Because of
4✔
3286
                // this, we temporarily add it to a map, and reprocess it after
4✔
3287
                // our own ChannelAnnouncement has been processed.
4✔
3288
                //
4✔
3289
                // The shortChanID may be an alias, but it is fine to use here
4✔
3290
                // since we don't have an edge in the graph and if the peer is
4✔
3291
                // not buggy, we should be able to use it once the gossiper
4✔
3292
                // receives the local announcement.
4✔
3293
                pMsg := &processedNetworkMsg{msg: nMsg}
4✔
3294

4✔
3295
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
4✔
3296
                switch {
4✔
3297
                // Nothing in the cache yet, we can just directly insert this
3298
                // element.
3299
                case err == cache.ErrElementNotFound:
4✔
3300
                        _, _ = d.prematureChannelUpdates.Put(
4✔
3301
                                shortChanID, &cachedNetworkMsg{
4✔
3302
                                        msgs: []*processedNetworkMsg{pMsg},
4✔
3303
                                })
4✔
3304

3305
                // There's already something in the cache, so we'll combine the
3306
                // set of messages into a single value.
3307
                default:
2✔
3308
                        msgs := earlyMsgs.msgs
2✔
3309
                        msgs = append(msgs, pMsg)
2✔
3310
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3311
                                shortChanID, &cachedNetworkMsg{
2✔
3312
                                        msgs: msgs,
2✔
3313
                                })
2✔
3314
                }
3315

3316
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
4✔
3317
                        "(shortChanID=%v), saving for reprocessing later",
4✔
3318
                        shortChanID)
4✔
3319

4✔
3320
                // NOTE: We don't return anything on the error channel for this
4✔
3321
                // message, as we expect that will be done when this
4✔
3322
                // ChannelUpdate is later reprocessed. This might never happen
4✔
3323
                // if the corresponding ChannelAnnouncement is never received
4✔
3324
                // or the LRU cache is filled up and the entry is evicted.
4✔
3325
                return nil, false
4✔
3326

3327
        default:
×
3328
                err := fmt.Errorf("unable to validate channel update "+
×
3329
                        "short_chan_id=%v: %v", shortChanID, err)
×
3330
                log.Error(err)
×
3331
                nMsg.err <- err
×
3332

×
3333
                key := newRejectCacheKey(
×
3334
                        upd.GossipVersion(),
×
3335
                        upd.ShortChannelID.ToUint64(),
×
3336
                        sourceToPub(nMsg.source),
×
3337
                )
×
3338
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3339

×
3340
                return nil, false
×
3341
        }
3342

3343
        // The least-significant bit in the flag on the channel update
3344
        // announcement tells us "which" side of the channels directed edge is
3345
        // being updated.
3346
        var (
56✔
3347
                pubKey       *btcec.PublicKey
56✔
3348
                edgeToUpdate *models.ChannelEdgePolicy
56✔
3349
        )
56✔
3350
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
56✔
3351
        switch direction {
56✔
3352
        case 0:
40✔
3353
                pubKey, _ = chanInfo.NodeKey1()
40✔
3354
                edgeToUpdate = e1
40✔
3355
        case 1:
18✔
3356
                pubKey, _ = chanInfo.NodeKey2()
18✔
3357
                edgeToUpdate = e2
18✔
3358
        }
3359

3360
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
56✔
3361
                "edge policy=%v", chanInfo.ChannelID,
56✔
3362
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
56✔
3363

56✔
3364
        // Validate the channel announcement with the expected public key and
56✔
3365
        // channel capacity. In the case of an invalid channel update, we'll
56✔
3366
        // return an error to the caller and exit early.
56✔
3367
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
56✔
3368
        if err != nil {
60✔
3369
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3370
                        "announcement for short_chan_id=%v: %v",
4✔
3371
                        lnutils.SpewLogClosure(upd.ShortChannelID), err)
4✔
3372

4✔
3373
                log.Error(rErr)
4✔
3374
                nMsg.err <- rErr
4✔
3375
                return nil, false
4✔
3376
        }
4✔
3377

3378
        // If we have a previous version of the edge being updated, we'll want
3379
        // to rate limit its updates to prevent spam throughout the network.
3380
        if nMsg.isRemote && edgeToUpdate != nil {
71✔
3381
                // If it's a keep-alive update, we'll only propagate one if
19✔
3382
                // it's been a day since the previous. This follows our own
19✔
3383
                // heuristic of sending keep-alive updates after the same
19✔
3384
                // duration (see retransmitStaleAnns).
19✔
3385
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
19✔
3386
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
23✔
3387
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
7✔
3388
                                log.Debugf("Ignoring keep alive update not "+
3✔
3389
                                        "within %v period for channel %v",
3✔
3390
                                        d.cfg.RebroadcastInterval, shortChanID)
3✔
3391
                                nMsg.err <- nil
3✔
3392
                                return nil, false
3✔
3393
                        }
3✔
3394
                } else {
17✔
3395
                        // If it's not, we'll allow an update per minute with a
17✔
3396
                        // maximum burst of 10. If we haven't seen an update
17✔
3397
                        // for this channel before, we'll need to initialize a
17✔
3398
                        // rate limiter for each direction.
17✔
3399
                        //
17✔
3400
                        // Since the edge exists in the graph, we'll create a
17✔
3401
                        // rate limiter for chanInfo.ChannelID rather then the
17✔
3402
                        // SCID the peer sent. This is because there may be
17✔
3403
                        // multiple aliases for a channel and we may otherwise
17✔
3404
                        // rate-limit only a single alias of the channel,
17✔
3405
                        // instead of the whole channel.
17✔
3406
                        baseScid := chanInfo.ChannelID
17✔
3407
                        d.Lock()
17✔
3408
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
17✔
3409
                        if !ok {
21✔
3410
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
4✔
3411
                                b := d.cfg.MaxChannelUpdateBurst
4✔
3412
                                rls = [2]*rate.Limiter{
4✔
3413
                                        rate.NewLimiter(r, b),
4✔
3414
                                        rate.NewLimiter(r, b),
4✔
3415
                                }
4✔
3416
                                d.chanUpdateRateLimiter[baseScid] = rls
4✔
3417
                        }
4✔
3418
                        d.Unlock()
17✔
3419

17✔
3420
                        if !rls[direction].Allow() {
25✔
3421
                                log.Debugf("Rate limiting update for channel "+
8✔
3422
                                        "%v from direction %x", shortChanID,
8✔
3423
                                        pubKey.SerializeCompressed())
8✔
3424
                                nMsg.err <- nil
8✔
3425
                                return nil, false
8✔
3426
                        }
8✔
3427
                }
3428
        }
3429

3430
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3431
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3432
        // lookup the stored SCID. If we're sending the update, we'll always
3433
        // use the SCID stored in the database rather than a potentially
3434
        // different alias. This might mean that SigBytes is incorrect as it
3435
        // signs a different SCID than the database SCID, but since there will
3436
        // only be a difference if AuthProof == nil, this is fine.
3437
        update := &models.ChannelEdgePolicy{
45✔
3438
                SigBytes:                  upd.Signature.ToSignatureBytes(),
45✔
3439
                ChannelID:                 chanInfo.ChannelID,
45✔
3440
                LastUpdate:                timestamp,
45✔
3441
                MessageFlags:              upd.MessageFlags,
45✔
3442
                ChannelFlags:              upd.ChannelFlags,
45✔
3443
                TimeLockDelta:             upd.TimeLockDelta,
45✔
3444
                MinHTLC:                   upd.HtlcMinimumMsat,
45✔
3445
                MaxHTLC:                   upd.HtlcMaximumMsat,
45✔
3446
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
45✔
3447
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
45✔
3448
                InboundFee:                upd.InboundFee.ValOpt(),
45✔
3449
                ExtraOpaqueData:           upd.ExtraOpaqueData,
45✔
3450
        }
45✔
3451

45✔
3452
        if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
45✔
UNCOV
3453
                if graph.IsError(
×
UNCOV
3454
                        err, graph.ErrOutdated,
×
UNCOV
3455
                        graph.ErrIgnored,
×
UNCOV
3456
                ) {
×
UNCOV
3457

×
UNCOV
3458
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
3459
                                shortChanID, err)
×
UNCOV
3460
                } else {
×
UNCOV
3461
                        // Since we know the stored SCID in the graph, we'll
×
UNCOV
3462
                        // cache that SCID.
×
UNCOV
3463
                        key := newRejectCacheKey(
×
UNCOV
3464
                                upd.GossipVersion(),
×
UNCOV
3465
                                chanInfo.ChannelID,
×
UNCOV
3466
                                sourceToPub(nMsg.source),
×
UNCOV
3467
                        )
×
UNCOV
3468
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
3469

×
UNCOV
3470
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
3471
                                shortChanID, err)
×
UNCOV
3472
                }
×
3473

UNCOV
3474
                nMsg.err <- err
×
UNCOV
3475
                return nil, false
×
3476
        }
3477

3478
        // If this is a local ChannelUpdate without an AuthProof, it means it
3479
        // is an update to a channel that is not (yet) supposed to be announced
3480
        // to the greater network. However, our channel counter party will need
3481
        // to be given the update, so we'll try sending the update directly to
3482
        // the remote peer.
3483
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
58✔
3484
                if nMsg.optionalMsgFields != nil {
26✔
3485
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
13✔
3486
                        if remoteAlias != nil {
15✔
3487
                                // The remoteAlias field was specified, meaning
2✔
3488
                                // that we should replace the SCID in the
2✔
3489
                                // update with the remote's alias. We'll also
2✔
3490
                                // need to re-sign the channel update. This is
2✔
3491
                                // required for option-scid-alias feature-bit
2✔
3492
                                // negotiated channels.
2✔
3493
                                upd.ShortChannelID = *remoteAlias
2✔
3494

2✔
3495
                                sig, err := d.cfg.SignAliasUpdate(upd)
2✔
3496
                                if err != nil {
2✔
UNCOV
3497
                                        log.Error(err)
×
UNCOV
3498
                                        nMsg.err <- err
×
UNCOV
3499
                                        return nil, false
×
UNCOV
3500
                                }
×
3501

3502
                                lnSig, err := lnwire.NewSigFromSignature(sig)
2✔
3503
                                if err != nil {
2✔
UNCOV
3504
                                        log.Error(err)
×
UNCOV
3505
                                        nMsg.err <- err
×
UNCOV
3506
                                        return nil, false
×
UNCOV
3507
                                }
×
3508

3509
                                upd.Signature = lnSig
2✔
3510
                        }
3511
                }
3512

3513
                // Get our peer's public key.
3514
                remotePubKey := remotePubFromChanInfo(
13✔
3515
                        chanInfo, upd.ChannelFlags,
13✔
3516
                )
13✔
3517

13✔
3518
                log.Debugf("The message %v has no AuthProof, sending the "+
13✔
3519
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
13✔
3520

13✔
3521
                // Now we'll attempt to send the channel update message
13✔
3522
                // reliably to the remote peer in the background, so that we
13✔
3523
                // don't block if the peer happens to be offline at the moment.
13✔
3524
                err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
13✔
3525
                if err != nil {
13✔
UNCOV
3526
                        err := fmt.Errorf("unable to reliably send %v for "+
×
UNCOV
3527
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
UNCOV
3528
                                upd.ShortChannelID, remotePubKey, err)
×
UNCOV
3529
                        nMsg.err <- err
×
UNCOV
3530
                        return nil, false
×
UNCOV
3531
                }
×
3532
        }
3533

3534
        // Channel update announcement was successfully processed and now it
3535
        // can be broadcast to the rest of the network. However, we'll only
3536
        // broadcast the channel update announcement if it has an attached
3537
        // authentication proof. We also won't broadcast the update if it
3538
        // contains an alias because the network would reject this.
3539
        var announcements []networkMsg
45✔
3540
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
70✔
3541
                announcements = append(announcements, networkMsg{
25✔
3542
                        peer:     nMsg.peer,
25✔
3543
                        source:   nMsg.source,
25✔
3544
                        isRemote: nMsg.isRemote,
25✔
3545
                        msg:      upd,
25✔
3546
                })
25✔
3547
        }
25✔
3548

3549
        nMsg.err <- nil
45✔
3550

45✔
3551
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
45✔
3552
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
45✔
3553
                timestamp)
45✔
3554
        return announcements, true
45✔
3555
}
3556

3557
// handleAnnSig processes a new announcement signatures message.
3558
//
3559
//nolint:funlen
3560
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
3561
        nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
3562
        bool) {
23✔
3563

23✔
3564
        needBlockHeight := ann.ShortChannelID.BlockHeight +
23✔
3565
                d.cfg.ProofMatureDelta
23✔
3566
        shortChanID := ann.ShortChannelID.ToUint64()
23✔
3567

23✔
3568
        prefix := "local"
23✔
3569
        if nMsg.isRemote {
36✔
3570
                prefix = "remote"
13✔
3571
        }
13✔
3572

3573
        log.Infof("Received new %v announcement signature for %v", prefix,
23✔
3574
                ann.ShortChannelID)
23✔
3575

23✔
3576
        // By the specification, channel announcement proofs should be sent
23✔
3577
        // after some number of confirmations after channel was registered in
23✔
3578
        // bitcoin blockchain. Therefore, we check if the proof is mature.
23✔
3579
        d.Lock()
23✔
3580
        premature := d.isPremature(
23✔
3581
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
23✔
3582
        )
23✔
3583
        if premature {
25✔
3584
                log.Warnf("Premature proof announcement, current block height"+
2✔
3585
                        "lower than needed: %v < %v", d.bestHeight,
2✔
3586
                        needBlockHeight)
2✔
3587
                d.Unlock()
2✔
3588
                nMsg.err <- nil
2✔
3589
                return nil, false
2✔
3590
        }
2✔
3591
        d.Unlock()
23✔
3592

23✔
3593
        // Ensure that we know of a channel with the target channel ID before
23✔
3594
        // proceeding further.
23✔
3595
        //
23✔
3596
        // We must acquire the mutex for this channel ID before getting the
23✔
3597
        // channel from the database, to ensure what we read does not change
23✔
3598
        // before we call AddProof() later.
23✔
3599
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
23✔
3600
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
23✔
3601

23✔
3602
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
23✔
3603
                ann.ShortChannelID,
23✔
3604
        )
23✔
3605
        if err != nil {
26✔
3606
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
3✔
3607
                if err != nil {
5✔
3608
                        err := fmt.Errorf("unable to store the proof for "+
2✔
3609
                                "short_chan_id=%v: %v", shortChanID, err)
2✔
3610
                        log.Error(err)
2✔
3611
                        nMsg.err <- err
2✔
3612

2✔
3613
                        return nil, false
2✔
3614
                }
2✔
3615

3616
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
3✔
3617
                err := d.cfg.WaitingProofStore.Add(proof)
3✔
3618
                if err != nil {
3✔
UNCOV
3619
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3620
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3621
                        log.Error(err)
×
UNCOV
3622
                        nMsg.err <- err
×
UNCOV
3623
                        return nil, false
×
UNCOV
3624
                }
×
3625

3626
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
3✔
3627
                        ", adding to waiting batch", prefix, shortChanID)
3✔
3628
                nMsg.err <- nil
3✔
3629
                return nil, false
3✔
3630
        }
3631

3632
        nodeID := nMsg.source.SerializeCompressed()
22✔
3633
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
22✔
3634
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
22✔
3635

22✔
3636
        // Ensure that channel that was retrieved belongs to the peer which
22✔
3637
        // sent the proof announcement.
22✔
3638
        if !(isFirstNode || isSecondNode) {
22✔
UNCOV
3639
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3640
                        "to the peer which sent the proof, short_chan_id=%v",
×
3641
                        shortChanID)
×
3642
                log.Error(err)
×
3643
                nMsg.err <- err
×
UNCOV
3644
                return nil, false
×
UNCOV
3645
        }
×
3646

3647
        // If proof was sent by a local sub-system, then we'll send the
3648
        // announcement signature to the remote node so they can also
3649
        // reconstruct the full channel announcement.
3650
        if !nMsg.isRemote {
34✔
3651
                var remotePubKey [33]byte
12✔
3652
                if isFirstNode {
24✔
3653
                        remotePubKey = chanInfo.NodeKey2Bytes
12✔
3654
                } else {
14✔
3655
                        remotePubKey = chanInfo.NodeKey1Bytes
2✔
3656
                }
2✔
3657

3658
                // Since the remote peer might not be online we'll call a
3659
                // method that will attempt to deliver the proof when it comes
3660
                // online.
3661
                err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
12✔
3662
                if err != nil {
12✔
UNCOV
3663
                        err := fmt.Errorf("unable to reliably send %v for "+
×
UNCOV
3664
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3665
                                ann.ShortChannelID, remotePubKey, err)
×
3666
                        nMsg.err <- err
×
3667
                        return nil, false
×
3668
                }
×
3669
        }
3670

3671
        // Check if we already have the full proof for this channel.
3672
        if chanInfo.AuthProof != nil {
25✔
3673
                // If we already have the fully assembled proof, then the peer
3✔
3674
                // sending us their proof has probably not received our local
3✔
3675
                // proof yet. So be kind and send them the full proof.
3✔
3676
                if nMsg.isRemote {
6✔
3677
                        peerID := nMsg.source.SerializeCompressed()
3✔
3678
                        log.Debugf("Got AnnounceSignatures for channel with " +
3✔
3679
                                "full proof.")
3✔
3680

3✔
3681
                        d.wg.Add(1)
3✔
3682
                        go func() {
6✔
3683
                                defer d.wg.Done()
3✔
3684

3✔
3685
                                log.Debugf("Received half proof for channel "+
3✔
3686
                                        "%v with existing full proof. Sending"+
3✔
3687
                                        " full proof to peer=%x",
3✔
3688
                                        ann.ChannelID, peerID)
3✔
3689

3✔
3690
                                ca, _, _, err := netann.CreateChanAnnouncement(
3✔
3691
                                        chanInfo.AuthProof, chanInfo, e1, e2,
3✔
3692
                                )
3✔
3693
                                if err != nil {
3✔
UNCOV
3694
                                        log.Errorf("unable to gen ann: %v",
×
UNCOV
3695
                                                err)
×
UNCOV
3696
                                        return
×
UNCOV
3697
                                }
×
3698

3699
                                err = nMsg.peer.SendMessage(false, ca)
3✔
3700
                                if err != nil {
3✔
UNCOV
3701
                                        log.Errorf("Failed sending full proof"+
×
UNCOV
3702
                                                " to peer=%x: %v", peerID, err)
×
UNCOV
3703
                                        return
×
UNCOV
3704
                                }
×
3705

3706
                                log.Debugf("Full proof sent to peer=%x for "+
3✔
3707
                                        "chanID=%v", peerID, ann.ChannelID)
3✔
3708
                        }()
3709
                }
3710

3711
                log.Debugf("Already have proof for channel with chanID=%v",
3✔
3712
                        ann.ChannelID)
3✔
3713
                nMsg.err <- nil
3✔
3714
                return nil, true
3✔
3715
        }
3716

3717
        // Check that we received the opposite proof. If so, then we're now
3718
        // able to construct the full proof, and create the channel
3719
        // announcement. If we didn't receive the opposite half of the proof
3720
        // then we should store this one, and wait for the opposite to be
3721
        // received.
3722
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
21✔
3723
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
21✔
3724
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
21✔
UNCOV
3725
                err := fmt.Errorf("unable to get the opposite proof for "+
×
UNCOV
3726
                        "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3727
                log.Error(err)
×
UNCOV
3728
                nMsg.err <- err
×
UNCOV
3729
                return nil, false
×
UNCOV
3730
        }
×
3731

3732
        if err == channeldb.ErrWaitingProofNotFound {
32✔
3733
                err := d.cfg.WaitingProofStore.Add(proof)
11✔
3734
                if err != nil {
11✔
UNCOV
3735
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3736
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3737
                        log.Error(err)
×
UNCOV
3738
                        nMsg.err <- err
×
UNCOV
3739
                        return nil, false
×
UNCOV
3740
                }
×
3741

3742
                log.Infof("1/2 of channel ann proof received for "+
11✔
3743
                        "short_chan_id=%v, waiting for other half",
11✔
3744
                        shortChanID)
11✔
3745

11✔
3746
                nMsg.err <- nil
11✔
3747
                return nil, false
11✔
3748
        }
3749

3750
        // We now have both halves of the channel announcement proof, then
3751
        // we'll reconstruct the initial announcement so we can validate it
3752
        // shortly below.
3753
        var dbProof models.ChannelAuthProof
12✔
3754
        if isFirstNode {
15✔
3755
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
3✔
3756
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
3✔
3757
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
3✔
3758
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3✔
3759
        } else {
14✔
3760
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
11✔
3761
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
11✔
3762
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
11✔
3763
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
11✔
3764
        }
11✔
3765

3766
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
12✔
3767
                &dbProof, chanInfo, e1, e2,
12✔
3768
        )
12✔
3769
        if err != nil {
12✔
3770
                log.Error(err)
×
UNCOV
3771
                nMsg.err <- err
×
UNCOV
3772
                return nil, false
×
UNCOV
3773
        }
×
3774

3775
        // With all the necessary components assembled validate the full
3776
        // channel announcement proof.
3777
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
12✔
3778
        if err != nil {
12✔
UNCOV
3779
                err := fmt.Errorf("channel announcement proof for "+
×
UNCOV
3780
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
UNCOV
3781

×
UNCOV
3782
                log.Error(err)
×
UNCOV
3783
                nMsg.err <- err
×
UNCOV
3784
                return nil, false
×
UNCOV
3785
        }
×
3786

3787
        // If the channel was returned by the router it means that existence of
3788
        // funding point and inclusion of nodes bitcoin keys in it already
3789
        // checked by the router. In this stage we should check that node keys
3790
        // attest to the bitcoin keys by validating the signatures of
3791
        // announcement. If proof is valid then we'll populate the channel edge
3792
        // with it, so we can announce it on peer connect.
3793
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
12✔
3794
        if err != nil {
12✔
UNCOV
3795
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
UNCOV
3796
                        " %v", ann.ChannelID, err)
×
UNCOV
3797
                log.Error(err)
×
UNCOV
3798
                nMsg.err <- err
×
UNCOV
3799
                return nil, false
×
UNCOV
3800
        }
×
3801

3802
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
12✔
3803
        if err != nil {
12✔
UNCOV
3804
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
UNCOV
3805
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
UNCOV
3806
                log.Error(err)
×
UNCOV
3807
                nMsg.err <- err
×
UNCOV
3808
                return nil, false
×
UNCOV
3809
        }
×
3810

3811
        // Proof was successfully created and now can announce the channel to
3812
        // the remain network.
3813
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
12✔
3814
                ", adding to next ann batch", shortChanID)
12✔
3815

12✔
3816
        // Assemble the necessary announcements to add to the next broadcasting
12✔
3817
        // batch.
12✔
3818
        var announcements []networkMsg
12✔
3819
        announcements = append(announcements, networkMsg{
12✔
3820
                peer:   nMsg.peer,
12✔
3821
                source: nMsg.source,
12✔
3822
                msg:    chanAnn,
12✔
3823
        })
12✔
3824
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
23✔
3825
                announcements = append(announcements, networkMsg{
11✔
3826
                        peer:   nMsg.peer,
11✔
3827
                        source: src,
11✔
3828
                        msg:    e1Ann,
11✔
3829
                })
11✔
3830
        }
11✔
3831
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
22✔
3832
                announcements = append(announcements, networkMsg{
10✔
3833
                        peer:   nMsg.peer,
10✔
3834
                        source: src,
10✔
3835
                        msg:    e2Ann,
10✔
3836
                })
10✔
3837
        }
10✔
3838

3839
        // We'll also send along the node announcements for each channel
3840
        // participant if we know of them. To ensure our node announcement
3841
        // propagates to our channel counterparty, we'll set the source for
3842
        // each announcement to the node it belongs to, otherwise we won't send
3843
        // it since the source gets skipped. This isn't necessary for channel
3844
        // updates and announcement signatures since we send those directly to
3845
        // our channel counterparty through the gossiper's reliable sender.
3846
        node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
12✔
3847
        if err != nil {
16✔
3848
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3849
                        chanInfo.NodeKey1Bytes, err)
4✔
3850
        } else {
14✔
3851
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
20✔
3852
                        announcements = append(announcements, networkMsg{
10✔
3853
                                peer:   nMsg.peer,
10✔
3854
                                source: nodeKey1,
10✔
3855
                                msg:    node1Ann,
10✔
3856
                        })
10✔
3857
                }
10✔
3858
        }
3859

3860
        node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
12✔
3861
        if err != nil {
18✔
3862
                log.Debugf("Unable to fetch node announcement for %x: %v",
6✔
3863
                        chanInfo.NodeKey2Bytes, err)
6✔
3864
        } else {
14✔
3865
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
16✔
3866
                        announcements = append(announcements, networkMsg{
8✔
3867
                                peer:   nMsg.peer,
8✔
3868
                                source: nodeKey2,
8✔
3869
                                msg:    node2Ann,
8✔
3870
                        })
8✔
3871
                }
8✔
3872
        }
3873

3874
        nMsg.err <- nil
12✔
3875
        return announcements, true
12✔
3876
}
3877

3878
// isBanned returns true if the peer identified by pubkey is banned for sending
3879
// invalid channel announcements.
3880
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
211✔
3881
        return d.banman.isBanned(pubkey)
211✔
3882
}
211✔
3883

3884
// ShouldDisconnect returns true if we should disconnect the peer identified by
3885
// pubkey.
3886
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3887
        bool, error) {
209✔
3888

209✔
3889
        pubkeySer := pubkey.SerializeCompressed()
209✔
3890

209✔
3891
        var pubkeyBytes [33]byte
209✔
3892
        copy(pubkeyBytes[:], pubkeySer)
209✔
3893

209✔
3894
        // If the public key is banned, check whether or not this is a channel
209✔
3895
        // peer.
209✔
3896
        if d.isBanned(pubkeyBytes) {
211✔
3897
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3898
                if err != nil {
2✔
3899
                        return false, err
×
3900
                }
×
3901

3902
                // We should only disconnect non-channel peers.
3903
                if !isChanPeer {
3✔
3904
                        return true, nil
1✔
3905
                }
1✔
3906
        }
3907

3908
        return false, nil
208✔
3909
}
3910

3911
// validateFundingTransaction fetches the channel announcements claimed funding
3912
// transaction from chain to ensure that it exists, is not spent and matches
3913
// the channel announcement proof. The transaction's outpoint and value are
3914
// returned if we can glean them from the work done in this method.
3915
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
3916
        ann *lnwire.ChannelAnnouncement1,
3917
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3918
        []byte, error) {
232✔
3919

232✔
3920
        scid := ann.ShortChannelID
232✔
3921

232✔
3922
        // Before we can add the channel to the channel graph, we need to obtain
232✔
3923
        // the full funding outpoint that's encoded within the channel ID.
232✔
3924
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
232✔
3925
                d.cfg.ChainIO, scid, d.quit,
232✔
3926
        )
232✔
3927
        if err != nil {
233✔
3928
                //nolint:ll
1✔
3929
                //
1✔
3930
                // In order to ensure we don't erroneously mark a channel as a
1✔
3931
                // zombie due to an RPC failure, we'll attempt to string match
1✔
3932
                // for the relevant errors.
1✔
3933
                //
1✔
3934
                // * btcd:
1✔
3935
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
3936
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
3937
                // * bitcoind:
1✔
3938
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
3939
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
3940
                switch {
1✔
3941
                case strings.Contains(err.Error(), "not found"):
1✔
3942
                        fallthrough
1✔
3943

3944
                case strings.Contains(err.Error(), "out of range"):
1✔
3945
                        // If the funding transaction isn't found at all, then
1✔
3946
                        // we'll mark the edge itself as a zombie so we don't
1✔
3947
                        // continue to request it. We use the "zero key" for
1✔
3948
                        // both node pubkeys so this edge can't be resurrected.
1✔
3949
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
1✔
3950
                        if zErr != nil {
1✔
UNCOV
3951
                                return wire.OutPoint{}, 0, nil, zErr
×
UNCOV
3952
                        }
×
3953

UNCOV
3954
                default:
×
3955
                }
3956

3957
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
1✔
3958
                        ErrNoFundingTransaction, err)
1✔
3959
        }
3960

3961
        // Recreate witness output to be sure that declared in channel edge
3962
        // bitcoin keys and channel value corresponds to the reality.
3963
        fundingPkScript, err := makeFundingScript(
231✔
3964
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
231✔
3965
                tapscriptRoot,
231✔
3966
        )
231✔
3967
        if err != nil {
231✔
UNCOV
3968
                return wire.OutPoint{}, 0, nil, err
×
UNCOV
3969
        }
×
3970

3971
        // Next we'll validate that this channel is actually well formed. If
3972
        // this check fails, then this channel either doesn't exist, or isn't
3973
        // the one that was meant to be created according to the passed channel
3974
        // proofs.
3975
        fundingPoint, err := chanvalidate.Validate(
231✔
3976
                &chanvalidate.Context{
231✔
3977
                        Locator: &chanvalidate.ShortChanIDChanLocator{
231✔
3978
                                ID: scid,
231✔
3979
                        },
231✔
3980
                        MultiSigPkScript: fundingPkScript,
231✔
3981
                        FundingTx:        fundingTx,
231✔
3982
                },
231✔
3983
        )
231✔
3984
        if err != nil {
432✔
3985
                // Mark the edge as a zombie so we won't try to re-validate it
201✔
3986
                // on start up.
201✔
3987
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
201✔
3988
                if zErr != nil {
201✔
UNCOV
3989
                        return wire.OutPoint{}, 0, nil, zErr
×
UNCOV
3990
                }
×
3991

3992
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
201✔
3993
                        ErrInvalidFundingOutput, err)
201✔
3994
        }
3995

3996
        // Now that we have the funding outpoint of the channel, ensure
3997
        // that it hasn't yet been spent. If so, then this channel has
3998
        // been closed so we'll ignore it.
3999
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
30✔
4000
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
30✔
4001
        )
30✔
4002
        if err != nil {
32✔
4003
                if errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
4004
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
2✔
4005
                        if zErr != nil {
2✔
UNCOV
4006
                                return wire.OutPoint{}, 0, nil, zErr
×
UNCOV
4007
                        }
×
4008
                }
4009

4010
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
2✔
4011
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
2✔
4012
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
2✔
4013
        }
4014

4015
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
28✔
4016
                nil
28✔
4017
}
4018

4019
// handleBadPeer takes a misbehaving peer and increases its ban score. Once
4020
// increased, it will disconnect the peer if its ban score has reached
4021
// `banThreshold` and it doesn't have a channel with us.
4022
func (d *AuthenticatedGossiper) handleBadPeer(peer lnpeer.Peer) error {
206✔
4023
        // Increment the peer's ban score for misbehavior.
206✔
4024
        d.banman.incrementBanScore(peer.PubKey())
206✔
4025

206✔
4026
        // If the peer is banned and not a channel peer, we'll disconnect them.
206✔
4027
        shouldDc, dcErr := d.ShouldDisconnect(peer.IdentityKey())
206✔
4028
        if dcErr != nil {
206✔
UNCOV
4029
                log.Errorf("failed to check if we should disconnect peer: %v",
×
UNCOV
4030
                        dcErr)
×
UNCOV
4031

×
UNCOV
4032
                return dcErr
×
UNCOV
4033
        }
×
4034

4035
        if shouldDc {
207✔
4036
                peer.Disconnect(ErrPeerBanned)
1✔
4037
        }
1✔
4038

4039
        return nil
206✔
4040
}
4041

4042
// makeFundingScript is used to make the funding script for both segwit v0 and
4043
// segwit v1 (taproot) channels.
4044
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
4045
        features *lnwire.RawFeatureVector,
4046
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
231✔
4047

231✔
4048
        legacyFundingScript := func() ([]byte, error) {
462✔
4049
                witnessScript, err := input.GenMultiSigScript(
231✔
4050
                        bitcoinKey1, bitcoinKey2,
231✔
4051
                )
231✔
4052
                if err != nil {
231✔
UNCOV
4053
                        return nil, err
×
UNCOV
4054
                }
×
4055
                pkScript, err := input.WitnessScriptHash(witnessScript)
231✔
4056
                if err != nil {
231✔
UNCOV
4057
                        return nil, err
×
UNCOV
4058
                }
×
4059

4060
                return pkScript, nil
231✔
4061
        }
4062

4063
        if features.IsEmpty() {
462✔
4064
                return legacyFundingScript()
231✔
4065
        }
231✔
4066

4067
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
2✔
4068
        if chanFeatureBits.HasFeature(
2✔
4069
                lnwire.SimpleTaprootChannelsOptionalStaging,
2✔
4070
        ) {
4✔
4071

2✔
4072
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
2✔
4073
                if err != nil {
2✔
UNCOV
4074
                        return nil, err
×
UNCOV
4075
                }
×
4076
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
2✔
4077
                if err != nil {
2✔
UNCOV
4078
                        return nil, err
×
UNCOV
4079
                }
×
4080

4081
                fundingScript, _, err := input.GenTaprootFundingScript(
2✔
4082
                        pubKey1, pubKey2, 0, tapscriptRoot,
2✔
4083
                )
2✔
4084
                if err != nil {
2✔
UNCOV
4085
                        return nil, err
×
UNCOV
4086
                }
×
4087

4088
                // TODO(roasbeef): add tapscript root to gossip v1.5
4089

4090
                return fundingScript, nil
2✔
4091
        }
4092

UNCOV
4093
        return legacyFundingScript()
×
4094
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc