• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

koinos / koinos-p2p / 885

29 Aug 2023 05:22PM UTC coverage: 52.697% (-0.3%) from 53.0%
885

cron

travis-pro

web-flow
Merge pull request #285 from koinos/happy-fun-time

Bump libp2p and golang

9 of 9 new or added lines in 1 file covered. (100.0%)

1065 of 2021 relevant lines covered (52.7%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.84
/internal/node/node.go
1
package node
2

3
import (
4
        "context"
5
        "crypto/sha256"
6
        "encoding/base64"
7
        "encoding/binary"
8
        "encoding/hex"
9
        "errors"
10
        "io"
11
        "math/rand"
12
        "sync/atomic"
13
        "time"
14

15
        "filippo.io/keygen"
16

17
        log "github.com/koinos/koinos-log-golang"
18
        koinosmq "github.com/koinos/koinos-mq-golang"
19
        "github.com/koinos/koinos-p2p/internal/options"
20
        "github.com/koinos/koinos-p2p/internal/p2p"
21
        "github.com/koinos/koinos-p2p/internal/rpc"
22
        "github.com/koinos/koinos-proto-golang/koinos"
23
        "github.com/koinos/koinos-proto-golang/koinos/broadcast"
24
        prpc "github.com/koinos/koinos-proto-golang/koinos/rpc"
25
        rpcp2p "github.com/koinos/koinos-proto-golang/koinos/rpc/p2p"
26
        util "github.com/koinos/koinos-util-golang"
27

28
        libp2p "github.com/libp2p/go-libp2p"
29
        dht "github.com/libp2p/go-libp2p-kad-dht"
30
        pubsub "github.com/libp2p/go-libp2p-pubsub"
31
        pb "github.com/libp2p/go-libp2p-pubsub/pb"
32
        "github.com/libp2p/go-libp2p/core/crypto"
33
        "github.com/libp2p/go-libp2p/core/host"
34
        "github.com/libp2p/go-libp2p/core/network"
35
        "github.com/libp2p/go-libp2p/core/peer"
36
        "github.com/libp2p/go-libp2p/core/routing"
37
        multiaddr "github.com/multiformats/go-multiaddr"
38

39
        "google.golang.org/protobuf/proto"
40
)
41

42
// KoinosP2PNode is the core object representing
43
type KoinosP2PNode struct {
44
        Host              host.Host
45
        localRPC          rpc.LocalRPC
46
        Applicator        *p2p.Applicator
47
        Gossip            *p2p.KoinosGossip
48
        ConnectionManager *p2p.ConnectionManager
49
        PeerErrorHandler  *p2p.PeerErrorHandler
50
        GossipToggle      *p2p.GossipToggle
51
        TransactionCache  *p2p.TransactionCache
52
        libValue          atomic.Value
53

54
        PeerErrorChan      chan p2p.PeerError
55
        DisconnectPeerChan chan peer.ID
56

57
        Options options.NodeOptions
58
}
59

60
const (
61
        transactionCacheDuration = 5 * time.Minute
62
        pubsubTimeCacheDuration  = 30 * time.Second
63
        gossipHeartbeatInterval  = 1 * time.Second
64
        gossipIWantFollowupTime  = 3 * time.Second
65
        gossipHistoryLength      = 5
66
        gossipHistoryGossip      = 3
67
)
68

69
// NewKoinosP2PNode creates a libp2p node object listening on the given multiaddress
70
// uses secio encryption on the wire
71
// listenAddr is a multiaddress string on which to listen
72
// seed is the random seed to use for key generation. Use 0 for a random seed.
73
func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.LocalRPC, requestHandler *koinosmq.RequestHandler, seed string, config *options.Config) (*KoinosP2PNode, error) {
2✔
74
        privateKey, err := generatePrivateKey(seed)
2✔
75
        if err != nil {
2✔
76
                return nil, err
×
77
        }
×
78

79
        node := new(KoinosP2PNode)
2✔
80

2✔
81
        node.Options = config.NodeOptions
2✔
82
        node.PeerErrorChan = make(chan p2p.PeerError)
2✔
83
        node.DisconnectPeerChan = make(chan peer.ID)
2✔
84

2✔
85
        node.PeerErrorHandler = p2p.NewPeerErrorHandler(
2✔
86
                node.DisconnectPeerChan,
2✔
87
                node.PeerErrorChan,
2✔
88
                config.PeerErrorHandlerOptions)
2✔
89

2✔
90
        var idht *dht.IpfsDHT
2✔
91

2✔
92
        options := []libp2p.Option{
2✔
93
                libp2p.ListenAddrStrings(listenAddr),
2✔
94
                libp2p.Identity(privateKey),
2✔
95
                // Attempt to open ports using uPNP for NATed hosts.
2✔
96
                libp2p.NATPortMap(),
2✔
97
                // Let this host use the DHT to find other hosts
2✔
98
                libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
4✔
99
                        idht, err = dht.New(ctx, h)
2✔
100
                        return idht, err
2✔
101
                }),
2✔
102
                // Let this host use relays and advertise itself on relays if
103
                // it finds it is behind NAT. Use libp2p.Relay(options...) to
104
                // enable active relays and more.
105
                libp2p.EnableAutoRelayWithStaticRelays(config.NodeOptions.InitialPeers),
106
                // Enable NAT hole punching
107
                libp2p.EnableHolePunching(),
108
                // If you want to help other peers to figure out if they are behind
109
                // NATs, you can launch the server-side of AutoNAT too (AutoRelay
110
                // already runs the client)
111
                //
112
                // This service is highly rate-limited and should not cause any
113
                // performance issues.
114
                libp2p.EnableNATService(),
115
                libp2p.ConnectionGater(node.PeerErrorHandler),
116
        }
117

118
        host, err := libp2p.New(options...)
2✔
119
        if err != nil {
3✔
120
                return nil, err
1✔
121
        }
1✔
122

123
        node.Host = host
2✔
124
        node.localRPC = localRPC
2✔
125

2✔
126
        if requestHandler != nil {
2✔
127
                requestHandler.SetBroadcastHandler("koinos.block.accept", node.handleBlockBroadcast)
×
128
                requestHandler.SetBroadcastHandler("koinos.mempool.accept", node.handleTransactionBroadcast)
×
129
                requestHandler.SetBroadcastHandler("koinos.block.forks", node.handleForkUpdate)
×
130
                requestHandler.SetRPCHandler("p2p", node.handleRPC)
×
131
        } else {
2✔
132
                log.Info("Starting P2P node without broadcast listeners")
2✔
133
        }
2✔
134

135
        pubsub.TimeCacheDuration = pubsubTimeCacheDuration
2✔
136
        gossipOpts := pubsub.DefaultGossipSubParams()
2✔
137
        gossipOpts.HeartbeatInterval = gossipHeartbeatInterval
2✔
138
        gossipOpts.IWantFollowupTime = gossipIWantFollowupTime
2✔
139
        gossipOpts.HistoryLength = gossipHistoryLength
2✔
140
        gossipOpts.HistoryGossip = gossipHistoryGossip
2✔
141
        ps, err := pubsub.NewGossipSub(
2✔
142
                ctx, node.Host,
2✔
143
                pubsub.WithMessageIdFn(generateMessageID),
2✔
144
                pubsub.WithPeerExchange(true),
2✔
145
                pubsub.WithPeerScore(
2✔
146
                        &pubsub.PeerScoreParams{
2✔
147
                                AppSpecificScore: func(p peer.ID) float64 {
3✔
148
                                        rawScore := float64(node.PeerErrorHandler.GetPeerErrorScore(ctx, p))
1✔
149
                                        return -rawScore + float64(node.PeerErrorHandler.GetOptions().ErrorScoreReconnectThreshold)
1✔
150
                                },
1✔
151
                                AppSpecificWeight: 1,
152
                                DecayInterval:     1 * time.Minute,
153
                                DecayToZero:       0.01,
154
                        },
155
                        &pubsub.PeerScoreThresholds{
156
                                GossipThreshold:             -1,
157
                                PublishThreshold:            -1,
158
                                GraylistThreshold:           -1,
159
                                AcceptPXThreshold:           5000,
160
                                OpportunisticGraftThreshold: .1,
161
                        },
162
                ),
163
                pubsub.WithMessageSignaturePolicy(pubsub.StrictSign),
164
        )
165
        if err != nil {
2✔
166
                return nil, err
×
167
        }
×
168

169
        node.TransactionCache = p2p.NewTransactionCache(transactionCacheDuration)
2✔
170

2✔
171
        node.Applicator, err = p2p.NewApplicator(
2✔
172
                ctx,
2✔
173
                node.localRPC,
2✔
174
                node.TransactionCache,
2✔
175
                config.ApplicatorOptions,
2✔
176
        )
2✔
177

2✔
178
        if err != nil {
2✔
179
                return nil, err
×
180
        }
×
181

182
        node.Gossip = p2p.NewKoinosGossip(
2✔
183
                ctx,
2✔
184
                node.localRPC,
2✔
185
                ps,
2✔
186
                node.PeerErrorChan,
2✔
187
                node.Host.ID(),
2✔
188
                node,
2✔
189
                node.Applicator)
2✔
190

2✔
191
        node.ConnectionManager = p2p.NewConnectionManager(
2✔
192
                node.Host,
2✔
193
                node.localRPC,
2✔
194
                &config.PeerConnectionOptions,
2✔
195
                node,
2✔
196
                node.Options.InitialPeers,
2✔
197
                node.PeerErrorChan,
2✔
198
                node.Applicator)
2✔
199

2✔
200
        node.GossipToggle = p2p.NewGossipToggle(
2✔
201
                node.Gossip,
2✔
202
                node.ConnectionManager,
2✔
203
                config.GossipToggleOptions)
2✔
204

2✔
205
        node.PeerErrorHandler.SetPeerAddressProvider(node.ConnectionManager)
2✔
206

2✔
207
        return node, nil
2✔
208
}
209

210
func (n *KoinosP2PNode) handleBlockBroadcast(topic string, data []byte) {
×
211
        log.Debug("Received koinos.block.accept broadcast")
×
212
        blockBroadcast := &broadcast.BlockAccepted{}
×
213
        err := proto.Unmarshal(data, blockBroadcast)
×
214
        if err != nil {
×
215
                log.Warnf("Unable to parse koinos.block.accept broadcast: %v", err.Error())
×
216
                return
×
217
        }
×
218

219
        go func() {
×
220
                if blockBroadcast.Head {
×
221
                        n.GossipToggle.UpdateHeadTime(blockBroadcast.Block.Header.Timestamp)
×
222
                }
×
223
        }()
224

225
        go func() {
×
226
                n.Applicator.HandleBlockBroadcast(blockBroadcast)
×
227
        }()
×
228

229
        // If gossip is enabled publish the block
230
        err = n.Gossip.PublishBlock(context.Background(), blockBroadcast.Block)
×
231
        if err != nil {
×
232
                log.Warnf("Unable to publish block from broadcast: %v", err.Error())
×
233
                return
×
234
        }
×
235
}
236

237
func (n *KoinosP2PNode) handleTransactionBroadcast(topic string, data []byte) {
×
238
        log.Debug("Received koinos.mempool.accept broadcast")
×
239
        trxBroadcast := &broadcast.MempoolAccepted{}
×
240
        err := proto.Unmarshal(data, trxBroadcast)
×
241
        if err != nil {
×
242
                log.Warnf("Unable to parse koinos.transaction.accept broadcast: %v", err.Error())
×
243
                return
×
244
        }
×
245

246
        // If gossip is enabled publish the transaction
247
        err = n.Gossip.PublishTransaction(context.Background(), trxBroadcast.Transaction)
×
248
        if err != nil {
×
249
                log.Warnf("Unable to publish transaction from broadcast: %v", err.Error())
×
250
                return
×
251
        }
×
252
}
253

254
func (n *KoinosP2PNode) handleForkUpdate(topic string, data []byte) {
×
255
        log.Debug("Received koinos.block.forks broadcast")
×
256
        forkHeads := &broadcast.ForkHeads{}
×
257
        err := proto.Unmarshal(data, forkHeads)
×
258
        if err != nil {
×
259
                log.Warnf("Unable to parse koinos.block.forks broadcast: %s", err.Error())
×
260
                return
×
261
        }
×
262

263
        go func() {
×
264
                n.Applicator.HandleForkHeads(forkHeads)
×
265
        }()
×
266

267
        n.libValue.Store(forkHeads.LastIrreversibleBlock)
×
268
}
269

270
func (n *KoinosP2PNode) handleRPC(rpcType string, data []byte) ([]byte, error) {
×
271
        req := &rpcp2p.P2PRequest{}
×
272
        resp := &rpcp2p.P2PResponse{}
×
273

×
274
        err := proto.Unmarshal(data, req)
×
275
        if err != nil {
×
276
                log.Warnf("Received malformed request: 0x%v", hex.EncodeToString(data))
×
277
                eResp := prpc.ErrorResponse{Message: err.Error()}
×
278
                rErr := rpcp2p.P2PResponse_Error{Error: &eResp}
×
279
                resp.Response = &rErr
×
280
        } else {
×
281
                log.Debugf("Received RPC request: 0x%v", hex.EncodeToString(data))
×
282
                resp = n.handleRequest(req)
×
283
        }
×
284

285
        var outputBytes []byte
×
286
        outputBytes, err = proto.Marshal(resp)
×
287

×
288
        return outputBytes, err
×
289
}
290

291
func (n *KoinosP2PNode) handleRequest(req *rpcp2p.P2PRequest) *rpcp2p.P2PResponse {
×
292
        response := rpcp2p.P2PResponse{}
×
293
        var err error
×
294

×
295
        if req.Request != nil {
×
296
                switch req.Request.(type) {
×
297
                case *rpcp2p.P2PRequest_GetGossipStatus:
×
298
                        result := rpcp2p.GetGossipStatusResponse{Enabled: n.GossipToggle.IsEnabled()}
×
299
                        respVal := rpcp2p.P2PResponse_GetGossipStatus{GetGossipStatus: &result}
×
300
                        response.Response = &respVal
×
301
                default:
×
302
                        err = errors.New("unknown request")
×
303
                }
304
        } else {
×
305
                err = errors.New("expected request was nil")
×
306
        }
×
307

308
        if err != nil {
×
309
                result := prpc.ErrorResponse{Message: err.Error()}
×
310
                respVal := rpcp2p.P2PResponse_Error{Error: &result}
×
311
                response.Response = &respVal
×
312
        }
×
313

314
        return &response
×
315
}
316

317
// PeerStringToAddress Creates a peer.AddrInfo object based on the given connection string
318
func (n *KoinosP2PNode) PeerStringToAddress(peerAddr string) (*peer.AddrInfo, error) {
×
319
        addr, err := multiaddr.NewMultiaddr(peerAddr)
×
320
        if err != nil {
×
321
                return nil, err
×
322
        }
×
323
        peer, err := peer.AddrInfoFromP2pAddr(addr)
×
324
        if err != nil {
×
325
                return nil, err
×
326
        }
×
327

328
        return peer, nil
×
329
}
330

331
// ConnectToPeerAddress connects to the given peer address
332
func (n *KoinosP2PNode) ConnectToPeerAddress(ctx context.Context, peer *peer.AddrInfo) error {
1✔
333
        return n.Host.Connect(ctx, *peer)
1✔
334
}
1✔
335

336
// GetConnections returns the host's current peer connections
337
func (n *KoinosP2PNode) GetConnections() []network.Conn {
×
338
        return n.Host.Network().Conns()
×
339
}
×
340

341
// GetAddressInfo returns the node's address info
342
func (n *KoinosP2PNode) GetAddressInfo() *peer.AddrInfo {
2✔
343
        return &peer.AddrInfo{
2✔
344
                ID:    n.Host.ID(),
2✔
345
                Addrs: n.Host.Addrs(),
2✔
346
        }
2✔
347
}
2✔
348

349
// GetAddress returns the peer multiaddress
350
func (n *KoinosP2PNode) GetAddress() multiaddr.Multiaddr {
2✔
351
        addrs, _ := peer.AddrInfoToP2pAddrs(n.GetAddressInfo())
2✔
352
        return addrs[0]
2✔
353
}
2✔
354

355
// GetLastIrreversibleBlock returns last irreversible block height and block id of connected node
356
func (n *KoinosP2PNode) GetLastIrreversibleBlock() *koinos.BlockTopology {
1✔
357
        return n.libValue.Load().(*koinos.BlockTopology)
1✔
358
}
1✔
359

360
// Close closes the node
361
func (n *KoinosP2PNode) Close() error {
2✔
362
        n.Gossip.EnableGossip(context.Background(), false)
2✔
363

2✔
364
        if err := n.Host.Close(); err != nil {
2✔
365
                return err
×
366
        }
×
367

368
        return nil
2✔
369
}
370

371
func (n *KoinosP2PNode) logConnectionsLoop(ctx context.Context) {
1✔
372
        for {
2✔
373
                select {
1✔
374
                case <-time.After(time.Minute * 1):
×
375
                        log.Info("My address:")
×
376
                        log.Infof(" - %s", n.GetAddress())
×
377
                        log.Info("Connected peers:")
×
378
                        for i, conn := range n.GetConnections() {
×
379
                                log.Infof(" - %s/p2p/%s", conn.RemoteMultiaddr(), conn.RemotePeer())
×
380
                                if i > 10 {
×
381
                                        log.Infof("   and %v more...", len(n.GetConnections())-i)
×
382
                                        break
×
383
                                }
384
                        }
385
                case <-ctx.Done():
×
386
                        return
×
387
                }
388
        }
389
}
390

391
// Start starts background goroutines
392
func (n *KoinosP2PNode) Start(ctx context.Context) {
1✔
393
        n.Host.Network().Notify(n.ConnectionManager)
1✔
394

1✔
395
        forkHeads, err := n.localRPC.GetForkHeads(ctx)
1✔
396

1✔
397
        for err != nil {
1✔
398
                forkHeads, err = n.localRPC.GetForkHeads(ctx)
×
399
        }
×
400

401
        n.libValue.Store(forkHeads.LastIrreversibleBlock)
1✔
402

1✔
403
        // Start peer gossip
1✔
404
        go n.logConnectionsLoop(ctx)
1✔
405
        n.PeerErrorHandler.Start(ctx)
1✔
406
        n.GossipToggle.Start(ctx)
1✔
407
        n.ConnectionManager.Start(ctx)
1✔
408
        n.Applicator.Start(ctx)
1✔
409

1✔
410
        go func() {
2✔
411
                for {
2✔
412
                        select {
1✔
413
                        case id := <-n.DisconnectPeerChan:
1✔
414
                                _ = n.Host.Network().ClosePeer(id)
1✔
415
                        case <-ctx.Done():
×
416
                                return
×
417
                        }
418
                }
419
        }()
420
}
421

422
// ----------------------------------------------------------------------------
423
// Utility Functions
424
// ----------------------------------------------------------------------------
425

426
func seedStringToInt64(seed string) int64 {
2✔
427
        // Hash the seed string
2✔
428
        h := sha256.New()
2✔
429
        h.Write([]byte(seed))
2✔
430
        sum := h.Sum(nil)
2✔
431

2✔
432
        return int64(binary.BigEndian.Uint64(sum[:8]))
2✔
433
}
2✔
434

435
func generateECDSAKeyPair(src io.Reader) (crypto.PrivKey, crypto.PubKey, error) {
2✔
436
        priv, err := keygen.ECDSALegacy(crypto.ECDSACurve, src)
2✔
437
        if err != nil {
2✔
438
                return nil, nil, err
×
439
        }
×
440

441
        return crypto.ECDSAKeyPairFromKey(priv)
2✔
442
}
443

444
func generatePrivateKey(seed string) (crypto.PrivKey, error) {
2✔
445
        // If blank seed, generate a new randomized seed
2✔
446
        if seed == "" {
3✔
447
                seed = util.GenerateBase58ID(8)
1✔
448
                log.Infof("Using random seed: %s", seed)
1✔
449
        }
1✔
450

451
        // Convert the seed to int64 and construct the random source
452
        iseed := seedStringToInt64(seed)
2✔
453
        r := rand.New(rand.NewSource(iseed))
2✔
454

2✔
455
        privateKey, _, err := generateECDSAKeyPair(r)
2✔
456
        if err != nil {
2✔
457
                return nil, err
×
458
        }
×
459

460
        return privateKey, nil
2✔
461
}
462

463
func generateMessageID(msg *pb.Message) string {
×
464
        // Use the default unique ID function for peer exchange
×
465
        switch *msg.Topic {
×
466
        case p2p.BlockTopicName, p2p.TransactionTopicName:
×
467
                // Hash the data
×
468
                h := sha256.New()
×
469
                h.Write(msg.Data)
×
470
                sum := h.Sum(nil)
×
471

×
472
                // Base-64 encode it for compactness
×
473
                return base64.RawStdEncoding.EncodeToString(sum)
×
474
        default:
×
475
                return pubsub.DefaultMsgIdFn(msg)
×
476
        }
477
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc