• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15299382095

28 May 2025 11:50AM UTC coverage: 68.42% (+10.1%) from 58.327%
15299382095

Pull #9869

github

web-flow
Merge 7720c7d0f into bff2f2440
Pull Request #9869: sqldb+graph/db: add channel tables and implement some channel CRUD

4 of 225 new or added lines in 3 files covered. (1.78%)

20 existing lines in 6 files now uncovered.

133998 of 195845 relevant lines covered (68.42%)

21953.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "encoding/hex"
8
        "errors"
9
        "fmt"
10
        "math"
11
        "net"
12
        "strconv"
13
        "sync"
14
        "time"
15

16
        "github.com/btcsuite/btcd/btcec/v2"
17
        "github.com/lightningnetwork/lnd/batch"
18
        "github.com/lightningnetwork/lnd/graph/db/models"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/sqldb"
22
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
23
        "github.com/lightningnetwork/lnd/tlv"
24
        "github.com/lightningnetwork/lnd/tor"
25
)
26

27
// ProtocolVersion is an enum that defines the gossip protocol version of a
28
// message.
29
type ProtocolVersion uint8
30

31
const (
32
        // ProtocolV1 is the gossip protocol version defined in BOLT #7.
33
        ProtocolV1 ProtocolVersion = 1
34
)
35

36
// String returns a string representation of the protocol version.
37
func (v ProtocolVersion) String() string {
×
38
        return fmt.Sprintf("V%d", v)
×
39
}
×
40

41
// SQLQueries is a subset of the sqlc.Querier interface that can be used to
42
// execute queries against the SQL graph tables.
43
//
44
//nolint:ll,interfacebloat
45
type SQLQueries interface {
46
        /*
47
                Node queries.
48
        */
49
        UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
50
        GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error)
51
        GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error)
52
        DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error)
53

54
        GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error)
55
        UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error
56
        DeleteExtraNodeType(ctx context.Context, arg sqlc.DeleteExtraNodeTypeParams) error
57

58
        InsertNodeAddress(ctx context.Context, arg sqlc.InsertNodeAddressParams) error
59
        GetNodeAddressesByPubKey(ctx context.Context, arg sqlc.GetNodeAddressesByPubKeyParams) ([]sqlc.GetNodeAddressesByPubKeyRow, error)
60
        DeleteNodeAddresses(ctx context.Context, nodeID int64) error
61

62
        InsertNodeFeature(ctx context.Context, arg sqlc.InsertNodeFeatureParams) error
63
        GetNodeFeatures(ctx context.Context, nodeID int64) ([]sqlc.NodeFeature, error)
64
        GetNodeFeaturesByPubKey(ctx context.Context, arg sqlc.GetNodeFeaturesByPubKeyParams) ([]int32, error)
65
        DeleteNodeFeature(ctx context.Context, arg sqlc.DeleteNodeFeatureParams) error
66

67
        /*
68
                Source node queries.
69
        */
70
        AddSourceNode(ctx context.Context, nodeID int64) error
71
        GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
72

73
        /*
74
                Channel queries.
75
        */
76
        CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
77
        GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
78
        HighestSCID(ctx context.Context, version int16) ([]byte, error)
79

80
        CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
81
        InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
82
}
83

84
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
85
// database operations.
86
type BatchedSQLQueries interface {
87
        SQLQueries
88
        sqldb.BatchedTx[SQLQueries]
89
}
90

91
// SQLStore is an implementation of the V1Store interface that uses a SQL
92
// database as the backend.
93
//
94
// NOTE: currently, this temporarily embeds the KVStore struct so that we can
95
// implement the V1Store interface incrementally. For any method not
96
// implemented,  things will fall back to the KVStore. This is ONLY the case
97
// for the time being while this struct is purely used in unit tests only.
98
type SQLStore struct {
99
        db BatchedSQLQueries
100

101
        // cacheMu guards all caches (rejectCache and chanCache). If
102
        // this mutex will be acquired at the same time as the DB mutex then
103
        // the cacheMu MUST be acquired first to prevent deadlock.
104
        cacheMu     sync.RWMutex
105
        rejectCache *rejectCache
106
        chanCache   *channelCache
107

108
        chanScheduler batch.Scheduler[SQLQueries]
109
        nodeScheduler batch.Scheduler[SQLQueries]
110

111
        // Temporary fall-back to the KVStore so that we can implement the
112
        // interface incrementally.
113
        *KVStore
114
}
115

116
// A compile-time assertion to ensure that SQLStore implements the V1Store
117
// interface.
118
var _ V1Store = (*SQLStore)(nil)
119

120
// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
121
// storage backend.
122
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
123
        options ...StoreOptionModifier) (*SQLStore, error) {
×
124

×
125
        opts := DefaultOptions()
×
126
        for _, o := range options {
×
127
                o(opts)
×
128
        }
×
129

130
        if opts.NoMigration {
×
131
                return nil, fmt.Errorf("the NoMigration option is not yet " +
×
132
                        "supported for SQL stores")
×
133
        }
×
134

135
        s := &SQLStore{
×
136
                db:          db,
×
137
                KVStore:     kvStore,
×
138
                rejectCache: newRejectCache(opts.RejectCacheSize),
×
139
                chanCache:   newChannelCache(opts.ChannelCacheSize),
×
140
        }
×
141

×
142
        s.chanScheduler = batch.NewTimeScheduler(
×
143
                db, &s.cacheMu, opts.BatchCommitInterval,
×
144
        )
×
145
        s.nodeScheduler = batch.NewTimeScheduler(
×
146
                db, nil, opts.BatchCommitInterval,
×
147
        )
×
148

×
149
        return s, nil
×
150
}
151

152
// AddLightningNode adds a vertex/node to the graph database. If the node is not
153
// in the database from before, this will add a new, unconnected one to the
154
// graph. If it is present from before, this will update that node's
155
// information.
156
//
157
// NOTE: part of the V1Store interface.
158
func (s *SQLStore) AddLightningNode(node *models.LightningNode,
159
        opts ...batch.SchedulerOption) error {
×
160

×
161
        ctx := context.TODO()
×
162

×
163
        r := &batch.Request[SQLQueries]{
×
164
                Opts: batch.NewSchedulerOptions(opts...),
×
165
                Do: func(queries SQLQueries) error {
×
166
                        _, err := upsertNode(ctx, queries, node)
×
167
                        return err
×
168
                },
×
169
        }
170

171
        return s.nodeScheduler.Execute(ctx, r)
×
172
}
173

174
// FetchLightningNode attempts to look up a target node by its identity public
175
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
176
// returned.
177
//
178
// NOTE: part of the V1Store interface.
179
func (s *SQLStore) FetchLightningNode(pubKey route.Vertex) (
180
        *models.LightningNode, error) {
×
181

×
182
        ctx := context.TODO()
×
183

×
184
        var node *models.LightningNode
×
185
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
186
                var err error
×
187
                _, node, err = getNodeByPubKey(ctx, db, pubKey)
×
188

×
189
                return err
×
190
        }, sqldb.NoOpReset)
×
191
        if err != nil {
×
192
                return nil, fmt.Errorf("unable to fetch node: %w", err)
×
193
        }
×
194

195
        return node, nil
×
196
}
197

198
// HasLightningNode determines if the graph has a vertex identified by the
199
// target node identity public key. If the node exists in the database, a
200
// timestamp of when the data for the node was lasted updated is returned along
201
// with a true boolean. Otherwise, an empty time.Time is returned with a false
202
// boolean.
203
//
204
// NOTE: part of the V1Store interface.
205
func (s *SQLStore) HasLightningNode(pubKey [33]byte) (time.Time, bool,
206
        error) {
×
207

×
208
        ctx := context.TODO()
×
209

×
210
        var (
×
211
                exists     bool
×
212
                lastUpdate time.Time
×
213
        )
×
214
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
215
                dbNode, err := db.GetNodeByPubKey(
×
216
                        ctx, sqlc.GetNodeByPubKeyParams{
×
217
                                Version: int16(ProtocolV1),
×
218
                                PubKey:  pubKey[:],
×
219
                        },
×
220
                )
×
221
                if errors.Is(err, sql.ErrNoRows) {
×
222
                        return nil
×
223
                } else if err != nil {
×
224
                        return fmt.Errorf("unable to fetch node: %w", err)
×
225
                }
×
226

227
                exists = true
×
228

×
229
                if dbNode.LastUpdate.Valid {
×
230
                        lastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
231
                }
×
232

233
                return nil
×
234
        }, sqldb.NoOpReset)
235
        if err != nil {
×
236
                return time.Time{}, false,
×
237
                        fmt.Errorf("unable to fetch node: %w", err)
×
238
        }
×
239

240
        return lastUpdate, exists, nil
×
241
}
242

243
// AddrsForNode returns all known addresses for the target node public key
244
// that the graph DB is aware of. The returned boolean indicates if the
245
// given node is unknown to the graph DB or not.
246
//
247
// NOTE: part of the V1Store interface.
248
func (s *SQLStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
249
        error) {
×
250

×
251
        ctx := context.TODO()
×
252

×
253
        var (
×
254
                addresses []net.Addr
×
255
                known     bool
×
256
        )
×
257
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
258
                var err error
×
259
                known, addresses, err = getNodeAddresses(
×
260
                        ctx, db, nodePub.SerializeCompressed(),
×
261
                )
×
262
                if err != nil {
×
263
                        return fmt.Errorf("unable to fetch node addresses: %w",
×
264
                                err)
×
265
                }
×
266

267
                return nil
×
268
        }, sqldb.NoOpReset)
269
        if err != nil {
×
270
                return false, nil, fmt.Errorf("unable to get addresses for "+
×
271
                        "node(%x): %w", nodePub.SerializeCompressed(), err)
×
272
        }
×
273

274
        return known, addresses, nil
×
275
}
276

277
// DeleteLightningNode starts a new database transaction to remove a vertex/node
278
// from the database according to the node's public key.
279
//
280
// NOTE: part of the V1Store interface.
281
func (s *SQLStore) DeleteLightningNode(pubKey route.Vertex) error {
×
282
        ctx := context.TODO()
×
283

×
284
        err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
×
285
                res, err := db.DeleteNodeByPubKey(
×
286
                        ctx, sqlc.DeleteNodeByPubKeyParams{
×
287
                                Version: int16(ProtocolV1),
×
288
                                PubKey:  pubKey[:],
×
289
                        },
×
290
                )
×
291
                if err != nil {
×
292
                        return err
×
293
                }
×
294

295
                rows, err := res.RowsAffected()
×
296
                if err != nil {
×
297
                        return err
×
298
                }
×
299

300
                if rows == 0 {
×
301
                        return ErrGraphNodeNotFound
×
302
                } else if rows > 1 {
×
303
                        return fmt.Errorf("deleted %d rows, expected 1", rows)
×
304
                }
×
305

306
                return err
×
307
        }, sqldb.NoOpReset)
308
        if err != nil {
×
309
                return fmt.Errorf("unable to delete node: %w", err)
×
310
        }
×
311

312
        return nil
×
313
}
314

315
// FetchNodeFeatures returns the features of the given node. If no features are
316
// known for the node, an empty feature vector is returned.
317
//
318
// NOTE: this is part of the graphdb.NodeTraverser interface.
319
func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) (
320
        *lnwire.FeatureVector, error) {
×
321

×
322
        ctx := context.TODO()
×
323

×
324
        return fetchNodeFeatures(ctx, s.db, nodePub)
×
325
}
×
326

327
// LookupAlias attempts to return the alias as advertised by the target node.
328
//
329
// NOTE: part of the V1Store interface.
330
func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
×
331
        var (
×
332
                ctx   = context.TODO()
×
333
                alias string
×
334
        )
×
335
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
336
                dbNode, err := db.GetNodeByPubKey(
×
337
                        ctx, sqlc.GetNodeByPubKeyParams{
×
338
                                Version: int16(ProtocolV1),
×
339
                                PubKey:  pub.SerializeCompressed(),
×
340
                        },
×
341
                )
×
342
                if errors.Is(err, sql.ErrNoRows) {
×
343
                        return ErrNodeAliasNotFound
×
344
                } else if err != nil {
×
345
                        return fmt.Errorf("unable to fetch node: %w", err)
×
346
                }
×
347

348
                if !dbNode.Alias.Valid {
×
349
                        return ErrNodeAliasNotFound
×
350
                }
×
351

352
                alias = dbNode.Alias.String
×
353

×
354
                return nil
×
355
        }, sqldb.NoOpReset)
356
        if err != nil {
×
357
                return "", fmt.Errorf("unable to look up alias: %w", err)
×
358
        }
×
359

360
        return alias, nil
×
361
}
362

363
// SourceNode returns the source node of the graph. The source node is treated
364
// as the center node within a star-graph. This method may be used to kick off
365
// a path finding algorithm in order to explore the reachability of another
366
// node based off the source node.
367
//
368
// NOTE: part of the V1Store interface.
369
func (s *SQLStore) SourceNode() (*models.LightningNode, error) {
×
370
        ctx := context.TODO()
×
371

×
372
        var node *models.LightningNode
×
373
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
374
                _, nodePub, err := getSourceNode(ctx, db, ProtocolV1)
×
375
                if err != nil {
×
376
                        return fmt.Errorf("unable to fetch V1 source node: %w",
×
377
                                err)
×
378
                }
×
379

380
                _, node, err = getNodeByPubKey(ctx, db, nodePub)
×
381

×
382
                return err
×
383
        }, sqldb.NoOpReset)
384
        if err != nil {
×
385
                return nil, fmt.Errorf("unable to fetch source node: %w", err)
×
386
        }
×
387

388
        return node, nil
×
389
}
390

391
// SetSourceNode sets the source node within the graph database. The source
392
// node is to be used as the center of a star-graph within path finding
393
// algorithms.
394
//
395
// NOTE: part of the V1Store interface.
396
func (s *SQLStore) SetSourceNode(node *models.LightningNode) error {
×
397
        ctx := context.TODO()
×
398

×
399
        return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
×
400
                id, err := upsertNode(ctx, db, node)
×
401
                if err != nil {
×
402
                        return fmt.Errorf("unable to upsert source node: %w",
×
403
                                err)
×
404
                }
×
405

406
                // Make sure that if a source node for this version is already
407
                // set, then the ID is the same as the one we are about to set.
408
                dbSourceNodeID, _, err := getSourceNode(ctx, db, ProtocolV1)
×
409
                if err != nil && !errors.Is(err, ErrSourceNodeNotSet) {
×
410
                        return fmt.Errorf("unable to fetch source node: %w",
×
411
                                err)
×
412
                } else if err == nil {
×
413
                        if dbSourceNodeID != id {
×
414
                                return fmt.Errorf("v1 source node already "+
×
415
                                        "set to a different node: %d vs %d",
×
416
                                        dbSourceNodeID, id)
×
417
                        }
×
418

419
                        return nil
×
420
                }
421

422
                return db.AddSourceNode(ctx, id)
×
423
        }, sqldb.NoOpReset)
424
}
425

426
// NodeUpdatesInHorizon returns all the known lightning node which have an
427
// update timestamp within the passed range. This method can be used by two
428
// nodes to quickly determine if they have the same set of up to date node
429
// announcements.
430
//
431
// NOTE: This is part of the V1Store interface.
432
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
433
        endTime time.Time) ([]models.LightningNode, error) {
×
434

×
435
        ctx := context.TODO()
×
436

×
437
        var nodes []models.LightningNode
×
438
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
439
                dbNodes, err := db.GetNodesByLastUpdateRange(
×
440
                        ctx, sqlc.GetNodesByLastUpdateRangeParams{
×
441
                                StartTime: sqldb.SQLInt64(startTime.Unix()),
×
442
                                EndTime:   sqldb.SQLInt64(endTime.Unix()),
×
443
                        },
×
444
                )
×
445
                if err != nil {
×
446
                        return fmt.Errorf("unable to fetch nodes: %w", err)
×
447
                }
×
448

449
                for _, dbNode := range dbNodes {
×
450
                        node, err := buildNode(ctx, db, &dbNode)
×
451
                        if err != nil {
×
452
                                return fmt.Errorf("unable to build node: %w",
×
453
                                        err)
×
454
                        }
×
455

456
                        nodes = append(nodes, *node)
×
457
                }
458

459
                return nil
×
460
        }, sqldb.NoOpReset)
461
        if err != nil {
×
462
                return nil, fmt.Errorf("unable to fetch nodes: %w", err)
×
463
        }
×
464

465
        return nodes, nil
×
466
}
467

468
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
469
// undirected edge from the two target nodes are created. The information stored
470
// denotes the static attributes of the channel, such as the channelID, the keys
471
// involved in creation of the channel, and the set of features that the channel
472
// supports. The chanPoint and chanID are used to uniquely identify the edge
473
// globally within the database.
474
//
475
// NOTE: part of the V1Store interface.
476
func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
NEW
477
        opts ...batch.SchedulerOption) error {
×
NEW
478

×
NEW
479
        ctx := context.TODO()
×
NEW
480

×
NEW
481
        var alreadyExists bool
×
NEW
482
        r := &batch.Request[SQLQueries]{
×
NEW
483
                Opts: batch.NewSchedulerOptions(opts...),
×
NEW
484
                Reset: func() {
×
NEW
485
                        alreadyExists = false
×
NEW
486
                },
×
NEW
487
                Do: func(tx SQLQueries) error {
×
NEW
488
                        err := insertChannel(ctx, tx, edge)
×
NEW
489

×
NEW
490
                        // Silence ErrEdgeAlreadyExist so that the batch can
×
NEW
491
                        // succeed, but propagate the error via local state.
×
NEW
492
                        if errors.Is(err, ErrEdgeAlreadyExist) {
×
NEW
493
                                alreadyExists = true
×
NEW
494
                                return nil
×
NEW
495
                        }
×
496

NEW
497
                        return err
×
498
                },
NEW
499
                OnCommit: func(err error) error {
×
NEW
500
                        switch {
×
NEW
501
                        case err != nil:
×
NEW
502
                                return err
×
NEW
503
                        case alreadyExists:
×
NEW
504
                                return ErrEdgeAlreadyExist
×
NEW
505
                        default:
×
NEW
506
                                s.rejectCache.remove(edge.ChannelID)
×
NEW
507
                                s.chanCache.remove(edge.ChannelID)
×
NEW
508
                                return nil
×
509
                        }
510
                },
511
        }
512

NEW
513
        return s.chanScheduler.Execute(ctx, r)
×
514
}
515

516
// HighestChanID returns the "highest" known channel ID in the channel graph.
517
// This represents the "newest" channel from the PoV of the chain. This method
518
// can be used by peers to quickly determine if they're graphs are in sync.
519
//
520
// NOTE: This is part of the V1Store interface.
NEW
521
func (s *SQLStore) HighestChanID() (uint64, error) {
×
NEW
522
        ctx := context.TODO()
×
NEW
523

×
NEW
524
        var highestChanID uint64
×
NEW
525
        err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
×
NEW
526
                chanID, err := db.HighestSCID(ctx, int16(ProtocolV1))
×
NEW
527
                if errors.Is(err, sql.ErrNoRows) {
×
NEW
528
                        return nil
×
NEW
529
                } else if err != nil {
×
NEW
530
                        return fmt.Errorf("unable to fetch highest chan ID: %w",
×
NEW
531
                                err)
×
NEW
532
                }
×
533

NEW
534
                highestChanID = byteOrder.Uint64(chanID)
×
NEW
535

×
NEW
536
                return nil
×
537
        }, sqldb.NoOpReset)
NEW
538
        if err != nil {
×
NEW
539
                return 0, fmt.Errorf("unable to fetch highest chan ID: %w", err)
×
NEW
540
        }
×
541

NEW
542
        return highestChanID, nil
×
543
}
544

545
// getNodeByPubKey attempts to look up a target node by its public key.
546
func getNodeByPubKey(ctx context.Context, db SQLQueries,
547
        pubKey route.Vertex) (int64, *models.LightningNode, error) {
×
548

×
549
        dbNode, err := db.GetNodeByPubKey(
×
550
                ctx, sqlc.GetNodeByPubKeyParams{
×
551
                        Version: int16(ProtocolV1),
×
552
                        PubKey:  pubKey[:],
×
553
                },
×
554
        )
×
555
        if errors.Is(err, sql.ErrNoRows) {
×
556
                return 0, nil, ErrGraphNodeNotFound
×
557
        } else if err != nil {
×
558
                return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
×
559
        }
×
560

561
        node, err := buildNode(ctx, db, &dbNode)
×
562
        if err != nil {
×
563
                return 0, nil, fmt.Errorf("unable to build node: %w", err)
×
564
        }
×
565

566
        return dbNode.ID, node, nil
×
567
}
568

569
// buildNode constructs a LightningNode instance from the given database node
570
// record. The node's features, addresses and extra signed fields are also
571
// fetched from the database and set on the node.
572
func buildNode(ctx context.Context, db SQLQueries, dbNode *sqlc.Node) (
573
        *models.LightningNode, error) {
×
574

×
575
        if dbNode.Version != int16(ProtocolV1) {
×
576
                return nil, fmt.Errorf("unsupported node version: %d",
×
577
                        dbNode.Version)
×
578
        }
×
579

580
        var pub [33]byte
×
581
        copy(pub[:], dbNode.PubKey)
×
582

×
583
        node := &models.LightningNode{
×
NEW
584
                PubKeyBytes: pub,
×
NEW
585
                Features:    lnwire.EmptyFeatureVector(),
×
NEW
586
                LastUpdate:  time.Unix(0, 0),
×
587
        }
×
588

×
589
        if len(dbNode.Signature) == 0 {
×
590
                return node, nil
×
591
        }
×
592

593
        node.HaveNodeAnnouncement = true
×
594
        node.AuthSigBytes = dbNode.Signature
×
595
        node.Alias = dbNode.Alias.String
×
596
        node.LastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
597

×
598
        var err error
×
599
        node.Color, err = DecodeHexColor(dbNode.Color.String)
×
600
        if err != nil {
×
601
                return nil, fmt.Errorf("unable to decode color: %w", err)
×
602
        }
×
603

604
        // Fetch the node's features.
605
        node.Features, err = getNodeFeatures(ctx, db, dbNode.ID)
×
606
        if err != nil {
×
607
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
608
                        "features: %w", dbNode.ID, err)
×
609
        }
×
610

611
        // Fetch the node's addresses.
612
        _, node.Addresses, err = getNodeAddresses(ctx, db, pub[:])
×
613
        if err != nil {
×
614
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
615
                        "addresses: %w", dbNode.ID, err)
×
616
        }
×
617

618
        // Fetch the node's extra signed fields.
619
        extraTLVMap, err := getNodeExtraSignedFields(ctx, db, dbNode.ID)
×
620
        if err != nil {
×
621
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
622
                        "extra signed fields: %w", dbNode.ID, err)
×
623
        }
×
624

625
        recs, err := lnwire.CustomRecords(extraTLVMap).Serialize()
×
626
        if err != nil {
×
627
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
628
                        "fields: %w", err)
×
629
        }
×
630

631
        if len(recs) != 0 {
×
632
                node.ExtraOpaqueData = recs
×
633
        }
×
634

635
        return node, nil
×
636
}
637

638
// getNodeFeatures fetches the feature bits and constructs the feature vector
639
// for a node with the given DB ID.
640
func getNodeFeatures(ctx context.Context, db SQLQueries,
641
        nodeID int64) (*lnwire.FeatureVector, error) {
×
642

×
643
        rows, err := db.GetNodeFeatures(ctx, nodeID)
×
644
        if err != nil {
×
645
                return nil, fmt.Errorf("unable to get node(%d) features: %w",
×
646
                        nodeID, err)
×
647
        }
×
648

649
        features := lnwire.EmptyFeatureVector()
×
650
        for _, feature := range rows {
×
651
                features.Set(lnwire.FeatureBit(feature.FeatureBit))
×
652
        }
×
653

654
        return features, nil
×
655
}
656

657
// getNodeExtraSignedFields fetches the extra signed fields for a node with the
658
// given DB ID.
659
func getNodeExtraSignedFields(ctx context.Context, db SQLQueries,
660
        nodeID int64) (map[uint64][]byte, error) {
×
661

×
662
        fields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
663
        if err != nil {
×
664
                return nil, fmt.Errorf("unable to get node(%d) extra "+
×
665
                        "signed fields: %w", nodeID, err)
×
666
        }
×
667

668
        extraFields := make(map[uint64][]byte)
×
669
        for _, field := range fields {
×
670
                extraFields[uint64(field.Type)] = field.Value
×
671
        }
×
672

673
        return extraFields, nil
×
674
}
675

676
// upsertNode upserts the node record into the database. If the node already
677
// exists, then the node's information is updated. If the node doesn't exist,
678
// then a new node is created. The node's features, addresses and extra TLV
679
// types are also updated. The node's DB ID is returned.
680
func upsertNode(ctx context.Context, db SQLQueries,
681
        node *models.LightningNode) (int64, error) {
×
682

×
683
        params := sqlc.UpsertNodeParams{
×
684
                Version: int16(ProtocolV1),
×
685
                PubKey:  node.PubKeyBytes[:],
×
686
        }
×
687

×
688
        if node.HaveNodeAnnouncement {
×
689
                params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
×
690
                params.Color = sqldb.SQLStr(EncodeHexColor(node.Color))
×
691
                params.Alias = sqldb.SQLStr(node.Alias)
×
692
                params.Signature = node.AuthSigBytes
×
693
        }
×
694

695
        nodeID, err := db.UpsertNode(ctx, params)
×
696
        if err != nil {
×
697
                return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
×
698
                        err)
×
699
        }
×
700

701
        // We can exit here if we don't have the announcement yet.
702
        if !node.HaveNodeAnnouncement {
×
703
                return nodeID, nil
×
704
        }
×
705

706
        // Update the node's features.
707
        err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
×
708
        if err != nil {
×
709
                return 0, fmt.Errorf("inserting node features: %w", err)
×
710
        }
×
711

712
        // Update the node's addresses.
713
        err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
×
714
        if err != nil {
×
715
                return 0, fmt.Errorf("inserting node addresses: %w", err)
×
716
        }
×
717

718
        // Convert the flat extra opaque data into a map of TLV types to
719
        // values.
720
        extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
721
        if err != nil {
×
722
                return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
×
723
                        err)
×
724
        }
×
725

726
        // Update the node's extra signed fields.
727
        err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
×
728
        if err != nil {
×
729
                return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
×
730
        }
×
731

732
        return nodeID, nil
×
733
}
734

735
// upsertNodeFeatures updates the node's features node_features table. This
736
// includes deleting any feature bits no longer present and inserting any new
737
// feature bits. If the feature bit does not yet exist in the features table,
738
// then an entry is created in that table first.
739
func upsertNodeFeatures(ctx context.Context, db SQLQueries, nodeID int64,
740
        features *lnwire.FeatureVector) error {
×
741

×
742
        // Get any existing features for the node.
×
743
        existingFeatures, err := db.GetNodeFeatures(ctx, nodeID)
×
744
        if err != nil && !errors.Is(err, sql.ErrNoRows) {
×
745
                return err
×
746
        }
×
747

748
        // Copy the nodes latest set of feature bits.
749
        newFeatures := make(map[int32]struct{})
×
750
        if features != nil {
×
751
                for feature := range features.Features() {
×
752
                        newFeatures[int32(feature)] = struct{}{}
×
753
                }
×
754
        }
755

756
        // For any current feature that already exists in the DB, remove it from
757
        // the in-memory map. For any existing feature that does not exist in
758
        // the in-memory map, delete it from the database.
759
        for _, feature := range existingFeatures {
×
760
                // The feature is still present, so there are no updates to be
×
761
                // made.
×
762
                if _, ok := newFeatures[feature.FeatureBit]; ok {
×
763
                        delete(newFeatures, feature.FeatureBit)
×
764
                        continue
×
765
                }
766

767
                // The feature is no longer present, so we remove it from the
768
                // database.
769
                err := db.DeleteNodeFeature(ctx, sqlc.DeleteNodeFeatureParams{
×
770
                        NodeID:     nodeID,
×
771
                        FeatureBit: feature.FeatureBit,
×
772
                })
×
773
                if err != nil {
×
774
                        return fmt.Errorf("unable to delete node(%d) "+
×
775
                                "feature(%v): %w", nodeID, feature.FeatureBit,
×
776
                                err)
×
777
                }
×
778
        }
779

780
        // Any remaining entries in newFeatures are new features that need to be
781
        // added to the database for the first time.
782
        for feature := range newFeatures {
×
783
                err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
×
784
                        NodeID:     nodeID,
×
785
                        FeatureBit: feature,
×
786
                })
×
787
                if err != nil {
×
788
                        return fmt.Errorf("unable to insert node(%d) "+
×
789
                                "feature(%v): %w", nodeID, feature, err)
×
790
                }
×
791
        }
792

793
        return nil
×
794
}
795

796
// fetchNodeFeatures fetches the features for a node with the given public key.
797
func fetchNodeFeatures(ctx context.Context, queries SQLQueries,
798
        nodePub route.Vertex) (*lnwire.FeatureVector, error) {
×
799

×
800
        rows, err := queries.GetNodeFeaturesByPubKey(
×
801
                ctx, sqlc.GetNodeFeaturesByPubKeyParams{
×
802
                        PubKey:  nodePub[:],
×
803
                        Version: int16(ProtocolV1),
×
804
                },
×
805
        )
×
806
        if err != nil {
×
807
                return nil, fmt.Errorf("unable to get node(%s) features: %w",
×
808
                        nodePub, err)
×
809
        }
×
810

811
        features := lnwire.EmptyFeatureVector()
×
812
        for _, bit := range rows {
×
813
                features.Set(lnwire.FeatureBit(bit))
×
814
        }
×
815

816
        return features, nil
×
817
}
818

819
// dbAddressType is an enum type that represents the different address types
820
// that we store in the node_addresses table. The address type determines how
821
// the address is to be serialised/deserialize.
822
type dbAddressType uint8
823

824
const (
825
        addressTypeIPv4   dbAddressType = 1
826
        addressTypeIPv6   dbAddressType = 2
827
        addressTypeTorV2  dbAddressType = 3
828
        addressTypeTorV3  dbAddressType = 4
829
        addressTypeOpaque dbAddressType = math.MaxInt8
830
)
831

832
// upsertNodeAddresses updates the node's addresses in the database. This
833
// includes deleting any existing addresses and inserting the new set of
834
// addresses. The deletion is necessary since the ordering of the addresses may
835
// change, and we need to ensure that the database reflects the latest set of
836
// addresses so that at the time of reconstructing the node announcement, the
837
// order is preserved and the signature over the message remains valid.
838
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
839
        addresses []net.Addr) error {
×
840

×
841
        // Delete any existing addresses for the node. This is required since
×
842
        // even if the new set of addresses is the same, the ordering may have
×
843
        // changed for a given address type.
×
844
        err := db.DeleteNodeAddresses(ctx, nodeID)
×
845
        if err != nil {
×
846
                return fmt.Errorf("unable to delete node(%d) addresses: %w",
×
847
                        nodeID, err)
×
848
        }
×
849

850
        // Copy the nodes latest set of addresses.
851
        newAddresses := map[dbAddressType][]string{
×
852
                addressTypeIPv4:   {},
×
853
                addressTypeIPv6:   {},
×
854
                addressTypeTorV2:  {},
×
855
                addressTypeTorV3:  {},
×
856
                addressTypeOpaque: {},
×
857
        }
×
858
        addAddr := func(t dbAddressType, addr net.Addr) {
×
859
                newAddresses[t] = append(newAddresses[t], addr.String())
×
860
        }
×
861

862
        for _, address := range addresses {
×
863
                switch addr := address.(type) {
×
864
                case *net.TCPAddr:
×
865
                        if ip4 := addr.IP.To4(); ip4 != nil {
×
866
                                addAddr(addressTypeIPv4, addr)
×
867
                        } else if ip6 := addr.IP.To16(); ip6 != nil {
×
868
                                addAddr(addressTypeIPv6, addr)
×
869
                        } else {
×
870
                                return fmt.Errorf("unhandled IP address: %v",
×
871
                                        addr)
×
872
                        }
×
873

874
                case *tor.OnionAddr:
×
875
                        switch len(addr.OnionService) {
×
876
                        case tor.V2Len:
×
877
                                addAddr(addressTypeTorV2, addr)
×
878
                        case tor.V3Len:
×
879
                                addAddr(addressTypeTorV3, addr)
×
880
                        default:
×
881
                                return fmt.Errorf("invalid length for a tor " +
×
882
                                        "address")
×
883
                        }
884

885
                case *lnwire.OpaqueAddrs:
×
886
                        addAddr(addressTypeOpaque, addr)
×
887

888
                default:
×
889
                        return fmt.Errorf("unhandled address type: %T", addr)
×
890
                }
891
        }
892

893
        // Any remaining entries in newAddresses are new addresses that need to
894
        // be added to the database for the first time.
895
        for addrType, addrList := range newAddresses {
×
896
                for position, addr := range addrList {
×
897
                        err := db.InsertNodeAddress(
×
898
                                ctx, sqlc.InsertNodeAddressParams{
×
899
                                        NodeID:   nodeID,
×
900
                                        Type:     int16(addrType),
×
901
                                        Address:  addr,
×
902
                                        Position: int32(position),
×
903
                                },
×
904
                        )
×
905
                        if err != nil {
×
906
                                return fmt.Errorf("unable to insert "+
×
907
                                        "node(%d) address(%v): %w", nodeID,
×
908
                                        addr, err)
×
909
                        }
×
910
                }
911
        }
912

913
        return nil
×
914
}
915

916
// getNodeAddresses fetches the addresses for a node with the given public key.
917
func getNodeAddresses(ctx context.Context, db SQLQueries, nodePub []byte) (bool,
918
        []net.Addr, error) {
×
919

×
920
        // GetNodeAddressesByPubKey ensures that the addresses for a given type
×
921
        // are returned in the same order as they were inserted.
×
922
        rows, err := db.GetNodeAddressesByPubKey(
×
923
                ctx, sqlc.GetNodeAddressesByPubKeyParams{
×
924
                        Version: int16(ProtocolV1),
×
925
                        PubKey:  nodePub,
×
926
                },
×
927
        )
×
928
        if err != nil {
×
929
                return false, nil, err
×
930
        }
×
931

932
        // GetNodeAddressesByPubKey uses a left join so there should always be
933
        // at least one row returned if the node exists even if it has no
934
        // addresses.
935
        if len(rows) == 0 {
×
936
                return false, nil, nil
×
937
        }
×
938

939
        addresses := make([]net.Addr, 0, len(rows))
×
940
        for _, addr := range rows {
×
941
                if !(addr.Type.Valid && addr.Address.Valid) {
×
942
                        continue
×
943
                }
944

945
                address := addr.Address.String
×
946

×
947
                switch dbAddressType(addr.Type.Int16) {
×
948
                case addressTypeIPv4:
×
949
                        tcp, err := net.ResolveTCPAddr("tcp4", address)
×
950
                        if err != nil {
×
951
                                return false, nil, nil
×
952
                        }
×
953
                        tcp.IP = tcp.IP.To4()
×
954

×
955
                        addresses = append(addresses, tcp)
×
956

957
                case addressTypeIPv6:
×
958
                        tcp, err := net.ResolveTCPAddr("tcp6", address)
×
959
                        if err != nil {
×
960
                                return false, nil, nil
×
961
                        }
×
962
                        addresses = append(addresses, tcp)
×
963

964
                case addressTypeTorV3, addressTypeTorV2:
×
965
                        service, portStr, err := net.SplitHostPort(address)
×
966
                        if err != nil {
×
967
                                return false, nil, fmt.Errorf("unable to "+
×
968
                                        "split tor v3 address: %v",
×
969
                                        addr.Address)
×
970
                        }
×
971

972
                        port, err := strconv.Atoi(portStr)
×
973
                        if err != nil {
×
974
                                return false, nil, err
×
975
                        }
×
976

977
                        addresses = append(addresses, &tor.OnionAddr{
×
978
                                OnionService: service,
×
979
                                Port:         port,
×
980
                        })
×
981

982
                case addressTypeOpaque:
×
983
                        opaque, err := hex.DecodeString(address)
×
984
                        if err != nil {
×
985
                                return false, nil, fmt.Errorf("unable to "+
×
986
                                        "decode opaque address: %v", addr)
×
987
                        }
×
988

989
                        addresses = append(addresses, &lnwire.OpaqueAddrs{
×
990
                                Payload: opaque,
×
991
                        })
×
992

993
                default:
×
994
                        return false, nil, fmt.Errorf("unknown address "+
×
995
                                "type: %v", addr.Type)
×
996
                }
997
        }
998

999
        return true, addresses, nil
×
1000
}
1001

1002
// upsertNodeExtraSignedFields updates the node's extra signed fields in the
1003
// database. This includes updating any existing types, inserting any new types,
1004
// and deleting any types that are no longer present.
1005
func upsertNodeExtraSignedFields(ctx context.Context, db SQLQueries,
1006
        nodeID int64, extraFields map[uint64][]byte) error {
×
1007

×
1008
        // Get any existing extra signed fields for the node.
×
1009
        existingFields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
1010
        if err != nil {
×
1011
                return err
×
1012
        }
×
1013

1014
        // Make a lookup map of the existing field types so that we can use it
1015
        // to keep track of any fields we should delete.
1016
        m := make(map[uint64]bool)
×
1017
        for _, field := range existingFields {
×
1018
                m[uint64(field.Type)] = true
×
1019
        }
×
1020

1021
        // For all the new fields, we'll upsert them and remove them from the
1022
        // map of existing fields.
1023
        for tlvType, value := range extraFields {
×
1024
                err = db.UpsertNodeExtraType(
×
1025
                        ctx, sqlc.UpsertNodeExtraTypeParams{
×
1026
                                NodeID: nodeID,
×
1027
                                Type:   int64(tlvType),
×
1028
                                Value:  value,
×
1029
                        },
×
1030
                )
×
1031
                if err != nil {
×
1032
                        return fmt.Errorf("unable to upsert node(%d) extra "+
×
1033
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1034
                }
×
1035

1036
                // Remove the field from the map of existing fields if it was
1037
                // present.
1038
                delete(m, tlvType)
×
1039
        }
1040

1041
        // For all the fields that are left in the map of existing fields, we'll
1042
        // delete them as they are no longer present in the new set of fields.
1043
        for tlvType := range m {
×
1044
                err = db.DeleteExtraNodeType(
×
1045
                        ctx, sqlc.DeleteExtraNodeTypeParams{
×
1046
                                NodeID: nodeID,
×
1047
                                Type:   int64(tlvType),
×
1048
                        },
×
1049
                )
×
1050
                if err != nil {
×
1051
                        return fmt.Errorf("unable to delete node(%d) extra "+
×
1052
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1053
                }
×
1054
        }
1055

1056
        return nil
×
1057
}
1058

1059
// getSourceNode returns the DB node ID and pub key of the source node for the
1060
// specified protocol version.
1061
func getSourceNode(ctx context.Context, db SQLQueries,
1062
        version ProtocolVersion) (int64, route.Vertex, error) {
×
1063

×
1064
        var pubKey route.Vertex
×
1065

×
1066
        nodes, err := db.GetSourceNodesByVersion(ctx, int16(version))
×
1067
        if err != nil {
×
1068
                return 0, pubKey, fmt.Errorf("unable to fetch source node: %w",
×
1069
                        err)
×
1070
        }
×
1071

1072
        if len(nodes) == 0 {
×
1073
                return 0, pubKey, ErrSourceNodeNotSet
×
1074
        } else if len(nodes) > 1 {
×
1075
                return 0, pubKey, fmt.Errorf("multiple source nodes for "+
×
1076
                        "protocol %s found", version)
×
1077
        }
×
1078

1079
        copy(pubKey[:], nodes[0].PubKey)
×
1080

×
1081
        return nodes[0].NodeID, pubKey, nil
×
1082
}
1083

1084
// marshalExtraOpaqueData takes a flat byte slice parses it as a TLV stream.
1085
// This then produces a map from TLV type to value. If the input is not a
1086
// valid TLV stream, then an error is returned.
1087
func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
×
1088
        r := bytes.NewReader(data)
×
1089

×
1090
        tlvStream, err := tlv.NewStream()
×
1091
        if err != nil {
×
1092
                return nil, err
×
1093
        }
×
1094

1095
        // Since ExtraOpaqueData is provided by a potentially malicious peer,
1096
        // pass it into the P2P decoding variant.
1097
        parsedTypes, err := tlvStream.DecodeWithParsedTypesP2P(r)
×
1098
        if err != nil {
×
1099
                return nil, err
×
1100
        }
×
1101
        if len(parsedTypes) == 0 {
×
1102
                return nil, nil
×
1103
        }
×
1104

1105
        records := make(map[uint64][]byte)
×
1106
        for k, v := range parsedTypes {
×
1107
                records[uint64(k)] = v
×
1108
        }
×
1109

1110
        return records, nil
×
1111
}
1112

1113
// insertChannel inserts a new channel record into the database.
1114
func insertChannel(ctx context.Context, db SQLQueries,
NEW
1115
        edge *models.ChannelEdgeInfo) error {
×
NEW
1116

×
NEW
1117
        var chanIDB [8]byte
×
NEW
1118
        byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
×
NEW
1119

×
NEW
1120
        // Make sure that the channel doesn't already exist. We do this
×
NEW
1121
        // explicitly instead of relying on catching a unique constraint error
×
NEW
1122
        // because relying on SQL to throw that error would abort the entire
×
NEW
1123
        // batch of transactions.
×
NEW
1124
        _, err := db.GetChannelBySCID(
×
NEW
1125
                ctx, sqlc.GetChannelBySCIDParams{
×
NEW
1126
                        Scid:    chanIDB[:],
×
NEW
1127
                        Version: int16(ProtocolV1),
×
NEW
1128
                },
×
NEW
1129
        )
×
NEW
1130
        if err == nil {
×
NEW
1131
                return ErrEdgeAlreadyExist
×
NEW
1132
        } else if !errors.Is(err, sql.ErrNoRows) {
×
NEW
1133
                return fmt.Errorf("unable to fetch channel: %w", err)
×
NEW
1134
        }
×
1135

1136
        // Make sure that at least a "shell" entry for each node is present in
1137
        // the nodes table.
NEW
1138
        node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
×
NEW
1139
        if err != nil {
×
NEW
1140
                return fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1141
        }
×
1142

NEW
1143
        node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
×
NEW
1144
        if err != nil {
×
NEW
1145
                return fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1146
        }
×
1147

NEW
1148
        var capacity sql.NullInt64
×
NEW
1149
        if edge.Capacity != 0 {
×
NEW
1150
                capacity = sqldb.SQLInt64(int64(edge.Capacity))
×
NEW
1151
        }
×
1152

NEW
1153
        createParams := sqlc.CreateChannelParams{
×
NEW
1154
                Version:     int16(ProtocolV1),
×
NEW
1155
                Scid:        chanIDB[:],
×
NEW
1156
                NodeID1:     node1DBID,
×
NEW
1157
                NodeID2:     node2DBID,
×
NEW
1158
                Outpoint:    edge.ChannelPoint.String(),
×
NEW
1159
                Capacity:    capacity,
×
NEW
1160
                BitcoinKey1: edge.BitcoinKey1Bytes[:],
×
NEW
1161
                BitcoinKey2: edge.BitcoinKey2Bytes[:],
×
NEW
1162
        }
×
NEW
1163

×
NEW
1164
        if edge.AuthProof != nil {
×
NEW
1165
                proof := edge.AuthProof
×
NEW
1166

×
NEW
1167
                createParams.Node1Signature = proof.NodeSig1Bytes
×
NEW
1168
                createParams.Node2Signature = proof.NodeSig2Bytes
×
NEW
1169
                createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
×
NEW
1170
                createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
×
NEW
1171
        }
×
1172

1173
        // Insert the new channel record.
NEW
1174
        dbChanID, err := db.CreateChannel(ctx, createParams)
×
NEW
1175
        if err != nil {
×
NEW
1176
                return err
×
NEW
1177
        }
×
1178

1179
        // Insert any channel features.
NEW
1180
        if len(edge.Features) != 0 {
×
NEW
1181
                chanFeatures := lnwire.NewRawFeatureVector()
×
NEW
1182
                err := chanFeatures.Decode(bytes.NewReader(edge.Features))
×
NEW
1183
                if err != nil {
×
NEW
1184
                        return err
×
NEW
1185
                }
×
1186

NEW
1187
                fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features)
×
NEW
1188
                for feature := range fv.Features() {
×
NEW
1189
                        err = db.InsertChannelFeature(
×
NEW
1190
                                ctx, sqlc.InsertChannelFeatureParams{
×
NEW
1191
                                        ChannelID:  dbChanID,
×
NEW
1192
                                        FeatureBit: int32(feature),
×
NEW
1193
                                },
×
NEW
1194
                        )
×
NEW
1195
                        if err != nil {
×
NEW
1196
                                return fmt.Errorf("unable to insert "+
×
NEW
1197
                                        "channel(%d) feature(%v): %w", dbChanID,
×
NEW
1198
                                        feature, err)
×
NEW
1199
                        }
×
1200
                }
1201
        }
1202

1203
        // Finally, insert any extra TLV fields in the channel announcement.
NEW
1204
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
NEW
1205
        if err != nil {
×
NEW
1206
                return fmt.Errorf("unable to marshal extra opaque data: %w",
×
NEW
1207
                        err)
×
NEW
1208
        }
×
1209

NEW
1210
        for tlvType, value := range extra {
×
NEW
1211
                err := db.CreateChannelExtraType(
×
NEW
1212
                        ctx, sqlc.CreateChannelExtraTypeParams{
×
NEW
1213
                                ChannelID: dbChanID,
×
NEW
1214
                                Type:      int64(tlvType),
×
NEW
1215
                                Value:     value,
×
NEW
1216
                        },
×
NEW
1217
                )
×
NEW
1218
                if err != nil {
×
NEW
1219
                        return fmt.Errorf("unable to upsert channel(%d) extra "+
×
NEW
1220
                                "signed field(%v): %w", edge.ChannelID,
×
NEW
1221
                                tlvType, err)
×
NEW
1222
                }
×
1223
        }
1224

NEW
1225
        return nil
×
1226
}
1227

1228
// maybeCreateShellNode checks if a shell node entry exists for the
1229
// given public key. If it does not exist, then a new shell node entry is
1230
// created. The ID of the node is returned. A shell node only has a protocol
1231
// version and public key persisted.
1232
func maybeCreateShellNode(ctx context.Context, db SQLQueries,
NEW
1233
        pubKey route.Vertex) (int64, error) {
×
NEW
1234

×
NEW
1235
        dbNode, err := db.GetNodeByPubKey(
×
NEW
1236
                ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
1237
                        PubKey:  pubKey[:],
×
NEW
1238
                        Version: int16(ProtocolV1),
×
NEW
1239
                },
×
NEW
1240
        )
×
NEW
1241
        // The node exists. Return the ID.
×
NEW
1242
        if err == nil {
×
NEW
1243
                return dbNode.ID, nil
×
NEW
1244
        } else if !errors.Is(err, sql.ErrNoRows) {
×
NEW
1245
                return 0, err
×
NEW
1246
        }
×
1247

1248
        // Otherwise, the node does not exist, so we create a shell entry for
1249
        // it.
NEW
1250
        id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{
×
NEW
1251
                Version: int16(ProtocolV1),
×
NEW
1252
                PubKey:  pubKey[:],
×
NEW
1253
        })
×
NEW
1254
        if err != nil {
×
NEW
1255
                return 0, fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1256
        }
×
1257

NEW
1258
        return id, nil
×
1259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc