• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16585115487

29 Jul 2025 02:30AM UTC coverage: 67.231% (+0.004%) from 67.227%
16585115487

Pull #10015

github

web-flow
Merge 918e8713e into 2e36f9b8b
Pull Request #10015: graph/db: add zombie channels cleanup routine

58 of 64 new or added lines in 2 files covered. (90.63%)

76 existing lines in 21 files now uncovered.

135542 of 201606 relevant lines covered (67.23%)

21711.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.17
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// DirectedChannel is a type that stores the channel information as seen from
15
// one side of the channel.
16
type DirectedChannel struct {
17
        // ChannelID is the unique identifier of this channel.
18
        ChannelID uint64
19

20
        // IsNode1 indicates if this is the node with the smaller public key.
21
        IsNode1 bool
22

23
        // OtherNode is the public key of the node on the other end of this
24
        // channel.
25
        OtherNode route.Vertex
26

27
        // Capacity is the announced capacity of this channel in satoshis.
28
        Capacity btcutil.Amount
29

30
        // OutPolicySet is a boolean that indicates whether the node has an
31
        // outgoing policy set. For pathfinding only the existence of the policy
32
        // is important to know, not the actual content.
33
        OutPolicySet bool
34

35
        // InPolicy is the incoming policy *from* the other node to this node.
36
        // In path finding, we're walking backward from the destination to the
37
        // source, so we're always interested in the edge that arrives to us
38
        // from the other node.
39
        InPolicy *models.CachedEdgePolicy
40

41
        // Inbound fees of this node.
42
        InboundFee lnwire.Fee
43
}
44

45
// DeepCopy creates a deep copy of the channel, including the incoming policy.
46
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
1,597✔
47
        channelCopy := *c
1,597✔
48

1,597✔
49
        if channelCopy.InPolicy != nil {
3,168✔
50
                inPolicyCopy := *channelCopy.InPolicy
1,571✔
51
                channelCopy.InPolicy = &inPolicyCopy
1,571✔
52

1,571✔
53
                // The fields for the ToNode can be overwritten by the path
1,571✔
54
                // finding algorithm, which is why we need a deep copy in the
1,571✔
55
                // first place. So we always start out with nil values, just to
1,571✔
56
                // be sure they don't contain any old data.
1,571✔
57
                channelCopy.InPolicy.ToNodePubKey = nil
1,571✔
58
                channelCopy.InPolicy.ToNodeFeatures = nil
1,571✔
59
        }
1,571✔
60

61
        return &channelCopy
1,597✔
62
}
63

64
// GraphCache is a type that holds a minimal set of information of the public
65
// channel graph that can be used for pathfinding.
66
type GraphCache struct {
67
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
68
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
69

70
        // zombieIndex tracks channels that may be leaked during the removal
71
        // process. Since the remover could not have the node ID, these channels
72
        // are stored here and will be removed later in a separate loop.
73
        zombieIndex map[uint64]struct{}
74

75
        // zombieCleanerInterval is the interval at which the zombie cleaner
76
        // runs to clean up channels that are still missing their nodes.
77
        zombieCleanerInterval time.Duration
78

79
        mtx  sync.RWMutex
80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewGraphCache creates a new graphCache.
85
func NewGraphCache(preAllocNumNodes int) *GraphCache {
145✔
86
        return &GraphCache{
145✔
87
                nodeChannels: make(
145✔
88
                        map[route.Vertex]map[uint64]*DirectedChannel,
145✔
89
                        // A channel connects two nodes, so we can look it up
145✔
90
                        // from both sides, meaning we get double the number of
145✔
91
                        // entries.
145✔
92
                        preAllocNumNodes*2,
145✔
93
                ),
145✔
94
                nodeFeatures: make(
145✔
95
                        map[route.Vertex]*lnwire.FeatureVector,
145✔
96
                        preAllocNumNodes,
145✔
97
                ),
145✔
98
                zombieIndex:           make(map[uint64]struct{}),
145✔
99
                zombieCleanerInterval: time.Hour,
145✔
100
                quit:                  make(chan struct{}),
145✔
101
        }
145✔
102
}
145✔
103

104
// Stats returns statistics about the current cache size.
105
func (c *GraphCache) Stats() string {
375✔
106
        c.mtx.RLock()
375✔
107
        defer c.mtx.RUnlock()
375✔
108

375✔
109
        numChannels := 0
375✔
110
        for node := range c.nodeChannels {
765✔
111
                numChannels += len(c.nodeChannels[node])
390✔
112
        }
390✔
113
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
375✔
114
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
375✔
115
                numChannels)
375✔
116
}
117

118
// Start launches the background goroutine that periodically cleans up zombie
119
// channels.
120
func (c *GraphCache) Start() {
142✔
121
        c.wg.Add(1)
142✔
122
        go c.zombieCleaner()
142✔
123
}
142✔
124

125
// Stop signals the background cleaner to shut down and waits for it to exit.
126
func (c *GraphCache) Stop() {
140✔
127
        close(c.quit)
140✔
128
        c.wg.Wait()
140✔
129
}
140✔
130

131
// zombieCleaner periodically iterates over the zombie index and removes
132
// channels that are still missing their nodes.
133
//
134
// NOTE: must be run as a goroutine.
135
func (c *GraphCache) zombieCleaner() {
142✔
136
        defer c.wg.Done()
142✔
137

142✔
138
        ticker := time.NewTicker(c.zombieCleanerInterval)
142✔
139
        defer ticker.Stop()
142✔
140

142✔
141
        for {
285✔
142
                select {
143✔
143
                case <-ticker.C:
1✔
144
                        c.cleanupZombies()
1✔
145
                case <-c.quit:
140✔
146
                        return
140✔
147
                }
148
        }
149
}
150

151
// cleanupZombies attempts to prune channels tracked in the zombie index. If the
152
// nodes for a channel still cannot be resolved, the channel is deleted from the
153
// cache.
154
func (c *GraphCache) cleanupZombies() {
2✔
155
        c.mtx.Lock()
2✔
156
        defer c.mtx.Unlock()
2✔
157

2✔
158
        if len(c.zombieIndex) == 0 {
2✔
NEW
159
                log.Debug("no zombie channels to clean up")
×
NEW
160
                return
×
NEW
161
        }
×
162

163
        // Go through all nodes and their channels once to check if any are
164
        // marked as zombies. This is faster than checking every node for each
165
        // zombie channel, since there are usually many more nodes than zombie
166
        // channels.
167
        for node, chans := range c.nodeChannels {
5✔
168
                for cid, ch := range chans {
6✔
169
                        // if the channel isn't a zombie, we can skip it.
3✔
170
                        if _, ok := c.zombieIndex[cid]; !ok {
5✔
171
                                continue
2✔
172
                        }
173

174
                        // delete peer's side channel if it exists.
175
                        c.removeChannelIfFound(ch.OtherNode, cid)
1✔
176

1✔
177
                        // delete the channel from our side.
1✔
178
                        delete(chans, cid)
1✔
179
                }
180

181
                // If all channels were deleted for this node, clean up the map
182
                // entry entirely.
183
                if len(chans) == 0 {
4✔
184
                        delete(c.nodeChannels, node)
1✔
185
                }
1✔
186
        }
187

188
        // Now that we have removed all channels that were zombies, we can
189
        // clear the zombie index.
190
        c.zombieIndex = make(map[uint64]struct{})
2✔
191
}
192

193
// AddNodeFeatures adds a graph node and its features to the cache.
194
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
195
        features *lnwire.FeatureVector) {
663✔
196

663✔
197
        c.mtx.Lock()
663✔
198
        defer c.mtx.Unlock()
663✔
199

663✔
200
        c.nodeFeatures[node] = features
663✔
201
}
663✔
202

203
// AddChannel adds a non-directed channel, meaning that the order of policy 1
204
// and policy 2 does not matter, the directionality is extracted from the info
205
// and policy flags automatically. The policy will be set as the outgoing policy
206
// on one node and the incoming policy on the peer's side.
207
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
208
        policy1, policy2 *models.CachedEdgePolicy) {
1,697✔
209

1,697✔
210
        if info == nil {
1,697✔
211
                return
×
212
        }
×
213

214
        if policy1 != nil && policy1.IsDisabled() &&
1,697✔
215
                policy2 != nil && policy2.IsDisabled() {
1,700✔
216

3✔
217
                return
3✔
218
        }
3✔
219

220
        // Create the edge entry for both nodes.
221
        c.mtx.Lock()
1,697✔
222
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
1,697✔
223
                ChannelID: info.ChannelID,
1,697✔
224
                IsNode1:   true,
1,697✔
225
                OtherNode: info.NodeKey2Bytes,
1,697✔
226
                Capacity:  info.Capacity,
1,697✔
227
        })
1,697✔
228
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
1,697✔
229
                ChannelID: info.ChannelID,
1,697✔
230
                IsNode1:   false,
1,697✔
231
                OtherNode: info.NodeKey1Bytes,
1,697✔
232
                Capacity:  info.Capacity,
1,697✔
233
        })
1,697✔
234
        c.mtx.Unlock()
1,697✔
235

1,697✔
236
        // The policy's node is always the to_node. So if policy 1 has to_node
1,697✔
237
        // of node 2 then we have the policy 1 as seen from node 1.
1,697✔
238
        if policy1 != nil {
2,098✔
239
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
401✔
240
                if !policy1.IsNode1() {
402✔
241
                        fromNode, toNode = toNode, fromNode
1✔
242
                }
1✔
243
                c.UpdatePolicy(policy1, fromNode, toNode)
401✔
244
        }
245
        if policy2 != nil {
2,098✔
246
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
401✔
247
                if policy2.IsNode1() {
402✔
248
                        fromNode, toNode = toNode, fromNode
1✔
249
                }
1✔
250
                c.UpdatePolicy(policy2, fromNode, toNode)
401✔
251
        }
252
}
253

254
// updateOrAddEdge makes sure the edge information for a node is either updated
255
// if it already exists or is added to that node's list of channels.
256
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3,391✔
257
        if len(c.nodeChannels[node]) == 0 {
4,256✔
258
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
865✔
259
        }
865✔
260

261
        c.nodeChannels[node][edge.ChannelID] = edge
3,391✔
262
}
263

264
// UpdatePolicy updates a single policy on both the from and to node. The order
265
// of the from and to node is not strictly important. But we assume that a
266
// channel edge was added beforehand so that the directed channel struct already
267
// exists in the cache.
268
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
269
        toNode route.Vertex) {
3,081✔
270

3,081✔
271
        c.mtx.Lock()
3,081✔
272
        defer c.mtx.Unlock()
3,081✔
273

3,081✔
274
        updatePolicy := func(nodeKey route.Vertex) {
9,240✔
275
                if len(c.nodeChannels[nodeKey]) == 0 {
6,159✔
276
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
277

×
278
                        return
×
279
                }
×
280

281
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
6,159✔
282
                if !ok {
6,159✔
283
                        log.Warnf("Channel=%v not found in graph cache",
×
284
                                policy.ChannelID)
×
285

×
286
                        return
×
287
                }
×
288

289
                // Edge 1 is defined as the policy for the direction of node1 to
290
                // node2.
291
                switch {
6,159✔
292
                // This is node 1, and it is edge 1, so this is the outgoing
293
                // policy for node 1.
294
                case channel.IsNode1 && policy.IsNode1():
1,544✔
295
                        channel.OutPolicySet = true
1,544✔
296
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,709✔
297
                                channel.InboundFee = fee
165✔
298
                        })
165✔
299

300
                // This is node 2, and it is edge 2, so this is the outgoing
301
                // policy for node 2.
302
                case !channel.IsNode1 && !policy.IsNode1():
1,540✔
303
                        channel.OutPolicySet = true
1,540✔
304
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,704✔
305
                                channel.InboundFee = fee
164✔
306
                        })
164✔
307

308
                // The other two cases left mean it's the inbound policy for the
309
                // node.
310
                default:
3,081✔
311
                        channel.InPolicy = policy
3,081✔
312
                }
313
        }
314

315
        updatePolicy(fromNode)
3,081✔
316
        updatePolicy(toNode)
3,081✔
317
}
318

319
// RemoveNode completely removes a node and all its channels (including the
320
// peer's side).
321
func (c *GraphCache) RemoveNode(node route.Vertex) {
71✔
322
        c.mtx.Lock()
71✔
323
        defer c.mtx.Unlock()
71✔
324

71✔
325
        delete(c.nodeFeatures, node)
71✔
326

71✔
327
        // First remove all channels from the other nodes' lists.
71✔
328
        for _, channel := range c.nodeChannels[node] {
73✔
329
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
2✔
330
        }
2✔
331

332
        // Then remove our whole node completely.
333
        delete(c.nodeChannels, node)
71✔
334
}
335

336
// RemoveChannel removes a single channel between two nodes.
337
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
272✔
338
        c.mtx.Lock()
272✔
339
        defer c.mtx.Unlock()
272✔
340

272✔
341
        // Remove that one channel from both sides.
272✔
342
        c.removeChannelIfFound(node1, chanID)
272✔
343
        c.removeChannelIfFound(node2, chanID)
272✔
344

272✔
345
        zeroVertex := route.Vertex{}
272✔
346
        if node1 == zeroVertex || node2 == zeroVertex {
276✔
347
                // If one of the nodes is the zero vertex, it means that we will
4✔
348
                // leak the channel in the memory cache, since we don't have the
4✔
349
                // node ID to remove, so we add it to the zombie index to post
4✔
350
                // removal.
4✔
351
                c.zombieIndex[chanID] = struct{}{}
4✔
352
        }
4✔
353
}
354

355
// removeChannelIfFound removes a single channel from one side.
356
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
544✔
357
        if len(c.nodeChannels[node]) == 0 {
737✔
358
                return
193✔
359
        }
193✔
360

361
        delete(c.nodeChannels[node], chanID)
354✔
362
}
363

364
// getChannels returns a copy of the passed node's channels or nil if there
365
// isn't any.
366
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
548✔
367
        c.mtx.RLock()
548✔
368
        defer c.mtx.RUnlock()
548✔
369

548✔
370
        channels, ok := c.nodeChannels[node]
548✔
371
        if !ok {
558✔
372
                return nil
10✔
373
        }
10✔
374

375
        features, ok := c.nodeFeatures[node]
541✔
376
        if !ok {
696✔
377
                // If the features were set to nil explicitly, that's fine here.
155✔
378
                // The router will overwrite the features of the destination
155✔
379
                // node with those found in the invoice if necessary. But if we
155✔
380
                // didn't yet get a node announcement we want to mimic the
155✔
381
                // behavior of the old DB based code that would always set an
155✔
382
                // empty feature vector instead of leaving it nil.
155✔
383
                features = lnwire.EmptyFeatureVector()
155✔
384
        }
155✔
385

386
        toNodeCallback := func() route.Vertex {
992✔
387
                return node
451✔
388
        }
451✔
389

390
        i := 0
541✔
391
        channelsCopy := make([]*DirectedChannel, len(channels))
541✔
392
        for cid, channel := range channels {
2,134✔
393
                if _, ok := c.zombieIndex[cid]; ok {
1,593✔
NEW
394
                        // If this channel is a zombie, we don't want to return
×
NEW
395
                        // it, so we skip it.
×
NEW
396
                        continue
×
397
                }
398

399
                // We need to copy the channel and policy to avoid it being
400
                // updated in the cache if the path finding algorithm sets
401
                // fields on it (currently only the ToNodeFeatures of the
402
                // policy).
403
                channelCopy := channel.DeepCopy()
1,593✔
404
                if channelCopy.InPolicy != nil {
3,161✔
405
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
1,568✔
406
                        channelCopy.InPolicy.ToNodeFeatures = features
1,568✔
407
                }
1,568✔
408

409
                channelsCopy[i] = channelCopy
1,593✔
410
                i++
1,593✔
411
        }
412

413
        // Copy the slice to clean up the unused pre allocated tail entries.
414
        copy(channelsCopy, channelsCopy[:i])
541✔
415

541✔
416
        return channelsCopy
541✔
417
}
418

419
// ForEachChannel invokes the given callback for each channel of the given node.
420
func (c *GraphCache) ForEachChannel(node route.Vertex,
421
        cb func(channel *DirectedChannel) error) error {
548✔
422

548✔
423
        // Obtain a copy of the node's channels. We need do this in order to
548✔
424
        // avoid deadlocks caused by interaction with the graph cache, channel
548✔
425
        // state and the graph database from multiple goroutines. This snapshot
548✔
426
        // is only used for path finding where being stale is acceptable since
548✔
427
        // the real world graph and our representation may always become
548✔
428
        // slightly out of sync for a short time and the actual channel state
548✔
429
        // is stored separately.
548✔
430
        channels := c.getChannels(node)
548✔
431
        for _, channel := range channels {
2,125✔
432
                if err := cb(channel); err != nil {
1,591✔
433
                        return err
14✔
434
                }
14✔
435
        }
436

437
        return nil
537✔
438
}
439

440
// ForEachNode iterates over the adjacency list of the graph, executing the
441
// call back for each node and the set of channels that emanate from the given
442
// node.
443
//
444
// NOTE: This method should be considered _read only_, the channels or nodes
445
// passed in MUST NOT be modified.
446
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
447
        channels map[uint64]*DirectedChannel) error) error {
2✔
448

2✔
449
        c.mtx.RLock()
2✔
450
        defer c.mtx.RUnlock()
2✔
451

2✔
452
        for node, channels := range c.nodeChannels {
6✔
453
                // We don't make a copy here since this is a read-only RPC
4✔
454
                // call. We also don't need the node features either for this
4✔
455
                // call.
4✔
456
                if err := cb(node, channels); err != nil {
4✔
457
                        return err
×
458
                }
×
459
        }
460

461
        return nil
2✔
462
}
463

464
// GetFeatures returns the features of the node with the given ID. If no
465
// features are known for the node, an empty feature vector is returned.
466
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
472✔
467
        c.mtx.RLock()
472✔
468
        defer c.mtx.RUnlock()
472✔
469

472✔
470
        features, ok := c.nodeFeatures[node]
472✔
471
        if !ok || features == nil {
555✔
472
                // The router expects the features to never be nil, so we return
83✔
473
                // an empty feature set instead.
83✔
474
                return lnwire.EmptyFeatureVector()
83✔
475
        }
83✔
476

477
        return features
392✔
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc