• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15760549439

19 Jun 2025 02:42PM UTC coverage: 68.141% (-0.02%) from 68.161%
15760549439

push

github

web-flow
Merge pull request #9959 from ellemouton/chanGraphContext2

multi: add context.Context param to more graphdb.V1Store methods

49 of 72 new or added lines in 13 files covered. (68.06%)

118 existing lines in 28 files now uncovered.

134468 of 197337 relevant lines covered (68.14%)

22222.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "encoding/hex"
8
        "errors"
9
        "fmt"
10
        "math"
11
        "net"
12
        "strconv"
13
        "sync"
14
        "time"
15

16
        "github.com/btcsuite/btcd/btcec/v2"
17
        "github.com/btcsuite/btcd/btcutil"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/wire"
20
        "github.com/lightningnetwork/lnd/batch"
21
        "github.com/lightningnetwork/lnd/fn/v2"
22
        "github.com/lightningnetwork/lnd/graph/db/models"
23
        "github.com/lightningnetwork/lnd/lnwire"
24
        "github.com/lightningnetwork/lnd/routing/route"
25
        "github.com/lightningnetwork/lnd/sqldb"
26
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
27
        "github.com/lightningnetwork/lnd/tlv"
28
        "github.com/lightningnetwork/lnd/tor"
29
)
30

31
// pageSize is the limit for the number of records that can be returned
32
// in a paginated query. This can be tuned after some benchmarks.
33
const pageSize = 2000
34

35
// ProtocolVersion is an enum that defines the gossip protocol version of a
36
// message.
37
type ProtocolVersion uint8
38

39
const (
40
        // ProtocolV1 is the gossip protocol version defined in BOLT #7.
41
        ProtocolV1 ProtocolVersion = 1
42
)
43

44
// String returns a string representation of the protocol version.
45
func (v ProtocolVersion) String() string {
×
46
        return fmt.Sprintf("V%d", v)
×
47
}
×
48

49
// SQLQueries is a subset of the sqlc.Querier interface that can be used to
50
// execute queries against the SQL graph tables.
51
//
52
//nolint:ll,interfacebloat
53
type SQLQueries interface {
54
        /*
55
                Node queries.
56
        */
57
        UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
58
        GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error)
59
        GetNodeIDByPubKey(ctx context.Context, arg sqlc.GetNodeIDByPubKeyParams) (int64, error)
60
        GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error)
61
        ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.Node, error)
62
        ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error)
63
        DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error)
64

65
        GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error)
66
        UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error
67
        DeleteExtraNodeType(ctx context.Context, arg sqlc.DeleteExtraNodeTypeParams) error
68

69
        InsertNodeAddress(ctx context.Context, arg sqlc.InsertNodeAddressParams) error
70
        GetNodeAddressesByPubKey(ctx context.Context, arg sqlc.GetNodeAddressesByPubKeyParams) ([]sqlc.GetNodeAddressesByPubKeyRow, error)
71
        DeleteNodeAddresses(ctx context.Context, nodeID int64) error
72

73
        InsertNodeFeature(ctx context.Context, arg sqlc.InsertNodeFeatureParams) error
74
        GetNodeFeatures(ctx context.Context, nodeID int64) ([]sqlc.NodeFeature, error)
75
        GetNodeFeaturesByPubKey(ctx context.Context, arg sqlc.GetNodeFeaturesByPubKeyParams) ([]int32, error)
76
        DeleteNodeFeature(ctx context.Context, arg sqlc.DeleteNodeFeatureParams) error
77

78
        /*
79
                Source node queries.
80
        */
81
        AddSourceNode(ctx context.Context, nodeID int64) error
82
        GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
83

84
        /*
85
                Channel queries.
86
        */
87
        CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
88
        GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
89
        GetChannelAndNodesBySCID(ctx context.Context, arg sqlc.GetChannelAndNodesBySCIDParams) (sqlc.GetChannelAndNodesBySCIDRow, error)
90
        GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]sqlc.GetChannelFeaturesAndExtrasRow, error)
91
        HighestSCID(ctx context.Context, version int16) ([]byte, error)
92
        ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error)
93

94
        CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
95
        InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
96

97
        /*
98
                Channel Policy table queries.
99
        */
100
        UpsertEdgePolicy(ctx context.Context, arg sqlc.UpsertEdgePolicyParams) (int64, error)
101

102
        InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
103
        GetChannelPolicyExtraTypes(ctx context.Context, arg sqlc.GetChannelPolicyExtraTypesParams) ([]sqlc.GetChannelPolicyExtraTypesRow, error)
104
        DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error
105
}
106

107
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
108
// database operations.
109
type BatchedSQLQueries interface {
110
        SQLQueries
111
        sqldb.BatchedTx[SQLQueries]
112
}
113

114
// SQLStore is an implementation of the V1Store interface that uses a SQL
115
// database as the backend.
116
//
117
// NOTE: currently, this temporarily embeds the KVStore struct so that we can
118
// implement the V1Store interface incrementally. For any method not
119
// implemented,  things will fall back to the KVStore. This is ONLY the case
120
// for the time being while this struct is purely used in unit tests only.
121
type SQLStore struct {
122
        cfg *SQLStoreConfig
123
        db  BatchedSQLQueries
124

125
        // cacheMu guards all caches (rejectCache and chanCache). If
126
        // this mutex will be acquired at the same time as the DB mutex then
127
        // the cacheMu MUST be acquired first to prevent deadlock.
128
        cacheMu     sync.RWMutex
129
        rejectCache *rejectCache
130
        chanCache   *channelCache
131

132
        chanScheduler batch.Scheduler[SQLQueries]
133
        nodeScheduler batch.Scheduler[SQLQueries]
134

135
        srcNodes  map[ProtocolVersion]*srcNodeInfo
136
        srcNodeMu sync.Mutex
137

138
        // Temporary fall-back to the KVStore so that we can implement the
139
        // interface incrementally.
140
        *KVStore
141
}
142

143
// A compile-time assertion to ensure that SQLStore implements the V1Store
144
// interface.
145
var _ V1Store = (*SQLStore)(nil)
146

147
// SQLStoreConfig holds the configuration for the SQLStore.
148
type SQLStoreConfig struct {
149
        // ChainHash is the genesis hash for the chain that all the gossip
150
        // messages in this store are aimed at.
151
        ChainHash chainhash.Hash
152
}
153

154
// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
155
// storage backend.
156
func NewSQLStore(cfg *SQLStoreConfig, db BatchedSQLQueries, kvStore *KVStore,
157
        options ...StoreOptionModifier) (*SQLStore, error) {
×
158

×
159
        opts := DefaultOptions()
×
160
        for _, o := range options {
×
161
                o(opts)
×
162
        }
×
163

164
        if opts.NoMigration {
×
165
                return nil, fmt.Errorf("the NoMigration option is not yet " +
×
166
                        "supported for SQL stores")
×
167
        }
×
168

169
        s := &SQLStore{
×
170
                cfg:         cfg,
×
171
                db:          db,
×
172
                KVStore:     kvStore,
×
173
                rejectCache: newRejectCache(opts.RejectCacheSize),
×
174
                chanCache:   newChannelCache(opts.ChannelCacheSize),
×
175
                srcNodes:    make(map[ProtocolVersion]*srcNodeInfo),
×
176
        }
×
177

×
178
        s.chanScheduler = batch.NewTimeScheduler(
×
179
                db, &s.cacheMu, opts.BatchCommitInterval,
×
180
        )
×
181
        s.nodeScheduler = batch.NewTimeScheduler(
×
182
                db, nil, opts.BatchCommitInterval,
×
183
        )
×
184

×
185
        return s, nil
×
186
}
187

188
// AddLightningNode adds a vertex/node to the graph database. If the node is not
189
// in the database from before, this will add a new, unconnected one to the
190
// graph. If it is present from before, this will update that node's
191
// information.
192
//
193
// NOTE: part of the V1Store interface.
194
func (s *SQLStore) AddLightningNode(ctx context.Context,
195
        node *models.LightningNode, opts ...batch.SchedulerOption) error {
×
196

×
197
        r := &batch.Request[SQLQueries]{
×
198
                Opts: batch.NewSchedulerOptions(opts...),
×
199
                Do: func(queries SQLQueries) error {
×
200
                        _, err := upsertNode(ctx, queries, node)
×
201
                        return err
×
202
                },
×
203
        }
204

205
        return s.nodeScheduler.Execute(ctx, r)
×
206
}
207

208
// FetchLightningNode attempts to look up a target node by its identity public
209
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
210
// returned.
211
//
212
// NOTE: part of the V1Store interface.
213
func (s *SQLStore) FetchLightningNode(ctx context.Context,
214
        pubKey route.Vertex) (*models.LightningNode, error) {
×
215

×
216
        var node *models.LightningNode
×
217
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
218
                var err error
×
219
                _, node, err = getNodeByPubKey(ctx, db, pubKey)
×
220

×
221
                return err
×
222
        }, sqldb.NoOpReset)
×
223
        if err != nil {
×
224
                return nil, fmt.Errorf("unable to fetch node: %w", err)
×
225
        }
×
226

227
        return node, nil
×
228
}
229

230
// HasLightningNode determines if the graph has a vertex identified by the
231
// target node identity public key. If the node exists in the database, a
232
// timestamp of when the data for the node was lasted updated is returned along
233
// with a true boolean. Otherwise, an empty time.Time is returned with a false
234
// boolean.
235
//
236
// NOTE: part of the V1Store interface.
237
func (s *SQLStore) HasLightningNode(ctx context.Context,
238
        pubKey [33]byte) (time.Time, bool, error) {
×
239

×
240
        var (
×
241
                exists     bool
×
242
                lastUpdate time.Time
×
243
        )
×
244
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
245
                dbNode, err := db.GetNodeByPubKey(
×
246
                        ctx, sqlc.GetNodeByPubKeyParams{
×
247
                                Version: int16(ProtocolV1),
×
248
                                PubKey:  pubKey[:],
×
249
                        },
×
250
                )
×
251
                if errors.Is(err, sql.ErrNoRows) {
×
252
                        return nil
×
253
                } else if err != nil {
×
254
                        return fmt.Errorf("unable to fetch node: %w", err)
×
255
                }
×
256

257
                exists = true
×
258

×
259
                if dbNode.LastUpdate.Valid {
×
260
                        lastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
261
                }
×
262

263
                return nil
×
264
        }, sqldb.NoOpReset)
265
        if err != nil {
×
266
                return time.Time{}, false,
×
267
                        fmt.Errorf("unable to fetch node: %w", err)
×
268
        }
×
269

270
        return lastUpdate, exists, nil
×
271
}
272

273
// AddrsForNode returns all known addresses for the target node public key
274
// that the graph DB is aware of. The returned boolean indicates if the
275
// given node is unknown to the graph DB or not.
276
//
277
// NOTE: part of the V1Store interface.
278
func (s *SQLStore) AddrsForNode(ctx context.Context,
279
        nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
×
280

×
281
        var (
×
282
                addresses []net.Addr
×
283
                known     bool
×
284
        )
×
285
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
286
                var err error
×
287
                known, addresses, err = getNodeAddresses(
×
288
                        ctx, db, nodePub.SerializeCompressed(),
×
289
                )
×
290
                if err != nil {
×
291
                        return fmt.Errorf("unable to fetch node addresses: %w",
×
292
                                err)
×
293
                }
×
294

295
                return nil
×
296
        }, sqldb.NoOpReset)
297
        if err != nil {
×
298
                return false, nil, fmt.Errorf("unable to get addresses for "+
×
299
                        "node(%x): %w", nodePub.SerializeCompressed(), err)
×
300
        }
×
301

302
        return known, addresses, nil
×
303
}
304

305
// DeleteLightningNode starts a new database transaction to remove a vertex/node
306
// from the database according to the node's public key.
307
//
308
// NOTE: part of the V1Store interface.
309
func (s *SQLStore) DeleteLightningNode(ctx context.Context,
310
        pubKey route.Vertex) error {
×
311

×
312
        err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
×
313
                res, err := db.DeleteNodeByPubKey(
×
314
                        ctx, sqlc.DeleteNodeByPubKeyParams{
×
315
                                Version: int16(ProtocolV1),
×
316
                                PubKey:  pubKey[:],
×
317
                        },
×
318
                )
×
319
                if err != nil {
×
320
                        return err
×
321
                }
×
322

323
                rows, err := res.RowsAffected()
×
324
                if err != nil {
×
325
                        return err
×
326
                }
×
327

328
                if rows == 0 {
×
329
                        return ErrGraphNodeNotFound
×
330
                } else if rows > 1 {
×
331
                        return fmt.Errorf("deleted %d rows, expected 1", rows)
×
332
                }
×
333

334
                return err
×
335
        }, sqldb.NoOpReset)
336
        if err != nil {
×
337
                return fmt.Errorf("unable to delete node: %w", err)
×
338
        }
×
339

340
        return nil
×
341
}
342

343
// FetchNodeFeatures returns the features of the given node. If no features are
344
// known for the node, an empty feature vector is returned.
345
//
346
// NOTE: this is part of the graphdb.NodeTraverser interface.
347
func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) (
348
        *lnwire.FeatureVector, error) {
×
349

×
350
        ctx := context.TODO()
×
351

×
352
        return fetchNodeFeatures(ctx, s.db, nodePub)
×
353
}
×
354

355
// LookupAlias attempts to return the alias as advertised by the target node.
356
//
357
// NOTE: part of the V1Store interface.
358
func (s *SQLStore) LookupAlias(ctx context.Context,
NEW
359
        pub *btcec.PublicKey) (string, error) {
×
NEW
360

×
NEW
361
        var alias string
×
362
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
363
                dbNode, err := db.GetNodeByPubKey(
×
364
                        ctx, sqlc.GetNodeByPubKeyParams{
×
365
                                Version: int16(ProtocolV1),
×
366
                                PubKey:  pub.SerializeCompressed(),
×
367
                        },
×
368
                )
×
369
                if errors.Is(err, sql.ErrNoRows) {
×
370
                        return ErrNodeAliasNotFound
×
371
                } else if err != nil {
×
372
                        return fmt.Errorf("unable to fetch node: %w", err)
×
373
                }
×
374

375
                if !dbNode.Alias.Valid {
×
376
                        return ErrNodeAliasNotFound
×
377
                }
×
378

379
                alias = dbNode.Alias.String
×
380

×
381
                return nil
×
382
        }, sqldb.NoOpReset)
383
        if err != nil {
×
384
                return "", fmt.Errorf("unable to look up alias: %w", err)
×
385
        }
×
386

387
        return alias, nil
×
388
}
389

390
// SourceNode returns the source node of the graph. The source node is treated
391
// as the center node within a star-graph. This method may be used to kick off
392
// a path finding algorithm in order to explore the reachability of another
393
// node based off the source node.
394
//
395
// NOTE: part of the V1Store interface.
396
func (s *SQLStore) SourceNode(ctx context.Context) (*models.LightningNode,
NEW
397
        error) {
×
398

×
399
        var node *models.LightningNode
×
400
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
401
                _, nodePub, err := s.getSourceNode(ctx, db, ProtocolV1)
×
402
                if err != nil {
×
403
                        return fmt.Errorf("unable to fetch V1 source node: %w",
×
404
                                err)
×
405
                }
×
406

407
                _, node, err = getNodeByPubKey(ctx, db, nodePub)
×
408

×
409
                return err
×
410
        }, sqldb.NoOpReset)
411
        if err != nil {
×
412
                return nil, fmt.Errorf("unable to fetch source node: %w", err)
×
413
        }
×
414

415
        return node, nil
×
416
}
417

418
// SetSourceNode sets the source node within the graph database. The source
419
// node is to be used as the center of a star-graph within path finding
420
// algorithms.
421
//
422
// NOTE: part of the V1Store interface.
423
func (s *SQLStore) SetSourceNode(ctx context.Context,
NEW
424
        node *models.LightningNode) error {
×
425

×
426
        return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
×
427
                id, err := upsertNode(ctx, db, node)
×
428
                if err != nil {
×
429
                        return fmt.Errorf("unable to upsert source node: %w",
×
430
                                err)
×
431
                }
×
432

433
                // Make sure that if a source node for this version is already
434
                // set, then the ID is the same as the one we are about to set.
435
                dbSourceNodeID, _, err := s.getSourceNode(ctx, db, ProtocolV1)
×
436
                if err != nil && !errors.Is(err, ErrSourceNodeNotSet) {
×
437
                        return fmt.Errorf("unable to fetch source node: %w",
×
438
                                err)
×
439
                } else if err == nil {
×
440
                        if dbSourceNodeID != id {
×
441
                                return fmt.Errorf("v1 source node already "+
×
442
                                        "set to a different node: %d vs %d",
×
443
                                        dbSourceNodeID, id)
×
444
                        }
×
445

446
                        return nil
×
447
                }
448

449
                return db.AddSourceNode(ctx, id)
×
450
        }, sqldb.NoOpReset)
451
}
452

453
// NodeUpdatesInHorizon returns all the known lightning node which have an
454
// update timestamp within the passed range. This method can be used by two
455
// nodes to quickly determine if they have the same set of up to date node
456
// announcements.
457
//
458
// NOTE: This is part of the V1Store interface.
459
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
460
        endTime time.Time) ([]models.LightningNode, error) {
×
461

×
462
        ctx := context.TODO()
×
463

×
464
        var nodes []models.LightningNode
×
465
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
466
                dbNodes, err := db.GetNodesByLastUpdateRange(
×
467
                        ctx, sqlc.GetNodesByLastUpdateRangeParams{
×
468
                                StartTime: sqldb.SQLInt64(startTime.Unix()),
×
469
                                EndTime:   sqldb.SQLInt64(endTime.Unix()),
×
470
                        },
×
471
                )
×
472
                if err != nil {
×
473
                        return fmt.Errorf("unable to fetch nodes: %w", err)
×
474
                }
×
475

476
                for _, dbNode := range dbNodes {
×
477
                        node, err := buildNode(ctx, db, &dbNode)
×
478
                        if err != nil {
×
479
                                return fmt.Errorf("unable to build node: %w",
×
480
                                        err)
×
481
                        }
×
482

483
                        nodes = append(nodes, *node)
×
484
                }
485

486
                return nil
×
487
        }, sqldb.NoOpReset)
488
        if err != nil {
×
489
                return nil, fmt.Errorf("unable to fetch nodes: %w", err)
×
490
        }
×
491

492
        return nodes, nil
×
493
}
494

495
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
496
// undirected edge from the two target nodes are created. The information stored
497
// denotes the static attributes of the channel, such as the channelID, the keys
498
// involved in creation of the channel, and the set of features that the channel
499
// supports. The chanPoint and chanID are used to uniquely identify the edge
500
// globally within the database.
501
//
502
// NOTE: part of the V1Store interface.
503
func (s *SQLStore) AddChannelEdge(ctx context.Context,
NEW
504
        edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error {
×
505

×
506
        var alreadyExists bool
×
507
        r := &batch.Request[SQLQueries]{
×
508
                Opts: batch.NewSchedulerOptions(opts...),
×
509
                Reset: func() {
×
510
                        alreadyExists = false
×
511
                },
×
512
                Do: func(tx SQLQueries) error {
×
513
                        err := insertChannel(ctx, tx, edge)
×
514

×
515
                        // Silence ErrEdgeAlreadyExist so that the batch can
×
516
                        // succeed, but propagate the error via local state.
×
517
                        if errors.Is(err, ErrEdgeAlreadyExist) {
×
518
                                alreadyExists = true
×
519
                                return nil
×
520
                        }
×
521

522
                        return err
×
523
                },
524
                OnCommit: func(err error) error {
×
525
                        switch {
×
526
                        case err != nil:
×
527
                                return err
×
528
                        case alreadyExists:
×
529
                                return ErrEdgeAlreadyExist
×
530
                        default:
×
531
                                s.rejectCache.remove(edge.ChannelID)
×
532
                                s.chanCache.remove(edge.ChannelID)
×
533
                                return nil
×
534
                        }
535
                },
536
        }
537

538
        return s.chanScheduler.Execute(ctx, r)
×
539
}
540

541
// HighestChanID returns the "highest" known channel ID in the channel graph.
542
// This represents the "newest" channel from the PoV of the chain. This method
543
// can be used by peers to quickly determine if their graphs are in sync.
544
//
545
// NOTE: This is part of the V1Store interface.
NEW
546
func (s *SQLStore) HighestChanID(ctx context.Context) (uint64, error) {
×
547
        var highestChanID uint64
×
548
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
549
                chanID, err := db.HighestSCID(ctx, int16(ProtocolV1))
×
550
                if errors.Is(err, sql.ErrNoRows) {
×
551
                        return nil
×
552
                } else if err != nil {
×
553
                        return fmt.Errorf("unable to fetch highest chan ID: %w",
×
554
                                err)
×
555
                }
×
556

557
                highestChanID = byteOrder.Uint64(chanID)
×
558

×
559
                return nil
×
560
        }, sqldb.NoOpReset)
561
        if err != nil {
×
562
                return 0, fmt.Errorf("unable to fetch highest chan ID: %w", err)
×
563
        }
×
564

565
        return highestChanID, nil
×
566
}
567

568
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
569
// within the database for the referenced channel. The `flags` attribute within
570
// the ChannelEdgePolicy determines which of the directed edges are being
571
// updated. If the flag is 1, then the first node's information is being
572
// updated, otherwise it's the second node's information. The node ordering is
573
// determined by the lexicographical ordering of the identity public keys of the
574
// nodes on either side of the channel.
575
//
576
// NOTE: part of the V1Store interface.
577
func (s *SQLStore) UpdateEdgePolicy(ctx context.Context,
578
        edge *models.ChannelEdgePolicy,
579
        opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
×
580

×
581
        var (
×
582
                isUpdate1    bool
×
583
                edgeNotFound bool
×
584
                from, to     route.Vertex
×
585
        )
×
586

×
587
        r := &batch.Request[SQLQueries]{
×
588
                Opts: batch.NewSchedulerOptions(opts...),
×
589
                Reset: func() {
×
590
                        isUpdate1 = false
×
591
                        edgeNotFound = false
×
592
                },
×
593
                Do: func(tx SQLQueries) error {
×
594
                        var err error
×
595
                        from, to, isUpdate1, err = updateChanEdgePolicy(
×
596
                                ctx, tx, edge,
×
597
                        )
×
598
                        if err != nil {
×
599
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
×
600
                        }
×
601

602
                        // Silence ErrEdgeNotFound so that the batch can
603
                        // succeed, but propagate the error via local state.
604
                        if errors.Is(err, ErrEdgeNotFound) {
×
605
                                edgeNotFound = true
×
606
                                return nil
×
607
                        }
×
608

609
                        return err
×
610
                },
611
                OnCommit: func(err error) error {
×
612
                        switch {
×
613
                        case err != nil:
×
614
                                return err
×
615
                        case edgeNotFound:
×
616
                                return ErrEdgeNotFound
×
617
                        default:
×
618
                                s.updateEdgeCache(edge, isUpdate1)
×
619
                                return nil
×
620
                        }
621
                },
622
        }
623

624
        err := s.chanScheduler.Execute(ctx, r)
×
625

×
626
        return from, to, err
×
627
}
628

629
// updateEdgeCache updates our reject and channel caches with the new
630
// edge policy information.
631
func (s *SQLStore) updateEdgeCache(e *models.ChannelEdgePolicy,
632
        isUpdate1 bool) {
×
633

×
634
        // If an entry for this channel is found in reject cache, we'll modify
×
635
        // the entry with the updated timestamp for the direction that was just
×
636
        // written. If the edge doesn't exist, we'll load the cache entry lazily
×
637
        // during the next query for this edge.
×
638
        if entry, ok := s.rejectCache.get(e.ChannelID); ok {
×
639
                if isUpdate1 {
×
640
                        entry.upd1Time = e.LastUpdate.Unix()
×
641
                } else {
×
642
                        entry.upd2Time = e.LastUpdate.Unix()
×
643
                }
×
644
                s.rejectCache.insert(e.ChannelID, entry)
×
645
        }
646

647
        // If an entry for this channel is found in channel cache, we'll modify
648
        // the entry with the updated policy for the direction that was just
649
        // written. If the edge doesn't exist, we'll defer loading the info and
650
        // policies and lazily read from disk during the next query.
651
        if channel, ok := s.chanCache.get(e.ChannelID); ok {
×
652
                if isUpdate1 {
×
653
                        channel.Policy1 = e
×
654
                } else {
×
655
                        channel.Policy2 = e
×
656
                }
×
657
                s.chanCache.insert(e.ChannelID, channel)
×
658
        }
659
}
660

661
// ForEachSourceNodeChannel iterates through all channels of the source node,
662
// executing the passed callback on each. The call-back is provided with the
663
// channel's outpoint, whether we have a policy for the channel and the channel
664
// peer's node information.
665
//
666
// NOTE: part of the V1Store interface.
667
func (s *SQLStore) ForEachSourceNodeChannel(cb func(chanPoint wire.OutPoint,
668
        havePolicy bool, otherNode *models.LightningNode) error) error {
×
669

×
670
        var ctx = context.TODO()
×
671

×
672
        return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
673
                nodeID, nodePub, err := s.getSourceNode(ctx, db, ProtocolV1)
×
674
                if err != nil {
×
675
                        return fmt.Errorf("unable to fetch source node: %w",
×
676
                                err)
×
677
                }
×
678

679
                return forEachNodeChannel(
×
680
                        ctx, db, s.cfg.ChainHash, nodeID,
×
681
                        func(info *models.ChannelEdgeInfo,
×
682
                                outPolicy *models.ChannelEdgePolicy,
×
683
                                _ *models.ChannelEdgePolicy) error {
×
684

×
685
                                // Fetch the other node.
×
686
                                var (
×
687
                                        otherNodePub [33]byte
×
688
                                        node1        = info.NodeKey1Bytes
×
689
                                        node2        = info.NodeKey2Bytes
×
690
                                )
×
691
                                switch {
×
692
                                case bytes.Equal(node1[:], nodePub[:]):
×
693
                                        otherNodePub = node2
×
694
                                case bytes.Equal(node2[:], nodePub[:]):
×
695
                                        otherNodePub = node1
×
696
                                default:
×
697
                                        return fmt.Errorf("node not " +
×
698
                                                "participating in this channel")
×
699
                                }
700

701
                                _, otherNode, err := getNodeByPubKey(
×
702
                                        ctx, db, otherNodePub,
×
703
                                )
×
704
                                if err != nil {
×
705
                                        return fmt.Errorf("unable to fetch "+
×
706
                                                "other node(%x): %w",
×
707
                                                otherNodePub, err)
×
708
                                }
×
709

710
                                return cb(
×
711
                                        info.ChannelPoint, outPolicy != nil,
×
712
                                        otherNode,
×
713
                                )
×
714
                        },
715
                )
716
        }, sqldb.NoOpReset)
717
}
718

719
// ForEachNode iterates through all the stored vertices/nodes in the graph,
720
// executing the passed callback with each node encountered. If the callback
721
// returns an error, then the transaction is aborted and the iteration stops
722
// early. Any operations performed on the NodeTx passed to the call-back are
723
// executed under the same read transaction and so, methods on the NodeTx object
724
// _MUST_ only be called from within the call-back.
725
//
726
// NOTE: part of the V1Store interface.
727
func (s *SQLStore) ForEachNode(cb func(tx NodeRTx) error) error {
×
728
        var (
×
729
                ctx          = context.TODO()
×
730
                lastID int64 = 0
×
731
        )
×
732

×
733
        handleNode := func(db SQLQueries, dbNode sqlc.Node) error {
×
734
                node, err := buildNode(ctx, db, &dbNode)
×
735
                if err != nil {
×
736
                        return fmt.Errorf("unable to build node(id=%d): %w",
×
737
                                dbNode.ID, err)
×
738
                }
×
739

740
                err = cb(
×
741
                        newSQLGraphNodeTx(db, s.cfg.ChainHash, dbNode.ID, node),
×
742
                )
×
743
                if err != nil {
×
744
                        return fmt.Errorf("callback failed for node(id=%d): %w",
×
745
                                dbNode.ID, err)
×
746
                }
×
747

748
                return nil
×
749
        }
750

751
        return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
752
                for {
×
753
                        nodes, err := db.ListNodesPaginated(
×
754
                                ctx, sqlc.ListNodesPaginatedParams{
×
755
                                        Version: int16(ProtocolV1),
×
756
                                        ID:      lastID,
×
757
                                        Limit:   pageSize,
×
758
                                },
×
759
                        )
×
760
                        if err != nil {
×
761
                                return fmt.Errorf("unable to fetch nodes: %w",
×
762
                                        err)
×
763
                        }
×
764

765
                        if len(nodes) == 0 {
×
766
                                break
×
767
                        }
768

769
                        for _, dbNode := range nodes {
×
770
                                err = handleNode(db, dbNode)
×
771
                                if err != nil {
×
772
                                        return err
×
773
                                }
×
774

775
                                lastID = dbNode.ID
×
776
                        }
777
                }
778

779
                return nil
×
780
        }, sqldb.NoOpReset)
781
}
782

783
// sqlGraphNodeTx is an implementation of the NodeRTx interface backed by the
784
// SQLStore and a SQL transaction.
785
type sqlGraphNodeTx struct {
786
        db    SQLQueries
787
        id    int64
788
        node  *models.LightningNode
789
        chain chainhash.Hash
790
}
791

792
// A compile-time constraint to ensure sqlGraphNodeTx implements the NodeRTx
793
// interface.
794
var _ NodeRTx = (*sqlGraphNodeTx)(nil)
795

796
func newSQLGraphNodeTx(db SQLQueries, chain chainhash.Hash,
797
        id int64, node *models.LightningNode) *sqlGraphNodeTx {
×
798

×
799
        return &sqlGraphNodeTx{
×
800
                db:    db,
×
801
                chain: chain,
×
802
                id:    id,
×
803
                node:  node,
×
804
        }
×
805
}
×
806

807
// Node returns the raw information of the node.
808
//
809
// NOTE: This is a part of the NodeRTx interface.
810
func (s *sqlGraphNodeTx) Node() *models.LightningNode {
×
811
        return s.node
×
812
}
×
813

814
// ForEachChannel can be used to iterate over the node's channels under the same
815
// transaction used to fetch the node.
816
//
817
// NOTE: This is a part of the NodeRTx interface.
818
func (s *sqlGraphNodeTx) ForEachChannel(cb func(*models.ChannelEdgeInfo,
819
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
×
820

×
821
        ctx := context.TODO()
×
822

×
823
        return forEachNodeChannel(ctx, s.db, s.chain, s.id, cb)
×
824
}
×
825

826
// FetchNode fetches the node with the given pub key under the same transaction
827
// used to fetch the current node. The returned node is also a NodeRTx and any
828
// operations on that NodeRTx will also be done under the same transaction.
829
//
830
// NOTE: This is a part of the NodeRTx interface.
831
func (s *sqlGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
×
832
        ctx := context.TODO()
×
833

×
834
        id, node, err := getNodeByPubKey(ctx, s.db, nodePub)
×
835
        if err != nil {
×
836
                return nil, fmt.Errorf("unable to fetch V1 node(%x): %w",
×
837
                        nodePub, err)
×
838
        }
×
839

840
        return newSQLGraphNodeTx(s.db, s.chain, id, node), nil
×
841
}
842

843
// ForEachNodeDirectedChannel iterates through all channels of a given node,
844
// executing the passed callback on the directed edge representing the channel
845
// and its incoming policy. If the callback returns an error, then the iteration
846
// is halted with the error propagated back up to the caller.
847
//
848
// Unknown policies are passed into the callback as nil values.
849
//
850
// NOTE: this is part of the graphdb.NodeTraverser interface.
851
func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
852
        cb func(channel *DirectedChannel) error) error {
×
853

×
854
        var ctx = context.TODO()
×
855

×
856
        return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
857
                return forEachNodeDirectedChannel(ctx, db, nodePub, cb)
×
858
        }, sqldb.NoOpReset)
×
859
}
860

861
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
862
// graph, executing the passed callback with each node encountered. If the
863
// callback returns an error, then the transaction is aborted and the iteration
864
// stops early.
865
//
866
// NOTE: This is a part of the V1Store interface.
867
func (s *SQLStore) ForEachNodeCacheable(cb func(route.Vertex,
868
        *lnwire.FeatureVector) error) error {
×
869

×
870
        ctx := context.TODO()
×
871

×
872
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
873
                return forEachNodeCacheable(ctx, db, func(nodeID int64,
×
874
                        nodePub route.Vertex) error {
×
875

×
876
                        features, err := getNodeFeatures(ctx, db, nodeID)
×
877
                        if err != nil {
×
878
                                return fmt.Errorf("unable to fetch node "+
×
879
                                        "features: %w", err)
×
880
                        }
×
881

882
                        return cb(nodePub, features)
×
883
                })
884
        }, sqldb.NoOpReset)
885
        if err != nil {
×
886
                return fmt.Errorf("unable to fetch nodes: %w", err)
×
887
        }
×
888

889
        return nil
×
890
}
891

892
// ForEachNodeChannel iterates through all channels of the given node,
893
// executing the passed callback with an edge info structure and the policies
894
// of each end of the channel. The first edge policy is the outgoing edge *to*
895
// the connecting node, while the second is the incoming edge *from* the
896
// connecting node. If the callback returns an error, then the iteration is
897
// halted with the error propagated back up to the caller.
898
//
899
// Unknown policies are passed into the callback as nil values.
900
//
901
// NOTE: part of the V1Store interface.
902
func (s *SQLStore) ForEachNodeChannel(nodePub route.Vertex,
903
        cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
904
                *models.ChannelEdgePolicy) error) error {
×
905

×
906
        var ctx = context.TODO()
×
907

×
908
        return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
909
                dbNode, err := db.GetNodeByPubKey(
×
910
                        ctx, sqlc.GetNodeByPubKeyParams{
×
911
                                Version: int16(ProtocolV1),
×
912
                                PubKey:  nodePub[:],
×
913
                        },
×
914
                )
×
915
                if errors.Is(err, sql.ErrNoRows) {
×
916
                        return nil
×
917
                } else if err != nil {
×
918
                        return fmt.Errorf("unable to fetch node: %w", err)
×
919
                }
×
920

921
                return forEachNodeChannel(
×
922
                        ctx, db, s.cfg.ChainHash, dbNode.ID, cb,
×
923
                )
×
924
        }, sqldb.NoOpReset)
925
}
926

927
// forEachNodeDirectedChannel iterates through all channels of a given
928
// node, executing the passed callback on the directed edge representing the
929
// channel and its incoming policy. If the node is not found, no error is
930
// returned.
931
func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
932
        nodePub route.Vertex, cb func(channel *DirectedChannel) error) error {
×
933

×
934
        toNodeCallback := func() route.Vertex {
×
935
                return nodePub
×
936
        }
×
937

938
        dbID, err := db.GetNodeIDByPubKey(
×
939
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
940
                        Version: int16(ProtocolV1),
×
941
                        PubKey:  nodePub[:],
×
942
                },
×
943
        )
×
944
        if errors.Is(err, sql.ErrNoRows) {
×
945
                return nil
×
946
        } else if err != nil {
×
947
                return fmt.Errorf("unable to fetch node: %w", err)
×
948
        }
×
949

950
        rows, err := db.ListChannelsByNodeID(
×
951
                ctx, sqlc.ListChannelsByNodeIDParams{
×
952
                        Version: int16(ProtocolV1),
×
953
                        NodeID1: dbID,
×
954
                },
×
955
        )
×
956
        if err != nil {
×
957
                return fmt.Errorf("unable to fetch channels: %w", err)
×
958
        }
×
959

960
        // Exit early if there are no channels for this node so we don't
961
        // do the unnecessary feature fetching.
962
        if len(rows) == 0 {
×
963
                return nil
×
964
        }
×
965

966
        features, err := getNodeFeatures(ctx, db, dbID)
×
967
        if err != nil {
×
968
                return fmt.Errorf("unable to fetch node features: %w", err)
×
969
        }
×
970

971
        for _, row := range rows {
×
972
                node1, node2, err := buildNodeVertices(
×
973
                        row.Node1Pubkey, row.Node2Pubkey,
×
974
                )
×
975
                if err != nil {
×
976
                        return fmt.Errorf("unable to build node vertices: %w",
×
977
                                err)
×
978
                }
×
979

980
                edge, err := buildCacheableChannelInfo(
×
981
                        row.Channel, node1, node2,
×
982
                )
×
983
                if err != nil {
×
984
                        return err
×
985
                }
×
986

987
                dbPol1, dbPol2, err := extractChannelPolicies(row)
×
988
                if err != nil {
×
989
                        return err
×
990
                }
×
991

992
                var p1, p2 *models.CachedEdgePolicy
×
993
                if dbPol1 != nil {
×
994
                        policy1, err := buildChanPolicy(
×
995
                                *dbPol1, edge.ChannelID, nil, node2, true,
×
996
                        )
×
997
                        if err != nil {
×
998
                                return err
×
999
                        }
×
1000

1001
                        p1 = models.NewCachedPolicy(policy1)
×
1002
                }
1003
                if dbPol2 != nil {
×
1004
                        policy2, err := buildChanPolicy(
×
1005
                                *dbPol2, edge.ChannelID, nil, node1, false,
×
1006
                        )
×
1007
                        if err != nil {
×
1008
                                return err
×
1009
                        }
×
1010

1011
                        p2 = models.NewCachedPolicy(policy2)
×
1012
                }
1013

1014
                // Determine the outgoing and incoming policy for this
1015
                // channel and node combo.
1016
                outPolicy, inPolicy := p1, p2
×
1017
                if p1 != nil && node2 == nodePub {
×
1018
                        outPolicy, inPolicy = p2, p1
×
1019
                } else if p2 != nil && node1 != nodePub {
×
1020
                        outPolicy, inPolicy = p2, p1
×
1021
                }
×
1022

1023
                var cachedInPolicy *models.CachedEdgePolicy
×
1024
                if inPolicy != nil {
×
1025
                        cachedInPolicy = inPolicy
×
1026
                        cachedInPolicy.ToNodePubKey = toNodeCallback
×
1027
                        cachedInPolicy.ToNodeFeatures = features
×
1028
                }
×
1029

1030
                directedChannel := &DirectedChannel{
×
1031
                        ChannelID:    edge.ChannelID,
×
1032
                        IsNode1:      nodePub == edge.NodeKey1Bytes,
×
1033
                        OtherNode:    edge.NodeKey2Bytes,
×
1034
                        Capacity:     edge.Capacity,
×
1035
                        OutPolicySet: outPolicy != nil,
×
1036
                        InPolicy:     cachedInPolicy,
×
1037
                }
×
1038
                if outPolicy != nil {
×
1039
                        outPolicy.InboundFee.WhenSome(func(fee lnwire.Fee) {
×
1040
                                directedChannel.InboundFee = fee
×
1041
                        })
×
1042
                }
1043

1044
                if nodePub == edge.NodeKey2Bytes {
×
1045
                        directedChannel.OtherNode = edge.NodeKey1Bytes
×
1046
                }
×
1047

1048
                if err := cb(directedChannel); err != nil {
×
1049
                        return err
×
1050
                }
×
1051
        }
1052

1053
        return nil
×
1054
}
1055

1056
// forEachNodeCacheable fetches all V1 node IDs and pub keys from the database,
1057
// and executes the provided callback for each node.
1058
func forEachNodeCacheable(ctx context.Context, db SQLQueries,
1059
        cb func(nodeID int64, nodePub route.Vertex) error) error {
×
1060

×
1061
        var lastID int64
×
1062

×
1063
        for {
×
1064
                nodes, err := db.ListNodeIDsAndPubKeys(
×
1065
                        ctx, sqlc.ListNodeIDsAndPubKeysParams{
×
1066
                                Version: int16(ProtocolV1),
×
1067
                                ID:      lastID,
×
1068
                                Limit:   pageSize,
×
1069
                        },
×
1070
                )
×
1071
                if err != nil {
×
1072
                        return fmt.Errorf("unable to fetch nodes: %w", err)
×
1073
                }
×
1074

1075
                if len(nodes) == 0 {
×
1076
                        break
×
1077
                }
1078

1079
                for _, node := range nodes {
×
1080
                        var pub route.Vertex
×
1081
                        copy(pub[:], node.PubKey)
×
1082

×
1083
                        if err := cb(node.ID, pub); err != nil {
×
1084
                                return fmt.Errorf("forEachNodeCacheable "+
×
1085
                                        "callback failed for node(id=%d): %w",
×
1086
                                        node.ID, err)
×
1087
                        }
×
1088

1089
                        lastID = node.ID
×
1090
                }
1091
        }
1092

1093
        return nil
×
1094
}
1095

1096
// forEachNodeChannel iterates through all channels of a node, executing
1097
// the passed callback on each. The call-back is provided with the channel's
1098
// edge information, the outgoing policy and the incoming policy for the
1099
// channel and node combo.
1100
func forEachNodeChannel(ctx context.Context, db SQLQueries,
1101
        chain chainhash.Hash, id int64, cb func(*models.ChannelEdgeInfo,
1102
                *models.ChannelEdgePolicy,
1103
                *models.ChannelEdgePolicy) error) error {
×
1104

×
1105
        // Get all the V1 channels for this node.Add commentMore actions
×
1106
        rows, err := db.ListChannelsByNodeID(
×
1107
                ctx, sqlc.ListChannelsByNodeIDParams{
×
1108
                        Version: int16(ProtocolV1),
×
1109
                        NodeID1: id,
×
1110
                },
×
1111
        )
×
1112
        if err != nil {
×
1113
                return fmt.Errorf("unable to fetch channels: %w", err)
×
1114
        }
×
1115

1116
        // Call the call-back for each channel and its known policies.
1117
        for _, row := range rows {
×
1118
                node1, node2, err := buildNodeVertices(
×
1119
                        row.Node1Pubkey, row.Node2Pubkey,
×
1120
                )
×
1121
                if err != nil {
×
1122
                        return fmt.Errorf("unable to build node vertices: %w",
×
1123
                                err)
×
1124
                }
×
1125

1126
                edge, err := getAndBuildEdgeInfo(
×
1127
                        ctx, db, chain, row.Channel.ID, row.Channel, node1,
×
1128
                        node2,
×
1129
                )
×
1130
                if err != nil {
×
1131
                        return fmt.Errorf("unable to build channel info: %w",
×
1132
                                err)
×
1133
                }
×
1134

1135
                dbPol1, dbPol2, err := extractChannelPolicies(row)
×
1136
                if err != nil {
×
1137
                        return fmt.Errorf("unable to extract channel "+
×
1138
                                "policies: %w", err)
×
1139
                }
×
1140

1141
                p1, p2, err := getAndBuildChanPolicies(
×
1142
                        ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
1143
                )
×
1144
                if err != nil {
×
1145
                        return fmt.Errorf("unable to build channel "+
×
1146
                                "policies: %w", err)
×
1147
                }
×
1148

1149
                // Determine the outgoing and incoming policy for this
1150
                // channel and node combo.
1151
                p1ToNode := row.Channel.NodeID2
×
1152
                p2ToNode := row.Channel.NodeID1
×
1153
                outPolicy, inPolicy := p1, p2
×
1154
                if (p1 != nil && p1ToNode == id) ||
×
1155
                        (p2 != nil && p2ToNode != id) {
×
1156

×
1157
                        outPolicy, inPolicy = p2, p1
×
1158
                }
×
1159

1160
                if err := cb(edge, outPolicy, inPolicy); err != nil {
×
1161
                        return err
×
1162
                }
×
1163
        }
1164

1165
        return nil
×
1166
}
1167

1168
// updateChanEdgePolicy upserts the channel policy info we have stored for
1169
// a channel we already know of.
1170
func updateChanEdgePolicy(ctx context.Context, tx SQLQueries,
1171
        edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool,
1172
        error) {
×
1173

×
1174
        var (
×
1175
                node1Pub, node2Pub route.Vertex
×
1176
                isNode1            bool
×
1177
                chanIDB            [8]byte
×
1178
        )
×
1179
        byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
×
1180

×
1181
        // Check that this edge policy refers to a channel that we already
×
1182
        // know of. We do this explicitly so that we can return the appropriate
×
1183
        // ErrEdgeNotFound error if the channel doesn't exist, rather than
×
1184
        // abort the transaction which would abort the entire batch.
×
1185
        dbChan, err := tx.GetChannelAndNodesBySCID(
×
1186
                ctx, sqlc.GetChannelAndNodesBySCIDParams{
×
1187
                        Scid:    chanIDB[:],
×
1188
                        Version: int16(ProtocolV1),
×
1189
                },
×
1190
        )
×
1191
        if errors.Is(err, sql.ErrNoRows) {
×
1192
                return node1Pub, node2Pub, false, ErrEdgeNotFound
×
1193
        } else if err != nil {
×
1194
                return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
×
1195
                        "fetch channel(%v): %w", edge.ChannelID, err)
×
1196
        }
×
1197

1198
        copy(node1Pub[:], dbChan.Node1PubKey)
×
1199
        copy(node2Pub[:], dbChan.Node2PubKey)
×
1200

×
1201
        // Figure out which node this edge is from.
×
1202
        isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
×
1203
        nodeID := dbChan.NodeID1
×
1204
        if !isNode1 {
×
1205
                nodeID = dbChan.NodeID2
×
1206
        }
×
1207

1208
        var (
×
1209
                inboundBase sql.NullInt64
×
1210
                inboundRate sql.NullInt64
×
1211
        )
×
1212
        edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
×
1213
                inboundRate = sqldb.SQLInt64(fee.FeeRate)
×
1214
                inboundBase = sqldb.SQLInt64(fee.BaseFee)
×
1215
        })
×
1216

1217
        id, err := tx.UpsertEdgePolicy(ctx, sqlc.UpsertEdgePolicyParams{
×
1218
                Version:     int16(ProtocolV1),
×
1219
                ChannelID:   dbChan.ID,
×
1220
                NodeID:      nodeID,
×
1221
                Timelock:    int32(edge.TimeLockDelta),
×
1222
                FeePpm:      int64(edge.FeeProportionalMillionths),
×
1223
                BaseFeeMsat: int64(edge.FeeBaseMSat),
×
1224
                MinHtlcMsat: int64(edge.MinHTLC),
×
1225
                LastUpdate:  sqldb.SQLInt64(edge.LastUpdate.Unix()),
×
1226
                Disabled: sql.NullBool{
×
1227
                        Valid: true,
×
1228
                        Bool:  edge.IsDisabled(),
×
1229
                },
×
1230
                MaxHtlcMsat: sql.NullInt64{
×
1231
                        Valid: edge.MessageFlags.HasMaxHtlc(),
×
1232
                        Int64: int64(edge.MaxHTLC),
×
1233
                },
×
1234
                InboundBaseFeeMsat:      inboundBase,
×
1235
                InboundFeeRateMilliMsat: inboundRate,
×
1236
                Signature:               edge.SigBytes,
×
1237
        })
×
1238
        if err != nil {
×
1239
                return node1Pub, node2Pub, isNode1,
×
1240
                        fmt.Errorf("unable to upsert edge policy: %w", err)
×
1241
        }
×
1242

1243
        // Convert the flat extra opaque data into a map of TLV types to
1244
        // values.
1245
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
1246
        if err != nil {
×
1247
                return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
×
1248
                        "marshal extra opaque data: %w", err)
×
1249
        }
×
1250

1251
        // Update the channel policy's extra signed fields.
1252
        err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra)
×
1253
        if err != nil {
×
1254
                return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+
×
1255
                        "policy extra TLVs: %w", err)
×
1256
        }
×
1257

1258
        return node1Pub, node2Pub, isNode1, nil
×
1259
}
1260

1261
// getNodeByPubKey attempts to look up a target node by its public key.
1262
func getNodeByPubKey(ctx context.Context, db SQLQueries,
1263
        pubKey route.Vertex) (int64, *models.LightningNode, error) {
×
1264

×
1265
        dbNode, err := db.GetNodeByPubKey(
×
1266
                ctx, sqlc.GetNodeByPubKeyParams{
×
1267
                        Version: int16(ProtocolV1),
×
1268
                        PubKey:  pubKey[:],
×
1269
                },
×
1270
        )
×
1271
        if errors.Is(err, sql.ErrNoRows) {
×
1272
                return 0, nil, ErrGraphNodeNotFound
×
1273
        } else if err != nil {
×
1274
                return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
×
1275
        }
×
1276

1277
        node, err := buildNode(ctx, db, &dbNode)
×
1278
        if err != nil {
×
1279
                return 0, nil, fmt.Errorf("unable to build node: %w", err)
×
1280
        }
×
1281

1282
        return dbNode.ID, node, nil
×
1283
}
1284

1285
// buildCacheableChannelInfo builds a models.CachedEdgeInfo instance from the
1286
// provided database channel row and the public keys of the two nodes
1287
// involved in the channel.
1288
func buildCacheableChannelInfo(dbChan sqlc.Channel, node1Pub,
1289
        node2Pub route.Vertex) (*models.CachedEdgeInfo, error) {
×
1290

×
1291
        return &models.CachedEdgeInfo{
×
1292
                ChannelID:     byteOrder.Uint64(dbChan.Scid),
×
1293
                NodeKey1Bytes: node1Pub,
×
1294
                NodeKey2Bytes: node2Pub,
×
1295
                Capacity:      btcutil.Amount(dbChan.Capacity.Int64),
×
1296
        }, nil
×
1297
}
×
1298

1299
// buildNode constructs a LightningNode instance from the given database node
1300
// record. The node's features, addresses and extra signed fields are also
1301
// fetched from the database and set on the node.
1302
func buildNode(ctx context.Context, db SQLQueries, dbNode *sqlc.Node) (
1303
        *models.LightningNode, error) {
×
1304

×
1305
        if dbNode.Version != int16(ProtocolV1) {
×
1306
                return nil, fmt.Errorf("unsupported node version: %d",
×
1307
                        dbNode.Version)
×
1308
        }
×
1309

1310
        var pub [33]byte
×
1311
        copy(pub[:], dbNode.PubKey)
×
1312

×
1313
        node := &models.LightningNode{
×
1314
                PubKeyBytes: pub,
×
1315
                Features:    lnwire.EmptyFeatureVector(),
×
1316
                LastUpdate:  time.Unix(0, 0),
×
1317
        }
×
1318

×
1319
        if len(dbNode.Signature) == 0 {
×
1320
                return node, nil
×
1321
        }
×
1322

1323
        node.HaveNodeAnnouncement = true
×
1324
        node.AuthSigBytes = dbNode.Signature
×
1325
        node.Alias = dbNode.Alias.String
×
1326
        node.LastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
1327

×
1328
        var err error
×
1329
        if dbNode.Color.Valid {
×
1330
                node.Color, err = DecodeHexColor(dbNode.Color.String)
×
1331
                if err != nil {
×
1332
                        return nil, fmt.Errorf("unable to decode color: %w",
×
1333
                                err)
×
1334
                }
×
1335
        }
1336

1337
        // Fetch the node's features.
1338
        node.Features, err = getNodeFeatures(ctx, db, dbNode.ID)
×
1339
        if err != nil {
×
1340
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
1341
                        "features: %w", dbNode.ID, err)
×
1342
        }
×
1343

1344
        // Fetch the node's addresses.
1345
        _, node.Addresses, err = getNodeAddresses(ctx, db, pub[:])
×
1346
        if err != nil {
×
1347
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
1348
                        "addresses: %w", dbNode.ID, err)
×
1349
        }
×
1350

1351
        // Fetch the node's extra signed fields.
1352
        extraTLVMap, err := getNodeExtraSignedFields(ctx, db, dbNode.ID)
×
1353
        if err != nil {
×
1354
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
1355
                        "extra signed fields: %w", dbNode.ID, err)
×
1356
        }
×
1357

1358
        recs, err := lnwire.CustomRecords(extraTLVMap).Serialize()
×
1359
        if err != nil {
×
1360
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
1361
                        "fields: %w", err)
×
1362
        }
×
1363

1364
        if len(recs) != 0 {
×
1365
                node.ExtraOpaqueData = recs
×
1366
        }
×
1367

1368
        return node, nil
×
1369
}
1370

1371
// getNodeFeatures fetches the feature bits and constructs the feature vector
1372
// for a node with the given DB ID.
1373
func getNodeFeatures(ctx context.Context, db SQLQueries,
1374
        nodeID int64) (*lnwire.FeatureVector, error) {
×
1375

×
1376
        rows, err := db.GetNodeFeatures(ctx, nodeID)
×
1377
        if err != nil {
×
1378
                return nil, fmt.Errorf("unable to get node(%d) features: %w",
×
1379
                        nodeID, err)
×
1380
        }
×
1381

1382
        features := lnwire.EmptyFeatureVector()
×
1383
        for _, feature := range rows {
×
1384
                features.Set(lnwire.FeatureBit(feature.FeatureBit))
×
1385
        }
×
1386

1387
        return features, nil
×
1388
}
1389

1390
// getNodeExtraSignedFields fetches the extra signed fields for a node with the
1391
// given DB ID.
1392
func getNodeExtraSignedFields(ctx context.Context, db SQLQueries,
1393
        nodeID int64) (map[uint64][]byte, error) {
×
1394

×
1395
        fields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
1396
        if err != nil {
×
1397
                return nil, fmt.Errorf("unable to get node(%d) extra "+
×
1398
                        "signed fields: %w", nodeID, err)
×
1399
        }
×
1400

1401
        extraFields := make(map[uint64][]byte)
×
1402
        for _, field := range fields {
×
1403
                extraFields[uint64(field.Type)] = field.Value
×
1404
        }
×
1405

1406
        return extraFields, nil
×
1407
}
1408

1409
// upsertNode upserts the node record into the database. If the node already
1410
// exists, then the node's information is updated. If the node doesn't exist,
1411
// then a new node is created. The node's features, addresses and extra TLV
1412
// types are also updated. The node's DB ID is returned.
1413
func upsertNode(ctx context.Context, db SQLQueries,
1414
        node *models.LightningNode) (int64, error) {
×
1415

×
1416
        params := sqlc.UpsertNodeParams{
×
1417
                Version: int16(ProtocolV1),
×
1418
                PubKey:  node.PubKeyBytes[:],
×
1419
        }
×
1420

×
1421
        if node.HaveNodeAnnouncement {
×
1422
                params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
×
1423
                params.Color = sqldb.SQLStr(EncodeHexColor(node.Color))
×
1424
                params.Alias = sqldb.SQLStr(node.Alias)
×
1425
                params.Signature = node.AuthSigBytes
×
1426
        }
×
1427

1428
        nodeID, err := db.UpsertNode(ctx, params)
×
1429
        if err != nil {
×
1430
                return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
×
1431
                        err)
×
1432
        }
×
1433

1434
        // We can exit here if we don't have the announcement yet.
1435
        if !node.HaveNodeAnnouncement {
×
1436
                return nodeID, nil
×
1437
        }
×
1438

1439
        // Update the node's features.
1440
        err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
×
1441
        if err != nil {
×
1442
                return 0, fmt.Errorf("inserting node features: %w", err)
×
1443
        }
×
1444

1445
        // Update the node's addresses.
1446
        err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
×
1447
        if err != nil {
×
1448
                return 0, fmt.Errorf("inserting node addresses: %w", err)
×
1449
        }
×
1450

1451
        // Convert the flat extra opaque data into a map of TLV types to
1452
        // values.
1453
        extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
1454
        if err != nil {
×
1455
                return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
×
1456
                        err)
×
1457
        }
×
1458

1459
        // Update the node's extra signed fields.
1460
        err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
×
1461
        if err != nil {
×
1462
                return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
×
1463
        }
×
1464

1465
        return nodeID, nil
×
1466
}
1467

1468
// upsertNodeFeatures updates the node's features node_features table. This
1469
// includes deleting any feature bits no longer present and inserting any new
1470
// feature bits. If the feature bit does not yet exist in the features table,
1471
// then an entry is created in that table first.
1472
func upsertNodeFeatures(ctx context.Context, db SQLQueries, nodeID int64,
1473
        features *lnwire.FeatureVector) error {
×
1474

×
1475
        // Get any existing features for the node.
×
1476
        existingFeatures, err := db.GetNodeFeatures(ctx, nodeID)
×
1477
        if err != nil && !errors.Is(err, sql.ErrNoRows) {
×
1478
                return err
×
1479
        }
×
1480

1481
        // Copy the nodes latest set of feature bits.
1482
        newFeatures := make(map[int32]struct{})
×
1483
        if features != nil {
×
1484
                for feature := range features.Features() {
×
1485
                        newFeatures[int32(feature)] = struct{}{}
×
1486
                }
×
1487
        }
1488

1489
        // For any current feature that already exists in the DB, remove it from
1490
        // the in-memory map. For any existing feature that does not exist in
1491
        // the in-memory map, delete it from the database.
1492
        for _, feature := range existingFeatures {
×
1493
                // The feature is still present, so there are no updates to be
×
1494
                // made.
×
1495
                if _, ok := newFeatures[feature.FeatureBit]; ok {
×
1496
                        delete(newFeatures, feature.FeatureBit)
×
1497
                        continue
×
1498
                }
1499

1500
                // The feature is no longer present, so we remove it from the
1501
                // database.
1502
                err := db.DeleteNodeFeature(ctx, sqlc.DeleteNodeFeatureParams{
×
1503
                        NodeID:     nodeID,
×
1504
                        FeatureBit: feature.FeatureBit,
×
1505
                })
×
1506
                if err != nil {
×
1507
                        return fmt.Errorf("unable to delete node(%d) "+
×
1508
                                "feature(%v): %w", nodeID, feature.FeatureBit,
×
1509
                                err)
×
1510
                }
×
1511
        }
1512

1513
        // Any remaining entries in newFeatures are new features that need to be
1514
        // added to the database for the first time.
1515
        for feature := range newFeatures {
×
1516
                err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
×
1517
                        NodeID:     nodeID,
×
1518
                        FeatureBit: feature,
×
1519
                })
×
1520
                if err != nil {
×
1521
                        return fmt.Errorf("unable to insert node(%d) "+
×
1522
                                "feature(%v): %w", nodeID, feature, err)
×
1523
                }
×
1524
        }
1525

1526
        return nil
×
1527
}
1528

1529
// fetchNodeFeatures fetches the features for a node with the given public key.
1530
func fetchNodeFeatures(ctx context.Context, queries SQLQueries,
1531
        nodePub route.Vertex) (*lnwire.FeatureVector, error) {
×
1532

×
1533
        rows, err := queries.GetNodeFeaturesByPubKey(
×
1534
                ctx, sqlc.GetNodeFeaturesByPubKeyParams{
×
1535
                        PubKey:  nodePub[:],
×
1536
                        Version: int16(ProtocolV1),
×
1537
                },
×
1538
        )
×
1539
        if err != nil {
×
1540
                return nil, fmt.Errorf("unable to get node(%s) features: %w",
×
1541
                        nodePub, err)
×
1542
        }
×
1543

1544
        features := lnwire.EmptyFeatureVector()
×
1545
        for _, bit := range rows {
×
1546
                features.Set(lnwire.FeatureBit(bit))
×
1547
        }
×
1548

1549
        return features, nil
×
1550
}
1551

1552
// dbAddressType is an enum type that represents the different address types
1553
// that we store in the node_addresses table. The address type determines how
1554
// the address is to be serialised/deserialize.
1555
type dbAddressType uint8
1556

1557
const (
1558
        addressTypeIPv4   dbAddressType = 1
1559
        addressTypeIPv6   dbAddressType = 2
1560
        addressTypeTorV2  dbAddressType = 3
1561
        addressTypeTorV3  dbAddressType = 4
1562
        addressTypeOpaque dbAddressType = math.MaxInt8
1563
)
1564

1565
// upsertNodeAddresses updates the node's addresses in the database. This
1566
// includes deleting any existing addresses and inserting the new set of
1567
// addresses. The deletion is necessary since the ordering of the addresses may
1568
// change, and we need to ensure that the database reflects the latest set of
1569
// addresses so that at the time of reconstructing the node announcement, the
1570
// order is preserved and the signature over the message remains valid.
1571
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
1572
        addresses []net.Addr) error {
×
1573

×
1574
        // Delete any existing addresses for the node. This is required since
×
1575
        // even if the new set of addresses is the same, the ordering may have
×
1576
        // changed for a given address type.
×
1577
        err := db.DeleteNodeAddresses(ctx, nodeID)
×
1578
        if err != nil {
×
1579
                return fmt.Errorf("unable to delete node(%d) addresses: %w",
×
1580
                        nodeID, err)
×
1581
        }
×
1582

1583
        // Copy the nodes latest set of addresses.
1584
        newAddresses := map[dbAddressType][]string{
×
1585
                addressTypeIPv4:   {},
×
1586
                addressTypeIPv6:   {},
×
1587
                addressTypeTorV2:  {},
×
1588
                addressTypeTorV3:  {},
×
1589
                addressTypeOpaque: {},
×
1590
        }
×
1591
        addAddr := func(t dbAddressType, addr net.Addr) {
×
1592
                newAddresses[t] = append(newAddresses[t], addr.String())
×
1593
        }
×
1594

1595
        for _, address := range addresses {
×
1596
                switch addr := address.(type) {
×
1597
                case *net.TCPAddr:
×
1598
                        if ip4 := addr.IP.To4(); ip4 != nil {
×
1599
                                addAddr(addressTypeIPv4, addr)
×
1600
                        } else if ip6 := addr.IP.To16(); ip6 != nil {
×
1601
                                addAddr(addressTypeIPv6, addr)
×
1602
                        } else {
×
1603
                                return fmt.Errorf("unhandled IP address: %v",
×
1604
                                        addr)
×
1605
                        }
×
1606

1607
                case *tor.OnionAddr:
×
1608
                        switch len(addr.OnionService) {
×
1609
                        case tor.V2Len:
×
1610
                                addAddr(addressTypeTorV2, addr)
×
1611
                        case tor.V3Len:
×
1612
                                addAddr(addressTypeTorV3, addr)
×
1613
                        default:
×
1614
                                return fmt.Errorf("invalid length for a tor " +
×
1615
                                        "address")
×
1616
                        }
1617

1618
                case *lnwire.OpaqueAddrs:
×
1619
                        addAddr(addressTypeOpaque, addr)
×
1620

1621
                default:
×
1622
                        return fmt.Errorf("unhandled address type: %T", addr)
×
1623
                }
1624
        }
1625

1626
        // Any remaining entries in newAddresses are new addresses that need to
1627
        // be added to the database for the first time.
1628
        for addrType, addrList := range newAddresses {
×
1629
                for position, addr := range addrList {
×
1630
                        err := db.InsertNodeAddress(
×
1631
                                ctx, sqlc.InsertNodeAddressParams{
×
1632
                                        NodeID:   nodeID,
×
1633
                                        Type:     int16(addrType),
×
1634
                                        Address:  addr,
×
1635
                                        Position: int32(position),
×
1636
                                },
×
1637
                        )
×
1638
                        if err != nil {
×
1639
                                return fmt.Errorf("unable to insert "+
×
1640
                                        "node(%d) address(%v): %w", nodeID,
×
1641
                                        addr, err)
×
1642
                        }
×
1643
                }
1644
        }
1645

1646
        return nil
×
1647
}
1648

1649
// getNodeAddresses fetches the addresses for a node with the given public key.
1650
func getNodeAddresses(ctx context.Context, db SQLQueries, nodePub []byte) (bool,
1651
        []net.Addr, error) {
×
1652

×
1653
        // GetNodeAddressesByPubKey ensures that the addresses for a given type
×
1654
        // are returned in the same order as they were inserted.
×
1655
        rows, err := db.GetNodeAddressesByPubKey(
×
1656
                ctx, sqlc.GetNodeAddressesByPubKeyParams{
×
1657
                        Version: int16(ProtocolV1),
×
1658
                        PubKey:  nodePub,
×
1659
                },
×
1660
        )
×
1661
        if err != nil {
×
1662
                return false, nil, err
×
1663
        }
×
1664

1665
        // GetNodeAddressesByPubKey uses a left join so there should always be
1666
        // at least one row returned if the node exists even if it has no
1667
        // addresses.
1668
        if len(rows) == 0 {
×
1669
                return false, nil, nil
×
1670
        }
×
1671

1672
        addresses := make([]net.Addr, 0, len(rows))
×
1673
        for _, addr := range rows {
×
1674
                if !(addr.Type.Valid && addr.Address.Valid) {
×
1675
                        continue
×
1676
                }
1677

1678
                address := addr.Address.String
×
1679

×
1680
                switch dbAddressType(addr.Type.Int16) {
×
1681
                case addressTypeIPv4:
×
1682
                        tcp, err := net.ResolveTCPAddr("tcp4", address)
×
1683
                        if err != nil {
×
1684
                                return false, nil, nil
×
1685
                        }
×
1686
                        tcp.IP = tcp.IP.To4()
×
1687

×
1688
                        addresses = append(addresses, tcp)
×
1689

1690
                case addressTypeIPv6:
×
1691
                        tcp, err := net.ResolveTCPAddr("tcp6", address)
×
1692
                        if err != nil {
×
1693
                                return false, nil, nil
×
1694
                        }
×
1695
                        addresses = append(addresses, tcp)
×
1696

1697
                case addressTypeTorV3, addressTypeTorV2:
×
1698
                        service, portStr, err := net.SplitHostPort(address)
×
1699
                        if err != nil {
×
1700
                                return false, nil, fmt.Errorf("unable to "+
×
1701
                                        "split tor v3 address: %v",
×
1702
                                        addr.Address)
×
1703
                        }
×
1704

1705
                        port, err := strconv.Atoi(portStr)
×
1706
                        if err != nil {
×
1707
                                return false, nil, err
×
1708
                        }
×
1709

1710
                        addresses = append(addresses, &tor.OnionAddr{
×
1711
                                OnionService: service,
×
1712
                                Port:         port,
×
1713
                        })
×
1714

1715
                case addressTypeOpaque:
×
1716
                        opaque, err := hex.DecodeString(address)
×
1717
                        if err != nil {
×
1718
                                return false, nil, fmt.Errorf("unable to "+
×
1719
                                        "decode opaque address: %v", addr)
×
1720
                        }
×
1721

1722
                        addresses = append(addresses, &lnwire.OpaqueAddrs{
×
1723
                                Payload: opaque,
×
1724
                        })
×
1725

1726
                default:
×
1727
                        return false, nil, fmt.Errorf("unknown address "+
×
1728
                                "type: %v", addr.Type)
×
1729
                }
1730
        }
1731

1732
        return true, addresses, nil
×
1733
}
1734

1735
// upsertNodeExtraSignedFields updates the node's extra signed fields in the
1736
// database. This includes updating any existing types, inserting any new types,
1737
// and deleting any types that are no longer present.
1738
func upsertNodeExtraSignedFields(ctx context.Context, db SQLQueries,
1739
        nodeID int64, extraFields map[uint64][]byte) error {
×
1740

×
1741
        // Get any existing extra signed fields for the node.
×
1742
        existingFields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
1743
        if err != nil {
×
1744
                return err
×
1745
        }
×
1746

1747
        // Make a lookup map of the existing field types so that we can use it
1748
        // to keep track of any fields we should delete.
1749
        m := make(map[uint64]bool)
×
1750
        for _, field := range existingFields {
×
1751
                m[uint64(field.Type)] = true
×
1752
        }
×
1753

1754
        // For all the new fields, we'll upsert them and remove them from the
1755
        // map of existing fields.
1756
        for tlvType, value := range extraFields {
×
1757
                err = db.UpsertNodeExtraType(
×
1758
                        ctx, sqlc.UpsertNodeExtraTypeParams{
×
1759
                                NodeID: nodeID,
×
1760
                                Type:   int64(tlvType),
×
1761
                                Value:  value,
×
1762
                        },
×
1763
                )
×
1764
                if err != nil {
×
1765
                        return fmt.Errorf("unable to upsert node(%d) extra "+
×
1766
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1767
                }
×
1768

1769
                // Remove the field from the map of existing fields if it was
1770
                // present.
1771
                delete(m, tlvType)
×
1772
        }
1773

1774
        // For all the fields that are left in the map of existing fields, we'll
1775
        // delete them as they are no longer present in the new set of fields.
1776
        for tlvType := range m {
×
1777
                err = db.DeleteExtraNodeType(
×
1778
                        ctx, sqlc.DeleteExtraNodeTypeParams{
×
1779
                                NodeID: nodeID,
×
1780
                                Type:   int64(tlvType),
×
1781
                        },
×
1782
                )
×
1783
                if err != nil {
×
1784
                        return fmt.Errorf("unable to delete node(%d) extra "+
×
1785
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1786
                }
×
1787
        }
1788

1789
        return nil
×
1790
}
1791

1792
// srcNodeInfo holds the information about the source node of the graph.
1793
type srcNodeInfo struct {
1794
        // id is the DB level ID of the source node entry in the "nodes" table.
1795
        id int64
1796

1797
        // pub is the public key of the source node.
1798
        pub route.Vertex
1799
}
1800

1801
// getSourceNode returns the DB node ID and pub key of the source node for the
1802
// specified protocol version.
1803
func (s *SQLStore) getSourceNode(ctx context.Context, db SQLQueries,
1804
        version ProtocolVersion) (int64, route.Vertex, error) {
×
1805

×
1806
        s.srcNodeMu.Lock()
×
1807
        defer s.srcNodeMu.Unlock()
×
1808

×
1809
        // If we already have the source node ID and pub key cached, then
×
1810
        // return them.
×
1811
        if info, ok := s.srcNodes[version]; ok {
×
1812
                return info.id, info.pub, nil
×
1813
        }
×
1814

1815
        var pubKey route.Vertex
×
1816

×
1817
        nodes, err := db.GetSourceNodesByVersion(ctx, int16(version))
×
1818
        if err != nil {
×
1819
                return 0, pubKey, fmt.Errorf("unable to fetch source node: %w",
×
1820
                        err)
×
1821
        }
×
1822

1823
        if len(nodes) == 0 {
×
1824
                return 0, pubKey, ErrSourceNodeNotSet
×
1825
        } else if len(nodes) > 1 {
×
1826
                return 0, pubKey, fmt.Errorf("multiple source nodes for "+
×
1827
                        "protocol %s found", version)
×
1828
        }
×
1829

1830
        copy(pubKey[:], nodes[0].PubKey)
×
1831

×
1832
        s.srcNodes[version] = &srcNodeInfo{
×
1833
                id:  nodes[0].NodeID,
×
1834
                pub: pubKey,
×
1835
        }
×
1836

×
1837
        return nodes[0].NodeID, pubKey, nil
×
1838
}
1839

1840
// marshalExtraOpaqueData takes a flat byte slice parses it as a TLV stream.
1841
// This then produces a map from TLV type to value. If the input is not a
1842
// valid TLV stream, then an error is returned.
1843
func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
×
1844
        r := bytes.NewReader(data)
×
1845

×
1846
        tlvStream, err := tlv.NewStream()
×
1847
        if err != nil {
×
1848
                return nil, err
×
1849
        }
×
1850

1851
        // Since ExtraOpaqueData is provided by a potentially malicious peer,
1852
        // pass it into the P2P decoding variant.
1853
        parsedTypes, err := tlvStream.DecodeWithParsedTypesP2P(r)
×
1854
        if err != nil {
×
1855
                return nil, fmt.Errorf("%w: %w", ErrParsingExtraTLVBytes, err)
×
1856
        }
×
1857
        if len(parsedTypes) == 0 {
×
1858
                return nil, nil
×
1859
        }
×
1860

1861
        records := make(map[uint64][]byte)
×
1862
        for k, v := range parsedTypes {
×
1863
                records[uint64(k)] = v
×
1864
        }
×
1865

1866
        return records, nil
×
1867
}
1868

1869
// insertChannel inserts a new channel record into the database.
1870
func insertChannel(ctx context.Context, db SQLQueries,
1871
        edge *models.ChannelEdgeInfo) error {
×
1872

×
1873
        var chanIDB [8]byte
×
1874
        byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
×
1875

×
1876
        // Make sure that the channel doesn't already exist. We do this
×
1877
        // explicitly instead of relying on catching a unique constraint error
×
1878
        // because relying on SQL to throw that error would abort the entire
×
1879
        // batch of transactions.
×
1880
        _, err := db.GetChannelBySCID(
×
1881
                ctx, sqlc.GetChannelBySCIDParams{
×
1882
                        Scid:    chanIDB[:],
×
1883
                        Version: int16(ProtocolV1),
×
1884
                },
×
1885
        )
×
1886
        if err == nil {
×
1887
                return ErrEdgeAlreadyExist
×
1888
        } else if !errors.Is(err, sql.ErrNoRows) {
×
1889
                return fmt.Errorf("unable to fetch channel: %w", err)
×
1890
        }
×
1891

1892
        // Make sure that at least a "shell" entry for each node is present in
1893
        // the nodes table.
1894
        node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
×
1895
        if err != nil {
×
1896
                return fmt.Errorf("unable to create shell node: %w", err)
×
1897
        }
×
1898

1899
        node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
×
1900
        if err != nil {
×
1901
                return fmt.Errorf("unable to create shell node: %w", err)
×
1902
        }
×
1903

1904
        var capacity sql.NullInt64
×
1905
        if edge.Capacity != 0 {
×
1906
                capacity = sqldb.SQLInt64(int64(edge.Capacity))
×
1907
        }
×
1908

1909
        createParams := sqlc.CreateChannelParams{
×
1910
                Version:     int16(ProtocolV1),
×
1911
                Scid:        chanIDB[:],
×
1912
                NodeID1:     node1DBID,
×
1913
                NodeID2:     node2DBID,
×
1914
                Outpoint:    edge.ChannelPoint.String(),
×
1915
                Capacity:    capacity,
×
1916
                BitcoinKey1: edge.BitcoinKey1Bytes[:],
×
1917
                BitcoinKey2: edge.BitcoinKey2Bytes[:],
×
1918
        }
×
1919

×
1920
        if edge.AuthProof != nil {
×
1921
                proof := edge.AuthProof
×
1922

×
1923
                createParams.Node1Signature = proof.NodeSig1Bytes
×
1924
                createParams.Node2Signature = proof.NodeSig2Bytes
×
1925
                createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
×
1926
                createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
×
1927
        }
×
1928

1929
        // Insert the new channel record.
1930
        dbChanID, err := db.CreateChannel(ctx, createParams)
×
1931
        if err != nil {
×
1932
                return err
×
1933
        }
×
1934

1935
        // Insert any channel features.
1936
        if len(edge.Features) != 0 {
×
1937
                chanFeatures := lnwire.NewRawFeatureVector()
×
1938
                err := chanFeatures.Decode(bytes.NewReader(edge.Features))
×
1939
                if err != nil {
×
1940
                        return err
×
1941
                }
×
1942

1943
                fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features)
×
1944
                for feature := range fv.Features() {
×
1945
                        err = db.InsertChannelFeature(
×
1946
                                ctx, sqlc.InsertChannelFeatureParams{
×
1947
                                        ChannelID:  dbChanID,
×
1948
                                        FeatureBit: int32(feature),
×
1949
                                },
×
1950
                        )
×
1951
                        if err != nil {
×
1952
                                return fmt.Errorf("unable to insert "+
×
1953
                                        "channel(%d) feature(%v): %w", dbChanID,
×
1954
                                        feature, err)
×
1955
                        }
×
1956
                }
1957
        }
1958

1959
        // Finally, insert any extra TLV fields in the channel announcement.
1960
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
1961
        if err != nil {
×
1962
                return fmt.Errorf("unable to marshal extra opaque data: %w",
×
1963
                        err)
×
1964
        }
×
1965

1966
        for tlvType, value := range extra {
×
1967
                err := db.CreateChannelExtraType(
×
1968
                        ctx, sqlc.CreateChannelExtraTypeParams{
×
1969
                                ChannelID: dbChanID,
×
1970
                                Type:      int64(tlvType),
×
1971
                                Value:     value,
×
1972
                        },
×
1973
                )
×
1974
                if err != nil {
×
1975
                        return fmt.Errorf("unable to upsert channel(%d) extra "+
×
1976
                                "signed field(%v): %w", edge.ChannelID,
×
1977
                                tlvType, err)
×
1978
                }
×
1979
        }
1980

1981
        return nil
×
1982
}
1983

1984
// maybeCreateShellNode checks if a shell node entry exists for the
1985
// given public key. If it does not exist, then a new shell node entry is
1986
// created. The ID of the node is returned. A shell node only has a protocol
1987
// version and public key persisted.
1988
func maybeCreateShellNode(ctx context.Context, db SQLQueries,
1989
        pubKey route.Vertex) (int64, error) {
×
1990

×
1991
        dbNode, err := db.GetNodeByPubKey(
×
1992
                ctx, sqlc.GetNodeByPubKeyParams{
×
1993
                        PubKey:  pubKey[:],
×
1994
                        Version: int16(ProtocolV1),
×
1995
                },
×
1996
        )
×
1997
        // The node exists. Return the ID.
×
1998
        if err == nil {
×
1999
                return dbNode.ID, nil
×
2000
        } else if !errors.Is(err, sql.ErrNoRows) {
×
2001
                return 0, err
×
2002
        }
×
2003

2004
        // Otherwise, the node does not exist, so we create a shell entry for
2005
        // it.
2006
        id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{
×
2007
                Version: int16(ProtocolV1),
×
2008
                PubKey:  pubKey[:],
×
2009
        })
×
2010
        if err != nil {
×
2011
                return 0, fmt.Errorf("unable to create shell node: %w", err)
×
2012
        }
×
2013

2014
        return id, nil
×
2015
}
2016

2017
// upsertChanPolicyExtraSignedFields updates the policy's extra signed fields in
2018
// the database. This includes deleting any existing types and then inserting
2019
// the new types.
2020
func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries,
2021
        chanPolicyID int64, extraFields map[uint64][]byte) error {
×
2022

×
2023
        // Delete all existing extra signed fields for the channel policy.
×
2024
        err := db.DeleteChannelPolicyExtraTypes(ctx, chanPolicyID)
×
2025
        if err != nil {
×
2026
                return fmt.Errorf("unable to delete "+
×
2027
                        "existing policy extra signed fields for policy %d: %w",
×
2028
                        chanPolicyID, err)
×
2029
        }
×
2030

2031
        // Insert all new extra signed fields for the channel policy.
2032
        for tlvType, value := range extraFields {
×
2033
                err = db.InsertChanPolicyExtraType(
×
2034
                        ctx, sqlc.InsertChanPolicyExtraTypeParams{
×
2035
                                ChannelPolicyID: chanPolicyID,
×
2036
                                Type:            int64(tlvType),
×
2037
                                Value:           value,
×
2038
                        },
×
2039
                )
×
2040
                if err != nil {
×
2041
                        return fmt.Errorf("unable to insert "+
×
2042
                                "channel_policy(%d) extra signed field(%v): %w",
×
2043
                                chanPolicyID, tlvType, err)
×
2044
                }
×
2045
        }
2046

2047
        return nil
×
2048
}
2049

2050
// getAndBuildEdgeInfo builds a models.ChannelEdgeInfo instance from the
2051
// provided dbChanRow and also fetches any other required information
2052
// to construct the edge info.
2053
func getAndBuildEdgeInfo(ctx context.Context, db SQLQueries,
2054
        chain chainhash.Hash, dbChanID int64, dbChan sqlc.Channel, node1,
2055
        node2 route.Vertex) (*models.ChannelEdgeInfo, error) {
×
2056

×
2057
        fv, extras, err := getChanFeaturesAndExtras(
×
2058
                ctx, db, dbChanID,
×
2059
        )
×
2060
        if err != nil {
×
2061
                return nil, err
×
2062
        }
×
2063

2064
        op, err := wire.NewOutPointFromString(dbChan.Outpoint)
×
2065
        if err != nil {
×
2066
                return nil, err
×
2067
        }
×
2068

2069
        var featureBuf bytes.Buffer
×
2070
        if err := fv.Encode(&featureBuf); err != nil {
×
2071
                return nil, fmt.Errorf("unable to encode features: %w", err)
×
2072
        }
×
2073

2074
        recs, err := lnwire.CustomRecords(extras).Serialize()
×
2075
        if err != nil {
×
2076
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
2077
                        "fields: %w", err)
×
2078
        }
×
2079
        if recs == nil {
×
2080
                recs = make([]byte, 0)
×
2081
        }
×
2082

2083
        var btcKey1, btcKey2 route.Vertex
×
2084
        copy(btcKey1[:], dbChan.BitcoinKey1)
×
2085
        copy(btcKey2[:], dbChan.BitcoinKey2)
×
2086

×
2087
        channel := &models.ChannelEdgeInfo{
×
2088
                ChainHash:        chain,
×
2089
                ChannelID:        byteOrder.Uint64(dbChan.Scid),
×
2090
                NodeKey1Bytes:    node1,
×
2091
                NodeKey2Bytes:    node2,
×
2092
                BitcoinKey1Bytes: btcKey1,
×
2093
                BitcoinKey2Bytes: btcKey2,
×
2094
                ChannelPoint:     *op,
×
2095
                Capacity:         btcutil.Amount(dbChan.Capacity.Int64),
×
2096
                Features:         featureBuf.Bytes(),
×
2097
                ExtraOpaqueData:  recs,
×
2098
        }
×
2099

×
2100
        // We always set all the signatures at the same time, so we can
×
2101
        // safely check if one signature is present to determine if we have the
×
2102
        // rest of the signatures for the auth proof.
×
2103
        if len(dbChan.Bitcoin1Signature) > 0 {
×
2104
                channel.AuthProof = &models.ChannelAuthProof{
×
2105
                        NodeSig1Bytes:    dbChan.Node1Signature,
×
2106
                        NodeSig2Bytes:    dbChan.Node2Signature,
×
2107
                        BitcoinSig1Bytes: dbChan.Bitcoin1Signature,
×
2108
                        BitcoinSig2Bytes: dbChan.Bitcoin2Signature,
×
2109
                }
×
2110
        }
×
2111

2112
        return channel, nil
×
2113
}
2114

2115
// buildNodeVertices is a helper that converts raw node public keys
2116
// into route.Vertex instances.
2117
func buildNodeVertices(node1Pub, node2Pub []byte) (route.Vertex,
2118
        route.Vertex, error) {
×
2119

×
2120
        node1Vertex, err := route.NewVertexFromBytes(node1Pub)
×
2121
        if err != nil {
×
2122
                return route.Vertex{}, route.Vertex{}, fmt.Errorf("unable to "+
×
2123
                        "create vertex from node1 pubkey: %w", err)
×
2124
        }
×
2125

2126
        node2Vertex, err := route.NewVertexFromBytes(node2Pub)
×
2127
        if err != nil {
×
2128
                return route.Vertex{}, route.Vertex{}, fmt.Errorf("unable to "+
×
2129
                        "create vertex from node2 pubkey: %w", err)
×
2130
        }
×
2131

2132
        return node1Vertex, node2Vertex, nil
×
2133
}
2134

2135
// getChanFeaturesAndExtras fetches the channel features and extra TLV types
2136
// for a channel with the given ID.
2137
func getChanFeaturesAndExtras(ctx context.Context, db SQLQueries,
2138
        id int64) (*lnwire.FeatureVector, map[uint64][]byte, error) {
×
2139

×
2140
        rows, err := db.GetChannelFeaturesAndExtras(ctx, id)
×
2141
        if err != nil {
×
2142
                return nil, nil, fmt.Errorf("unable to fetch channel "+
×
2143
                        "features and extras: %w", err)
×
2144
        }
×
2145

2146
        var (
×
2147
                fv     = lnwire.EmptyFeatureVector()
×
2148
                extras = make(map[uint64][]byte)
×
2149
        )
×
2150
        for _, row := range rows {
×
2151
                if row.IsFeature {
×
2152
                        fv.Set(lnwire.FeatureBit(row.FeatureBit))
×
2153

×
2154
                        continue
×
2155
                }
2156

2157
                tlvType, ok := row.ExtraKey.(int64)
×
2158
                if !ok {
×
2159
                        return nil, nil, fmt.Errorf("unexpected type for "+
×
2160
                                "TLV type: %T", row.ExtraKey)
×
2161
                }
×
2162

2163
                valueBytes, ok := row.Value.([]byte)
×
2164
                if !ok {
×
2165
                        return nil, nil, fmt.Errorf("unexpected type for "+
×
2166
                                "Value: %T", row.Value)
×
2167
                }
×
2168

2169
                extras[uint64(tlvType)] = valueBytes
×
2170
        }
2171

2172
        return fv, extras, nil
×
2173
}
2174

2175
// getAndBuildChanPolicies uses the given sqlc.ChannelPolicy and also retrieves
2176
// all the extra info required to build the complete models.ChannelEdgePolicy
2177
// types. It returns two policies, which may be nil if the provided
2178
// sqlc.ChannelPolicy records are nil.
2179
func getAndBuildChanPolicies(ctx context.Context, db SQLQueries,
2180
        dbPol1, dbPol2 *sqlc.ChannelPolicy, channelID uint64, node1,
2181
        node2 route.Vertex) (*models.ChannelEdgePolicy,
2182
        *models.ChannelEdgePolicy, error) {
×
2183

×
2184
        if dbPol1 == nil && dbPol2 == nil {
×
2185
                return nil, nil, nil
×
2186
        }
×
2187

2188
        var (
×
2189
                policy1ID int64
×
2190
                policy2ID int64
×
2191
        )
×
2192
        if dbPol1 != nil {
×
2193
                policy1ID = dbPol1.ID
×
2194
        }
×
2195
        if dbPol2 != nil {
×
2196
                policy2ID = dbPol2.ID
×
2197
        }
×
2198
        rows, err := db.GetChannelPolicyExtraTypes(
×
2199
                ctx, sqlc.GetChannelPolicyExtraTypesParams{
×
2200
                        ID:   policy1ID,
×
2201
                        ID_2: policy2ID,
×
2202
                },
×
2203
        )
×
2204
        if err != nil {
×
2205
                return nil, nil, err
×
2206
        }
×
2207

2208
        var (
×
2209
                dbPol1Extras = make(map[uint64][]byte)
×
2210
                dbPol2Extras = make(map[uint64][]byte)
×
2211
        )
×
2212
        for _, row := range rows {
×
2213
                switch row.PolicyID {
×
2214
                case policy1ID:
×
2215
                        dbPol1Extras[uint64(row.Type)] = row.Value
×
2216
                case policy2ID:
×
2217
                        dbPol2Extras[uint64(row.Type)] = row.Value
×
2218
                default:
×
2219
                        return nil, nil, fmt.Errorf("unexpected policy ID %d "+
×
2220
                                "in row: %v", row.PolicyID, row)
×
2221
                }
2222
        }
2223

2224
        var pol1, pol2 *models.ChannelEdgePolicy
×
2225
        if dbPol1 != nil {
×
2226
                pol1, err = buildChanPolicy(
×
2227
                        *dbPol1, channelID, dbPol1Extras, node2, true,
×
2228
                )
×
2229
                if err != nil {
×
2230
                        return nil, nil, err
×
2231
                }
×
2232
        }
2233
        if dbPol2 != nil {
×
2234
                pol2, err = buildChanPolicy(
×
2235
                        *dbPol2, channelID, dbPol2Extras, node1, false,
×
2236
                )
×
2237
                if err != nil {
×
2238
                        return nil, nil, err
×
2239
                }
×
2240
        }
2241

2242
        return pol1, pol2, nil
×
2243
}
2244

2245
// buildChanPolicy builds a models.ChannelEdgePolicy instance from the
2246
// provided sqlc.ChannelPolicy and other required information.
2247
func buildChanPolicy(dbPolicy sqlc.ChannelPolicy, channelID uint64,
2248
        extras map[uint64][]byte, toNode route.Vertex,
2249
        isNode1 bool) (*models.ChannelEdgePolicy, error) {
×
2250

×
2251
        recs, err := lnwire.CustomRecords(extras).Serialize()
×
2252
        if err != nil {
×
2253
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
2254
                        "fields: %w", err)
×
2255
        }
×
2256

2257
        var msgFlags lnwire.ChanUpdateMsgFlags
×
2258
        if dbPolicy.MaxHtlcMsat.Valid {
×
2259
                msgFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
2260
        }
×
2261

2262
        var chanFlags lnwire.ChanUpdateChanFlags
×
2263
        if !isNode1 {
×
2264
                chanFlags |= lnwire.ChanUpdateDirection
×
2265
        }
×
2266
        if dbPolicy.Disabled.Bool {
×
2267
                chanFlags |= lnwire.ChanUpdateDisabled
×
2268
        }
×
2269

2270
        var inboundFee fn.Option[lnwire.Fee]
×
2271
        if dbPolicy.InboundFeeRateMilliMsat.Valid ||
×
2272
                dbPolicy.InboundBaseFeeMsat.Valid {
×
2273

×
2274
                inboundFee = fn.Some(lnwire.Fee{
×
2275
                        BaseFee: int32(dbPolicy.InboundBaseFeeMsat.Int64),
×
2276
                        FeeRate: int32(dbPolicy.InboundFeeRateMilliMsat.Int64),
×
2277
                })
×
2278
        }
×
2279

2280
        return &models.ChannelEdgePolicy{
×
2281
                SigBytes:  dbPolicy.Signature,
×
2282
                ChannelID: channelID,
×
2283
                LastUpdate: time.Unix(
×
2284
                        dbPolicy.LastUpdate.Int64, 0,
×
2285
                ),
×
2286
                MessageFlags:  msgFlags,
×
2287
                ChannelFlags:  chanFlags,
×
2288
                TimeLockDelta: uint16(dbPolicy.Timelock),
×
2289
                MinHTLC: lnwire.MilliSatoshi(
×
2290
                        dbPolicy.MinHtlcMsat,
×
2291
                ),
×
2292
                MaxHTLC: lnwire.MilliSatoshi(
×
2293
                        dbPolicy.MaxHtlcMsat.Int64,
×
2294
                ),
×
2295
                FeeBaseMSat: lnwire.MilliSatoshi(
×
2296
                        dbPolicy.BaseFeeMsat,
×
2297
                ),
×
2298
                FeeProportionalMillionths: lnwire.MilliSatoshi(dbPolicy.FeePpm),
×
2299
                ToNode:                    toNode,
×
2300
                InboundFee:                inboundFee,
×
2301
                ExtraOpaqueData:           recs,
×
2302
        }, nil
×
2303
}
2304

2305
// extractChannelPolicies extracts the sqlc.ChannelPolicy records from the give
2306
// row which is expected to be a sqlc type that contains channel policy
2307
// information. It returns two policies, which may be nil if the policy
2308
// information is not present in the row.
2309
//
2310
//nolint:ll
2311
func extractChannelPolicies(row any) (*sqlc.ChannelPolicy, *sqlc.ChannelPolicy,
2312
        error) {
×
2313

×
2314
        var policy1, policy2 *sqlc.ChannelPolicy
×
2315
        switch r := row.(type) {
×
2316
        case sqlc.ListChannelsByNodeIDRow:
×
2317
                if r.Policy1ID.Valid {
×
2318
                        policy1 = &sqlc.ChannelPolicy{
×
2319
                                ID:                      r.Policy1ID.Int64,
×
2320
                                Version:                 r.Policy1Version.Int16,
×
2321
                                ChannelID:               r.Channel.ID,
×
2322
                                NodeID:                  r.Policy1NodeID.Int64,
×
2323
                                Timelock:                r.Policy1Timelock.Int32,
×
2324
                                FeePpm:                  r.Policy1FeePpm.Int64,
×
2325
                                BaseFeeMsat:             r.Policy1BaseFeeMsat.Int64,
×
2326
                                MinHtlcMsat:             r.Policy1MinHtlcMsat.Int64,
×
2327
                                MaxHtlcMsat:             r.Policy1MaxHtlcMsat,
×
2328
                                LastUpdate:              r.Policy1LastUpdate,
×
2329
                                InboundBaseFeeMsat:      r.Policy1InboundBaseFeeMsat,
×
2330
                                InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat,
×
2331
                                Disabled:                r.Policy1Disabled,
×
2332
                                Signature:               r.Policy1Signature,
×
2333
                        }
×
2334
                }
×
2335
                if r.Policy2ID.Valid {
×
2336
                        policy2 = &sqlc.ChannelPolicy{
×
2337
                                ID:                      r.Policy2ID.Int64,
×
2338
                                Version:                 r.Policy2Version.Int16,
×
2339
                                ChannelID:               r.Channel.ID,
×
2340
                                NodeID:                  r.Policy2NodeID.Int64,
×
2341
                                Timelock:                r.Policy2Timelock.Int32,
×
2342
                                FeePpm:                  r.Policy2FeePpm.Int64,
×
2343
                                BaseFeeMsat:             r.Policy2BaseFeeMsat.Int64,
×
2344
                                MinHtlcMsat:             r.Policy2MinHtlcMsat.Int64,
×
2345
                                MaxHtlcMsat:             r.Policy2MaxHtlcMsat,
×
2346
                                LastUpdate:              r.Policy2LastUpdate,
×
2347
                                InboundBaseFeeMsat:      r.Policy2InboundBaseFeeMsat,
×
2348
                                InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat,
×
2349
                                Disabled:                r.Policy2Disabled,
×
2350
                                Signature:               r.Policy2Signature,
×
2351
                        }
×
2352
                }
×
2353

2354
                return policy1, policy2, nil
×
2355
        default:
×
2356
                return nil, nil, fmt.Errorf("unexpected row type in "+
×
2357
                        "extractChannelPolicies: %T", r)
×
2358
        }
2359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc