• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 23185093231

17 Mar 2026 08:25AM UTC coverage: 62.303% (-0.03%) from 62.33%
23185093231

push

github

web-flow
Merge pull request #10582 from ellemouton/g175-db-8

[g175] graph/db: add versioned range queries and complete v2 graph query migration

210 of 471 new or added lines in 11 files covered. (44.59%)

87 existing lines in 22 files now uncovered.

140872 of 226108 relevant lines covered (62.3%)

19402.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.12
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "iter"
8
        "net"
9
        "sync"
10
        "sync/atomic"
11
        "testing"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/batch"
18
        "github.com/lightningnetwork/lnd/graph/db/models"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/stretchr/testify/require"
22
)
23

24
// ErrChanGraphShuttingDown indicates that the ChannelGraph has shutdown or is
25
// busy shutting down.
26
var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down")
27

28
// ChannelGraph is a layer above the graph's CRUD layer.
29
type ChannelGraph struct {
30
        started atomic.Bool
31
        stopped atomic.Bool
32

33
        graphCache *GraphCache
34

35
        db Store
36
        *topologyManager
37

38
        quit chan struct{}
39
        wg   sync.WaitGroup
40
}
41

42
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
43
func NewChannelGraph(v1Store Store,
44
        options ...ChanGraphOption) (*ChannelGraph, error) {
188✔
45

188✔
46
        opts := defaultChanGraphOptions()
188✔
47
        for _, o := range options {
282✔
48
                o(opts)
94✔
49
        }
94✔
50

51
        g := &ChannelGraph{
188✔
52
                db:              v1Store,
188✔
53
                topologyManager: newTopologyManager(),
188✔
54
                quit:            make(chan struct{}),
188✔
55
        }
188✔
56

188✔
57
        // The graph cache can be turned off (e.g. for mobile users) for a
188✔
58
        // speed/memory usage tradeoff.
188✔
59
        if opts.useGraphCache {
343✔
60
                g.graphCache = NewGraphCache(opts.preAllocCacheNumNodes)
155✔
61
        }
155✔
62

63
        return g, nil
188✔
64
}
65

66
// Start kicks off any goroutines required for the ChannelGraph to function.
67
// If the graph cache is enabled, then it will be populated with the contents of
68
// the database.
69
func (c *ChannelGraph) Start() error {
301✔
70
        if !c.started.CompareAndSwap(false, true) {
414✔
71
                return nil
113✔
72
        }
113✔
73
        log.Debugf("ChannelGraph starting")
188✔
74
        defer log.Debug("ChannelGraph started")
188✔
75

188✔
76
        ctx := context.TODO()
188✔
77

188✔
78
        if c.graphCache != nil {
343✔
79
                if err := c.populateCache(ctx); err != nil {
155✔
80
                        return fmt.Errorf("could not populate the graph "+
×
81
                                "cache: %w", err)
×
82
                }
×
83
        }
84

85
        c.wg.Add(1)
188✔
86
        go c.handleTopologySubscriptions(ctx)
188✔
87

188✔
88
        return nil
188✔
89
}
90

91
// Stop signals any active goroutines for a graceful closure.
92
func (c *ChannelGraph) Stop() error {
301✔
93
        if !c.stopped.CompareAndSwap(false, true) {
414✔
94
                return nil
113✔
95
        }
113✔
96

97
        log.Debugf("ChannelGraph shutting down...")
188✔
98
        defer log.Debug("ChannelGraph shutdown complete")
188✔
99

188✔
100
        close(c.quit)
188✔
101
        c.wg.Wait()
188✔
102

188✔
103
        return nil
188✔
104
}
105

106
// handleTopologySubscriptions ensures that topology client subscriptions,
107
// subscription cancellations and topology notifications are handled
108
// synchronously.
109
//
110
// NOTE: this MUST be run in a goroutine.
111
func (c *ChannelGraph) handleTopologySubscriptions(ctx context.Context) {
188✔
112
        defer c.wg.Done()
188✔
113

188✔
114
        for {
5,833✔
115
                select {
5,645✔
116
                // A new fully validated topology update has just arrived.
117
                // We'll notify any registered clients.
118
                case update := <-c.topologyUpdate:
5,455✔
119
                        // TODO(elle): change topology handling to be handled
5,455✔
120
                        // synchronously so that we can guarantee the order of
5,455✔
121
                        // notification delivery.
5,455✔
122
                        c.wg.Add(1)
5,455✔
123
                        go c.handleTopologyUpdate(ctx, update)
5,455✔
124

125
                        // TODO(roasbeef): remove all unconnected vertexes
126
                        // after N blocks pass with no corresponding
127
                        // announcements.
128

129
                // A new notification client update has arrived. We're either
130
                // gaining a new client, or cancelling notifications for an
131
                // existing client.
132
                case ntfnUpdate := <-c.ntfnClientUpdates:
8✔
133
                        clientID := ntfnUpdate.clientID
8✔
134

8✔
135
                        if ntfnUpdate.cancel {
12✔
136
                                client, ok := c.topologyClients.LoadAndDelete(
4✔
137
                                        clientID,
4✔
138
                                )
4✔
139
                                if ok {
8✔
140
                                        close(client.exit)
4✔
141
                                        client.wg.Wait()
4✔
142

4✔
143
                                        close(client.ntfnChan)
4✔
144
                                }
4✔
145

146
                                continue
4✔
147
                        }
148

149
                        c.topologyClients.Store(clientID, &topologyClient{
7✔
150
                                ntfnChan: ntfnUpdate.ntfnChan,
7✔
151
                                exit:     make(chan struct{}),
7✔
152
                        })
7✔
153

154
                case <-ctx.Done():
×
155
                        return
×
156

157
                case <-c.quit:
188✔
158
                        return
188✔
159
                }
160
        }
161
}
162

163
// populateCache loads the entire channel graph into the in-memory graph cache.
164
//
165
// NOTE: This should only be called if the graphCache has been constructed.
166
func (c *ChannelGraph) populateCache(ctx context.Context) error {
155✔
167
        startTime := time.Now()
155✔
168
        log.Info("Populating in-memory channel graph, this might take a " +
155✔
169
                "while...")
155✔
170

155✔
171
        for _, v := range []lnwire.GossipVersion{
155✔
172
                gossipV1, gossipV2,
155✔
173
        } {
462✔
174
                // TODO(elle): If we have both v1 and v2 entries for the same
307✔
175
                // node/channel, prefer v2 when merging.
307✔
176
                err := c.db.ForEachNodeCacheable(ctx, v,
307✔
177
                        func(node route.Vertex,
307✔
178
                                features *lnwire.FeatureVector) error {
410✔
179

103✔
180
                                c.graphCache.AddNodeFeatures(node, features)
103✔
181

103✔
182
                                return nil
103✔
183
                        }, func() {},
258✔
184
                )
185
                if err != nil && !errors.Is(
307✔
186
                        err, ErrVersionNotSupportedForKVDB,
307✔
187
                ) {
307✔
188

×
189
                        return err
×
190
                }
×
191

192
                err = c.db.ForEachChannelCacheable(
307✔
193
                        ctx, v, func(info *models.CachedEdgeInfo,
307✔
194
                                policy1,
307✔
195
                                policy2 *models.CachedEdgePolicy) error {
706✔
196

399✔
197
                                c.graphCache.AddChannel(info, policy1, policy2)
399✔
198

399✔
199
                                return nil
399✔
200
                        }, func() {},
554✔
201
                )
202
                if err != nil &&
307✔
203
                        !errors.Is(err, ErrVersionNotSupportedForKVDB) {
307✔
204

×
205
                        return err
×
206
                }
×
207
        }
208

209
        log.Infof("Finished populating in-memory channel graph (took %v, %s)",
155✔
210
                time.Since(startTime), c.graphCache.Stats())
155✔
211

155✔
212
        return nil
155✔
213
}
214

215
// ForEachNodeDirectedChannel iterates through all channels of a given node,
216
// executing the passed callback on the directed edge representing the channel
217
// and its incoming policy. If the callback returns an error, then the iteration
218
// is halted with the error propagated back up to the caller. If the graphCache
219
// is available, then it will be used to retrieve the node's channels instead
220
// of the database.
221
//
222
// Unknown policies are passed into the callback as nil values.
223
//
224
// NOTE: this is part of the graphdb.NodeTraverser interface.
225
func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context,
226
        node route.Vertex, cb func(channel *DirectedChannel) error,
UNCOV
227
        reset func()) error {
×
UNCOV
228

×
UNCOV
229
        if c.graphCache != nil {
×
UNCOV
230
                return c.graphCache.ForEachChannel(node, cb)
×
UNCOV
231
        }
×
232

233
        // TODO(elle): once the no-cache path needs to support
234
        // pathfinding across gossip versions, this should iterate
235
        // across all versions rather than defaulting to v1.
UNCOV
236
        return c.db.ForEachNodeDirectedChannel(
×
UNCOV
237
                ctx, gossipV1, node, cb, reset,
×
UNCOV
238
        )
×
239
}
240

241
// FetchNodeFeatures returns the features of the given node. If no features are
242
// known for the node, an empty feature vector is returned.
243
// If the graphCache is available, then it will be used to retrieve the node's
244
// features instead of the database.
245
//
246
// NOTE: this is part of the graphdb.NodeTraverser interface.
247
func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context,
UNCOV
248
        node route.Vertex) (*lnwire.FeatureVector, error) {
×
UNCOV
249

×
UNCOV
250
        if c.graphCache != nil {
×
UNCOV
251
                return c.graphCache.GetFeatures(node), nil
×
UNCOV
252
        }
×
253

UNCOV
254
        return c.db.FetchNodeFeatures(ctx, lnwire.GossipVersion1, node)
×
255
}
256

257
// GraphSession will provide the call-back with access to a NodeTraverser
258
// instance which can be used to perform queries against the channel graph. If
259
// the graph cache is not enabled, then the call-back will be provided with
260
// access to the graph via a consistent read-only transaction.
261
func (c *ChannelGraph) GraphSession(ctx context.Context,
UNCOV
262
        cb func(graph NodeTraverser) error, reset func()) error {
×
UNCOV
263

×
UNCOV
264
        if c.graphCache != nil {
×
UNCOV
265
                return cb(c)
×
UNCOV
266
        }
×
267

UNCOV
268
        return c.db.GraphSession(ctx, cb, reset)
×
269
}
270

271
// ForEachNodeCached iterates through all the stored vertices/nodes in the
272
// graph, executing the passed callback with each node encountered.
273
//
274
// NOTE: The callback contents MUST not be modified.
275
func (c *ChannelGraph) ForEachNodeCached(ctx context.Context,
276
        v lnwire.GossipVersion, withAddrs bool,
277
        cb func(ctx context.Context, node route.Vertex, addrs []net.Addr,
278
                chans map[uint64]*DirectedChannel) error, reset func()) error {
120✔
279

120✔
280
        if !withAddrs && c.graphCache != nil {
120✔
281
                return c.graphCache.ForEachNode(
×
282
                        func(node route.Vertex,
×
283
                                channels map[uint64]*DirectedChannel) error {
×
284

×
285
                                return cb(ctx, node, nil, channels)
×
286
                        },
×
287
                )
288
        }
289

290
        return c.db.ForEachNodeCached(ctx, v, withAddrs, cb, reset)
120✔
291
}
292

293
// AddNode adds a vertex/node to the graph database. If the node is not
294
// in the database from before, this will add a new, unconnected one to the
295
// graph. If it is present from before, this will update that node's
296
// information. Note that this method is expected to only be called to update an
297
// already present node from a node announcement, or to insert a node found in a
298
// channel update.
299
func (c *ChannelGraph) AddNode(ctx context.Context,
300
        node *models.Node, op ...batch.SchedulerOption) error {
977✔
301

977✔
302
        err := c.db.AddNode(ctx, node, op...)
977✔
303
        if err != nil {
977✔
304
                return err
×
305
        }
×
306

307
        if c.graphCache != nil {
1,800✔
308
                c.graphCache.AddNodeFeatures(
823✔
309
                        node.PubKeyBytes, node.Features,
823✔
310
                )
823✔
311
        }
823✔
312

313
        select {
977✔
314
        case c.topologyUpdate <- node:
977✔
315
        case <-c.quit:
×
316
                return ErrChanGraphShuttingDown
×
317
        }
318

319
        return nil
977✔
320
}
321

322
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
323
// undirected edge from the two target nodes are created. The information stored
324
// denotes the static attributes of the channel, such as the channelID, the keys
325
// involved in creation of the channel, and the set of features that the channel
326
// supports. The chanPoint and chanID are used to uniquely identify the edge
327
// globally within the database.
328
func (c *ChannelGraph) AddChannelEdge(ctx context.Context,
329
        edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error {
1,827✔
330

1,827✔
331
        err := c.db.AddChannelEdge(ctx, edge, op...)
1,827✔
332
        if err != nil {
2,064✔
333
                return err
237✔
334
        }
237✔
335

336
        if c.graphCache != nil {
2,990✔
337
                c.graphCache.AddChannel(models.NewCachedEdge(edge), nil, nil)
1,400✔
338
        }
1,400✔
339

340
        select {
1,590✔
341
        case c.topologyUpdate <- edge:
1,590✔
342
        case <-c.quit:
×
343
                return ErrChanGraphShuttingDown
×
344
        }
345

346
        return nil
1,590✔
347
}
348

349
// MarkEdgeLive clears an edge from our zombie index for the given gossip
350
// version, deeming it as live. If the cache is enabled, the edge will be added
351
// back to the graph cache if we still have a record of this channel in the DB.
352
func (c *ChannelGraph) MarkEdgeLive(ctx context.Context,
353
        v lnwire.GossipVersion, chanID uint64) error {
2✔
354

2✔
355
        err := c.db.MarkEdgeLive(ctx, v, chanID)
2✔
356
        if err != nil {
3✔
357
                return err
1✔
358
        }
1✔
359

360
        if c.graphCache != nil {
2✔
361
                // We need to add the channel back into our graph cache,
1✔
362
                // otherwise we won't use it for path finding.
1✔
363
                infos, err := c.db.FetchChanInfos(ctx, v, []uint64{chanID})
1✔
364
                if err != nil {
1✔
365
                        return err
×
366
                }
×
367

368
                if len(infos) == 0 {
2✔
369
                        return nil
1✔
370
                }
1✔
371

372
                info := infos[0]
×
373

×
374
                var policy1, policy2 *models.CachedEdgePolicy
×
375
                if info.Policy1 != nil {
×
376
                        policy1 = models.NewCachedPolicy(info.Policy1)
×
377
                }
×
378
                if info.Policy2 != nil {
×
379
                        policy2 = models.NewCachedPolicy(info.Policy2)
×
380
                }
×
381

382
                c.graphCache.AddChannel(
×
383
                        models.NewCachedEdge(info.Info), policy1, policy2,
×
384
                )
×
385
        }
386

387
        return nil
×
388
}
389

390
// DeleteChannelEdges removes edges with the given channel IDs from the
391
// database and marks them as zombies. This ensures that we're unable to re-add
392
// it to our database once again. If an edge does not exist within the
393
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
394
// true, then when we mark these edges as zombies, we'll set up the keys such
395
// that we require the node that failed to send the fresh update to be the one
396
// that resurrects the channel from its zombie state. The markZombie bool
397
// denotes whether to mark the channel as a zombie.
398
func (c *ChannelGraph) DeleteChannelEdges(ctx context.Context,
399
        v lnwire.GossipVersion, strictZombiePruning, markZombie bool,
400
        chanIDs ...uint64) error {
141✔
401

141✔
402
        infos, err := c.db.DeleteChannelEdges(
141✔
403
                ctx, v, strictZombiePruning, markZombie, chanIDs...,
141✔
404
        )
141✔
405
        if err != nil {
198✔
406
                return err
57✔
407
        }
57✔
408

409
        if c.graphCache != nil {
168✔
410
                for _, info := range infos {
111✔
411
                        c.graphCache.RemoveChannel(
27✔
412
                                info.NodeKey1Bytes, info.NodeKey2Bytes,
27✔
413
                                info.ChannelID,
27✔
414
                        )
27✔
415
                }
27✔
416
        }
417

418
        return err
84✔
419
}
420

421
// DisconnectBlockAtHeight is used to indicate that the block specified
422
// by the passed height has been disconnected from the main chain. This
423
// will "rewind" the graph back to the height below, deleting channels
424
// that are no longer confirmed from the graph. The prune log will be
425
// set to the last prune height valid for the remaining chain.
426
// Channels that were removed from the graph resulting from the
427
// disconnected block are returned.
428
func (c *ChannelGraph) DisconnectBlockAtHeight(ctx context.Context,
429
        height uint32) ([]*models.ChannelEdgeInfo, error) {
159✔
430

159✔
431
        edges, err := c.db.DisconnectBlockAtHeight(ctx, height)
159✔
432
        if err != nil {
159✔
433
                return nil, err
×
434
        }
×
435

436
        if c.graphCache != nil {
318✔
437
                for _, edge := range edges {
246✔
438
                        c.graphCache.RemoveChannel(
87✔
439
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
87✔
440
                                edge.ChannelID,
87✔
441
                        )
87✔
442
                }
87✔
443
        }
444

445
        return edges, nil
159✔
446
}
447

448
// PruneGraph prunes newly closed channels from the channel graph in response
449
// to a new block being solved on the network. Any transactions which spend the
450
// funding output of any known channels within he graph will be deleted.
451
// Additionally, the "prune tip", or the last block which has been used to
452
// prune the graph is stored so callers can ensure the graph is fully in sync
453
// with the current UTXO state. A slice of channels that have been closed by
454
// the target block are returned if the function succeeds without error.
455
func (c *ChannelGraph) PruneGraph(ctx context.Context,
456
        spentOutputs []*wire.OutPoint,
457
        blockHash *chainhash.Hash, blockHeight uint32) (
458
        []*models.ChannelEdgeInfo, error) {
252✔
459

252✔
460
        edges, nodes, err := c.db.PruneGraph(
252✔
461
                ctx, spentOutputs, blockHash, blockHeight,
252✔
462
        )
252✔
463
        if err != nil {
252✔
464
                return nil, err
×
465
        }
×
466

467
        if c.graphCache != nil {
504✔
468
                for _, edge := range edges {
281✔
469
                        c.graphCache.RemoveChannel(
29✔
470
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
29✔
471
                                edge.ChannelID,
29✔
472
                        )
29✔
473
                }
29✔
474

475
                for _, node := range nodes {
314✔
476
                        c.graphCache.RemoveNode(node)
62✔
477
                }
62✔
478

479
                log.Debugf("Pruned graph, cache now has %s",
252✔
480
                        c.graphCache.Stats())
252✔
481
        }
482

483
        if len(edges) != 0 {
276✔
484
                // Notify all currently registered clients of the newly closed
24✔
485
                // channels.
24✔
486
                closeSummaries := createCloseSummaries(
24✔
487
                        blockHeight, edges...,
24✔
488
                )
24✔
489

24✔
490
                select {
24✔
491
                case c.topologyUpdate <- closeSummaries:
24✔
492
                case <-c.quit:
×
493
                        return nil, ErrChanGraphShuttingDown
×
494
                }
495
        }
496

497
        return edges, nil
252✔
498
}
499

500
// PruneGraphNodes is a garbage collection method which attempts to prune out
501
// any nodes from the channel graph that are currently unconnected. This ensure
502
// that we only maintain a graph of reachable nodes. In the event that a pruned
503
// node gains more channels, it will be re-added back to the graph.
504
func (c *ChannelGraph) PruneGraphNodes(ctx context.Context) error {
26✔
505
        nodes, err := c.db.PruneGraphNodes(ctx)
26✔
506
        if err != nil {
26✔
507
                return err
×
508
        }
×
509

510
        if c.graphCache != nil {
52✔
511
                for _, node := range nodes {
33✔
512
                        c.graphCache.RemoveNode(node)
7✔
513
                }
7✔
514
        }
515

516
        return nil
26✔
517
}
518

519
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
520
// ID's that we don't know and are not known zombies of the passed set. In other
521
// words, we perform a set difference of our set of chan ID's and the ones
522
// passed in. This method can be used by callers to determine the set of
523
// channels another peer knows of that we don't.
524
func (c *ChannelGraph) FilterKnownChanIDs(ctx context.Context,
525
        chansInfo []ChannelUpdateInfo,
526
        isZombieChan func(ChannelUpdateInfo) bool) ([]uint64, error) {
129✔
527

129✔
528
        unknown, knownZombies, err := c.db.FilterKnownChanIDs(ctx, chansInfo)
129✔
529
        if err != nil {
129✔
530
                return nil, err
×
531
        }
×
532

533
        for _, info := range knownZombies {
172✔
534
                // TODO(ziggie): Make sure that for the strict pruning case we
43✔
535
                // compare the pubkeys and whether the right timestamp is not
43✔
536
                // older than the `ChannelPruneExpiry`.
43✔
537
                //
43✔
538
                // NOTE: The timestamp data has no verification attached to it
43✔
539
                // in the `ReplyChannelRange` msg so we are trusting this data
43✔
540
                // at this point. However it is not critical because we are just
43✔
541
                // removing the channel from the db when the timestamps are more
43✔
542
                // recent. During the querying of the gossip msg verification
43✔
543
                // happens as usual. However we should start punishing peers
43✔
544
                // when they don't provide us honest data ?
43✔
545
                if isZombieChan(info) {
67✔
546
                        continue
24✔
547
                }
548

549
                // If we have marked it as a zombie but the latest update
550
                // info could bring it back from the dead, then we mark it
551
                // alive, and we let it be added to the set of IDs to query our
552
                // peer for.
553
                err := c.db.MarkEdgeLive(
19✔
554
                        ctx, info.Version,
19✔
555
                        info.ShortChannelID.ToUint64(),
19✔
556
                )
19✔
557
                // Since there is a chance that the edge could have been marked
19✔
558
                // as "live" between the FilterKnownChanIDs call and the
19✔
559
                // MarkEdgeLive call, we ignore the error if the edge is already
19✔
560
                // marked as live.
19✔
561
                if err != nil && !errors.Is(err, ErrZombieEdgeNotFound) {
19✔
562
                        return nil, err
×
563
                }
×
564
        }
565

566
        return unknown, nil
129✔
567
}
568

569
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
570
// zombie for the given gossip version. This method is used on an ad-hoc basis,
571
// when channels need to be marked as zombies outside the normal pruning cycle.
572
func (c *ChannelGraph) MarkEdgeZombie(ctx context.Context,
573
        v lnwire.GossipVersion, chanID uint64,
574
        pubKey1, pubKey2 [33]byte) error {
127✔
575

127✔
576
        err := c.db.MarkEdgeZombie(ctx, v, chanID, pubKey1, pubKey2)
127✔
577
        if err != nil {
127✔
578
                return err
×
579
        }
×
580

581
        if c.graphCache != nil {
254✔
582
                c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
127✔
583
        }
127✔
584

585
        return nil
127✔
586
}
587

588
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
589
// within the database for the referenced channel. The `flags` attribute within
590
// the ChannelEdgePolicy determines which of the directed edges are being
591
// updated. If the flag is 1, then the first node's information is being
592
// updated, otherwise it's the second node's information. The node ordering is
593
// determined by the lexicographical ordering of the identity public keys of the
594
// nodes on either side of the channel.
595
func (c *ChannelGraph) UpdateEdgePolicy(ctx context.Context,
596
        edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {
2,878✔
597

2,878✔
598
        from, to, err := c.db.UpdateEdgePolicy(ctx, edge, op...)
2,878✔
599
        if err != nil {
2,883✔
600
                return err
5✔
601
        }
5✔
602

603
        if c.graphCache != nil {
5,361✔
604
                c.graphCache.UpdatePolicy(
2,488✔
605
                        models.NewCachedPolicy(edge), from, to,
2,488✔
606
                )
2,488✔
607
        }
2,488✔
608

609
        select {
2,873✔
610
        case c.topologyUpdate <- edge:
2,873✔
611
        case <-c.quit:
×
612
                return ErrChanGraphShuttingDown
×
613
        }
614

615
        return nil
2,873✔
616
}
617

618
// ForEachNodeChannel iterates through all channels of the given node.
619
func (c *ChannelGraph) ForEachNodeChannel(ctx context.Context,
620
        v lnwire.GossipVersion, nodePub route.Vertex,
621
        cb func(*models.ChannelEdgeInfo,
622
                *models.ChannelEdgePolicy,
623
                *models.ChannelEdgePolicy) error, reset func()) error {
4✔
624

4✔
625
        return c.db.ForEachNodeChannel(ctx, v, nodePub, cb, reset)
4✔
626
}
4✔
627

628
// ForEachNodeCacheable iterates through all stored vertices/nodes in the graph.
629
func (c *ChannelGraph) ForEachNodeCacheable(ctx context.Context,
630
        v lnwire.GossipVersion, cb func(route.Vertex,
631
                *lnwire.FeatureVector) error, reset func()) error {
4✔
632

4✔
633
        return c.db.ForEachNodeCacheable(ctx, v, cb, reset)
4✔
634
}
4✔
635

636
// NodeUpdatesInHorizon returns all known lightning nodes with updates in the
637
// range.
638
func (c *ChannelGraph) NodeUpdatesInHorizon(ctx context.Context,
639
        startTime, endTime time.Time,
640
        opts ...IteratorOption) iter.Seq2[*models.Node, error] {
49✔
641

49✔
642
        return c.db.NodeUpdatesInHorizon(ctx, startTime, endTime, opts...)
49✔
643
}
49✔
644

645
// HasV1Node determines if the graph has a vertex identified by the target node
646
// in the V1 graph.
647
func (c *ChannelGraph) HasV1Node(ctx context.Context,
648
        nodePub [33]byte) (time.Time, bool, error) {
13✔
649

13✔
650
        return c.db.HasV1Node(ctx, nodePub)
13✔
651
}
13✔
652

653
// ForEachChannel iterates through all channel edges stored within the graph.
654
func (c *ChannelGraph) ForEachChannel(ctx context.Context,
655
        v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo,
656
                *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error,
657
        reset func()) error {
4✔
658

4✔
659
        return c.db.ForEachChannel(ctx, v, cb, reset)
4✔
660
}
4✔
661

662
// DisabledChannelIDs returns the channel ids of disabled channels.
663
func (c *ChannelGraph) DisabledChannelIDs(ctx context.Context,
664
        v lnwire.GossipVersion) (
665
        []uint64, error) {
2✔
666

2✔
667
        return c.db.DisabledChannelIDs(ctx, v)
2✔
668
}
2✔
669

670
// HasV1ChannelEdge returns true if the database knows of a channel edge.
671
func (c *ChannelGraph) HasV1ChannelEdge(ctx context.Context,
672
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
15✔
673

15✔
674
        return c.db.HasV1ChannelEdge(ctx, chanID)
15✔
675
}
15✔
676

677
// HasChannelEdge returns true if the database knows of a channel edge.
678
func (c *ChannelGraph) HasChannelEdge(ctx context.Context,
679
        v lnwire.GossipVersion, chanID uint64) (bool, bool, error) {
136✔
680

136✔
681
        return c.db.HasChannelEdge(ctx, v, chanID)
136✔
682
}
136✔
683

684
// AddEdgeProof sets the proof of an existing edge in the graph database.
685
func (c *ChannelGraph) AddEdgeProof(ctx context.Context,
686
        chanID lnwire.ShortChannelID, proof *models.ChannelAuthProof) error {
5✔
687

5✔
688
        return c.db.AddEdgeProof(ctx, chanID, proof)
5✔
689
}
5✔
690

691
// HighestChanID returns the "highest" known channel ID in the channel graph.
692
func (c *ChannelGraph) HighestChanID(ctx context.Context,
693
        v lnwire.GossipVersion) (uint64, error) {
×
694

×
695
        return c.db.HighestChanID(ctx, v)
×
696
}
×
697

698
// ChanUpdatesInHorizon returns all known channel edges with updates in the
699
// horizon.
700
func (c *ChannelGraph) ChanUpdatesInHorizon(ctx context.Context,
701
        startTime, endTime time.Time,
702
        opts ...IteratorOption) iter.Seq2[ChannelEdge, error] {
147✔
703

147✔
704
        return c.db.ChanUpdatesInHorizon(ctx, startTime, endTime, opts...)
147✔
705
}
147✔
706

707
// FilterChannelRange returns channel IDs within the passed block height range
708
// for the given gossip version.
709
func (c *ChannelGraph) FilterChannelRange(ctx context.Context,
710
        v lnwire.GossipVersion, startHeight, endHeight uint32,
711
        withTimestamps bool) ([]BlockChannelRange, error) {
11✔
712

11✔
713
        return c.db.FilterChannelRange(
11✔
714
                ctx, v, startHeight, endHeight, withTimestamps,
11✔
715
        )
11✔
716
}
11✔
717

718
// FilterChannelRange returns channel IDs within the passed block height range
719
// for this graph's gossip version.
720
func (c *VersionedGraph) FilterChannelRange(ctx context.Context,
721
        startHeight, endHeight uint32,
722
        withTimestamps bool) ([]BlockChannelRange, error) {
3✔
723

3✔
724
        return c.db.FilterChannelRange(
3✔
725
                ctx, c.v, startHeight, endHeight, withTimestamps,
3✔
726
        )
3✔
727
}
3✔
728

729
// FetchChanInfos returns the set of channel edges for the passed channel IDs.
730
func (c *ChannelGraph) FetchChanInfos(ctx context.Context,
731
        v lnwire.GossipVersion, chanIDs []uint64) ([]ChannelEdge, error) {
×
732

×
733
        return c.db.FetchChanInfos(ctx, v, chanIDs)
×
734
}
×
735

736
// FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding
737
// outpoint.
738
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(ctx context.Context,
739
        op *wire.OutPoint) (
740
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
741
        *models.ChannelEdgePolicy, error) {
12✔
742

12✔
743
        return c.db.FetchChannelEdgesByOutpoint(
12✔
744
                ctx, lnwire.GossipVersion1, op,
12✔
745
        )
12✔
746
}
12✔
747

748
// FetchChannelEdgesByID attempts to lookup directed edges by channel ID.
749
func (c *ChannelGraph) FetchChannelEdgesByID(ctx context.Context,
750
        chanID uint64) (
751
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
752
        *models.ChannelEdgePolicy, error) {
2,889✔
753

2,889✔
754
        return c.db.FetchChannelEdgesByID(
2,889✔
755
                ctx, lnwire.GossipVersion1, chanID,
2,889✔
756
        )
2,889✔
757
}
2,889✔
758

759
// PutClosedScid stores a SCID for a closed channel in the database.
760
func (c *ChannelGraph) PutClosedScid(ctx context.Context,
761
        scid lnwire.ShortChannelID) error {
1✔
762

1✔
763
        return c.db.PutClosedScid(ctx, scid)
1✔
764
}
1✔
765

766
// IsClosedScid checks whether a channel identified by the scid is closed.
767
func (c *ChannelGraph) IsClosedScid(ctx context.Context,
768
        scid lnwire.ShortChannelID) (bool, error) {
5✔
769

5✔
770
        return c.db.IsClosedScid(ctx, scid)
5✔
771
}
5✔
772

773
// SetSourceNode sets the source node within the graph database.
774
func (c *ChannelGraph) SetSourceNode(ctx context.Context,
775
        node *models.Node) error {
120✔
776

120✔
777
        return c.db.SetSourceNode(ctx, node)
120✔
778
}
120✔
779

780
// PruneTip returns the block height and hash of the latest pruning block.
781
func (c *ChannelGraph) PruneTip(ctx context.Context) (*chainhash.Hash,
782
        uint32, error) {
56✔
783

56✔
784
        return c.db.PruneTip(ctx)
56✔
785
}
56✔
786

787
// VersionedGraph is a wrapper around ChannelGraph that will call underlying
788
// Store methods with a specific gossip version.
789
type VersionedGraph struct {
790
        *ChannelGraph
791
        v lnwire.GossipVersion
792
}
793

794
// NewVersionedGraph creates a new VersionedGraph.
795
func NewVersionedGraph(c *ChannelGraph,
796
        v lnwire.GossipVersion) *VersionedGraph {
189✔
797

189✔
798
        return &VersionedGraph{
189✔
799
                ChannelGraph: c,
189✔
800
                v:            v,
189✔
801
        }
189✔
802
}
189✔
803

804
// FetchNodeFeatures returns the features of the given node. If no features are
805
// known for the node, an empty feature vector is returned. If the graphCache is
806
// available, it will be used instead of the database.
807
//
808
// NOTE: This is part of the graphdb.NodeTraverser interface.
809
func (c *VersionedGraph) FetchNodeFeatures(ctx context.Context,
810
        node route.Vertex) (*lnwire.FeatureVector, error) {
466✔
811

466✔
812
        if c.graphCache != nil {
932✔
813
                return c.graphCache.GetFeatures(node), nil
466✔
814
        }
466✔
815

816
        return c.db.FetchNodeFeatures(ctx, c.v, node)
3✔
817
}
818

819
// ForEachNodeDirectedChannel iterates through all channels of a given node,
820
// executing the passed callback on the directed edge representing the channel
821
// and its incoming policy. If the graphCache is available, it will be used
822
// instead of the database.
823
//
824
// NOTE: This is part of the graphdb.NodeTraverser interface.
825
func (c *VersionedGraph) ForEachNodeDirectedChannel(ctx context.Context,
826
        node route.Vertex, cb func(channel *DirectedChannel) error,
827
        reset func()) error {
508✔
828

508✔
829
        if c.graphCache != nil {
1,016✔
830
                return c.graphCache.ForEachChannel(node, cb)
508✔
831
        }
508✔
832

833
        return c.db.ForEachNodeDirectedChannel(ctx, c.v, node, cb, reset)
3✔
834
}
835

836
// ForEachNodeCached iterates through all stored vertices/nodes in the graph,
837
// delegating to the embedded ChannelGraph.
838
func (c *VersionedGraph) ForEachNodeCached(ctx context.Context,
839
        withAddrs bool, cb func(ctx context.Context, node route.Vertex,
840
                addrs []net.Addr,
841
                chans map[uint64]*DirectedChannel) error,
842
        reset func()) error {
119✔
843

119✔
844
        return c.ChannelGraph.ForEachNodeCached(ctx, c.v, withAddrs, cb, reset)
119✔
845
}
119✔
846

847
// ForEachNode iterates through all stored vertices/nodes in the graph.
848
func (c *VersionedGraph) ForEachNode(ctx context.Context,
849
        cb func(*models.Node) error, reset func()) error {
8✔
850

8✔
851
        return c.db.ForEachNode(ctx, c.v, cb, reset)
8✔
852
}
8✔
853

854
// NumZombies returns the current number of zombie channels in the graph.
855
func (c *VersionedGraph) NumZombies(ctx context.Context) (uint64, error) {
4✔
856
        return c.db.NumZombies(ctx, c.v)
4✔
857
}
4✔
858

859
// NodeUpdatesInHorizon returns all known lightning nodes which have an update
860
// timestamp within the passed range.
861
func (c *VersionedGraph) NodeUpdatesInHorizon(ctx context.Context,
862
        startTime, endTime time.Time,
863
        opts ...IteratorOption) iter.Seq2[*models.Node, error] {
5✔
864

5✔
865
        return c.db.NodeUpdatesInHorizon(ctx, startTime, endTime, opts...)
5✔
866
}
5✔
867

868
// ChannelView returns the verifiable edge information for each active channel.
869
func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint,
870
        error) {
27✔
871

27✔
872
        return c.db.ChannelView(ctx, c.v)
27✔
873
}
27✔
874

875
// GraphSession provides the callback with access to a NodeTraverser instance
876
// for performing queries against the channel graph. If the graph cache is
877
// enabled, the callback receives the VersionedGraph directly (which implements
878
// NodeTraverser using the cache). Otherwise a read-only database session is
879
// used.
880
func (c *VersionedGraph) GraphSession(ctx context.Context,
881
        cb func(graph NodeTraverser) error, reset func()) error {
136✔
882

136✔
883
        if c.graphCache != nil {
218✔
884
                return cb(c)
82✔
885
        }
82✔
886

887
        // TODO(elle): the underlying GraphSession currently creates a
888
        // NodeTraverser that is hardcoded to GossipVersion1. This needs to be
889
        // updated to pass the version through for v2 support.
890
        return c.db.GraphSession(ctx, cb, reset)
54✔
891
}
892

893
// FetchNode attempts to look up a target node by its identity public key.
894
func (c *VersionedGraph) FetchNode(ctx context.Context,
895
        nodePub route.Vertex) (*models.Node, error) {
159✔
896

159✔
897
        return c.db.FetchNode(ctx, c.v, nodePub)
159✔
898
}
159✔
899

900
// FetchChannelEdgesByID attempts to lookup directed edges by channel ID.
901
func (c *VersionedGraph) FetchChannelEdgesByID(ctx context.Context,
902
        chanID uint64) (
903
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
904
        *models.ChannelEdgePolicy, error) {
9✔
905

9✔
906
        return c.db.FetchChannelEdgesByID(ctx, c.v, chanID)
9✔
907
}
9✔
908

909
// FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding
910
// outpoint.
911
func (c *VersionedGraph) FetchChannelEdgesByOutpoint(ctx context.Context,
912
        op *wire.OutPoint) (
913
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
914
        *models.ChannelEdgePolicy, error) {
3✔
915

3✔
916
        return c.db.FetchChannelEdgesByOutpoint(ctx, c.v, op)
3✔
917
}
3✔
918

919
// IsZombieEdge returns whether the edge is considered zombie for this version.
920
func (c *VersionedGraph) IsZombieEdge(ctx context.Context,
921
        chanID uint64) (bool, [33]byte, [33]byte, error) {
14✔
922

14✔
923
        return c.db.IsZombieEdge(ctx, c.v, chanID)
14✔
924
}
14✔
925

926
// AddrsForNode returns all known addresses for the target node public key.
927
func (c *VersionedGraph) AddrsForNode(ctx context.Context,
928
        nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
6✔
929

6✔
930
        return c.db.AddrsForNode(ctx, c.v, nodePub)
6✔
931
}
6✔
932

933
// DeleteNode starts a new database transaction to remove a vertex/node
934
// from the database according to the node's public key.
935
func (c *VersionedGraph) DeleteNode(ctx context.Context,
936
        nodePub route.Vertex) error {
4✔
937

4✔
938
        err := c.db.DeleteNode(ctx, c.v, nodePub)
4✔
939
        if err != nil {
5✔
940
                return err
1✔
941
        }
1✔
942

943
        if c.graphCache != nil {
6✔
944
                c.graphCache.RemoveNode(nodePub)
3✔
945
        }
3✔
946

947
        return nil
3✔
948
}
949

950
// HasNode determines if the graph has a vertex identified by the target node
951
// in the V1 graph.
952
func (c *VersionedGraph) HasNode(ctx context.Context, nodePub [33]byte) (bool,
953
        error) {
10✔
954

10✔
955
        return c.db.HasNode(ctx, c.v, nodePub)
10✔
956
}
10✔
957

958
// LookupAlias attempts to return the alias as advertised by the target node.
959
func (c *VersionedGraph) LookupAlias(ctx context.Context,
960
        pub *btcec.PublicKey) (string, error) {
5✔
961

5✔
962
        return c.db.LookupAlias(ctx, c.v, pub)
5✔
963
}
5✔
964

965
// SourceNode returns the source node of the graph.
966
func (c *VersionedGraph) SourceNode(ctx context.Context) (*models.Node,
967
        error) {
243✔
968

243✔
969
        return c.db.SourceNode(ctx, c.v)
243✔
970
}
243✔
971

972
// DeleteChannelEdges removes edges with the given channel IDs from the
973
// database and marks them as zombies. This ensures that we're unable to re-add
974
// it to our database once again. If an edge does not exist within the
975
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
976
// true, then when we mark these edges as zombies, we'll set up the keys such
977
// that we require the node that failed to send the fresh update to be the one
978
// that resurrects the channel from its zombie state. The markZombie bool
979
// denotes whether to mark the channel as a zombie.
980
func (c *VersionedGraph) DeleteChannelEdges(ctx context.Context,
981
        strictZombiePruning, markZombie bool, chanIDs ...uint64) error {
16✔
982

16✔
983
        return c.ChannelGraph.DeleteChannelEdges(
16✔
984
                ctx, c.v, strictZombiePruning, markZombie, chanIDs...,
16✔
985
        )
16✔
986
}
16✔
987

988
// HasChannelEdge returns true if the database knows of a channel edge with the
989
// passed channel ID and this graph's gossip version, and false otherwise. If it
990
// is not found, then the zombie index is checked and its result is returned as
991
// the second boolean.
992
func (c *VersionedGraph) HasChannelEdge(ctx context.Context,
993
        chanID uint64) (bool, bool, error) {
67✔
994

67✔
995
        return c.db.HasChannelEdge(ctx, c.v, chanID)
67✔
996
}
67✔
997

998
// ForEachSourceNodeChannel iterates through all channels of the source node.
999
func (c *VersionedGraph) ForEachSourceNodeChannel(ctx context.Context,
1000
        cb func(chanPoint wire.OutPoint, havePolicy bool,
1001
                otherNode *models.Node) error, reset func()) error {
4✔
1002

4✔
1003
        return c.db.ForEachSourceNodeChannel(ctx, c.v, cb, reset)
4✔
1004
}
4✔
1005

1006
// ForEachNodeChannel iterates through all channels of the given node.
1007
func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context,
1008
        nodePub route.Vertex, cb func(*models.ChannelEdgeInfo,
1009
                *models.ChannelEdgePolicy,
1010
                *models.ChannelEdgePolicy) error, reset func()) error {
8✔
1011

8✔
1012
        return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset)
8✔
1013
}
8✔
1014

1015
// ForEachChannel iterates through all channel edges stored within the graph.
1016
func (c *VersionedGraph) ForEachChannel(ctx context.Context,
1017
        cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
1018
                *models.ChannelEdgePolicy) error, reset func()) error {
6✔
1019

6✔
1020
        return c.db.ForEachChannel(ctx, c.v, cb, reset)
6✔
1021
}
6✔
1022

1023
// ForEachNodeCacheable iterates through all stored vertices/nodes in the graph.
1024
func (c *VersionedGraph) ForEachNodeCacheable(ctx context.Context,
1025
        cb func(route.Vertex, *lnwire.FeatureVector) error,
1026
        reset func()) error {
1✔
1027

1✔
1028
        return c.db.ForEachNodeCacheable(ctx, c.v, cb, reset)
1✔
1029
}
1✔
1030

1031
// ForEachChannelCacheable iterates through all channel edges for the cache.
1032
func (c *VersionedGraph) ForEachChannelCacheable(ctx context.Context,
1033
        cb func(*models.CachedEdgeInfo, *models.CachedEdgePolicy,
1034
                *models.CachedEdgePolicy) error, reset func()) error {
×
1035

×
1036
        return c.db.ForEachChannelCacheable(ctx, c.v, cb, reset)
×
1037
}
×
1038

1039
// DisabledChannelIDs returns the channel ids of disabled channels.
1040
func (c *VersionedGraph) DisabledChannelIDs(
1041
        ctx context.Context) ([]uint64, error) {
4✔
1042

4✔
1043
        return c.db.DisabledChannelIDs(ctx, c.v)
4✔
1044
}
4✔
1045

1046
// FetchChanInfos returns the set of channel edges for the passed channel IDs.
1047
func (c *VersionedGraph) FetchChanInfos(ctx context.Context,
1048
        chanIDs []uint64) ([]ChannelEdge, error) {
6✔
1049

6✔
1050
        return c.db.FetchChanInfos(ctx, c.v, chanIDs)
6✔
1051
}
6✔
1052

1053
// HighestChanID returns the "highest" known channel ID in the channel graph.
1054
func (c *VersionedGraph) HighestChanID(ctx context.Context) (uint64, error) {
6✔
1055
        return c.db.HighestChanID(ctx, c.v)
6✔
1056
}
6✔
1057

1058
// ChannelID attempts to lookup the 8-byte compact channel ID.
1059
func (c *VersionedGraph) ChannelID(ctx context.Context,
1060
        chanPoint *wire.OutPoint) (uint64, error) {
4✔
1061

4✔
1062
        return c.db.ChannelID(ctx, c.v, chanPoint)
4✔
1063
}
4✔
1064

1065
// IsPublicNode determines whether the node is seen as public in the graph.
1066
func (c *VersionedGraph) IsPublicNode(ctx context.Context,
1067
        pubKey [33]byte) (bool, error) {
17✔
1068

17✔
1069
        return c.db.IsPublicNode(ctx, c.v, pubKey)
17✔
1070
}
17✔
1071

1072
// MakeTestGraph creates a new instance of the ChannelGraph for testing
1073
// purposes. The backing Store implementation depends on the version of
1074
// NewTestDB included in the current build.
1075
//
1076
// NOTE: this is currently unused, but is left here for future use to show how
1077
// NewTestDB can be used. As the SQL implementation of the Store is
1078
// implemented, unit tests will be switched to use this function instead of
1079
// the existing MakeTestGraph helper. Once only this function is used, the
1080
// existing MakeTestGraph function will be removed and this one will be renamed.
1081
func MakeTestGraph(t testing.TB,
1082
        opts ...ChanGraphOption) *ChannelGraph {
183✔
1083

183✔
1084
        t.Helper()
183✔
1085

183✔
1086
        store := NewTestDB(t)
183✔
1087

183✔
1088
        graph, err := NewChannelGraph(store, opts...)
183✔
1089
        require.NoError(t, err)
183✔
1090
        require.NoError(t, graph.Start())
183✔
1091

183✔
1092
        t.Cleanup(func() {
366✔
1093
                require.NoError(t, graph.Stop())
183✔
1094
        })
183✔
1095

1096
        return graph
183✔
1097
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc