• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5133271154

31 May 2023 02:03PM UTC coverage: 67.271% (+0.4%) from 66.918%
5133271154

push

web-flow
chore(ci): add workflow for OSS build + unit tests (#8834)

## Problem

Previous commits broke OSS build. Since we didn't have a test for this,
it was undetected by CI.

## Solution

Add a CI job to verify that we can make the dgraph OSS build, and run
all dgraph OSS unit tests. We do this in a separate workflow because
this does not require a separate self-hosted runner (we can use github
hosted runners for this job).

See also: https://github.com/dgraph-io/dgraph/pull/8832

58407 of 86823 relevant lines covered (67.27%)

2245249.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.44
/conn/node.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package conn
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "encoding/binary"
24
        "fmt"
25
        "math/rand"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/golang/glog"
32
        "github.com/pkg/errors"
33
        "go.etcd.io/etcd/raft"
34
        "go.etcd.io/etcd/raft/raftpb"
35
        otrace "go.opencensus.io/trace"
36

37
        "github.com/dgraph-io/badger/v4/y"
38
        "github.com/dgraph-io/dgo/v230/protos/api"
39
        "github.com/dgraph-io/dgraph/protos/pb"
40
        "github.com/dgraph-io/dgraph/raftwal"
41
        "github.com/dgraph-io/dgraph/x"
42
        "github.com/dgraph-io/ristretto/z"
43
)
44

45
var (
46
        // ErrNoNode is returned when no node has been set up.
47
        ErrNoNode = errors.Errorf("No node has been set up yet")
48
)
49

50
// Node represents a node participating in the RAFT protocol.
51
type Node struct {
52
        x.SafeMutex
53

54
        // Applied is used to keep track of the applied RAFT proposals.
55
        // The stages are proposed -> committed (accepted by cluster) ->
56
        // applied (to PL) -> synced (to BadgerDB).
57
        // This needs to be 64 bit aligned for atomics to work on 32 bit machine.
58
        Applied y.WaterMark
59

60
        joinLock sync.Mutex
61

62
        // Used to keep track of lin read requests.
63
        requestCh chan linReadReq
64

65
        // SafeMutex is for fields which can be changed after init.
66
        _confState *raftpb.ConfState
67
        _raft      raft.Node
68

69
        // Fields which are never changed after init.
70
        StartTime       time.Time
71
        Cfg             *raft.Config
72
        MyAddr          string
73
        Id              uint64
74
        peers           map[uint64]string
75
        confChanges     map[uint64]chan error
76
        messages        chan sendmsg
77
        RaftContext     *pb.RaftContext
78
        Store           *raftwal.DiskStorage
79
        Rand            *rand.Rand
80
        tlsClientConfig *tls.Config
81

82
        Proposals proposals
83

84
        heartbeatsOut int64
85
        heartbeatsIn  int64
86
}
87

88
// NewNode returns a new Node instance.
89
func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node {
159✔
90
        snap, err := store.Snapshot()
159✔
91
        x.Check(err)
159✔
92

159✔
93
        n := &Node{
159✔
94
                StartTime: time.Now(),
159✔
95
                Id:        rc.Id,
159✔
96
                MyAddr:    rc.Addr,
159✔
97
                Store:     store,
159✔
98
                Cfg: &raft.Config{
159✔
99
                        ID:                       rc.Id,
159✔
100
                        ElectionTick:             20, // 2s if we call Tick() every 100 ms.
159✔
101
                        HeartbeatTick:            1,  // 100ms if we call Tick() every 100 ms.
159✔
102
                        Storage:                  store,
159✔
103
                        MaxInflightMsgs:          256,
159✔
104
                        MaxSizePerMsg:            256 << 10, // 256 KB should allow more batching.
159✔
105
                        MaxCommittedSizePerReady: 64 << 20,  // Avoid loading entire Raft log into memory.
159✔
106
                        // We don't need lease based reads. They cause issues because they
159✔
107
                        // require CheckQuorum to be true, and that causes a lot of issues
159✔
108
                        // for us during cluster bootstrapping and later. A seemingly
159✔
109
                        // healthy cluster would just cause leader to step down due to
159✔
110
                        // "inactive" quorum, and then disallow anyone from becoming leader.
159✔
111
                        // So, let's stick to default options.  Let's achieve correctness,
159✔
112
                        // then we achieve performance. Plus, for the Dgraph alphas, we'll
159✔
113
                        // be soon relying only on Timestamps for blocking reads and
159✔
114
                        // achieving linearizability, than checking quorums (Zero would
159✔
115
                        // still check quorums).
159✔
116
                        ReadOnlyOption: raft.ReadOnlySafe,
159✔
117
                        // When a disconnected node joins back, it forces a leader change,
159✔
118
                        // as it starts with a higher term, as described in Raft thesis (not
159✔
119
                        // the paper) in section 9.6. This setting can avoid that by only
159✔
120
                        // increasing the term, if the node has a good chance of becoming
159✔
121
                        // the leader.
159✔
122
                        PreVote: true,
159✔
123

159✔
124
                        // We can explicitly set Applied to the first index in the Raft log,
159✔
125
                        // so it does not derive it separately, thus avoiding a crash when
159✔
126
                        // the Applied is set to below snapshot index by Raft.
159✔
127
                        // In case this is a new Raft log, first would be 1, and therefore
159✔
128
                        // Applied would be zero, hence meeting the condition by the library
159✔
129
                        // that Applied should only be set during a restart.
159✔
130
                        //
159✔
131
                        // Update: Set the Applied to the latest snapshot, because it seems
159✔
132
                        // like somehow the first index can be out of sync with the latest
159✔
133
                        // snapshot.
159✔
134
                        Applied: snap.Metadata.Index,
159✔
135

159✔
136
                        Logger: &x.ToGlog{},
159✔
137
                },
159✔
138
                // processConfChange etc are not throttled so some extra delta, so that we don't
159✔
139
                // block tick when applyCh is full
159✔
140
                Applied:     y.WaterMark{Name: "Applied watermark"},
159✔
141
                RaftContext: rc,
159✔
142
                //nolint:gosec // random node id generator does not require cryptographic precision
159✔
143
                Rand:            rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
159✔
144
                confChanges:     make(map[uint64]chan error),
159✔
145
                messages:        make(chan sendmsg, 100),
159✔
146
                peers:           make(map[uint64]string),
159✔
147
                requestCh:       make(chan linReadReq, 100),
159✔
148
                tlsClientConfig: tlsConfig,
159✔
149
        }
159✔
150
        n.Applied.Init(nil)
159✔
151
        // This should match up to the Applied index set above.
159✔
152
        n.Applied.SetDoneUntil(n.Cfg.Applied)
159✔
153
        glog.Infof("Setting raft.Config to: %+v", n.Cfg)
159✔
154
        return n
159✔
155
}
159✔
156

157
// ReportRaftComms periodically prints the state of the node (heartbeats in and out).
158
func (n *Node) ReportRaftComms() {
158✔
159
        if !glog.V(3) {
313✔
160
                return
155✔
161
        }
155✔
162
        ticker := time.NewTicker(time.Second)
3✔
163
        defer ticker.Stop()
3✔
164

3✔
165
        for range ticker.C {
33✔
166
                out := atomic.SwapInt64(&n.heartbeatsOut, 0)
30✔
167
                in := atomic.SwapInt64(&n.heartbeatsIn, 0)
30✔
168
                glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
30✔
169
        }
30✔
170
}
171

172
// SetRaft would set the provided raft.Node to this node.
173
// It would check fail if the node is already set.
174
func (n *Node) SetRaft(r raft.Node) {
159✔
175
        n.Lock()
159✔
176
        defer n.Unlock()
159✔
177
        x.AssertTrue(n._raft == nil)
159✔
178
        n._raft = r
159✔
179
}
159✔
180

181
// Raft would return back the raft.Node stored in the node.
182
func (n *Node) Raft() raft.Node {
3,578,502✔
183
        n.RLock()
3,578,502✔
184
        defer n.RUnlock()
3,578,502✔
185
        return n._raft
3,578,502✔
186
}
3,578,502✔
187

188
// SetConfState would store the latest ConfState generated by ApplyConfChange.
189
func (n *Node) SetConfState(cs *raftpb.ConfState) {
264✔
190
        glog.Infof("Setting conf state to %+v\n", cs)
264✔
191
        n.Lock()
264✔
192
        defer n.Unlock()
264✔
193
        n._confState = cs
264✔
194
}
264✔
195

196
// DoneConfChange marks a configuration change as done and sends the given error to the
197
// config channel.
198
func (n *Node) DoneConfChange(id uint64, err error) {
263✔
199
        n.Lock()
263✔
200
        defer n.Unlock()
263✔
201
        ch, has := n.confChanges[id]
263✔
202
        if !has {
490✔
203
                return
227✔
204
        }
227✔
205
        delete(n.confChanges, id)
36✔
206
        ch <- err
36✔
207
}
208

209
//nolint:gosec // random node id generator does not require cryptographic precision
210
func (n *Node) storeConfChange(che chan error) uint64 {
37✔
211
        n.Lock()
37✔
212
        defer n.Unlock()
37✔
213
        id := rand.Uint64()
37✔
214
        _, has := n.confChanges[id]
37✔
215
        for has {
37✔
216
                id = rand.Uint64()
×
217
                _, has = n.confChanges[id]
×
218
        }
×
219
        n.confChanges[id] = che
37✔
220
        return id
37✔
221
}
222

223
// ConfState would return the latest ConfState stored in node.
224
func (n *Node) ConfState() *raftpb.ConfState {
8,086✔
225
        n.RLock()
8,086✔
226
        defer n.RUnlock()
8,086✔
227
        return n._confState
8,086✔
228
}
8,086✔
229

230
// Peer returns the address of the peer with the given id.
231
func (n *Node) Peer(pid uint64) (string, bool) {
653✔
232
        n.RLock()
653✔
233
        defer n.RUnlock()
653✔
234
        addr, ok := n.peers[pid]
653✔
235
        return addr, ok
653✔
236
}
653✔
237

238
// SetPeer sets the address of the peer with the given id. The address must not be empty.
239
func (n *Node) SetPeer(pid uint64, addr string) {
108✔
240
        x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
108✔
241
        n.Lock()
108✔
242
        defer n.Unlock()
108✔
243
        n.peers[pid] = addr
108✔
244
}
108✔
245

246
// Send sends the given RAFT message from this node.
247
func (n *Node) Send(msg *raftpb.Message) {
967,513✔
248
        x.AssertTruef(n.Id != msg.To, "Sending message to itself")
967,513✔
249
        data, err := msg.Marshal()
967,513✔
250
        x.Check(err)
967,513✔
251

967,513✔
252
        if glog.V(2) {
1,935,026✔
253
                switch msg.Type {
967,513✔
254
                case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
478,471✔
255
                        atomic.AddInt64(&n.heartbeatsOut, 1)
478,471✔
256
                case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
×
257
                case raftpb.MsgApp, raftpb.MsgAppResp:
488,951✔
258
                case raftpb.MsgProp:
57✔
259
                default:
34✔
260
                        glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
34✔
261
                }
262
        }
263
        // As long as leadership is stable, any attempted Propose() calls should be reflected in the
264
        // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
265
        // MsgProp to the leader. It is up to the transport layer to get those messages to their
266
        // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
267
        // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
268
        // leadership transitions, proposals may get dropped even if the network is reliable.
269
        //
270
        // We can't do a select default here. The messages must be sent to the channel, otherwise we
271
        // should block until the channel can accept these messages. BatchAndSendMessages would take
272
        // care of dropping messages which can't be sent due to network issues to the corresponding
273
        // node. But, we shouldn't take the liberty to do that here. It would take us more time to
274
        // repropose these dropped messages anyway, than to block here a bit waiting for the messages
275
        // channel to clear out.
276
        n.messages <- sendmsg{to: msg.To, data: data}
967,513✔
277
}
278

279
// Snapshot returns the current snapshot.
280
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
×
281
        if n == nil || n.Store == nil {
×
282
                return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store")
×
283
        }
×
284
        return n.Store.Snapshot()
×
285
}
286

287
// SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.
288
func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot) {
850,711✔
289
        for {
1,701,422✔
290
                if err := n.Store.Save(h, es, s); err != nil {
850,711✔
291
                        glog.Errorf("While trying to save Raft update: %v. Retrying...", err)
×
292
                } else {
850,711✔
293
                        return
850,711✔
294
                }
850,711✔
295
        }
296
}
297

298
// PastLife returns the index of the snapshot before the restart (if any) and whether there was
299
// a previous state that should be recovered after a restart.
300
func (n *Node) PastLife() (uint64, bool, error) {
158✔
301
        var (
158✔
302
                sp      raftpb.Snapshot
158✔
303
                idx     uint64
158✔
304
                restart bool
158✔
305
                rerr    error
158✔
306
        )
158✔
307
        sp, rerr = n.Store.Snapshot()
158✔
308
        if rerr != nil {
158✔
309
                return 0, false, rerr
×
310
        }
×
311
        if !raft.IsEmptySnap(sp) {
159✔
312
                glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
1✔
313
                restart = true
1✔
314
                idx = sp.Metadata.Index
1✔
315
        }
1✔
316

317
        var hd raftpb.HardState
158✔
318
        hd, rerr = n.Store.HardState()
158✔
319
        if rerr != nil {
158✔
320
                return 0, false, rerr
×
321
        }
×
322
        if !raft.IsEmptyHardState(hd) {
159✔
323
                glog.Infof("Found hardstate: %+v\n", hd)
1✔
324
                restart = true
1✔
325
        }
1✔
326

327
        num := n.Store.NumEntries()
158✔
328
        glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
158✔
329
        // We'll always have at least one entry.
158✔
330
        if num > 1 {
159✔
331
                restart = true
1✔
332
        }
1✔
333
        return idx, restart, nil
158✔
334
}
335

336
const (
337
        messageBatchSoftLimit = 10e6
338
)
339

340
type stream struct {
341
        msgCh chan []byte
342
        alive int32
343
}
344

345
// BatchAndSendMessages sends messages in batches.
346
func (n *Node) BatchAndSendMessages() {
158✔
347
        batches := make(map[uint64]*bytes.Buffer)
158✔
348
        streams := make(map[uint64]*stream)
158✔
349

158✔
350
        for {
705,668✔
351
                totalSize := 0
705,510✔
352
                sm := <-n.messages
705,510✔
353
        slurp_loop:
705,510✔
354
                for {
1,673,023✔
355
                        var buf *bytes.Buffer
967,513✔
356
                        if b, ok := batches[sm.to]; !ok {
967,590✔
357
                                buf = new(bytes.Buffer)
77✔
358
                                batches[sm.to] = buf
77✔
359
                        } else {
967,513✔
360
                                buf = b
967,436✔
361
                        }
967,436✔
362
                        totalSize += 4 + len(sm.data)
967,513✔
363
                        x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
967,513✔
364
                        x.Check2(buf.Write(sm.data))
967,513✔
365

967,513✔
366
                        if totalSize > messageBatchSoftLimit {
967,513✔
367
                                // We limit the batch size, but we aren't pushing back on
×
368
                                // n.messages, because the loop below spawns a goroutine
×
369
                                // to do its dirty work.  This is good because right now
×
370
                                // (*node).send fails(!) if the channel is full.
×
371
                                break
×
372
                        }
373

374
                        select {
967,513✔
375
                        case sm = <-n.messages:
262,161✔
376
                        default:
705,352✔
377
                                break slurp_loop
705,352✔
378
                        }
379
                }
380

381
                for to, buf := range batches {
1,650,257✔
382
                        if buf.Len() == 0 {
948,898✔
383
                                continue
3,993✔
384
                        }
385
                        s, ok := streams[to]
940,912✔
386
                        if !ok || atomic.LoadInt32(&s.alive) <= 0 {
941,038✔
387
                                s = &stream{
126✔
388
                                        msgCh: make(chan []byte, 100),
126✔
389
                                        alive: 1,
126✔
390
                                }
126✔
391
                                go n.streamMessages(to, s)
126✔
392
                                streams[to] = s
126✔
393
                        }
126✔
394
                        data := make([]byte, buf.Len())
940,912✔
395
                        copy(data, buf.Bytes())
940,912✔
396
                        buf.Reset()
940,912✔
397

940,912✔
398
                        select {
940,912✔
399
                        case s.msgCh <- data:
939,800✔
400
                        default:
1,112✔
401
                        }
402
                }
403
        }
404
}
405

406
func (n *Node) streamMessages(to uint64, s *stream) {
126✔
407
        defer atomic.StoreInt32(&s.alive, 0)
126✔
408

126✔
409
        // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
126✔
410
        // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
126✔
411
        // a partition and get a new leader.
126✔
412
        deadline := time.Now().Add(10 * time.Second)
126✔
413
        ticker := time.NewTicker(time.Second)
126✔
414
        defer ticker.Stop()
126✔
415

126✔
416
        var logged int
126✔
417
        for range ticker.C { // Don't do this in an busy-wait loop, use a ticker.
542✔
418
                // doSendMessage would block doing a stream. So, time.Now().After is
416✔
419
                // only there to avoid a busy-wait.
416✔
420
                if err := n.doSendMessage(to, s.msgCh); err != nil {
790✔
421
                        // Update lastLog so we print error only a few times if we are not able to connect.
374✔
422
                        // Otherwise, the log is polluted with repeated errors.
374✔
423
                        if logged == 0 {
456✔
424
                                glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
82✔
425
                                logged++
82✔
426
                        }
82✔
427
                }
428
                if time.Now().After(deadline) {
428✔
429
                        return
54✔
430
                }
54✔
431
        }
432
}
433

434
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
416✔
435
        addr, has := n.Peer(to)
416✔
436
        if !has {
416✔
437
                return errors.Errorf("Do not have address of peer %#x", to)
×
438
        }
×
439
        pool, err := GetPools().Get(addr)
416✔
440
        if err != nil {
662✔
441
                return err
246✔
442
        }
246✔
443

444
        c := pb.NewRaftClient(pool.Get())
170✔
445
        ctx, span := otrace.StartSpan(context.Background(),
170✔
446
                fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
170✔
447
        defer span.End()
170✔
448

170✔
449
        mc, err := c.RaftMessage(ctx)
170✔
450
        if err != nil {
259✔
451
                return err
89✔
452
        }
89✔
453

454
        var packets, lastPackets uint64
78✔
455
        slurp := func(batch *pb.RaftBatch) {
934,060✔
456
                for {
1,870,770✔
457
                        if len(batch.Payload.Data) > messageBatchSoftLimit {
936,788✔
458
                                return
×
459
                        }
×
460
                        select {
936,788✔
461
                        case data := <-msgCh:
2,806✔
462
                                batch.Payload.Data = append(batch.Payload.Data, data...)
2,806✔
463
                                packets++
2,806✔
464
                        default:
933,982✔
465
                                return
933,982✔
466
                        }
467
                }
468
        }
469

470
        ctx = mc.Context()
78✔
471

78✔
472
        fastTick := time.NewTicker(5 * time.Second)
78✔
473
        defer fastTick.Stop()
78✔
474

78✔
475
        ticker := time.NewTicker(3 * time.Minute)
78✔
476
        defer ticker.Stop()
78✔
477

78✔
478
        for {
941,974✔
479
                select {
941,896✔
480
                case data := <-msgCh:
933,982✔
481
                        batch := &pb.RaftBatch{
933,982✔
482
                                Context: n.RaftContext,
933,982✔
483
                                Payload: &api.Payload{Data: data},
933,982✔
484
                        }
933,982✔
485
                        slurp(batch) // Pick up more entries from msgCh, if present.
933,982✔
486
                        span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
933,982✔
487
                                to, packets, len(batch.Payload.Data))
933,982✔
488
                        if packets%10000 == 0 {
934,058✔
489
                                glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
76✔
490
                                        to, packets, len(batch.Payload.Data))
76✔
491
                        }
76✔
492
                        packets++
933,982✔
493
                        if err := mc.Send(batch); err != nil {
934,017✔
494
                                span.Annotatef(nil, "Error while mc.Send: %v", err)
35✔
495
                                glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
35✔
496
                                switch {
35✔
497
                                case strings.Contains(err.Error(), "TransientFailure"):
×
498
                                        glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
×
499
                                        n.Raft().ReportUnreachable(to)
×
500
                                        pool.SetUnhealthy()
×
501
                                default:
35✔
502
                                }
503
                                // We don't need to do anything if we receive any error while sending message.
504
                                // RAFT would automatically retry.
505
                                return err
35✔
506
                        }
507
                case <-fastTick.C:
7,684✔
508
                        // We use this ticker, because during network partitions, mc.Send is
7,684✔
509
                        // unable to actually send packets, and also does not complain about
7,684✔
510
                        // them. We could have potentially used the separately tracked
7,684✔
511
                        // heartbeats to check this, but what we have observed is that
7,684✔
512
                        // incoming traffic might be OK, but outgoing might not be. So, this
7,684✔
513
                        // is a better way for us to verify whether this particular outbound
7,684✔
514
                        // connection is valid or not.
7,684✔
515
                        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
7,684✔
516
                        _, err := c.IsPeer(ctx, n.RaftContext)
7,684✔
517
                        cancel()
7,684✔
518
                        if err != nil {
7,688✔
519
                                glog.Errorf("Error while calling IsPeer %v. Reporting %x as unreachable.", err, to)
4✔
520
                                n.Raft().ReportUnreachable(to)
4✔
521
                                pool.SetUnhealthy()
4✔
522
                                return errors.Wrapf(err, "while calling IsPeer %x", to)
4✔
523
                        }
4✔
524
                case <-ticker.C:
191✔
525
                        if lastPackets == packets {
191✔
526
                                span.Annotatef(nil,
×
527
                                        "No activity for a while [Packets == %d]. Closing connection.", packets)
×
528
                                return mc.CloseSend()
×
529
                        }
×
530
                        lastPackets = packets
191✔
531
                case <-ctx.Done():
×
532
                        return ctx.Err()
×
533
                }
534
        }
535
}
536

537
// Connect connects the node and makes its peerPool refer to the constructed pool and address
538
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
539
// case this does nothing.)
540
func (n *Node) Connect(pid uint64, addr string) {
289✔
541
        if pid == n.Id {
377✔
542
                return
88✔
543
        }
88✔
544
        if paddr, ok := n.Peer(pid); ok && paddr == addr {
294✔
545
                // Already connected.
93✔
546
                return
93✔
547
        }
93✔
548
        // Here's what we do.  Right now peerPool maps peer node id's to addr values.  If
549
        // a *pool can be created, good, but if not, we still create a peerPoolEntry with
550
        // a nil *pool.
551
        if addr == n.MyAddr {
108✔
552
                // TODO: Note this fact in more general peer health info somehow.
×
553
                glog.Infof("Peer %d claims same host as me\n", pid)
×
554
                n.SetPeer(pid, addr)
×
555
                return
×
556
        }
×
557
        GetPools().Connect(addr, n.tlsClientConfig)
108✔
558
        n.SetPeer(pid, addr)
108✔
559
}
560

561
// DeletePeer deletes the record of the peer with the given id.
562
func (n *Node) DeletePeer(pid uint64) {
×
563
        if pid == n.Id {
×
564
                return
×
565
        }
×
566
        n.Lock()
×
567
        defer n.Unlock()
×
568
        delete(n.peers, pid)
×
569
}
570

571
var errInternalRetry = errors.New("Retry proposal again")
572

573
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
37✔
574
        cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
37✔
575
        defer cancel()
37✔
576

37✔
577
        ch := make(chan error, 1)
37✔
578
        id := n.storeConfChange(ch)
37✔
579
        // TODO: Delete id from the map.
37✔
580
        conf.ID = id
37✔
581
        if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
38✔
582
                if cctx.Err() != nil {
2✔
583
                        return errInternalRetry
1✔
584
                }
1✔
585
                glog.Warningf("Error while proposing conf change: %v", err)
×
586
                return err
×
587
        }
588
        select {
36✔
589
        case err := <-ch:
36✔
590
                return err
36✔
591
        case <-ctx.Done():
×
592
                return ctx.Err()
×
593
        case <-cctx.Done():
×
594
                return errInternalRetry
×
595
        }
596
}
597

598
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
36✔
599
        pid := rc.Id
36✔
600
        rc.SnapshotTs = 0
36✔
601
        rcBytes, err := rc.Marshal()
36✔
602
        x.Check(err)
36✔
603

36✔
604
        cc := raftpb.ConfChange{
36✔
605
                Type:    raftpb.ConfChangeAddNode,
36✔
606
                NodeID:  pid,
36✔
607
                Context: rcBytes,
36✔
608
        }
36✔
609
        if rc.IsLearner {
36✔
610
                cc.Type = raftpb.ConfChangeAddLearnerNode
×
611
        }
×
612

613
        err = errInternalRetry
36✔
614
        for err == errInternalRetry {
73✔
615
                glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
37✔
616
                glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
37✔
617
                err = n.proposeConfChange(ctx, cc)
37✔
618
        }
37✔
619
        return err
36✔
620
}
621

622
// ProposePeerRemoval proposes a new configuration with the peer with the given id removed.
623
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
×
624
        if n.Raft() == nil {
×
625
                return ErrNoNode
×
626
        }
×
627
        if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
×
628
                return errors.Errorf("Node %#x not part of group", id)
×
629
        }
×
630
        cc := raftpb.ConfChange{
×
631
                Type:   raftpb.ConfChangeRemoveNode,
×
632
                NodeID: id,
×
633
        }
×
634
        err := errInternalRetry
×
635
        for err == errInternalRetry {
×
636
                err = n.proposeConfChange(ctx, cc)
×
637
        }
×
638
        return err
×
639
}
640

641
type linReadReq struct {
642
        // A one-shot chan which we send a raft index upon.
643
        indexCh chan<- uint64
644
}
645

646
var errReadIndex = errors.Errorf(
647
        "Cannot get linearized read (time expired or no configured leader)")
648

649
var readIndexOk, readIndexTotal uint64
650

651
// WaitLinearizableRead waits until a linearizable read can be performed.
652
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
28,606✔
653
        span := otrace.FromContext(ctx)
28,606✔
654
        span.Annotate(nil, "WaitLinearizableRead")
28,606✔
655

28,606✔
656
        if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
28,626✔
657
                glog.V(2).Infof("ReadIndex Total: %d\n", num)
20✔
658
        }
20✔
659
        indexCh := make(chan uint64, 1)
28,606✔
660
        select {
28,606✔
661
        case n.requestCh <- linReadReq{indexCh: indexCh}:
28,606✔
662
                span.Annotate(nil, "Pushed to requestCh")
28,606✔
663
        case <-ctx.Done():
×
664
                span.Annotate(nil, "Context expired")
×
665
                return ctx.Err()
×
666
        }
667

668
        select {
28,606✔
669
        case index := <-indexCh:
28,589✔
670
                span.Annotatef(nil, "Received index: %d", index)
28,589✔
671
                if index == 0 {
28,594✔
672
                        return errReadIndex
5✔
673
                } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
28,609✔
674
                        glog.V(2).Infof("ReadIndex OK: %d\n", num)
20✔
675
                }
20✔
676
                err := n.Applied.WaitForMark(ctx, index)
28,584✔
677
                span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
28,584✔
678
                return err
28,584✔
679
        case <-ctx.Done():
17✔
680
                span.Annotate(nil, "Context expired")
17✔
681
                return ctx.Err()
17✔
682
        }
683
}
684

685
// RunReadIndexLoop runs the RAFT index in a loop.
686
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
66✔
687
        defer closer.Done()
66✔
688
        readIndex := func(activeRctx []byte) (uint64, error) {
28,686✔
689
                // Read Request can get rejected then we would wait indefinitely on the channel
28,620✔
690
                // so have a timeout.
28,620✔
691
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
28,620✔
692
                defer cancel()
28,620✔
693

28,620✔
694
                if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
28,624✔
695
                        glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
4✔
696
                        return 0, err
4✔
697
                }
4✔
698

699
        again:
28,616✔
700
                select {
28,616✔
701
                case <-closer.HasBeenClosed():
6✔
702
                        return 0, errors.New("Closer has been called")
6✔
703
                case rs := <-readStateCh:
28,501✔
704
                        if !bytes.Equal(activeRctx, rs.RequestCtx) {
28,501✔
705
                                glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
×
706
                                goto again
×
707
                        }
708
                        return rs.Index, nil
28,501✔
709
                case <-ctx.Done():
109✔
710
                        glog.Warningf("[%#x] Read index context timed out\n", n.Id)
109✔
711
                        return 0, errInternalRetry
109✔
712
                }
713
        } // end of readIndex func
714

715
        // We maintain one linearizable ReadIndex request at a time.  Others wait queued behind
716
        // requestCh.
717
        requests := []linReadReq{}
66✔
718
        for {
28,643✔
719
                select {
28,577✔
720
                case <-closer.HasBeenClosed():
66✔
721
                        return
66✔
722
                case <-readStateCh:
×
723
                        // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
724
                case req := <-n.requestCh:
28,511✔
725
                slurpLoop:
28,511✔
726
                        for {
57,107✔
727
                                requests = append(requests, req)
28,596✔
728
                                select {
28,596✔
729
                                case req = <-n.requestCh:
85✔
730
                                default:
28,511✔
731
                                        break slurpLoop
28,511✔
732
                                }
733
                        }
734
                        // Create one activeRctx slice for the read index, even if we have to call readIndex
735
                        // repeatedly. That way, we can process the requests as soon as we encounter the first
736
                        // activeRctx. This is better than flooding readIndex with a new activeRctx on each
737
                        // call, causing more unique traffic and further delays in request processing.
738
                        activeRctx := make([]byte, 8)
28,511✔
739
                        x.Check2(n.Rand.Read(activeRctx))
28,511✔
740
                        glog.V(4).Infof("Request readctx: %#x", activeRctx)
28,511✔
741
                        for {
57,131✔
742
                                index, err := readIndex(activeRctx)
28,620✔
743
                                if err == errInternalRetry {
28,729✔
744
                                        continue
109✔
745
                                }
746
                                if err != nil {
28,521✔
747
                                        index = 0
10✔
748
                                        glog.Errorf("[%#x] While trying to do lin read index: %v", n.Id, err)
10✔
749
                                }
10✔
750
                                for _, req := range requests {
57,107✔
751
                                        req.indexCh <- index
28,596✔
752
                                }
28,596✔
753
                                break
28,511✔
754
                        }
755
                        requests = requests[:0]
28,511✔
756
                }
757
        }
758
}
759

760
func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) {
36✔
761
        // Only process one JoinCluster request at a time.
36✔
762
        n.joinLock.Lock()
36✔
763
        defer n.joinLock.Unlock()
36✔
764

36✔
765
        // Check that the new node is from the same group as me.
36✔
766
        if rc.Group != n.RaftContext.Group {
36✔
767
                return nil, errors.Errorf("Raft group mismatch")
×
768
        }
×
769
        // Also check that the new node is not me.
770
        if rc.Id == n.RaftContext.Id {
36✔
771
                return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
×
772
        }
×
773

774
        // Check that the new node is not already part of the group.
775
        if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
36✔
776
                // There exists a healthy connection to server with same id.
×
777
                if _, err := GetPools().Get(addr); err == nil {
×
778
                        return &api.Payload{}, errors.Errorf(
×
779
                                "REUSE_ADDR: IP Address same as existing peer: %s", addr)
×
780
                }
×
781
        }
782
        n.Connect(rc.Id, rc.Addr)
36✔
783

36✔
784
        err := n.addToCluster(context.Background(), rc)
36✔
785
        glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err)
36✔
786
        return &api.Payload{}, err
36✔
787
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc