• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16200890897

10 Jul 2025 04:39PM UTC coverage: 67.437% (+0.02%) from 67.417%
16200890897

Pull #10015

github

web-flow
Merge 46d2623a2 into 04a2be29d
Pull Request #10015: graph/db: add zombie channels cleanup routine

58 of 63 new or added lines in 2 files covered. (92.06%)

86 existing lines in 18 files now uncovered.

135349 of 200705 relevant lines covered (67.44%)

21863.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.5
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// DirectedChannel is a type that stores the channel information as seen from
15
// one side of the channel.
16
type DirectedChannel struct {
17
        // ChannelID is the unique identifier of this channel.
18
        ChannelID uint64
19

20
        // IsNode1 indicates if this is the node with the smaller public key.
21
        IsNode1 bool
22

23
        // OtherNode is the public key of the node on the other end of this
24
        // channel.
25
        OtherNode route.Vertex
26

27
        // Capacity is the announced capacity of this channel in satoshis.
28
        Capacity btcutil.Amount
29

30
        // OutPolicySet is a boolean that indicates whether the node has an
31
        // outgoing policy set. For pathfinding only the existence of the policy
32
        // is important to know, not the actual content.
33
        OutPolicySet bool
34

35
        // InPolicy is the incoming policy *from* the other node to this node.
36
        // In path finding, we're walking backward from the destination to the
37
        // source, so we're always interested in the edge that arrives to us
38
        // from the other node.
39
        InPolicy *models.CachedEdgePolicy
40

41
        // Inbound fees of this node.
42
        InboundFee lnwire.Fee
43
}
44

45
// DeepCopy creates a deep copy of the channel, including the incoming policy.
46
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
1,597✔
47
        channelCopy := *c
1,597✔
48

1,597✔
49
        if channelCopy.InPolicy != nil {
3,168✔
50
                inPolicyCopy := *channelCopy.InPolicy
1,571✔
51
                channelCopy.InPolicy = &inPolicyCopy
1,571✔
52

1,571✔
53
                // The fields for the ToNode can be overwritten by the path
1,571✔
54
                // finding algorithm, which is why we need a deep copy in the
1,571✔
55
                // first place. So we always start out with nil values, just to
1,571✔
56
                // be sure they don't contain any old data.
1,571✔
57
                channelCopy.InPolicy.ToNodePubKey = nil
1,571✔
58
                channelCopy.InPolicy.ToNodeFeatures = nil
1,571✔
59
        }
1,571✔
60

61
        return &channelCopy
1,597✔
62
}
63

64
// GraphCache is a type that holds a minimal set of information of the public
65
// channel graph that can be used for pathfinding.
66
type GraphCache struct {
67
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
68
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
69

70
        // zombieIndex tracks channels that may be leaked during the removal
71
        // process. Since the remover could not have the node ID, these channels
72
        // are stored here and will be removed later in a separate loop.
73
        zombieIndex map[uint64]struct{}
74

75
        // zombieCleanerInterval is the interval at which the zombie cleaner
76
        // runs to clean up channels that are still missing their nodes.
77
        zombieCleanerInterval time.Duration
78

79
        mtx  sync.RWMutex
80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewGraphCache creates a new graphCache.
85
func NewGraphCache(preAllocNumNodes int) *GraphCache {
145✔
86
        return &GraphCache{
145✔
87
                nodeChannels: make(
145✔
88
                        map[route.Vertex]map[uint64]*DirectedChannel,
145✔
89
                        // A channel connects two nodes, so we can look it up
145✔
90
                        // from both sides, meaning we get double the number of
145✔
91
                        // entries.
145✔
92
                        preAllocNumNodes*2,
145✔
93
                ),
145✔
94
                nodeFeatures: make(
145✔
95
                        map[route.Vertex]*lnwire.FeatureVector,
145✔
96
                        preAllocNumNodes,
145✔
97
                ),
145✔
98
                zombieIndex:           make(map[uint64]struct{}),
145✔
99
                zombieCleanerInterval: 24 * time.Hour,
145✔
100
                quit:                  make(chan struct{}),
145✔
101
        }
145✔
102
}
145✔
103

104
// Stats returns statistics about the current cache size.
105
func (c *GraphCache) Stats() string {
386✔
106
        c.mtx.RLock()
386✔
107
        defer c.mtx.RUnlock()
386✔
108

386✔
109
        numChannels := 0
386✔
110
        for node := range c.nodeChannels {
796✔
111
                numChannels += len(c.nodeChannels[node])
410✔
112
        }
410✔
113
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
386✔
114
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
386✔
115
                numChannels)
386✔
116
}
117

118
// Start launches the background goroutine that periodically cleans up zombie
119
// channels.
120
func (c *GraphCache) Start() {
142✔
121
        c.wg.Add(1)
142✔
122
        go c.zombieCleaner()
142✔
123
}
142✔
124

125
// Stop signals the background cleaner to shut down and waits for it to exit.
126
func (c *GraphCache) Stop() {
140✔
127
        close(c.quit)
140✔
128
        c.wg.Wait()
140✔
129
}
140✔
130

131
// zombieCleaner periodically iterates over the zombie index and removes
132
// channels that are still missing their nodes.
133
func (c *GraphCache) zombieCleaner() {
142✔
134
        ticker := time.NewTicker(c.zombieCleanerInterval)
142✔
135
        defer func() {
282✔
136
                ticker.Stop()
140✔
137
                c.wg.Done()
140✔
138
        }()
140✔
139

140
        for {
285✔
141
                select {
143✔
142
                case <-ticker.C:
1✔
143
                        c.cleanupZombies()
1✔
144
                case <-c.quit:
140✔
145
                        return
140✔
146
                }
147
        }
148
}
149

150
// cleanupZombies attempts to prune channels tracked in the zombie index. If the
151
// nodes for a channel still cannot be resolved, the channel is deleted from the
152
// cache.
153
func (c *GraphCache) cleanupZombies() {
2✔
154
        c.mtx.Lock()
2✔
155
        defer c.mtx.Unlock()
2✔
156

2✔
157
        if len(c.zombieIndex) == 0 {
2✔
NEW
158
                return
×
NEW
159
        }
×
160

161
        // Go through all nodes and their channels once to check if any are
162
        // marked as zombies. This is faster than checking every node for each
163
        // zombie channel, since there are usually many more nodes than zombie
164
        // channels.
165
        for node, chans := range c.nodeChannels {
5✔
166
                for cid, ch := range chans {
6✔
167
                        // if the channel isn't a zombie, we can skip it.
3✔
168
                        if _, ok := c.zombieIndex[cid]; !ok {
5✔
169
                                continue
2✔
170
                        }
171

172
                        // delete peer's side channel if it exists.
173
                        c.removeChannelIfFound(ch.OtherNode, cid)
1✔
174

1✔
175
                        // delete the channel from our side.
1✔
176
                        delete(chans, cid)
1✔
177
                }
178

179
                // If all channels were deleted for this node, clean up the map
180
                // entry entirely.
181
                if len(chans) == 0 {
4✔
182
                        delete(c.nodeChannels, node)
1✔
183
                }
1✔
184
        }
185

186
        // Now that we have removed all channels that were zombies, we can
187
        // clear the zombie index.
188
        c.zombieIndex = make(map[uint64]struct{})
2✔
189
}
190

191
// AddNodeFeatures adds a graph node and its features to the cache.
192
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
193
        features *lnwire.FeatureVector) {
663✔
194

663✔
195
        c.mtx.Lock()
663✔
196
        defer c.mtx.Unlock()
663✔
197

663✔
198
        c.nodeFeatures[node] = features
663✔
199
}
663✔
200

201
// AddChannel adds a non-directed channel, meaning that the order of policy 1
202
// and policy 2 does not matter, the directionality is extracted from the info
203
// and policy flags automatically. The policy will be set as the outgoing policy
204
// on one node and the incoming policy on the peer's side.
205
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
206
        policy1, policy2 *models.CachedEdgePolicy) {
1,702✔
207

1,702✔
208
        if info == nil {
1,702✔
209
                return
×
210
        }
×
211

212
        if policy1 != nil && policy1.IsDisabled() &&
1,702✔
213
                policy2 != nil && policy2.IsDisabled() {
1,705✔
214

3✔
215
                return
3✔
216
        }
3✔
217

218
        // Create the edge entry for both nodes.
219
        c.mtx.Lock()
1,702✔
220
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
1,702✔
221
                ChannelID: info.ChannelID,
1,702✔
222
                IsNode1:   true,
1,702✔
223
                OtherNode: info.NodeKey2Bytes,
1,702✔
224
                Capacity:  info.Capacity,
1,702✔
225
        })
1,702✔
226
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
1,702✔
227
                ChannelID: info.ChannelID,
1,702✔
228
                IsNode1:   false,
1,702✔
229
                OtherNode: info.NodeKey1Bytes,
1,702✔
230
                Capacity:  info.Capacity,
1,702✔
231
        })
1,702✔
232
        c.mtx.Unlock()
1,702✔
233

1,702✔
234
        // The policy's node is always the to_node. So if policy 1 has to_node
1,702✔
235
        // of node 2 then we have the policy 1 as seen from node 1.
1,702✔
236
        if policy1 != nil {
2,103✔
237
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
401✔
238
                if !policy1.IsNode1() {
402✔
239
                        fromNode, toNode = toNode, fromNode
1✔
240
                }
1✔
241
                c.UpdatePolicy(policy1, fromNode, toNode)
401✔
242
        }
243
        if policy2 != nil {
2,103✔
244
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
401✔
245
                if policy2.IsNode1() {
402✔
246
                        fromNode, toNode = toNode, fromNode
1✔
247
                }
1✔
248
                c.UpdatePolicy(policy2, fromNode, toNode)
401✔
249
        }
250
}
251

252
// updateOrAddEdge makes sure the edge information for a node is either updated
253
// if it already exists or is added to that node's list of channels.
254
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3,401✔
255
        if len(c.nodeChannels[node]) == 0 {
4,252✔
256
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
851✔
257
        }
851✔
258

259
        c.nodeChannels[node][edge.ChannelID] = edge
3,401✔
260
}
261

262
// UpdatePolicy updates a single policy on both the from and to node. The order
263
// of the from and to node is not strictly important. But we assume that a
264
// channel edge was added beforehand so that the directed channel struct already
265
// exists in the cache.
266
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
267
        toNode route.Vertex) {
3,081✔
268

3,081✔
269
        c.mtx.Lock()
3,081✔
270
        defer c.mtx.Unlock()
3,081✔
271

3,081✔
272
        updatePolicy := func(nodeKey route.Vertex) {
9,240✔
273
                if len(c.nodeChannels[nodeKey]) == 0 {
6,159✔
274
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
275

×
276
                        return
×
277
                }
×
278

279
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
6,159✔
280
                if !ok {
6,159✔
281
                        log.Warnf("Channel=%v not found in graph cache",
×
282
                                policy.ChannelID)
×
283

×
284
                        return
×
285
                }
×
286

287
                // Edge 1 is defined as the policy for the direction of node1 to
288
                // node2.
289
                switch {
6,159✔
290
                // This is node 1, and it is edge 1, so this is the outgoing
291
                // policy for node 1.
292
                case channel.IsNode1 && policy.IsNode1():
1,543✔
293
                        channel.OutPolicySet = true
1,543✔
294
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,708✔
295
                                channel.InboundFee = fee
165✔
296
                        })
165✔
297

298
                // This is node 2, and it is edge 2, so this is the outgoing
299
                // policy for node 2.
300
                case !channel.IsNode1 && !policy.IsNode1():
1,541✔
301
                        channel.OutPolicySet = true
1,541✔
302
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,705✔
303
                                channel.InboundFee = fee
164✔
304
                        })
164✔
305

306
                // The other two cases left mean it's the inbound policy for the
307
                // node.
308
                default:
3,081✔
309
                        channel.InPolicy = policy
3,081✔
310
                }
311
        }
312

313
        updatePolicy(fromNode)
3,081✔
314
        updatePolicy(toNode)
3,081✔
315
}
316

317
// RemoveNode completely removes a node and all its channels (including the
318
// peer's side).
319
func (c *GraphCache) RemoveNode(node route.Vertex) {
69✔
320
        c.mtx.Lock()
69✔
321
        defer c.mtx.Unlock()
69✔
322

69✔
323
        delete(c.nodeFeatures, node)
69✔
324

69✔
325
        // First remove all channels from the other nodes' lists.
69✔
326
        for _, channel := range c.nodeChannels[node] {
71✔
327
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
2✔
328
        }
2✔
329

330
        // Then remove our whole node completely.
331
        delete(c.nodeChannels, node)
69✔
332
}
333

334
// RemoveChannel removes a single channel between two nodes.
335
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
268✔
336
        c.mtx.Lock()
268✔
337
        defer c.mtx.Unlock()
268✔
338

268✔
339
        // Remove that one channel from both sides.
268✔
340
        c.removeChannelIfFound(node1, chanID)
268✔
341
        c.removeChannelIfFound(node2, chanID)
268✔
342

268✔
343
        zeroVertex := route.Vertex{}
268✔
344
        if node1 == zeroVertex || node2 == zeroVertex {
272✔
345
                // If one of the nodes is the zero vertex, it means that we will
4✔
346
                // leak the channel in the memory cache, since we don't have the
4✔
347
                // node ID to remove, so we add it to the zombie index to post
4✔
348
                // removal.
4✔
349
                c.zombieIndex[chanID] = struct{}{}
4✔
350
        }
4✔
351
}
352

353
// removeChannelIfFound removes a single channel from one side.
354
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
536✔
355
        if len(c.nodeChannels[node]) == 0 {
721✔
356
                return
185✔
357
        }
185✔
358

359
        delete(c.nodeChannels[node], chanID)
354✔
360
}
361

362
// getChannels returns a copy of the passed node's channels or nil if there
363
// isn't any.
364
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
548✔
365
        c.mtx.RLock()
548✔
366
        defer c.mtx.RUnlock()
548✔
367

548✔
368
        channels, ok := c.nodeChannels[node]
548✔
369
        if !ok {
558✔
370
                return nil
10✔
371
        }
10✔
372

373
        features, ok := c.nodeFeatures[node]
541✔
374
        if !ok {
696✔
375
                // If the features were set to nil explicitly, that's fine here.
155✔
376
                // The router will overwrite the features of the destination
155✔
377
                // node with those found in the invoice if necessary. But if we
155✔
378
                // didn't yet get a node announcement we want to mimic the
155✔
379
                // behavior of the old DB based code that would always set an
155✔
380
                // empty feature vector instead of leaving it nil.
155✔
381
                features = lnwire.EmptyFeatureVector()
155✔
382
        }
155✔
383

384
        toNodeCallback := func() route.Vertex {
992✔
385
                return node
451✔
386
        }
451✔
387

388
        i := 0
541✔
389
        channelsCopy := make([]*DirectedChannel, len(channels))
541✔
390
        for cid, channel := range channels {
2,134✔
391
                if _, ok := c.zombieIndex[cid]; ok {
1,593✔
NEW
392
                        // If this channel is a zombie, we don't want to return
×
NEW
393
                        // it, so we skip it.
×
NEW
394
                        continue
×
395
                }
396

397
                // We need to copy the channel and policy to avoid it being
398
                // updated in the cache if the path finding algorithm sets
399
                // fields on it (currently only the ToNodeFeatures of the
400
                // policy).
401
                channelCopy := channel.DeepCopy()
1,593✔
402
                if channelCopy.InPolicy != nil {
3,161✔
403
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
1,568✔
404
                        channelCopy.InPolicy.ToNodeFeatures = features
1,568✔
405
                }
1,568✔
406

407
                channelsCopy[i] = channelCopy
1,593✔
408
                i++
1,593✔
409
        }
410

411
        // Copy the slice to clean up the unused pre allocated tail entries.
412
        copy(channelsCopy, channelsCopy[:i])
541✔
413

541✔
414
        return channelsCopy
541✔
415
}
416

417
// ForEachChannel invokes the given callback for each channel of the given node.
418
func (c *GraphCache) ForEachChannel(node route.Vertex,
419
        cb func(channel *DirectedChannel) error) error {
548✔
420

548✔
421
        // Obtain a copy of the node's channels. We need do this in order to
548✔
422
        // avoid deadlocks caused by interaction with the graph cache, channel
548✔
423
        // state and the graph database from multiple goroutines. This snapshot
548✔
424
        // is only used for path finding where being stale is acceptable since
548✔
425
        // the real world graph and our representation may always become
548✔
426
        // slightly out of sync for a short time and the actual channel state
548✔
427
        // is stored separately.
548✔
428
        channels := c.getChannels(node)
548✔
429
        for _, channel := range channels {
2,124✔
430
                if err := cb(channel); err != nil {
1,590✔
431
                        return err
14✔
432
                }
14✔
433
        }
434

435
        return nil
537✔
436
}
437

438
// ForEachNode iterates over the adjacency list of the graph, executing the
439
// call back for each node and the set of channels that emanate from the given
440
// node.
441
//
442
// NOTE: This method should be considered _read only_, the channels or nodes
443
// passed in MUST NOT be modified.
444
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
445
        channels map[uint64]*DirectedChannel) error) error {
2✔
446

2✔
447
        c.mtx.RLock()
2✔
448
        defer c.mtx.RUnlock()
2✔
449

2✔
450
        for node, channels := range c.nodeChannels {
6✔
451
                // We don't make a copy here since this is a read-only RPC
4✔
452
                // call. We also don't need the node features either for this
4✔
453
                // call.
4✔
454
                if err := cb(node, channels); err != nil {
4✔
455
                        return err
×
456
                }
×
457
        }
458

459
        return nil
2✔
460
}
461

462
// GetFeatures returns the features of the node with the given ID. If no
463
// features are known for the node, an empty feature vector is returned.
464
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
472✔
465
        c.mtx.RLock()
472✔
466
        defer c.mtx.RUnlock()
472✔
467

472✔
468
        features, ok := c.nodeFeatures[node]
472✔
469
        if !ok || features == nil {
555✔
470
                // The router expects the features to never be nil, so we return
83✔
471
                // an empty feature set instead.
83✔
472
                return lnwire.EmptyFeatureVector()
83✔
473
        }
83✔
474

475
        return features
392✔
476
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc