• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16937581283

13 Aug 2025 12:43PM UTC coverage: 66.901% (-0.03%) from 66.929%
16937581283

Pull #10148

github

web-flow
Merge b1deddec4 into c6a9116e3
Pull Request #10148: graph/db+sqldb: different defaults for SQLite and Postgres query options

11 of 81 new or added lines in 7 files covered. (13.58%)

88 existing lines in 25 files now uncovered.

135834 of 203036 relevant lines covered (66.9%)

21598.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
        "golang.org/x/time/rate"
21
)
22

23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
24
// backend.
25
//
26
// NOTE: this is currently not called from any code path. It is called via tests
27
// only for now and will be called from the main lnd binary once the
28
// migration is fully implemented and tested.
29
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
NEW
30
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
31

×
32
        log.Infof("Starting migration of the graph store from KV to SQL")
×
33
        t0 := time.Now()
×
34

×
35
        // Check if there is a graph to migrate.
×
36
        graphExists, err := checkGraphExists(kvBackend)
×
37
        if err != nil {
×
38
                return fmt.Errorf("failed to check graph existence: %w", err)
×
39
        }
×
40
        if !graphExists {
×
41
                log.Infof("No graph found in KV store, skipping the migration")
×
42
                return nil
×
43
        }
×
44

45
        // 1) Migrate all the nodes.
NEW
46
        err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
NEW
47
        if err != nil {
×
48
                return fmt.Errorf("could not migrate nodes: %w", err)
×
49
        }
×
50

51
        // 2) Migrate the source node.
52
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
53
                return fmt.Errorf("could not migrate source node: %w", err)
×
54
        }
×
55

56
        // 3) Migrate all the channels and channel policies.
NEW
57
        err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
×
58
        if err != nil {
×
59
                return fmt.Errorf("could not migrate channels and policies: %w",
×
60
                        err)
×
61
        }
×
62

63
        // 4) Migrate the Prune log.
64
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
65
                return fmt.Errorf("could not migrate prune log: %w", err)
×
66
        }
×
67

68
        // 5) Migrate the closed SCID index.
69
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
70
        if err != nil {
×
71
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
72
                        err)
×
73
        }
×
74

75
        // 6) Migrate the zombie index.
76
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
77
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
78
        }
×
79

80
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
81
                time.Since(t0))
×
82

×
83
        return nil
×
84
}
85

86
// checkGraphExists checks if the graph exists in the KV backend.
87
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
88
        // Check if there is even a graph to migrate.
×
89
        err := db.View(func(tx kvdb.RTx) error {
×
90
                // Check for the existence of the node bucket which is a top
×
91
                // level bucket that would have been created on the initial
×
92
                // creation of the graph store.
×
93
                nodes := tx.ReadBucket(nodeBucket)
×
94
                if nodes == nil {
×
95
                        return ErrGraphNotFound
×
96
                }
×
97

98
                return nil
×
99
        }, func() {})
×
100
        if errors.Is(err, ErrGraphNotFound) {
×
101
                return false, nil
×
102
        } else if err != nil {
×
103
                return false, err
×
104
        }
×
105

106
        return true, nil
×
107
}
108

109
// migrateNodes migrates all nodes from the KV backend to the SQL database.
110
// This includes doing a sanity check after each migration to ensure that the
111
// migrated node matches the original node.
112
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
NEW
113
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
114

×
115
        // Keep track of the number of nodes migrated and the number of
×
116
        // nodes skipped due to errors.
×
117
        var (
×
118
                count   uint64
×
119
                skipped uint64
×
120

×
121
                t0    = time.Now()
×
122
                chunk uint64
×
123
                s     = rate.Sometimes{
×
124
                        Interval: 10 * time.Second,
×
125
                }
×
126
        )
×
127

×
128
        // Loop through each node in the KV store and insert it into the SQL
×
129
        // database.
×
130
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
131
                node *models.LightningNode) error {
×
132

×
133
                pub := node.PubKeyBytes
×
134

×
135
                // Sanity check to ensure that the node has valid extra opaque
×
136
                // data. If it does not, we'll skip it. We need to do this
×
137
                // because previously we would just persist any TLV bytes that
×
138
                // we received without validating them. Now, however, we
×
139
                // normalise the storage of extra opaque data, so we need to
×
140
                // ensure that the data is valid. We don't want to abort the
×
141
                // migration if we encounter a node with invalid extra opaque
×
142
                // data, so we'll just skip it and log a warning.
×
143
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
144
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
145
                        skipped++
×
146
                        log.Warnf("Skipping migration of node %x with invalid "+
×
147
                                "extra opaque data: %v", pub,
×
148
                                node.ExtraOpaqueData)
×
149

×
150
                        return nil
×
151
                } else if err != nil {
×
152
                        return fmt.Errorf("unable to marshal extra "+
×
153
                                "opaque data for node %x: %w", pub, err)
×
154
                }
×
155

156
                count++
×
157
                chunk++
×
158

×
159
                // TODO(elle): At this point, we should check the loaded node
×
160
                // to see if we should extract any DNS addresses from its
×
161
                // opaque type addresses. This is expected to be done in:
×
162
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
163
                // This TODO is being tracked in
×
164
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
165
                // must be addressed before making this code path active in
×
166
                // production.
×
167

×
168
                // Write the node to the SQL database.
×
169
                id, err := upsertNode(ctx, sqlDB, node)
×
170
                if err != nil {
×
171
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
172
                                err)
×
173
                }
×
174

175
                // Fetch it from the SQL store and compare it against the
176
                // original node object to ensure the migration was successful.
177
                dbNode, err := sqlDB.GetNodeByPubKey(
×
178
                        ctx, sqlc.GetNodeByPubKeyParams{
×
179
                                PubKey:  node.PubKeyBytes[:],
×
180
                                Version: int16(ProtocolV1),
×
181
                        },
×
182
                )
×
183
                if err != nil {
×
184
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
185
                                "after migration: %w", pub, err)
×
186
                }
×
187

188
                // Sanity check: ensure the migrated node ID matches the one we
189
                // just inserted.
190
                if dbNode.ID != id {
×
191
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
192
                                "after migration: expected %d, got %d",
×
193
                                pub, id, dbNode.ID)
×
194
                }
×
195

NEW
196
                migratedNode, err := buildNode(ctx, cfg, sqlDB, dbNode)
×
197
                if err != nil {
×
198
                        return fmt.Errorf("could not build migrated node "+
×
199
                                "from dbNode(db id: %d, node pub: %x): %w",
×
200
                                dbNode.ID, pub, err)
×
201
                }
×
202

203
                // Make sure that the node addresses are sorted before
204
                // comparing them to ensure that the order of addresses does
205
                // not affect the comparison.
206
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
207
                        return cmp.Compare(i.String(), j.String())
×
208
                })
×
209
                slices.SortFunc(
×
210
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
211
                                return cmp.Compare(i.String(), j.String())
×
212
                        },
×
213
                )
214

215
                err = sqldb.CompareRecords(
×
216
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
217
                )
×
218
                if err != nil {
×
219
                        return fmt.Errorf("node mismatch after migration "+
×
220
                                "for node %x: %w", pub, err)
×
221
                }
×
222

223
                s.Do(func() {
×
224
                        elapsed := time.Since(t0).Seconds()
×
225
                        ratePerSec := float64(chunk) / elapsed
×
226
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
227
                                count, ratePerSec)
×
228

×
229
                        t0 = time.Now()
×
230
                        chunk = 0
×
231
                })
×
232

233
                return nil
×
234
        }, func() {
×
235
                // No reset is needed since if a retry occurs, the entire
×
236
                // migration will be retried from the start.
×
237
        })
×
238
        if err != nil {
×
239
                return fmt.Errorf("could not migrate nodes: %w", err)
×
240
        }
×
241

242
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
243
                "invalid TLV streams)", count, skipped)
×
244

×
245
        return nil
×
246
}
247

248
// migrateSourceNode migrates the source node from the KV backend to the
249
// SQL database.
250
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
251
        sqlDB SQLQueries) error {
×
252

×
253
        log.Debugf("Migrating source node from KV to SQL")
×
254

×
255
        sourceNode, err := sourceNode(kvdb)
×
256
        if errors.Is(err, ErrSourceNodeNotSet) {
×
257
                // If the source node has not been set yet, we can skip this
×
258
                // migration step.
×
259
                return nil
×
260
        } else if err != nil {
×
261
                return fmt.Errorf("could not get source node from kv "+
×
262
                        "store: %w", err)
×
263
        }
×
264

265
        pub := sourceNode.PubKeyBytes
×
266

×
267
        // Get the DB ID of the source node by its public key. This node must
×
268
        // already exist in the SQL database, as it should have been migrated
×
269
        // in the previous node-migration step.
×
270
        id, err := sqlDB.GetNodeIDByPubKey(
×
271
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
272
                        PubKey:  pub[:],
×
273
                        Version: int16(ProtocolV1),
×
274
                },
×
275
        )
×
276
        if err != nil {
×
277
                return fmt.Errorf("could not get source node ID: %w", err)
×
278
        }
×
279

280
        // Now we can add the source node to the SQL database.
281
        err = sqlDB.AddSourceNode(ctx, id)
×
282
        if err != nil {
×
283
                return fmt.Errorf("could not add source node to SQL store: %w",
×
284
                        err)
×
285
        }
×
286

287
        // Verify that the source node was added correctly by fetching it back
288
        // from the SQL database and checking that the expected DB ID and
289
        // pub key are returned. We don't need to do a whole node comparison
290
        // here, as this was already done in the previous migration step.
291
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
292
        if err != nil {
×
293
                return fmt.Errorf("could not get source nodes from SQL "+
×
294
                        "store: %w", err)
×
295
        }
×
296

297
        // The SQL store has support for multiple source nodes (for future
298
        // protocol versions) but this migration is purely aimed at the V1
299
        // store, and so we expect exactly one source node to be present.
300
        if len(srcNodes) != 1 {
×
301
                return fmt.Errorf("expected exactly one source node, "+
×
302
                        "got %d", len(srcNodes))
×
303
        }
×
304

305
        // Check that the source node ID and pub key match the original
306
        // source node.
307
        if srcNodes[0].NodeID != id {
×
308
                return fmt.Errorf("source node ID mismatch after migration: "+
×
309
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
310
        }
×
311
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
312
        if err != nil {
×
313
                return fmt.Errorf("source node pubkey mismatch after "+
×
314
                        "migration: %w", err)
×
315
        }
×
316

317
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
318

×
319
        return nil
×
320
}
321

322
// migrateChannelsAndPolicies migrates all channels and their policies
323
// from the KV backend to the SQL database.
324
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
NEW
325
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
326

×
327
        var (
×
328
                channelCount       uint64
×
329
                skippedChanCount   uint64
×
330
                policyCount        uint64
×
331
                skippedPolicyCount uint64
×
332

×
333
                t0    = time.Now()
×
334
                chunk uint64
×
335
                s     = rate.Sometimes{
×
336
                        Interval: 10 * time.Second,
×
337
                }
×
338
        )
×
339
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
340
                // If the policy is nil, we can skip it.
×
341
                if policy == nil {
×
342
                        return nil
×
343
                }
×
344

345
                // Unlike the special case of invalid TLV bytes for node and
346
                // channel announcements, we don't need to handle the case for
347
                // channel policies here because it is already handled in the
348
                // `forEachChannel` function. If the policy has invalid TLV
349
                // bytes, then `nil` will be passed to this function.
350

351
                policyCount++
×
352

×
353
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
354
                if err != nil {
×
355
                        return fmt.Errorf("could not migrate channel "+
×
356
                                "policy %d: %w", policy.ChannelID, err)
×
357
                }
×
358

359
                return nil
×
360
        }
361

362
        // Iterate over each channel in the KV store and migrate it and its
363
        // policies to the SQL database.
364
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
365
                policy1 *models.ChannelEdgePolicy,
×
366
                policy2 *models.ChannelEdgePolicy) error {
×
367

×
368
                scid := channel.ChannelID
×
369

×
370
                // Here, we do a sanity check to ensure that the chain hash of
×
371
                // the channel returned by the KV store matches the expected
×
372
                // chain hash. This is important since in the SQL store, we will
×
373
                // no longer explicitly store the chain hash in the channel
×
374
                // info, but rather rely on the chain hash LND is running with.
×
375
                // So this is our way of ensuring that LND is running on the
×
376
                // correct network at migration time.
×
NEW
377
                if channel.ChainHash != cfg.ChainHash {
×
378
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
NEW
379
                                "expected %s", scid, channel.ChainHash,
×
NEW
380
                                cfg.ChainHash)
×
UNCOV
381
                }
×
382

383
                // Sanity check to ensure that the channel has valid extra
384
                // opaque data. If it does not, we'll skip it. We need to do
385
                // this because previously we would just persist any TLV bytes
386
                // that we received without validating them. Now, however, we
387
                // normalise the storage of extra opaque data, so we need to
388
                // ensure that the data is valid. We don't want to abort the
389
                // migration if we encounter a channel with invalid extra opaque
390
                // data, so we'll just skip it and log a warning.
391
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
392
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
393
                        log.Warnf("Skipping channel %d with invalid "+
×
394
                                "extra opaque data: %v", scid,
×
395
                                channel.ExtraOpaqueData)
×
396

×
397
                        skippedChanCount++
×
398

×
399
                        // If we skip a channel, we also skip its policies.
×
400
                        if policy1 != nil {
×
401
                                skippedPolicyCount++
×
402
                        }
×
403
                        if policy2 != nil {
×
404
                                skippedPolicyCount++
×
405
                        }
×
406

407
                        return nil
×
408
                } else if err != nil {
×
409
                        return fmt.Errorf("unable to marshal extra opaque "+
×
410
                                "data for channel %d (%v): %w", scid,
×
411
                                channel.ExtraOpaqueData, err)
×
412
                }
×
413

414
                channelCount++
×
415
                chunk++
×
416

×
417
                err = migrateSingleChannel(
×
NEW
418
                        ctx, cfg, sqlDB, channel, policy1, policy2,
×
NEW
419
                        migChanPolicy,
×
420
                )
×
421
                if err != nil {
×
422
                        return fmt.Errorf("could not migrate channel %d: %w",
×
423
                                scid, err)
×
424
                }
×
425

426
                s.Do(func() {
×
427
                        elapsed := time.Since(t0).Seconds()
×
428
                        ratePerSec := float64(chunk) / elapsed
×
429
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
430
                                channelCount, ratePerSec)
×
431

×
432
                        t0 = time.Now()
×
433
                        chunk = 0
×
434
                })
×
435

436
                return nil
×
437
        }, func() {
×
438
                // No reset is needed since if a retry occurs, the entire
×
439
                // migration will be retried from the start.
×
440
        })
×
441
        if err != nil {
×
442
                return fmt.Errorf("could not migrate channels and policies: %w",
×
443
                        err)
×
444
        }
×
445

446
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
447
                "(skipped %d channels and %d policies due to invalid TLV "+
×
448
                "streams)", channelCount, policyCount, skippedChanCount,
×
449
                skippedPolicyCount)
×
450

×
451
        return nil
×
452
}
453

454
func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig,
455
        sqlDB SQLQueries, channel *models.ChannelEdgeInfo,
456
        policy1, policy2 *models.ChannelEdgePolicy,
457
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
458

×
459
        scid := channel.ChannelID
×
460

×
461
        // First, migrate the channel info along with its policies.
×
462
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
463
        if err != nil {
×
464
                return fmt.Errorf("could not insert record for channel %d "+
×
465
                        "in SQL store: %w", scid, err)
×
466
        }
×
467

468
        // Now, migrate the two channel policies.
469
        err = migChanPolicy(policy1)
×
470
        if err != nil {
×
471
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
472
                        err)
×
473
        }
×
474
        err = migChanPolicy(policy2)
×
475
        if err != nil {
×
476
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
477
                        err)
×
478
        }
×
479

480
        // Now, fetch the channel and its policies from the SQL DB.
481
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
482
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
483
                        Scid:    channelIDToBytes(scid),
×
484
                        Version: int16(ProtocolV1),
×
485
                },
×
486
        )
×
487
        if err != nil {
×
488
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
489
                        err)
×
490
        }
×
491

492
        // Assert that the DB IDs for the channel and nodes are as expected
493
        // given the inserted channel info.
494
        err = sqldb.CompareRecords(
×
495
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
496
        )
×
497
        if err != nil {
×
498
                return err
×
499
        }
×
500
        err = sqldb.CompareRecords(
×
501
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
502
        )
×
503
        if err != nil {
×
504
                return err
×
505
        }
×
506
        err = sqldb.CompareRecords(
×
507
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
508
        )
×
509
        if err != nil {
×
510
                return err
×
511
        }
×
512

513
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
NEW
514
                ctx, cfg, sqlDB, row,
×
515
        )
×
516
        if err != nil {
×
517
                return fmt.Errorf("could not build migrated channel and "+
×
518
                        "policies: %w", err)
×
519
        }
×
520

521
        // Finally, compare the original channel info and
522
        // policies with the migrated ones to ensure they match.
523
        if len(channel.ExtraOpaqueData) == 0 {
×
524
                channel.ExtraOpaqueData = nil
×
525
        }
×
526
        if len(migChan.ExtraOpaqueData) == 0 {
×
527
                migChan.ExtraOpaqueData = nil
×
528
        }
×
529

530
        err = sqldb.CompareRecords(
×
531
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
532
        )
×
533
        if err != nil {
×
534
                return err
×
535
        }
×
536

537
        checkPolicy := func(expPolicy,
×
538
                migPolicy *models.ChannelEdgePolicy) error {
×
539

×
540
                switch {
×
541
                // Both policies are nil, nothing to compare.
542
                case expPolicy == nil && migPolicy == nil:
×
543
                        return nil
×
544

545
                // One of the policies is nil, but the other is not.
546
                case expPolicy == nil || migPolicy == nil:
×
547
                        return fmt.Errorf("expected both policies to be "+
×
548
                                "non-nil. Got expPolicy: %v, "+
×
549
                                "migPolicy: %v", expPolicy, migPolicy)
×
550

551
                // Both policies are non-nil, we can compare them.
552
                default:
×
553
                }
554

555
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
556
                        expPolicy.ExtraOpaqueData = nil
×
557
                }
×
558
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
559
                        migPolicy.ExtraOpaqueData = nil
×
560
                }
×
561

562
                return sqldb.CompareRecords(
×
563
                        *expPolicy, *migPolicy, "channel policy",
×
564
                )
×
565
        }
566

567
        err = checkPolicy(policy1, migPol1)
×
568
        if err != nil {
×
569
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
570
                        err)
×
571
        }
×
572

573
        err = checkPolicy(policy2, migPol2)
×
574
        if err != nil {
×
575
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
576
                        err)
×
577
        }
×
578

579
        return nil
×
580
}
581

582
// migratePruneLog migrates the prune log from the KV backend to the SQL
583
// database. It iterates over each prune log entry in the KV store, inserts it
584
// into the SQL database, and then verifies that the entry was inserted
585
// correctly by fetching it back from the SQL database and comparing it to the
586
// original entry.
587
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
588
        sqlDB SQLQueries) error {
×
589

×
590
        var (
×
591
                count          uint64
×
592
                pruneTipHeight uint32
×
593
                pruneTipHash   chainhash.Hash
×
594

×
595
                t0    = time.Now()
×
596
                chunk uint64
×
597
                s     = rate.Sometimes{
×
598
                        Interval: 10 * time.Second,
×
599
                }
×
600
        )
×
601

×
602
        // migrateSinglePruneEntry is a helper function that inserts a single
×
603
        // prune log entry into the SQL database and verifies that it was
×
604
        // inserted correctly.
×
605
        migrateSinglePruneEntry := func(height uint32,
×
606
                hash *chainhash.Hash) error {
×
607

×
608
                count++
×
NEW
609
                chunk++
×
610

×
611
                // Keep track of the prune tip height and hash.
×
612
                if height > pruneTipHeight {
×
613
                        pruneTipHeight = height
×
614
                        pruneTipHash = *hash
×
615
                }
×
616

617
                err := sqlDB.UpsertPruneLogEntry(
×
618
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
619
                                BlockHeight: int64(height),
×
620
                                BlockHash:   hash[:],
×
621
                        },
×
622
                )
×
623
                if err != nil {
×
624
                        return fmt.Errorf("unable to insert prune log "+
×
625
                                "entry for height %d: %w", height, err)
×
626
                }
×
627

628
                // Now, check that the entry was inserted correctly.
629
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
630
                        ctx, int64(height),
×
631
                )
×
632
                if err != nil {
×
633
                        return fmt.Errorf("could not get prune hash "+
×
634
                                "for height %d: %w", height, err)
×
635
                }
×
636

637
                return sqldb.CompareRecords(
×
638
                        hash[:], migratedHash, "prune log entry",
×
639
                )
×
640
        }
641

642
        // Iterate over each prune log entry in the KV store and migrate it to
643
        // the SQL database.
644
        err := forEachPruneLogEntry(
×
645
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
646
                        err := migrateSinglePruneEntry(height, hash)
×
647
                        if err != nil {
×
648
                                return fmt.Errorf("could not migrate "+
×
649
                                        "prune log entry at height %d: %w",
×
650
                                        height, err)
×
651
                        }
×
652

653
                        s.Do(func() {
×
654
                                elapsed := time.Since(t0).Seconds()
×
655
                                ratePerSec := float64(chunk) / elapsed
×
656
                                log.Debugf("Migrated %d prune log "+
×
657
                                        "entries (%.2f entries/sec)",
×
658
                                        count, ratePerSec)
×
659

×
660
                                t0 = time.Now()
×
661
                                chunk = 0
×
662
                        })
×
663

664
                        return nil
×
665
                },
666
        )
667
        if err != nil {
×
668
                return fmt.Errorf("could not migrate prune log: %w", err)
×
669
        }
×
670

671
        // Check that the prune tip is set correctly in the SQL
672
        // database.
673
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
674
        if errors.Is(err, sql.ErrNoRows) {
×
675
                // The ErrGraphNeverPruned error is expected if no prune log
×
676
                // entries were migrated from the kvdb store. Otherwise, it's
×
677
                // an unexpected error.
×
678
                if count == 0 {
×
679
                        log.Infof("No prune log entries found in KV store " +
×
680
                                "to migrate")
×
681
                        return nil
×
682
                }
×
683
                // Fall-through to the next error check.
684
        }
685
        if err != nil {
×
686
                return fmt.Errorf("could not get prune tip: %w", err)
×
687
        }
×
688

689
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
690
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
691

×
692
                return fmt.Errorf("prune tip mismatch after migration: "+
×
693
                        "expected height %d, hash %s; got height %d, "+
×
694
                        "hash %s", pruneTipHeight, pruneTipHash,
×
695
                        pruneTip.BlockHeight,
×
696
                        chainhash.Hash(pruneTip.BlockHash))
×
697
        }
×
698

699
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
700
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
701
                pruneTipHash)
×
702

×
703
        return nil
×
704
}
705

706
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
707
// and policies from the given row returned by the SQL query
708
// GetChannelBySCIDWithPolicies.
709
func getAndBuildChanAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
710
        db SQLQueries,
711
        row sqlc.GetChannelBySCIDWithPoliciesRow) (*models.ChannelEdgeInfo,
712
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
713

×
714
        node1, node2, err := buildNodeVertices(
×
715
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
716
        )
×
717
        if err != nil {
×
718
                return nil, nil, nil, err
×
719
        }
×
720

721
        edge, err := getAndBuildEdgeInfo(
×
NEW
722
                ctx, cfg, db, row.GraphChannel, node1, node2,
×
723
        )
×
724
        if err != nil {
×
725
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
726
                        "info: %w", err)
×
727
        }
×
728

729
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
730
        if err != nil {
×
731
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
732
                        "policies: %w", err)
×
733
        }
×
734

735
        policy1, policy2, err := getAndBuildChanPolicies(
×
NEW
736
                ctx, cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, node1,
×
NEW
737
                node2,
×
738
        )
×
739
        if err != nil {
×
740
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
741
                        "policies: %w", err)
×
742
        }
×
743

744
        return edge, policy1, policy2, nil
×
745
}
746

747
// forEachPruneLogEntry iterates over each prune log entry in the KV
748
// backend and calls the provided callback function for each entry.
749
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
750
        hash *chainhash.Hash) error) error {
×
751

×
752
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
753
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
754
                if metaBucket == nil {
×
755
                        return ErrGraphNotFound
×
756
                }
×
757

758
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
759
                if pruneBucket == nil {
×
760
                        // The graph has never been pruned and so, there are no
×
761
                        // entries to iterate over.
×
762
                        return nil
×
763
                }
×
764

765
                return pruneBucket.ForEach(func(k, v []byte) error {
×
766
                        blockHeight := byteOrder.Uint32(k)
×
767
                        var blockHash chainhash.Hash
×
768
                        copy(blockHash[:], v)
×
769

×
770
                        return cb(blockHeight, &blockHash)
×
771
                })
×
772
        }, func() {})
×
773
}
774

775
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
776
// the SQL database. It iterates over each closed SCID in the KV store, inserts
777
// it into the SQL database, and then verifies that the SCID was inserted
778
// correctly by checking if the channel with the given SCID is seen as closed in
779
// the SQL database.
780
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
781
        sqlDB SQLQueries) error {
×
782

×
783
        var (
×
784
                count uint64
×
785

×
786
                t0    = time.Now()
×
787
                chunk uint64
×
788
                s     = rate.Sometimes{
×
789
                        Interval: 10 * time.Second,
×
790
                }
×
791
        )
×
792
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
793
                count++
×
NEW
794
                chunk++
×
795

×
796
                chanIDB := channelIDToBytes(scid.ToUint64())
×
797
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
798
                if err != nil {
×
799
                        return fmt.Errorf("could not insert closed channel "+
×
800
                                "with SCID %s: %w", scid, err)
×
801
                }
×
802

803
                // Now, verify that the channel with the given SCID is
804
                // seen as closed.
805
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
806
                if err != nil {
×
807
                        return fmt.Errorf("could not check if channel %s "+
×
808
                                "is closed: %w", scid, err)
×
809
                }
×
810

811
                if !isClosed {
×
812
                        return fmt.Errorf("channel %s should be closed, "+
×
813
                                "but is not", scid)
×
814
                }
×
815

816
                s.Do(func() {
×
817
                        elapsed := time.Since(t0).Seconds()
×
818
                        ratePerSec := float64(chunk) / elapsed
×
819
                        log.Debugf("Migrated %d closed scids "+
×
820
                                "(%.2f entries/sec)", count, ratePerSec)
×
821

×
822
                        t0 = time.Now()
×
823
                        chunk = 0
×
824
                })
×
825

826
                return nil
×
827
        }
828

829
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
830
        if err != nil {
×
831
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
832
                        err)
×
833
        }
×
834

835
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
836

×
837
        return nil
×
838
}
839

840
// migrateZombieIndex migrates the zombie index from the KV backend to
841
// the SQL database. It iterates over each zombie channel in the KV store,
842
// inserts it into the SQL database, and then verifies that the channel is
843
// indeed marked as a zombie channel in the SQL database.
844
//
845
// NOTE: before inserting an entry into the zombie index, the function checks
846
// if the channel is already marked as closed in the SQL store. If it is,
847
// the entry is skipped. This means that the resulting zombie index count in
848
// the SQL store may well be less than the count of zombie channels in the KV
849
// store.
850
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
851
        sqlDB SQLQueries) error {
×
852

×
853
        var (
×
854
                count uint64
×
855

×
856
                t0    = time.Now()
×
857
                chunk uint64
×
858
                s     = rate.Sometimes{
×
859
                        Interval: 10 * time.Second,
×
860
                }
×
861
        )
×
862
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
863
                pubKey2 [33]byte) error {
×
864

×
865
                chanIDB := channelIDToBytes(chanID)
×
866

×
867
                // If it is in the closed SCID index, we don't need to
×
868
                // add it to the zombie index.
×
869
                //
×
870
                // NOTE: this means that the resulting zombie index count in
×
871
                // the SQL store may well be less than the count of zombie
×
872
                // channels in the KV store.
×
873
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
874
                if err != nil {
×
875
                        return fmt.Errorf("could not check closed "+
×
876
                                "channel: %w", err)
×
877
                }
×
878
                if isClosed {
×
879
                        return nil
×
880
                }
×
881

882
                count++
×
NEW
883
                chunk++
×
884

×
885
                err = sqlDB.UpsertZombieChannel(
×
886
                        ctx, sqlc.UpsertZombieChannelParams{
×
887
                                Version:  int16(ProtocolV1),
×
888
                                Scid:     chanIDB,
×
889
                                NodeKey1: pubKey1[:],
×
890
                                NodeKey2: pubKey2[:],
×
891
                        },
×
892
                )
×
893
                if err != nil {
×
894
                        return fmt.Errorf("could not upsert zombie "+
×
895
                                "channel %d: %w", chanID, err)
×
896
                }
×
897

898
                // Finally, verify that the channel is indeed marked as a
899
                // zombie channel.
900
                isZombie, err := sqlDB.IsZombieChannel(
×
901
                        ctx, sqlc.IsZombieChannelParams{
×
902
                                Version: int16(ProtocolV1),
×
903
                                Scid:    chanIDB,
×
904
                        },
×
905
                )
×
906
                if err != nil {
×
907
                        return fmt.Errorf("could not check if "+
×
908
                                "channel %d is zombie: %w", chanID, err)
×
909
                }
×
910

911
                if !isZombie {
×
912
                        return fmt.Errorf("channel %d should be "+
×
913
                                "a zombie, but is not", chanID)
×
914
                }
×
915

916
                s.Do(func() {
×
917
                        elapsed := time.Since(t0).Seconds()
×
918
                        ratePerSec := float64(chunk) / elapsed
×
919
                        log.Debugf("Migrated %d zombie index entries "+
×
920
                                "(%.2f entries/sec)", count, ratePerSec)
×
921

×
922
                        t0 = time.Now()
×
923
                        chunk = 0
×
924
                })
×
925

926
                return nil
×
927
        })
928
        if err != nil {
×
929
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
930
        }
×
931

932
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
933

×
934
        return nil
×
935
}
936

937
// forEachZombieEntry iterates over each zombie channel entry in the
938
// KV backend and calls the provided callback function for each entry.
939
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
940
        pubKey2 [33]byte) error) error {
×
941

×
942
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
943
                edges := tx.ReadBucket(edgeBucket)
×
944
                if edges == nil {
×
945
                        return ErrGraphNoEdgesFound
×
946
                }
×
947
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
948
                if zombieIndex == nil {
×
949
                        return nil
×
950
                }
×
951

952
                return zombieIndex.ForEach(func(k, v []byte) error {
×
953
                        var pubKey1, pubKey2 [33]byte
×
954
                        copy(pubKey1[:], v[:33])
×
955
                        copy(pubKey2[:], v[33:])
×
956

×
957
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
958
                })
×
959
        }, func() {})
×
960
}
961

962
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
963
// the provided callback function for each SCID.
964
func forEachClosedSCID(db kvdb.Backend,
965
        cb func(lnwire.ShortChannelID) error) error {
×
966

×
967
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
968
                closedScids := tx.ReadBucket(closedScidBucket)
×
969
                if closedScids == nil {
×
970
                        return nil
×
971
                }
×
972

973
                return closedScids.ForEach(func(k, _ []byte) error {
×
974
                        return cb(lnwire.NewShortChanIDFromInt(
×
975
                                byteOrder.Uint64(k),
×
976
                        ))
×
977
                })
×
978
        }, func() {})
×
979
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc