• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10477144192

20 Aug 2024 06:27PM UTC coverage: 58.586% (-0.2%) from 58.764%
10477144192

push

github

web-flow
Merge pull request #9007 from Roasbeef/go-1-22

build: set min build version to Go 1.22.6

38 of 71 new or added lines in 9 files covered. (53.52%)

458 existing lines in 33 files now uncovered.

126108 of 215251 relevant lines covered (58.59%)

28481.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/lntest/node/harness_node.go
1
package node
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "encoding/json"
9
        "fmt"
10
        "io"
11
        "os"
12
        "os/exec"
13
        "path/filepath"
14
        "strings"
15
        "testing"
16
        "time"
17

18
        "github.com/jackc/pgx/v4/pgxpool"
19
        "github.com/lightningnetwork/lnd"
20
        "github.com/lightningnetwork/lnd/lnrpc"
21
        "github.com/lightningnetwork/lnd/lntest/rpc"
22
        "github.com/lightningnetwork/lnd/lntest/wait"
23
        "github.com/lightningnetwork/lnd/macaroons"
24
        "google.golang.org/grpc"
25
        "google.golang.org/grpc/codes"
26
        "google.golang.org/grpc/credentials"
27
        "google.golang.org/grpc/status"
28
        "gopkg.in/macaroon.v2"
29
)
30

31
const (
32
        // logPubKeyBytes is the number of bytes of the node's PubKey that will
33
        // be appended to the log file name. The whole PubKey is too long and
34
        // not really necessary to quickly identify what node produced which
35
        // log file.
36
        logPubKeyBytes = 4
37

38
        // trickleDelay is the amount of time in milliseconds between each
39
        // release of announcements by AuthenticatedGossiper to the network.
40
        trickleDelay = 50
41

42
        postgresDsn = "postgres://postgres:postgres@localhost:" +
43
                "6432/%s?sslmode=disable"
44

45
        // commitInterval specifies the maximum interval the graph database
46
        // will wait between attempting to flush a batch of modifications to
47
        // disk(db.batch-commit-interval).
48
        commitInterval = 10 * time.Millisecond
49
)
50

51
// HarnessNode represents an instance of lnd running within our test network
52
// harness. It's responsible for managing the lnd process, grpc connection, and
53
// wallet auth. A HarnessNode is built upon its rpc clients, represented in
54
// `HarnessRPC`. It also has a `State` which holds its internal state, and a
55
// `Watcher` that keeps track of its topology updates.
56
type HarnessNode struct {
57
        *testing.T
58

59
        // Cfg holds the config values for the node.
60
        Cfg *BaseNodeConfig
61

62
        // RPC holds a list of RPC clients.
63
        RPC *rpc.HarnessRPC
64

65
        // State records the current state of the node.
66
        State *State
67

68
        // Watcher watches the node's topology updates.
69
        Watcher *nodeWatcher
70

71
        // PubKey is the serialized compressed identity public key of the node.
72
        // This field will only be populated once the node itself has been
73
        // started via the start() method.
74
        PubKey    [33]byte
75
        PubKeyStr string
76

77
        // conn is the underlying connection to the grpc endpoint of the node.
78
        conn *grpc.ClientConn
79

80
        // runCtx is a context with cancel method. It's used to signal when the
81
        // node needs to quit, and used as the parent context when spawning
82
        // children contexts for RPC requests.
83
        runCtx context.Context //nolint:containedctx
84
        cancel context.CancelFunc
85

86
        // filename is the log file's name.
87
        filename string
88

89
        cmd     *exec.Cmd
90
        logFile *os.File
91
}
92

93
// NewHarnessNode creates a new test lightning node instance from the passed
94
// config.
95
func NewHarnessNode(t *testing.T, cfg *BaseNodeConfig) (*HarnessNode, error) {
×
96
        if cfg.BaseDir == "" {
×
97
                var err error
×
98

×
99
                // Create a temporary directory for the node's data and logs.
×
100
                // Use dash suffix as a separator between base name and random
×
101
                // suffix.
×
102
                dirBaseName := fmt.Sprintf("lndtest-node-%s-", cfg.Name)
×
103
                cfg.BaseDir, err = os.MkdirTemp("", dirBaseName)
×
104
                if err != nil {
×
105
                        return nil, err
×
106
                }
×
107
        }
108
        cfg.DataDir = filepath.Join(cfg.BaseDir, "data")
×
109
        cfg.LogDir = filepath.Join(cfg.BaseDir, "logs")
×
110
        cfg.TLSCertPath = filepath.Join(cfg.BaseDir, "tls.cert")
×
111
        cfg.TLSKeyPath = filepath.Join(cfg.BaseDir, "tls.key")
×
112

×
113
        networkDir := filepath.Join(
×
114
                cfg.DataDir, "chain", lnd.BitcoinChainName, cfg.NetParams.Name,
×
115
        )
×
116
        cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon")
×
117
        cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon")
×
118
        cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon")
×
119

×
120
        cfg.GenerateListeningPorts()
×
121

×
122
        // Create temporary database.
×
123
        var dbName string
×
124
        if cfg.DBBackend == BackendPostgres {
×
125
                var err error
×
126
                dbName, err = createTempPgDB()
×
127
                if err != nil {
×
128
                        return nil, err
×
129
                }
×
130
                cfg.PostgresDsn = postgresDatabaseDsn(dbName)
×
131
        }
132

133
        cfg.OriginalExtraArgs = cfg.ExtraArgs
×
134
        cfg.postgresDBName = dbName
×
135

×
136
        return &HarnessNode{
×
137
                T:   t,
×
138
                Cfg: cfg,
×
139
        }, nil
×
140
}
141

142
// Initialize creates a list of new RPC clients using the passed connection,
143
// initializes the node's internal state and creates a topology watcher.
144
func (hn *HarnessNode) Initialize(c *grpc.ClientConn) {
×
145
        hn.conn = c
×
146

×
147
        // Init all the rpc clients.
×
148
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, c, hn.Name())
×
149

×
150
        // Init the node's state.
×
151
        //
×
152
        // If we already have a state, it means we are restarting the node and
×
153
        // we will only reset its internal states. Otherwise we'll create a new
×
154
        // state.
×
155
        if hn.State != nil {
×
156
                hn.State.resetEphermalStates(hn.RPC)
×
157
        } else {
×
158
                hn.State = newState(hn.RPC)
×
159
        }
×
160

161
        // Init the topology watcher.
162
        hn.Watcher = newNodeWatcher(hn.RPC, hn.State)
×
163
}
164

165
// Name returns the name of this node set during initialization.
166
func (hn *HarnessNode) Name() string {
×
167
        return hn.Cfg.Name
×
168
}
×
169

170
// UpdateState updates the node's internal state.
171
func (hn *HarnessNode) UpdateState() {
×
172
        hn.State.updateState()
×
173
}
×
174

175
// String gives the internal state of the node which is useful for debugging.
176
func (hn *HarnessNode) String() string {
×
177
        type nodeCfg struct {
×
178
                LogFilenamePrefix string
×
179
                ExtraArgs         []string
×
180
                SkipUnlock        bool
×
181
                Password          []byte
×
182
                P2PPort           int
×
183
                RPCPort           int
×
184
                RESTPort          int
×
185
                AcceptKeySend     bool
×
186
                FeeURL            string
×
187
        }
×
188

×
189
        nodeState := struct {
×
190
                NodeID  uint32
×
191
                Name    string
×
192
                PubKey  string
×
193
                State   *State
×
194
                NodeCfg nodeCfg
×
195
        }{
×
196
                NodeID: hn.Cfg.NodeID,
×
197
                Name:   hn.Cfg.Name,
×
198
                PubKey: hn.PubKeyStr,
×
199
                State:  hn.State,
×
200
                NodeCfg: nodeCfg{
×
201
                        SkipUnlock:        hn.Cfg.SkipUnlock,
×
202
                        Password:          hn.Cfg.Password,
×
203
                        LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
×
204
                        ExtraArgs:         hn.Cfg.ExtraArgs,
×
205
                        P2PPort:           hn.Cfg.P2PPort,
×
206
                        RPCPort:           hn.Cfg.RPCPort,
×
207
                        RESTPort:          hn.Cfg.RESTPort,
×
208
                },
×
209
        }
×
210

×
211
        stateBytes, err := json.MarshalIndent(nodeState, "", "\t")
×
212
        if err != nil {
×
213
                return fmt.Sprintf("\n encode node state with err: %v", err)
×
214
        }
×
215

216
        return fmt.Sprintf("\nnode state: %s", stateBytes)
×
217
}
218

219
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
220
func (hn *HarnessNode) WaitUntilStarted() error {
×
221
        return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
×
222
                return s != lnrpc.WalletState_WAITING_TO_START
×
223
        })
×
224
}
225

226
// WaitUntilServerActive waits until the lnd daemon is fully started.
227
func (hn *HarnessNode) WaitUntilServerActive() error {
×
228
        return hn.waitTillServerState(func(s lnrpc.WalletState) bool {
×
229
                return s == lnrpc.WalletState_SERVER_ACTIVE
×
230
        })
×
231
}
232

233
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
234
// connection and setting up the wallet unlocker client. This is needed when
235
// a node that has recently been started was waiting to become the leader and
236
// we're at the point when we expect that it is the leader now (awaiting
237
// unlock).
238
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
×
239
        var (
×
240
                conn    *grpc.ClientConn
×
241
                connErr error
×
242
        )
×
243

×
244
        if err := wait.NoError(func() error {
×
245
                conn, connErr = hn.ConnectRPCWithMacaroon(nil)
×
246
                return connErr
×
247
        }, timeout); err != nil {
×
248
                return err
×
249
        }
×
250

251
        // Since the conn is not authed, only the `WalletUnlocker` and `State`
252
        // clients can be inited from this conn.
253
        hn.conn = conn
×
254
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name())
×
255

×
256
        // Wait till the server is starting.
×
257
        return hn.WaitUntilStarted()
×
258
}
259

260
// Unlock attempts to unlock the wallet of the target HarnessNode. This method
261
// should be called after the restart of a HarnessNode that was created with a
262
// seed+password. Once this method returns, the HarnessNode will be ready to
263
// accept normal gRPC requests and harness command.
264
func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
×
265
        // Otherwise, we'll need to unlock the node before it's able to start
×
266
        // up properly.
×
267
        hn.RPC.UnlockWallet(unlockReq)
×
268

×
269
        // Now that the wallet has been unlocked, we'll wait for the RPC client
×
270
        // to be ready, then establish the normal gRPC connection.
×
271
        return hn.InitNode(nil)
×
272
}
×
273

274
// AddToLogf adds a line of choice to the node's logfile. This is useful
275
// to interleave test output with output from the node.
276
func (hn *HarnessNode) AddToLogf(format string, a ...interface{}) {
×
277
        // If this node was not set up with a log file, just return early.
×
278
        if hn.logFile == nil {
×
279
                return
×
280
        }
×
281

282
        desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
×
283
        if _, err := hn.logFile.WriteString(desc); err != nil {
×
284
                hn.printErrf("write to log err: %v", err)
×
285
        }
×
286
}
287

288
// ReadMacaroon waits a given duration for the macaroon file to be created. If
289
// the file is readable within the timeout, its content is de-serialized as a
290
// macaroon and returned.
291
func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) (
292
        *macaroon.Macaroon, error) {
×
293

×
294
        // Wait until macaroon file is created and has valid content before
×
295
        // using it.
×
296
        var mac *macaroon.Macaroon
×
297
        err := wait.NoError(func() error {
×
298
                macBytes, err := os.ReadFile(macPath)
×
299
                if err != nil {
×
300
                        return fmt.Errorf("error reading macaroon file: %w",
×
301
                                err)
×
302
                }
×
303

304
                newMac := &macaroon.Macaroon{}
×
305
                if err = newMac.UnmarshalBinary(macBytes); err != nil {
×
306
                        return fmt.Errorf("error unmarshalling macaroon "+
×
307
                                "file: %w", err)
×
308
                }
×
309
                mac = newMac
×
310

×
311
                return nil
×
312
        }, timeout)
313

314
        return mac, err
×
315
}
316

317
// ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to
318
// create a gRPC client connection.
319
func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
320
        *grpc.ClientConn, error) {
×
321

×
322
        // Wait until TLS certificate is created and has valid content before
×
323
        // using it, up to 30 sec.
×
324
        var tlsCreds credentials.TransportCredentials
×
325
        err := wait.NoError(func() error {
×
326
                var err error
×
327
                tlsCreds, err = credentials.NewClientTLSFromFile(
×
328
                        hn.Cfg.TLSCertPath, "",
×
329
                )
×
330
                return err
×
331
        }, wait.DefaultTimeout)
×
332
        if err != nil {
×
333
                return nil, fmt.Errorf("error reading TLS cert: %w", err)
×
334
        }
×
335

336
        opts := []grpc.DialOption{
×
337
                grpc.WithBlock(),
×
338
                grpc.WithTransportCredentials(tlsCreds),
×
339
        }
×
340

×
341
        ctx, cancel := context.WithTimeout(hn.runCtx, wait.DefaultTimeout)
×
342
        defer cancel()
×
343

×
344
        if mac == nil {
×
345
                return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
×
346
        }
×
347
        macCred, err := macaroons.NewMacaroonCredential(mac)
×
348
        if err != nil {
×
349
                return nil, fmt.Errorf("error cloning mac: %w", err)
×
350
        }
×
351
        opts = append(opts, grpc.WithPerRPCCredentials(macCred))
×
352

×
353
        return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...)
×
354
}
355

356
// ConnectRPC uses the TLS certificate and admin macaroon files written by the
357
// lnd node to create a gRPC client connection.
358
func (hn *HarnessNode) ConnectRPC() (*grpc.ClientConn, error) {
×
359
        // If we should use a macaroon, always take the admin macaroon as a
×
360
        // default.
×
361
        mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, wait.DefaultTimeout)
×
362
        if err != nil {
×
363
                return nil, err
×
364
        }
×
365

366
        return hn.ConnectRPCWithMacaroon(mac)
×
367
}
368

369
// SetExtraArgs assigns the ExtraArgs field for the node's configuration. The
370
// changes will take effect on restart.
371
func (hn *HarnessNode) SetExtraArgs(extraArgs []string) {
×
372
        hn.Cfg.ExtraArgs = extraArgs
×
373
}
×
374

375
// StartLndCmd handles the startup of lnd, creating log files, and possibly
376
// kills the process when needed.
377
func (hn *HarnessNode) StartLndCmd(ctxb context.Context) error {
×
378
        // Init the run context.
×
379
        hn.runCtx, hn.cancel = context.WithCancel(ctxb)
×
380

×
381
        args := hn.Cfg.GenArgs()
×
382
        hn.cmd = exec.Command(hn.Cfg.LndBinary, args...)
×
383

×
384
        // Redirect stderr output to buffer
×
385
        var errb bytes.Buffer
×
386
        hn.cmd.Stderr = &errb
×
387

×
388
        // If the logoutput flag is passed, redirect output from the nodes to
×
389
        // log files.
×
390
        if *logOutput {
×
391
                err := addLogFile(hn)
×
392
                if err != nil {
×
393
                        return err
×
394
                }
×
395
        }
396

397
        // Start the process.
398
        if err := hn.cmd.Start(); err != nil {
×
399
                return err
×
400
        }
×
401

402
        return nil
×
403
}
404

405
// StartWithNoAuth will start the lnd process, creates the grpc connection
406
// without macaroon auth, and waits until the server is reported as waiting to
407
// start.
408
//
409
// NOTE: caller needs to take extra step to create and unlock the wallet.
410
func (hn *HarnessNode) StartWithNoAuth(ctxt context.Context) error {
×
411
        // Start lnd process and prepare logs.
×
412
        if err := hn.StartLndCmd(ctxt); err != nil {
×
413
                return fmt.Errorf("start lnd error: %w", err)
×
414
        }
×
415

416
        // Create an unauthed connection.
417
        conn, err := hn.ConnectRPCWithMacaroon(nil)
×
418
        if err != nil {
×
419
                return fmt.Errorf("ConnectRPCWithMacaroon err: %w", err)
×
420
        }
×
421

422
        // Since the conn is not authed, only the `WalletUnlocker` and `State`
423
        // clients can be inited from this conn.
424
        hn.conn = conn
×
425
        hn.RPC = rpc.NewHarnessRPC(hn.runCtx, hn.T, conn, hn.Name())
×
426

×
427
        // Wait till the server is starting.
×
428
        return hn.WaitUntilStarted()
×
429
}
430

431
// Start will start the lnd process, creates the grpc connection, and waits
432
// until the server is fully started.
433
func (hn *HarnessNode) Start(ctxt context.Context) error {
×
434
        // Start lnd process and prepare logs.
×
435
        if err := hn.StartLndCmd(ctxt); err != nil {
×
436
                return fmt.Errorf("start lnd error: %w", err)
×
437
        }
×
438

439
        // Since Stop uses the LightningClient to stop the node, if we fail to
440
        // get a connected client, we have to kill the process.
441
        conn, err := hn.ConnectRPC()
×
442
        if err != nil {
×
443
                err = fmt.Errorf("ConnectRPC err: %w", err)
×
444
                cmdErr := hn.Kill()
×
445
                if cmdErr != nil {
×
446
                        err = fmt.Errorf("kill process got err: %w: %v",
×
447
                                cmdErr, err)
×
448
                }
×
449
                return err
×
450
        }
451

452
        // Init the node by creating the RPC clients, initializing node's
453
        // internal state and watcher.
454
        hn.Initialize(conn)
×
455

×
456
        // Wait till the server is starting.
×
457
        if err := hn.WaitUntilStarted(); err != nil {
×
458
                return fmt.Errorf("waiting for start got: %w", err)
×
459
        }
×
460

461
        // Subscribe for topology updates.
462
        return hn.initLightningClient()
×
463
}
464

465
// InitNode waits until the main gRPC server is detected as active, then
466
// complete the normal HarnessNode gRPC connection creation. A non-nil
467
// `macBytes` indicates the node is initialized stateless, otherwise it will
468
// use the admin macaroon.
469
func (hn *HarnessNode) InitNode(macBytes []byte) error {
×
470
        var (
×
471
                conn *grpc.ClientConn
×
472
                err  error
×
473
        )
×
474

×
475
        // If the node has been initialized stateless, we need to pass the
×
476
        // macaroon to the client.
×
477
        if macBytes != nil {
×
478
                adminMac := &macaroon.Macaroon{}
×
479
                err := adminMac.UnmarshalBinary(macBytes)
×
480
                if err != nil {
×
481
                        return fmt.Errorf("unmarshal failed: %w", err)
×
482
                }
×
483
                conn, err = hn.ConnectRPCWithMacaroon(adminMac)
×
484
                if err != nil {
×
485
                        return err
×
486
                }
×
487
        } else {
×
488
                // Normal initialization, we expect a macaroon to be in the
×
489
                // file system.
×
490
                conn, err = hn.ConnectRPC()
×
491
                if err != nil {
×
492
                        return err
×
493
                }
×
494
        }
495

496
        // Init the node by creating the RPC clients, initializing node's
497
        // internal state and watcher.
498
        hn.Initialize(conn)
×
499

×
500
        // Wait till the server is starting.
×
501
        if err := hn.WaitUntilStarted(); err != nil {
×
502
                return fmt.Errorf("waiting for start got: %w", err)
×
503
        }
×
504

505
        return hn.initLightningClient()
×
506
}
507

508
// InitChangePassword initializes a harness node by passing the change password
509
// request via RPC. After the request is submitted, this method will block until
510
// a macaroon-authenticated RPC connection can be established to the harness
511
// node. Once established, the new connection is used to initialize the
512
// RPC clients and subscribes the HarnessNode to topology changes.
513
func (hn *HarnessNode) ChangePasswordAndInit(
514
        req *lnrpc.ChangePasswordRequest) (
515
        *lnrpc.ChangePasswordResponse, error) {
×
516

×
517
        response := hn.RPC.ChangePassword(req)
×
518
        return response, hn.InitNode(response.AdminMacaroon)
×
519
}
×
520

521
// waitTillServerState makes a subscription to the server's state change and
522
// blocks until the server is in the targeted state.
523
func (hn *HarnessNode) waitTillServerState(
524
        predicate func(state lnrpc.WalletState) bool) error {
×
525

×
526
        client := hn.RPC.SubscribeState()
×
527

×
528
        errChan := make(chan error, 1)
×
529
        done := make(chan struct{})
×
530
        go func() {
×
531
                for {
×
532
                        resp, err := client.Recv()
×
533
                        if err != nil {
×
534
                                errChan <- err
×
535
                                return
×
536
                        }
×
537

538
                        if predicate(resp.State) {
×
539
                                close(done)
×
540
                                return
×
541
                        }
×
542
                }
543
        }()
544

545
        for {
×
546
                select {
×
547
                case <-time.After(wait.NodeStartTimeout):
×
548
                        return fmt.Errorf("timeout waiting for server state")
×
549
                case err := <-errChan:
×
550
                        return fmt.Errorf("receive server state err: %w", err)
×
551

552
                case <-done:
×
553
                        return nil
×
554
                }
555
        }
556
}
557

558
// initLightningClient blocks until the lnd server is fully started and
559
// subscribes the harness node to graph topology updates. This method also
560
// spawns a lightning network watcher for this node, which watches for topology
561
// changes.
562
func (hn *HarnessNode) initLightningClient() error {
×
563
        // Wait until the server is fully started.
×
564
        if err := hn.WaitUntilServerActive(); err != nil {
×
565
                return fmt.Errorf("waiting for server active: %w", err)
×
566
        }
×
567

568
        // Set the harness node's pubkey to what the node claims in GetInfo.
569
        // The RPC must have been started at this point.
570
        if err := hn.attachPubKey(); err != nil {
×
571
                return err
×
572
        }
×
573

574
        // Launch the watcher that will hook into graph related topology change
575
        // from the PoV of this node.
576
        started := make(chan error, 1)
×
577
        go hn.Watcher.topologyWatcher(hn.runCtx, started)
×
578

×
579
        select {
×
580
        // First time reading the channel indicates the topology client is
581
        // started.
582
        case err := <-started:
×
583
                if err != nil {
×
584
                        return fmt.Errorf("create topology client stream "+
×
585
                                "got err: %v", err)
×
586
                }
×
587

588
        case <-time.After(wait.DefaultTimeout):
×
589
                return fmt.Errorf("timeout creating topology client stream")
×
590
        }
591

592
        // Catch topology client stream error inside a goroutine.
593
        go func() {
×
594
                select {
×
595
                case err := <-started:
×
596
                        hn.printErrf("topology client: %v", err)
×
597

598
                case <-hn.runCtx.Done():
×
599
                }
600
        }()
601

602
        return nil
×
603
}
604

605
// attachPubKey queries an unlocked node to retrieve its public key.
606
func (hn *HarnessNode) attachPubKey() error {
×
607
        // Obtain the lnid of this node for quick identification purposes.
×
608
        info := hn.RPC.GetInfo()
×
609
        hn.PubKeyStr = info.IdentityPubkey
×
610

×
611
        pubkey, err := hex.DecodeString(info.IdentityPubkey)
×
612
        if err != nil {
×
613
                return err
×
614
        }
×
615
        copy(hn.PubKey[:], pubkey)
×
616

×
617
        return nil
×
618
}
619

620
// cleanup cleans up all the temporary files created by the node's process.
621
func (hn *HarnessNode) cleanup() error {
×
622
        if hn.Cfg.backupDBDir != "" {
×
623
                err := os.RemoveAll(hn.Cfg.backupDBDir)
×
624
                if err != nil {
×
625
                        return fmt.Errorf("unable to remove backup dir: %w",
×
626
                                err)
×
627
                }
×
628
        }
629

630
        return os.RemoveAll(hn.Cfg.BaseDir)
×
631
}
632

633
// waitForProcessExit Launch a new goroutine which that bubbles up any
634
// potential fatal process errors to the goroutine running the tests.
635
func (hn *HarnessNode) WaitForProcessExit() error {
×
636
        var err error
×
637

×
638
        errChan := make(chan error, 1)
×
639
        go func() {
×
640
                err = hn.cmd.Wait()
×
641
                errChan <- err
×
642
        }()
×
643

644
        select {
×
645
        case err := <-errChan:
×
646
                if err == nil {
×
647
                        break
×
648
                }
649

650
                // If the process has already been canceled, we can exit early
651
                // as the logs have already been saved.
652
                if strings.Contains(err.Error(), "Wait was already called") {
×
653
                        return nil
×
654
                }
×
655

656
                // Otherwise, we print the error, break the select and save
657
                // logs.
658
                hn.printErrf("wait process exit got err: %v", err)
×
659

×
660
                break
×
661

662
        case <-time.After(wait.DefaultTimeout):
×
NEW
663
                hn.printErrf("timeout waiting for process to exit")
×
664
        }
665

666
        // Make sure log file is closed and renamed if necessary.
667
        finalizeLogfile(hn)
×
668

×
669
        // Rename the etcd.log file if the node was running on embedded
×
670
        // etcd.
×
671
        finalizeEtcdLog(hn)
×
672

×
673
        return err
×
674
}
675

676
// Stop attempts to stop the active lnd process.
677
func (hn *HarnessNode) Stop() error {
×
678
        // Do nothing if the process is not running.
×
679
        if hn.runCtx == nil {
×
680
                hn.printErrf("found nil run context")
×
681
                return nil
×
682
        }
×
683

684
        // Stop the runCtx.
685
        hn.cancel()
×
686

×
687
        // If we ever reaches the state where `Watcher` is initialized, it
×
688
        // means the node has an authed connection and all its RPC clients are
×
689
        // ready for use. Thus we will try to stop it via the RPC.
×
690
        if hn.Watcher != nil {
×
691
                // Don't watch for error because sometimes the RPC connection
×
692
                // gets closed before a response is returned.
×
693
                req := lnrpc.StopRequest{}
×
694

×
695
                ctxt, cancel := context.WithCancel(context.Background())
×
696
                defer cancel()
×
697

×
698
                err := wait.NoError(func() error {
×
699
                        _, err := hn.RPC.LN.StopDaemon(ctxt, &req)
×
700

×
701
                        switch {
×
702
                        case err == nil:
×
703
                                return nil
×
704

705
                        // Try again if a recovery/rescan is in progress.
706
                        case strings.Contains(
707
                                err.Error(), "recovery in progress",
708
                        ):
×
709
                                return err
×
710

711
                        default:
×
712
                                return nil
×
713
                        }
714
                }, wait.DefaultTimeout)
715
                if err != nil {
×
716
                        return err
×
717
                }
×
718

719
                // Wait for goroutines to be finished.
720
                done := make(chan struct{})
×
721
                go func() {
×
722
                        hn.Watcher.wg.Wait()
×
723
                        close(done)
×
724
                }()
×
725

726
                // If the goroutines fail to finish before timeout, we'll print
727
                // the error to console and continue.
728
                select {
×
729
                case <-time.After(wait.DefaultTimeout):
×
730
                        hn.printErrf("timeout on wait group")
×
731
                case <-done:
×
732
                }
733
        } else {
×
734
                // If the rpc clients are not initiated, we'd kill the process
×
735
                // manually.
×
736
                hn.printErrf("found nil RPC clients")
×
737
                if err := hn.Kill(); err != nil {
×
738
                        // Skip the error if the process is already dead.
×
739
                        if !strings.Contains(
×
740
                                err.Error(), "process already finished",
×
741
                        ) {
×
742

×
743
                                return fmt.Errorf("killing process got: %w",
×
744
                                        err)
×
745
                        }
×
746
                }
747
        }
748

749
        // Close any attempts at further grpc connections.
750
        if hn.conn != nil {
×
751
                if err := hn.CloseConn(); err != nil {
×
752
                        return err
×
753
                }
×
754
        }
755

756
        // Wait for lnd process to exit in the end.
757
        return hn.WaitForProcessExit()
×
758
}
759

760
// CloseConn closes the grpc connection.
761
func (hn *HarnessNode) CloseConn() error {
×
762
        err := status.Code(hn.conn.Close())
×
763
        switch err {
×
764
        case codes.OK:
×
765
                return nil
×
766

767
        // When the context is canceled above, we might get the
768
        // following error as the context is no longer active.
769
        case codes.Canceled:
×
770
                return nil
×
771

772
        case codes.Unknown:
×
773
                return fmt.Errorf("unknown error attempting to stop "+
×
774
                        "grpc client: %v", err)
×
775

776
        default:
×
777
                return fmt.Errorf("error attempting to stop "+
×
778
                        "grpc client: %v", err)
×
779
        }
780
}
781

782
// Shutdown stops the active lnd process and cleans up any temporary
783
// directories created along the way.
784
func (hn *HarnessNode) Shutdown() error {
×
785
        if err := hn.Stop(); err != nil {
×
786
                return err
×
787
        }
×
788
        if err := hn.cleanup(); err != nil {
×
789
                return err
×
790
        }
×
791
        return nil
×
792
}
793

794
// Kill kills the lnd process.
795
func (hn *HarnessNode) Kill() error {
×
796
        return hn.cmd.Process.Kill()
×
797
}
×
798

799
// printErrf prints an error to the console.
800
func (hn *HarnessNode) printErrf(format string, a ...interface{}) {
×
801
        fmt.Printf("itest error from [%s:%s]: %s\n", //nolint:forbidigo
×
802
                hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
×
803
                fmt.Sprintf(format, a...))
×
804
}
×
805

806
// BackupDB creates a backup of the current database.
807
func (hn *HarnessNode) BackupDB() error {
×
808
        if hn.Cfg.backupDBDir != "" {
×
809
                return fmt.Errorf("backup already created")
×
810
        }
×
811

812
        if hn.Cfg.postgresDBName != "" {
×
813
                // Backup database.
×
814
                backupDBName := hn.Cfg.postgresDBName + "_backup"
×
815
                err := executePgQuery(
×
816
                        "CREATE DATABASE " + backupDBName + " WITH TEMPLATE " +
×
817
                                hn.Cfg.postgresDBName,
×
818
                )
×
819
                if err != nil {
×
820
                        return err
×
821
                }
×
822
        } else {
×
823
                // Backup files.
×
824
                tempDir, err := os.MkdirTemp("", "past-state")
×
825
                if err != nil {
×
826
                        return fmt.Errorf("unable to create temp db folder: %w",
×
827
                                err)
×
828
                }
×
829

830
                if err := copyAll(tempDir, hn.Cfg.DBDir()); err != nil {
×
831
                        return fmt.Errorf("unable to copy database files: %w",
×
832
                                err)
×
833
                }
×
834

835
                hn.Cfg.backupDBDir = tempDir
×
836
        }
837

838
        return nil
×
839
}
840

841
// RestoreDB restores a database backup.
842
func (hn *HarnessNode) RestoreDB() error {
×
843
        if hn.Cfg.postgresDBName != "" {
×
844
                // Restore database.
×
845
                backupDBName := hn.Cfg.postgresDBName + "_backup"
×
846
                err := executePgQuery(
×
847
                        "DROP DATABASE " + hn.Cfg.postgresDBName,
×
848
                )
×
849
                if err != nil {
×
850
                        return err
×
851
                }
×
852
                err = executePgQuery(
×
853
                        "ALTER DATABASE " + backupDBName + " RENAME TO " +
×
854
                                hn.Cfg.postgresDBName,
×
855
                )
×
856
                if err != nil {
×
857
                        return err
×
858
                }
×
859
        } else {
×
860
                // Restore files.
×
861
                if hn.Cfg.backupDBDir == "" {
×
862
                        return fmt.Errorf("no database backup created")
×
863
                }
×
864

865
                err := copyAll(hn.Cfg.DBDir(), hn.Cfg.backupDBDir)
×
866
                if err != nil {
×
867
                        return fmt.Errorf("unable to copy database files: %w",
×
868
                                err)
×
869
                }
×
870

871
                if err := os.RemoveAll(hn.Cfg.backupDBDir); err != nil {
×
872
                        return fmt.Errorf("unable to remove backup dir: %w",
×
873
                                err)
×
874
                }
×
875
                hn.Cfg.backupDBDir = ""
×
876
        }
877

878
        return nil
×
879
}
880

881
// UpdateGlobalPolicy updates a node's global channel policy.
882
func (hn *HarnessNode) UpdateGlobalPolicy(policy *lnrpc.RoutingPolicy) {
×
883
        updateFeeReq := &lnrpc.PolicyUpdateRequest{
×
884
                BaseFeeMsat: policy.FeeBaseMsat,
×
885
                FeeRate: float64(policy.FeeRateMilliMsat) /
×
886
                        float64(1_000_000),
×
887
                TimeLockDelta: policy.TimeLockDelta,
×
888
                Scope:         &lnrpc.PolicyUpdateRequest_Global{Global: true},
×
889
                MaxHtlcMsat:   policy.MaxHtlcMsat,
×
890
        }
×
891
        hn.RPC.UpdateChannelPolicy(updateFeeReq)
×
892
}
×
893

894
func postgresDatabaseDsn(dbName string) string {
×
895
        return fmt.Sprintf(postgresDsn, dbName)
×
896
}
×
897

898
// createTempPgDB creates a temp postgres database.
899
func createTempPgDB() (string, error) {
×
900
        // Create random database name.
×
901
        randBytes := make([]byte, 8)
×
902
        _, err := rand.Read(randBytes)
×
903
        if err != nil {
×
904
                return "", err
×
905
        }
×
906
        dbName := "itest_" + hex.EncodeToString(randBytes)
×
907

×
908
        // Create database.
×
909
        err = executePgQuery("CREATE DATABASE " + dbName)
×
910
        if err != nil {
×
911
                return "", err
×
912
        }
×
913

914
        return dbName, nil
×
915
}
916

917
// executePgQuery executes a SQL statement in a postgres db.
918
func executePgQuery(query string) error {
×
919
        pool, err := pgxpool.Connect(
×
920
                context.Background(),
×
921
                postgresDatabaseDsn("postgres"),
×
922
        )
×
923
        if err != nil {
×
924
                return fmt.Errorf("unable to connect to database: %w", err)
×
925
        }
×
926
        defer pool.Close()
×
927

×
928
        _, err = pool.Exec(context.Background(), query)
×
929
        return err
×
930
}
931

932
// renameFile is a helper to rename (log) files created during integration
933
// tests.
934
func renameFile(fromFileName, toFileName string) {
×
935
        err := os.Rename(fromFileName, toFileName)
×
936
        if err != nil {
×
937
                fmt.Printf("could not rename %s to %s: %v\n", // nolint:forbidigo
×
938
                        fromFileName, toFileName, err)
×
939
        }
×
940
}
941

942
// getFinalizedLogFilePrefix returns the finalize log filename.
943
func getFinalizedLogFilePrefix(hn *HarnessNode) string {
×
944
        pubKeyHex := hex.EncodeToString(
×
945
                hn.PubKey[:logPubKeyBytes],
×
946
        )
×
947

×
948
        return fmt.Sprintf("%s/%d-%s-%s-%s", GetLogDir(), hn.Cfg.NodeID,
×
949
                hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, pubKeyHex)
×
950
}
×
951

952
// finalizeLogfile makes sure the log file cleanup function is initialized,
953
// even if no log file is created.
954
func finalizeLogfile(hn *HarnessNode) {
×
955
        // Exit early if there's no log file.
×
956
        if hn.logFile == nil {
×
957
                return
×
958
        }
×
959

960
        hn.logFile.Close()
×
961

×
962
        // If logoutput flag is not set, return early.
×
963
        if !*logOutput {
×
964
                return
×
965
        }
×
966

967
        newFileName := fmt.Sprintf("%v.log",
×
968
                getFinalizedLogFilePrefix(hn),
×
969
        )
×
970
        renameFile(hn.filename, newFileName)
×
971
}
972

973
// finalizeEtcdLog saves the etcd log files when test ends.
974
func finalizeEtcdLog(hn *HarnessNode) {
×
975
        // Exit early if this is not etcd backend.
×
976
        if hn.Cfg.DBBackend != BackendEtcd {
×
977
                return
×
978
        }
×
979

980
        etcdLogFileName := fmt.Sprintf("%s/etcd.log", hn.Cfg.LogDir)
×
981
        newEtcdLogFileName := fmt.Sprintf("%v-etcd.log",
×
982
                getFinalizedLogFilePrefix(hn),
×
983
        )
×
984

×
985
        renameFile(etcdLogFileName, newEtcdLogFileName)
×
986
}
987

988
// addLogFile creates log files used by this node.
989
func addLogFile(hn *HarnessNode) error {
×
990
        var fileName string
×
991

×
992
        dir := GetLogDir()
×
993
        fileName = fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.Cfg.NodeID,
×
994
                hn.Cfg.LogFilenamePrefix, hn.Cfg.Name,
×
995
                hex.EncodeToString(hn.PubKey[:logPubKeyBytes]))
×
996

×
997
        // If the node's PubKey is not yet initialized, create a temporary file
×
998
        // name. Later, after the PubKey has been initialized, the file can be
×
999
        // moved to its final name with the PubKey included.
×
1000
        if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) {
×
1001
                fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir,
×
1002
                        hn.Cfg.NodeID, hn.Cfg.LogFilenamePrefix,
×
1003
                        hn.Cfg.Name)
×
1004
        }
×
1005

1006
        // Create file if not exists, otherwise append.
1007
        file, err := os.OpenFile(fileName,
×
1008
                os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
×
1009
        if err != nil {
×
1010
                return err
×
1011
        }
×
1012

1013
        // Pass node's stderr to both errb and the file.
1014
        w := io.MultiWriter(hn.cmd.Stderr, file)
×
1015
        hn.cmd.Stderr = w
×
1016

×
1017
        // Pass the node's stdout only to the file.
×
1018
        hn.cmd.Stdout = file
×
1019

×
1020
        // Let the node keep a reference to this file, such that we can add to
×
1021
        // it if necessary.
×
1022
        hn.logFile = file
×
1023

×
1024
        hn.filename = fileName
×
1025

×
1026
        return nil
×
1027
}
1028

1029
// copyAll copies all files and directories from srcDir to dstDir recursively.
1030
// Note that this function does not support links.
1031
func copyAll(dstDir, srcDir string) error {
×
1032
        entries, err := os.ReadDir(srcDir)
×
1033
        if err != nil {
×
1034
                return err
×
1035
        }
×
1036

1037
        for _, entry := range entries {
×
1038
                srcPath := filepath.Join(srcDir, entry.Name())
×
1039
                dstPath := filepath.Join(dstDir, entry.Name())
×
1040

×
1041
                info, err := os.Stat(srcPath)
×
1042
                if err != nil {
×
1043
                        return err
×
1044
                }
×
1045

1046
                if info.IsDir() {
×
1047
                        err := os.Mkdir(dstPath, info.Mode())
×
1048
                        if err != nil && !os.IsExist(err) {
×
1049
                                return err
×
1050
                        }
×
1051

1052
                        err = copyAll(dstPath, srcPath)
×
1053
                        if err != nil {
×
1054
                                return err
×
1055
                        }
×
1056
                } else if err := CopyFile(dstPath, srcPath); err != nil {
×
1057
                        return err
×
1058
                }
×
1059
        }
1060

1061
        return nil
×
1062
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc