• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 6041535666

31 Aug 2023 07:17PM UTC coverage: 67.057% (-0.2%) from 67.221%
6041535666

push

web-flow
test(upgrade): add upgrade tests for v20.11 for ee/acl (#8984)

Closes: https://dgraph.atlassian.net/browse/DGRAPHCORE-335

---------

Co-authored-by: Aman Mangal <aman@dgraph.io>

58715 of 87560 relevant lines covered (67.06%)

2203166.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.07
/conn/node.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package conn
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "encoding/binary"
24
        "fmt"
25
        "math/rand"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/golang/glog"
32
        "github.com/pkg/errors"
33
        "go.etcd.io/etcd/raft/v3"
34
        "go.etcd.io/etcd/raft/v3/raftpb"
35
        otrace "go.opencensus.io/trace"
36

37
        "github.com/dgraph-io/badger/v4/y"
38
        "github.com/dgraph-io/dgo/v230/protos/api"
39
        "github.com/dgraph-io/dgraph/protos/pb"
40
        "github.com/dgraph-io/dgraph/raftwal"
41
        "github.com/dgraph-io/dgraph/x"
42
        "github.com/dgraph-io/ristretto/z"
43
)
44

45
var (
46
        // ErrNoNode is returned when no node has been set up.
47
        ErrNoNode = errors.Errorf("No node has been set up yet")
48
)
49

50
// Node represents a node participating in the RAFT protocol.
51
type Node struct {
52
        x.SafeMutex
53

54
        // Applied is used to keep track of the applied RAFT proposals.
55
        // The stages are proposed -> committed (accepted by cluster) ->
56
        // applied (to PL) -> synced (to BadgerDB).
57
        // This needs to be 64 bit aligned for atomics to work on 32 bit machine.
58
        Applied y.WaterMark
59

60
        joinLock sync.Mutex
61

62
        // Used to keep track of lin read requests.
63
        requestCh chan linReadReq
64

65
        // SafeMutex is for fields which can be changed after init.
66
        _confState *raftpb.ConfState
67
        _raft      raft.Node
68

69
        // Fields which are never changed after init.
70
        StartTime       time.Time
71
        Cfg             *raft.Config
72
        MyAddr          string
73
        Id              uint64
74
        peers           map[uint64]string
75
        confChanges     map[uint64]chan error
76
        messages        chan sendmsg
77
        RaftContext     *pb.RaftContext
78
        Store           *raftwal.DiskStorage
79
        Rand            *rand.Rand
80
        tlsClientConfig *tls.Config
81

82
        Proposals proposals
83

84
        heartbeatsOut int64
85
        heartbeatsIn  int64
86
}
87

88
// NewNode returns a new Node instance.
89
func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node {
158✔
90
        snap, err := store.Snapshot()
158✔
91
        x.Check(err)
158✔
92

158✔
93
        n := &Node{
158✔
94
                StartTime: time.Now(),
158✔
95
                Id:        rc.Id,
158✔
96
                MyAddr:    rc.Addr,
158✔
97
                Store:     store,
158✔
98
                Cfg: &raft.Config{
158✔
99
                        ID:                       rc.Id,
158✔
100
                        ElectionTick:             20, // 2s if we call Tick() every 100 ms.
158✔
101
                        HeartbeatTick:            1,  // 100ms if we call Tick() every 100 ms.
158✔
102
                        Storage:                  store,
158✔
103
                        MaxInflightMsgs:          256,
158✔
104
                        MaxSizePerMsg:            256 << 10, // 256 KB should allow more batching.
158✔
105
                        MaxCommittedSizePerReady: 64 << 20,  // Avoid loading entire Raft log into memory.
158✔
106
                        // We don't need lease based reads. They cause issues because they
158✔
107
                        // require CheckQuorum to be true, and that causes a lot of issues
158✔
108
                        // for us during cluster bootstrapping and later. A seemingly
158✔
109
                        // healthy cluster would just cause leader to step down due to
158✔
110
                        // "inactive" quorum, and then disallow anyone from becoming leader.
158✔
111
                        // So, let's stick to default options.  Let's achieve correctness,
158✔
112
                        // then we achieve performance. Plus, for the Dgraph alphas, we'll
158✔
113
                        // be soon relying only on Timestamps for blocking reads and
158✔
114
                        // achieving linearizability, than checking quorums (Zero would
158✔
115
                        // still check quorums).
158✔
116
                        ReadOnlyOption: raft.ReadOnlySafe,
158✔
117
                        // When a disconnected node joins back, it forces a leader change,
158✔
118
                        // as it starts with a higher term, as described in Raft thesis (not
158✔
119
                        // the paper) in section 9.6. This setting can avoid that by only
158✔
120
                        // increasing the term, if the node has a good chance of becoming
158✔
121
                        // the leader.
158✔
122
                        PreVote: true,
158✔
123

158✔
124
                        // We can explicitly set Applied to the first index in the Raft log,
158✔
125
                        // so it does not derive it separately, thus avoiding a crash when
158✔
126
                        // the Applied is set to below snapshot index by Raft.
158✔
127
                        // In case this is a new Raft log, first would be 1, and therefore
158✔
128
                        // Applied would be zero, hence meeting the condition by the library
158✔
129
                        // that Applied should only be set during a restart.
158✔
130
                        //
158✔
131
                        // Update: Set the Applied to the latest snapshot, because it seems
158✔
132
                        // like somehow the first index can be out of sync with the latest
158✔
133
                        // snapshot.
158✔
134
                        Applied: snap.Metadata.Index,
158✔
135

158✔
136
                        Logger: &x.ToGlog{},
158✔
137
                },
158✔
138
                // processConfChange etc are not throttled so some extra delta, so that we don't
158✔
139
                // block tick when applyCh is full
158✔
140
                Applied:     y.WaterMark{Name: "Applied watermark"},
158✔
141
                RaftContext: rc,
158✔
142
                //nolint:gosec // random node id generator does not require cryptographic precision
158✔
143
                Rand:            rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
158✔
144
                confChanges:     make(map[uint64]chan error),
158✔
145
                messages:        make(chan sendmsg, 100),
158✔
146
                peers:           make(map[uint64]string),
158✔
147
                requestCh:       make(chan linReadReq, 100),
158✔
148
                tlsClientConfig: tlsConfig,
158✔
149
        }
158✔
150
        n.Applied.Init(nil)
158✔
151
        // This should match up to the Applied index set above.
158✔
152
        n.Applied.SetDoneUntil(n.Cfg.Applied)
158✔
153
        glog.Infof("Setting raft.Config to: %+v", n.Cfg)
158✔
154
        return n
158✔
155
}
158✔
156

157
// ReportRaftComms periodically prints the state of the node (heartbeats in and out).
158
func (n *Node) ReportRaftComms() {
157✔
159
        if !glog.V(3) {
311✔
160
                return
154✔
161
        }
154✔
162
        ticker := time.NewTicker(time.Second)
3✔
163
        defer ticker.Stop()
3✔
164

3✔
165
        for range ticker.C {
34✔
166
                out := atomic.SwapInt64(&n.heartbeatsOut, 0)
31✔
167
                in := atomic.SwapInt64(&n.heartbeatsIn, 0)
31✔
168
                glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
31✔
169
        }
31✔
170
}
171

172
// SetRaft would set the provided raft.Node to this node.
173
// It would check fail if the node is already set.
174
func (n *Node) SetRaft(r raft.Node) {
158✔
175
        n.Lock()
158✔
176
        defer n.Unlock()
158✔
177
        x.AssertTrue(n._raft == nil)
158✔
178
        n._raft = r
158✔
179
}
158✔
180

181
// Raft would return back the raft.Node stored in the node.
182
func (n *Node) Raft() raft.Node {
3,849,865✔
183
        n.RLock()
3,849,865✔
184
        defer n.RUnlock()
3,849,865✔
185
        return n._raft
3,849,865✔
186
}
3,849,865✔
187

188
// SetConfState would store the latest ConfState generated by ApplyConfChange.
189
func (n *Node) SetConfState(cs *raftpb.ConfState) {
266✔
190
        glog.Infof("Setting conf state to %+v\n", cs)
266✔
191
        n.Lock()
266✔
192
        defer n.Unlock()
266✔
193
        n._confState = cs
266✔
194
}
266✔
195

196
// DoneConfChange marks a configuration change as done and sends the given error to the
197
// config channel.
198
func (n *Node) DoneConfChange(id uint64, err error) {
265✔
199
        n.Lock()
265✔
200
        defer n.Unlock()
265✔
201
        ch, has := n.confChanges[id]
265✔
202
        if !has {
493✔
203
                return
228✔
204
        }
228✔
205
        delete(n.confChanges, id)
37✔
206
        ch <- err
37✔
207
}
208

209
//nolint:gosec // random node id generator does not require cryptographic precision
210
func (n *Node) storeConfChange(che chan error) uint64 {
41✔
211
        n.Lock()
41✔
212
        defer n.Unlock()
41✔
213
        id := rand.Uint64()
41✔
214
        _, has := n.confChanges[id]
41✔
215
        for has {
41✔
216
                id = rand.Uint64()
×
217
                _, has = n.confChanges[id]
×
218
        }
×
219
        n.confChanges[id] = che
41✔
220
        return id
41✔
221
}
222

223
// ConfState would return the latest ConfState stored in node.
224
func (n *Node) ConfState() *raftpb.ConfState {
8,259✔
225
        n.RLock()
8,259✔
226
        defer n.RUnlock()
8,259✔
227
        return n._confState
8,259✔
228
}
8,259✔
229

230
// Peer returns the address of the peer with the given id.
231
func (n *Node) Peer(pid uint64) (string, bool) {
619✔
232
        n.RLock()
619✔
233
        defer n.RUnlock()
619✔
234
        addr, ok := n.peers[pid]
619✔
235
        return addr, ok
619✔
236
}
619✔
237

238
// SetPeer sets the address of the peer with the given id. The address must not be empty.
239
func (n *Node) SetPeer(pid uint64, addr string) {
102✔
240
        x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
102✔
241
        n.Lock()
102✔
242
        defer n.Unlock()
102✔
243
        n.peers[pid] = addr
102✔
244
}
102✔
245

246
// Send sends the given RAFT message from this node.
247
func (n *Node) Send(msg *raftpb.Message) {
1,053,751✔
248
        x.AssertTruef(n.Id != msg.To, "Sending message to itself")
1,053,751✔
249
        data, err := msg.Marshal()
1,053,751✔
250
        x.Check(err)
1,053,751✔
251

1,053,751✔
252
        if glog.V(2) {
2,107,502✔
253
                switch msg.Type {
1,053,751✔
254
                case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
488,851✔
255
                        atomic.AddInt64(&n.heartbeatsOut, 1)
488,851✔
256
                case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
6✔
257
                case raftpb.MsgApp, raftpb.MsgAppResp:
553,432✔
258
                case raftpb.MsgProp:
11,440✔
259
                default:
22✔
260
                        glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
22✔
261
                }
262
        }
263
        // As long as leadership is stable, any attempted Propose() calls should be reflected in the
264
        // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
265
        // MsgProp to the leader. It is up to the transport layer to get those messages to their
266
        // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
267
        // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
268
        // leadership transitions, proposals may get dropped even if the network is reliable.
269
        //
270
        // We can't do a select default here. The messages must be sent to the channel, otherwise we
271
        // should block until the channel can accept these messages. BatchAndSendMessages would take
272
        // care of dropping messages which can't be sent due to network issues to the corresponding
273
        // node. But, we shouldn't take the liberty to do that here. It would take us more time to
274
        // repropose these dropped messages anyway, than to block here a bit waiting for the messages
275
        // channel to clear out.
276
        n.messages <- sendmsg{to: msg.To, data: data}
1,053,751✔
277
}
278

279
// Snapshot returns the current snapshot.
280
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
×
281
        if n == nil || n.Store == nil {
×
282
                return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store")
×
283
        }
×
284
        return n.Store.Snapshot()
×
285
}
286

287
// SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.
288
func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot) {
913,192✔
289
        for {
1,826,384✔
290
                if err := n.Store.Save(h, es, s); err != nil {
913,192✔
291
                        glog.Errorf("While trying to save Raft update: %v. Retrying...", err)
×
292
                } else {
913,192✔
293
                        return
913,192✔
294
                }
913,192✔
295
        }
296
}
297

298
// PastLife returns the index of the snapshot before the restart (if any) and whether there was
299
// a previous state that should be recovered after a restart.
300
func (n *Node) PastLife() (uint64, bool, error) {
157✔
301
        var (
157✔
302
                sp      raftpb.Snapshot
157✔
303
                idx     uint64
157✔
304
                restart bool
157✔
305
                rerr    error
157✔
306
        )
157✔
307
        sp, rerr = n.Store.Snapshot()
157✔
308
        if rerr != nil {
157✔
309
                return 0, false, rerr
×
310
        }
×
311
        if !raft.IsEmptySnap(sp) {
158✔
312
                glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
1✔
313
                restart = true
1✔
314
                idx = sp.Metadata.Index
1✔
315
        }
1✔
316

317
        var hd raftpb.HardState
157✔
318
        hd, rerr = n.Store.HardState()
157✔
319
        if rerr != nil {
157✔
320
                return 0, false, rerr
×
321
        }
×
322
        if !raft.IsEmptyHardState(hd) {
158✔
323
                glog.Infof("Found hardstate: %+v\n", hd)
1✔
324
                restart = true
1✔
325
        }
1✔
326

327
        num := n.Store.NumEntries()
157✔
328
        glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
157✔
329
        // We'll always have at least one entry.
157✔
330
        if num > 1 {
158✔
331
                restart = true
1✔
332
        }
1✔
333
        return idx, restart, nil
157✔
334
}
335

336
const (
337
        messageBatchSoftLimit = 10e6
338
)
339

340
type stream struct {
341
        msgCh chan []byte
342
        alive int32
343
}
344

345
// BatchAndSendMessages sends messages in batches.
346
func (n *Node) BatchAndSendMessages() {
157✔
347
        batches := make(map[uint64]*bytes.Buffer)
157✔
348
        streams := make(map[uint64]*stream)
157✔
349

157✔
350
        for {
762,615✔
351
                totalSize := 0
762,458✔
352
                sm := <-n.messages
762,458✔
353
        slurp_loop:
762,458✔
354
                for {
1,816,209✔
355
                        var buf *bytes.Buffer
1,053,751✔
356
                        if b, ok := batches[sm.to]; !ok {
1,053,822✔
357
                                buf = new(bytes.Buffer)
71✔
358
                                batches[sm.to] = buf
71✔
359
                        } else {
1,053,751✔
360
                                buf = b
1,053,680✔
361
                        }
1,053,680✔
362
                        totalSize += 4 + len(sm.data)
1,053,751✔
363
                        x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
1,053,751✔
364
                        x.Check2(buf.Write(sm.data))
1,053,751✔
365

1,053,751✔
366
                        if totalSize > messageBatchSoftLimit {
1,053,751✔
367
                                // We limit the batch size, but we aren't pushing back on
×
368
                                // n.messages, because the loop below spawns a goroutine
×
369
                                // to do its dirty work.  This is good because right now
×
370
                                // (*node).send fails(!) if the channel is full.
×
371
                                break
×
372
                        }
373

374
                        select {
1,053,751✔
375
                        case sm = <-n.messages:
291,450✔
376
                        default:
762,301✔
377
                                break slurp_loop
762,301✔
378
                        }
379
                }
380

381
                for to, buf := range batches {
1,780,868✔
382
                        if buf.Len() == 0 {
1,022,568✔
383
                                continue
4,001✔
384
                        }
385
                        s, ok := streams[to]
1,014,566✔
386
                        if !ok || atomic.LoadInt32(&s.alive) <= 0 {
1,014,683✔
387
                                s = &stream{
117✔
388
                                        msgCh: make(chan []byte, 100),
117✔
389
                                        alive: 1,
117✔
390
                                }
117✔
391
                                go n.streamMessages(to, s)
117✔
392
                                streams[to] = s
117✔
393
                        }
117✔
394
                        data := make([]byte, buf.Len())
1,014,566✔
395
                        copy(data, buf.Bytes())
1,014,566✔
396
                        buf.Reset()
1,014,566✔
397

1,014,566✔
398
                        select {
1,014,566✔
399
                        case s.msgCh <- data:
1,013,552✔
400
                        default:
1,014✔
401
                        }
402
                }
403
        }
404
}
405

406
func (n *Node) streamMessages(to uint64, s *stream) {
117✔
407
        defer atomic.StoreInt32(&s.alive, 0)
117✔
408

117✔
409
        // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
117✔
410
        // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
117✔
411
        // a partition and get a new leader.
117✔
412
        deadline := time.Now().Add(10 * time.Second)
117✔
413
        ticker := time.NewTicker(time.Second)
117✔
414
        defer ticker.Stop()
117✔
415

117✔
416
        var logged int
117✔
417
        for range ticker.C { // Don't do this in an busy-wait loop, use a ticker.
509✔
418
                // doSendMessage would block doing a stream. So, time.Now().After is
392✔
419
                // only there to avoid a busy-wait.
392✔
420
                if err := n.doSendMessage(to, s.msgCh); err != nil {
741✔
421
                        // Update lastLog so we print error only a few times if we are not able to connect.
349✔
422
                        // Otherwise, the log is polluted with repeated errors.
349✔
423
                        if logged == 0 {
425✔
424
                                glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
76✔
425
                                logged++
76✔
426
                        }
76✔
427
                }
428
                if time.Now().After(deadline) {
396✔
429
                        return
47✔
430
                }
47✔
431
        }
432
}
433

434
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
392✔
435
        addr, has := n.Peer(to)
392✔
436
        if !has {
392✔
437
                return errors.Errorf("Do not have address of peer %#x", to)
×
438
        }
×
439
        pool, err := GetPools().Get(addr)
392✔
440
        if err != nil {
611✔
441
                return err
219✔
442
        }
219✔
443

444
        c := pb.NewRaftClient(pool.Get())
173✔
445
        ctx, span := otrace.StartSpan(context.Background(),
173✔
446
                fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
173✔
447
        defer span.End()
173✔
448

173✔
449
        mc, err := c.RaftMessage(ctx)
173✔
450
        if err != nil {
267✔
451
                return err
94✔
452
        }
94✔
453

454
        var packets, lastPackets uint64
78✔
455
        slurp := func(batch *pb.RaftBatch) {
1,008,319✔
456
                for {
2,018,885✔
457
                        if len(batch.Payload.Data) > messageBatchSoftLimit {
1,010,644✔
458
                                return
×
459
                        }
×
460
                        select {
1,010,644✔
461
                        case data := <-msgCh:
2,403✔
462
                                batch.Payload.Data = append(batch.Payload.Data, data...)
2,403✔
463
                                packets++
2,403✔
464
                        default:
1,008,241✔
465
                                return
1,008,241✔
466
                        }
467
                }
468
        }
469

470
        ctx = mc.Context()
78✔
471

78✔
472
        fastTick := time.NewTicker(5 * time.Second)
78✔
473
        defer fastTick.Stop()
78✔
474

78✔
475
        ticker := time.NewTicker(3 * time.Minute)
78✔
476
        defer ticker.Stop()
78✔
477

78✔
478
        for {
1,016,419✔
479
                select {
1,016,341✔
480
                case data := <-msgCh:
1,008,241✔
481
                        batch := &pb.RaftBatch{
1,008,241✔
482
                                Context: n.RaftContext,
1,008,241✔
483
                                Payload: &api.Payload{Data: data},
1,008,241✔
484
                        }
1,008,241✔
485
                        slurp(batch) // Pick up more entries from msgCh, if present.
1,008,241✔
486
                        span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
1,008,241✔
487
                                to, packets, len(batch.Payload.Data))
1,008,241✔
488
                        if packets%10000 == 0 {
1,008,323✔
489
                                glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
82✔
490
                                        to, packets, len(batch.Payload.Data))
82✔
491
                        }
82✔
492
                        packets++
1,008,241✔
493
                        if err := mc.Send(batch); err != nil {
1,008,277✔
494
                                span.Annotatef(nil, "Error while mc.Send: %v", err)
36✔
495
                                glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
36✔
496
                                switch {
36✔
497
                                case strings.Contains(err.Error(), "TransientFailure"):
×
498
                                        glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
×
499
                                        n.Raft().ReportUnreachable(to)
×
500
                                        pool.SetUnhealthy()
×
501
                                default:
36✔
502
                                }
503
                                // We don't need to do anything if we receive any error while sending message.
504
                                // RAFT would automatically retry.
505
                                return err
36✔
506
                        }
507
                case <-fastTick.C:
7,855✔
508
                        // We use this ticker, because during network partitions, mc.Send is
7,855✔
509
                        // unable to actually send packets, and also does not complain about
7,855✔
510
                        // them. We could have potentially used the separately tracked
7,855✔
511
                        // heartbeats to check this, but what we have observed is that
7,855✔
512
                        // incoming traffic might be OK, but outgoing might not be. So, this
7,855✔
513
                        // is a better way for us to verify whether this particular outbound
7,855✔
514
                        // connection is valid or not.
7,855✔
515
                        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
7,855✔
516
                        _, err := c.IsPeer(ctx, n.RaftContext)
7,855✔
517
                        cancel()
7,855✔
518
                        if err != nil {
7,855✔
519
                                glog.Errorf("Error while calling IsPeer %v. Reporting %x as unreachable.", err, to)
×
520
                                n.Raft().ReportUnreachable(to)
×
521
                                pool.SetUnhealthy()
×
522
                                return errors.Wrapf(err, "while calling IsPeer %x", to)
×
523
                        }
×
524
                case <-ticker.C:
203✔
525
                        if lastPackets == packets {
203✔
526
                                span.Annotatef(nil,
×
527
                                        "No activity for a while [Packets == %d]. Closing connection.", packets)
×
528
                                return mc.CloseSend()
×
529
                        }
×
530
                        lastPackets = packets
203✔
531
                case <-ctx.Done():
×
532
                        return ctx.Err()
×
533
                }
534
        }
535
}
536

537
// Connect connects the node and makes its peerPool refer to the constructed pool and address
538
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
539
// case this does nothing.)
540
func (n *Node) Connect(pid uint64, addr string) {
283✔
541
        if pid == n.Id {
373✔
542
                return
90✔
543
        }
90✔
544
        if paddr, ok := n.Peer(pid); ok && paddr == addr {
284✔
545
                // Already connected.
91✔
546
                return
91✔
547
        }
91✔
548
        // Here's what we do.  Right now peerPool maps peer node id's to addr values.  If
549
        // a *pool can be created, good, but if not, we still create a peerPoolEntry with
550
        // a nil *pool.
551
        if addr == n.MyAddr {
102✔
552
                // TODO: Note this fact in more general peer health info somehow.
×
553
                glog.Infof("Peer %d claims same host as me\n", pid)
×
554
                n.SetPeer(pid, addr)
×
555
                return
×
556
        }
×
557
        GetPools().Connect(addr, n.tlsClientConfig)
102✔
558
        n.SetPeer(pid, addr)
102✔
559
}
560

561
// DeletePeer deletes the record of the peer with the given id.
562
func (n *Node) DeletePeer(pid uint64) {
×
563
        if pid == n.Id {
×
564
                return
×
565
        }
×
566
        n.Lock()
×
567
        defer n.Unlock()
×
568
        delete(n.peers, pid)
×
569
}
570

571
var errInternalRetry = errors.New("Retry proposal again")
572

573
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
41✔
574
        cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
41✔
575
        defer cancel()
41✔
576

41✔
577
        ch := make(chan error, 1)
41✔
578
        id := n.storeConfChange(ch)
41✔
579
        // TODO: Delete id from the map.
41✔
580
        conf.ID = id
41✔
581
        if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
42✔
582
                if cctx.Err() != nil {
2✔
583
                        return errInternalRetry
1✔
584
                }
1✔
585
                glog.Warningf("Error while proposing conf change: %v", err)
×
586
                return err
×
587
        }
588
        select {
40✔
589
        case err := <-ch:
34✔
590
                return err
34✔
591
        case <-ctx.Done():
×
592
                return ctx.Err()
×
593
        case <-cctx.Done():
6✔
594
                return errInternalRetry
6✔
595
        }
596
}
597

598
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
34✔
599
        pid := rc.Id
34✔
600
        rc.SnapshotTs = 0
34✔
601
        rcBytes, err := rc.Marshal()
34✔
602
        x.Check(err)
34✔
603

34✔
604
        cc := raftpb.ConfChange{
34✔
605
                Type:    raftpb.ConfChangeAddNode,
34✔
606
                NodeID:  pid,
34✔
607
                Context: rcBytes,
34✔
608
        }
34✔
609
        if rc.IsLearner {
34✔
610
                cc.Type = raftpb.ConfChangeAddLearnerNode
×
611
        }
×
612

613
        err = errInternalRetry
34✔
614
        for err == errInternalRetry {
75✔
615
                glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
41✔
616
                glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
41✔
617
                err = n.proposeConfChange(ctx, cc)
41✔
618
        }
41✔
619
        return err
34✔
620
}
621

622
// ProposePeerRemoval proposes a new configuration with the peer with the given id removed.
623
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
×
624
        if n.Raft() == nil {
×
625
                return ErrNoNode
×
626
        }
×
627
        if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
×
628
                return errors.Errorf("Node %#x not part of group", id)
×
629
        }
×
630
        cc := raftpb.ConfChange{
×
631
                Type:   raftpb.ConfChangeRemoveNode,
×
632
                NodeID: id,
×
633
        }
×
634
        err := errInternalRetry
×
635
        for err == errInternalRetry {
×
636
                err = n.proposeConfChange(ctx, cc)
×
637
        }
×
638
        return err
×
639
}
640

641
type linReadReq struct {
642
        // A one-shot chan which we send a raft index upon.
643
        indexCh chan<- uint64
644
}
645

646
var errReadIndex = errors.Errorf(
647
        "Cannot get linearized read (time expired or no configured leader)")
648

649
var readIndexOk, readIndexTotal uint64
650

651
// WaitLinearizableRead waits until a linearizable read can be performed.
652
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
29,410✔
653
        span := otrace.FromContext(ctx)
29,410✔
654
        span.Annotate(nil, "WaitLinearizableRead")
29,410✔
655

29,410✔
656
        if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
29,431✔
657
                glog.V(2).Infof("ReadIndex Total: %d\n", num)
21✔
658
        }
21✔
659
        indexCh := make(chan uint64, 1)
29,410✔
660
        select {
29,410✔
661
        case n.requestCh <- linReadReq{indexCh: indexCh}:
29,410✔
662
                span.Annotate(nil, "Pushed to requestCh")
29,410✔
663
        case <-ctx.Done():
×
664
                span.Annotate(nil, "Context expired")
×
665
                return ctx.Err()
×
666
        }
667

668
        select {
29,410✔
669
        case index := <-indexCh:
29,392✔
670
                span.Annotatef(nil, "Received index: %d", index)
29,392✔
671
                if index == 0 {
29,394✔
672
                        return errReadIndex
2✔
673
                } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
29,413✔
674
                        glog.V(2).Infof("ReadIndex OK: %d\n", num)
21✔
675
                }
21✔
676
                err := n.Applied.WaitForMark(ctx, index)
29,390✔
677
                span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
29,390✔
678
                return err
29,390✔
679
        case <-ctx.Done():
18✔
680
                span.Annotate(nil, "Context expired")
18✔
681
                return ctx.Err()
18✔
682
        }
683
}
684

685
// RunReadIndexLoop runs the RAFT index in a loop.
686
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
65✔
687
        defer closer.Done()
65✔
688
        readIndex := func(activeRctx []byte) (uint64, error) {
29,501✔
689
                // Read Request can get rejected then we would wait indefinitely on the channel
29,436✔
690
                // so have a timeout.
29,436✔
691
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
29,436✔
692
                defer cancel()
29,436✔
693

29,436✔
694
                if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
29,438✔
695
                        glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
2✔
696
                        return 0, err
2✔
697
                }
2✔
698

699
        again:
29,434✔
700
                select {
29,434✔
701
                case <-closer.HasBeenClosed():
6✔
702
                        return 0, errors.New("Closer has been called")
6✔
703
                case rs := <-readStateCh:
29,319✔
704
                        if !bytes.Equal(activeRctx, rs.RequestCtx) {
29,319✔
705
                                glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
×
706
                                goto again
×
707
                        }
708
                        return rs.Index, nil
29,319✔
709
                case <-ctx.Done():
109✔
710
                        glog.Warningf("[%#x] Read index context timed out\n", n.Id)
109✔
711
                        return 0, errInternalRetry
109✔
712
                }
713
        } // end of readIndex func
714

715
        // We maintain one linearizable ReadIndex request at a time.  Others wait queued behind
716
        // requestCh.
717
        requests := []linReadReq{}
65✔
718
        for {
29,457✔
719
                select {
29,392✔
720
                case <-closer.HasBeenClosed():
65✔
721
                        return
65✔
722
                case <-readStateCh:
×
723
                        // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
724
                case req := <-n.requestCh:
29,327✔
725
                slurpLoop:
29,327✔
726
                        for {
58,725✔
727
                                requests = append(requests, req)
29,398✔
728
                                select {
29,398✔
729
                                case req = <-n.requestCh:
71✔
730
                                default:
29,327✔
731
                                        break slurpLoop
29,327✔
732
                                }
733
                        }
734
                        // Create one activeRctx slice for the read index, even if we have to call readIndex
735
                        // repeatedly. That way, we can process the requests as soon as we encounter the first
736
                        // activeRctx. This is better than flooding readIndex with a new activeRctx on each
737
                        // call, causing more unique traffic and further delays in request processing.
738
                        activeRctx := make([]byte, 8)
29,327✔
739
                        x.Check2(n.Rand.Read(activeRctx))
29,327✔
740
                        glog.V(4).Infof("Request readctx: %#x", activeRctx)
29,327✔
741
                        for {
58,763✔
742
                                index, err := readIndex(activeRctx)
29,436✔
743
                                if err == errInternalRetry {
29,545✔
744
                                        continue
109✔
745
                                }
746
                                if err != nil {
29,335✔
747
                                        index = 0
8✔
748
                                        glog.Errorf("[%#x] While trying to do lin read index: %v", n.Id, err)
8✔
749
                                }
8✔
750
                                for _, req := range requests {
58,725✔
751
                                        req.indexCh <- index
29,398✔
752
                                }
29,398✔
753
                                break
29,327✔
754
                        }
755
                        requests = requests[:0]
29,327✔
756
                }
757
        }
758
}
759

760
func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) {
34✔
761
        // Only process one JoinCluster request at a time.
34✔
762
        n.joinLock.Lock()
34✔
763
        defer n.joinLock.Unlock()
34✔
764

34✔
765
        // Check that the new node is from the same group as me.
34✔
766
        if rc.Group != n.RaftContext.Group {
34✔
767
                return nil, errors.Errorf("Raft group mismatch")
×
768
        }
×
769
        // Also check that the new node is not me.
770
        if rc.Id == n.RaftContext.Id {
34✔
771
                return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
×
772
        }
×
773

774
        // Check that the new node is not already part of the group.
775
        if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
34✔
776
                // There exists a healthy connection to server with same id.
×
777
                if _, err := GetPools().Get(addr); err == nil {
×
778
                        return &api.Payload{}, errors.Errorf(
×
779
                                "REUSE_ADDR: IP Address same as existing peer: %s", addr)
×
780
                }
×
781
        }
782
        n.Connect(rc.Id, rc.Addr)
34✔
783

34✔
784
        err := n.addToCluster(context.Background(), rc)
34✔
785
        glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err)
34✔
786
        return &api.Payload{}, err
34✔
787
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc