• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 4763653586

21 Apr 2023 10:46AM UTC coverage: 66.83% (-0.09%) from 66.924%
4763653586

push

GitHub
fix(vscode): fixed Jaeger parameters (#8801)

58024 of 86823 relevant lines covered (66.83%)

2239173.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.89
/conn/node.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package conn
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "encoding/binary"
24
        "fmt"
25
        "math/rand"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/golang/glog"
32
        "github.com/pkg/errors"
33
        "go.etcd.io/etcd/raft"
34
        "go.etcd.io/etcd/raft/raftpb"
35
        otrace "go.opencensus.io/trace"
36

37
        "github.com/dgraph-io/badger/v4/y"
38
        "github.com/dgraph-io/dgo/v210/protos/api"
39
        "github.com/dgraph-io/dgraph/protos/pb"
40
        "github.com/dgraph-io/dgraph/raftwal"
41
        "github.com/dgraph-io/dgraph/x"
42
        "github.com/dgraph-io/ristretto/z"
43
)
44

45
var (
46
        // ErrNoNode is returned when no node has been set up.
47
        ErrNoNode = errors.Errorf("No node has been set up yet")
48
)
49

50
// Node represents a node participating in the RAFT protocol.
51
type Node struct {
52
        x.SafeMutex
53

54
        // Applied is used to keep track of the applied RAFT proposals.
55
        // The stages are proposed -> committed (accepted by cluster) ->
56
        // applied (to PL) -> synced (to BadgerDB).
57
        // This needs to be 64 bit aligned for atomics to work on 32 bit machine.
58
        Applied y.WaterMark
59

60
        joinLock sync.Mutex
61

62
        // Used to keep track of lin read requests.
63
        requestCh chan linReadReq
64

65
        // SafeMutex is for fields which can be changed after init.
66
        _confState *raftpb.ConfState
67
        _raft      raft.Node
68

69
        // Fields which are never changed after init.
70
        StartTime       time.Time
71
        Cfg             *raft.Config
72
        MyAddr          string
73
        Id              uint64
74
        peers           map[uint64]string
75
        confChanges     map[uint64]chan error
76
        messages        chan sendmsg
77
        RaftContext     *pb.RaftContext
78
        Store           *raftwal.DiskStorage
79
        Rand            *rand.Rand
80
        tlsClientConfig *tls.Config
81

82
        Proposals proposals
83

84
        heartbeatsOut int64
85
        heartbeatsIn  int64
86
}
87

88
// NewNode returns a new Node instance.
89
func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node {
162✔
90
        snap, err := store.Snapshot()
162✔
91
        x.Check(err)
162✔
92

162✔
93
        n := &Node{
162✔
94
                StartTime: time.Now(),
162✔
95
                Id:        rc.Id,
162✔
96
                MyAddr:    rc.Addr,
162✔
97
                Store:     store,
162✔
98
                Cfg: &raft.Config{
162✔
99
                        ID:                       rc.Id,
162✔
100
                        ElectionTick:             20, // 2s if we call Tick() every 100 ms.
162✔
101
                        HeartbeatTick:            1,  // 100ms if we call Tick() every 100 ms.
162✔
102
                        Storage:                  store,
162✔
103
                        MaxInflightMsgs:          256,
162✔
104
                        MaxSizePerMsg:            256 << 10, // 256 KB should allow more batching.
162✔
105
                        MaxCommittedSizePerReady: 64 << 20,  // Avoid loading entire Raft log into memory.
162✔
106
                        // We don't need lease based reads. They cause issues because they
162✔
107
                        // require CheckQuorum to be true, and that causes a lot of issues
162✔
108
                        // for us during cluster bootstrapping and later. A seemingly
162✔
109
                        // healthy cluster would just cause leader to step down due to
162✔
110
                        // "inactive" quorum, and then disallow anyone from becoming leader.
162✔
111
                        // So, let's stick to default options.  Let's achieve correctness,
162✔
112
                        // then we achieve performance. Plus, for the Dgraph alphas, we'll
162✔
113
                        // be soon relying only on Timestamps for blocking reads and
162✔
114
                        // achieving linearizability, than checking quorums (Zero would
162✔
115
                        // still check quorums).
162✔
116
                        ReadOnlyOption: raft.ReadOnlySafe,
162✔
117
                        // When a disconnected node joins back, it forces a leader change,
162✔
118
                        // as it starts with a higher term, as described in Raft thesis (not
162✔
119
                        // the paper) in section 9.6. This setting can avoid that by only
162✔
120
                        // increasing the term, if the node has a good chance of becoming
162✔
121
                        // the leader.
162✔
122
                        PreVote: true,
162✔
123

162✔
124
                        // We can explicitly set Applied to the first index in the Raft log,
162✔
125
                        // so it does not derive it separately, thus avoiding a crash when
162✔
126
                        // the Applied is set to below snapshot index by Raft.
162✔
127
                        // In case this is a new Raft log, first would be 1, and therefore
162✔
128
                        // Applied would be zero, hence meeting the condition by the library
162✔
129
                        // that Applied should only be set during a restart.
162✔
130
                        //
162✔
131
                        // Update: Set the Applied to the latest snapshot, because it seems
162✔
132
                        // like somehow the first index can be out of sync with the latest
162✔
133
                        // snapshot.
162✔
134
                        Applied: snap.Metadata.Index,
162✔
135

162✔
136
                        Logger: &x.ToGlog{},
162✔
137
                },
162✔
138
                // processConfChange etc are not throttled so some extra delta, so that we don't
162✔
139
                // block tick when applyCh is full
162✔
140
                Applied:     y.WaterMark{Name: "Applied watermark"},
162✔
141
                RaftContext: rc,
162✔
142
                //nolint:gosec // random node id generator does not require cryptographic precision
162✔
143
                Rand:            rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
162✔
144
                confChanges:     make(map[uint64]chan error),
162✔
145
                messages:        make(chan sendmsg, 100),
162✔
146
                peers:           make(map[uint64]string),
162✔
147
                requestCh:       make(chan linReadReq, 100),
162✔
148
                tlsClientConfig: tlsConfig,
162✔
149
        }
162✔
150
        n.Applied.Init(nil)
162✔
151
        // This should match up to the Applied index set above.
162✔
152
        n.Applied.SetDoneUntil(n.Cfg.Applied)
162✔
153
        glog.Infof("Setting raft.Config to: %+v", n.Cfg)
162✔
154
        return n
162✔
155
}
162✔
156

157
// ReportRaftComms periodically prints the state of the node (heartbeats in and out).
158
func (n *Node) ReportRaftComms() {
161✔
159
        if !glog.V(3) {
319✔
160
                return
158✔
161
        }
158✔
162
        ticker := time.NewTicker(time.Second)
3✔
163
        defer ticker.Stop()
3✔
164

3✔
165
        for range ticker.C {
32✔
166
                out := atomic.SwapInt64(&n.heartbeatsOut, 0)
29✔
167
                in := atomic.SwapInt64(&n.heartbeatsIn, 0)
29✔
168
                glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
29✔
169
        }
29✔
170
}
171

172
// SetRaft would set the provided raft.Node to this node.
173
// It would check fail if the node is already set.
174
func (n *Node) SetRaft(r raft.Node) {
162✔
175
        n.Lock()
162✔
176
        defer n.Unlock()
162✔
177
        x.AssertTrue(n._raft == nil)
162✔
178
        n._raft = r
162✔
179
}
162✔
180

181
// Raft would return back the raft.Node stored in the node.
182
func (n *Node) Raft() raft.Node {
3,603,981✔
183
        n.RLock()
3,603,981✔
184
        defer n.RUnlock()
3,603,981✔
185
        return n._raft
3,603,981✔
186
}
3,603,981✔
187

188
// SetConfState would store the latest ConfState generated by ApplyConfChange.
189
func (n *Node) SetConfState(cs *raftpb.ConfState) {
266✔
190
        glog.Infof("Setting conf state to %+v\n", cs)
266✔
191
        n.Lock()
266✔
192
        defer n.Unlock()
266✔
193
        n._confState = cs
266✔
194
}
266✔
195

196
// DoneConfChange marks a configuration change as done and sends the given error to the
197
// config channel.
198
func (n *Node) DoneConfChange(id uint64, err error) {
265✔
199
        n.Lock()
265✔
200
        defer n.Unlock()
265✔
201
        ch, has := n.confChanges[id]
265✔
202
        if !has {
493✔
203
                return
228✔
204
        }
228✔
205
        delete(n.confChanges, id)
37✔
206
        ch <- err
37✔
207
}
208

209
//nolint:gosec // random node id generator does not require cryptographic precision
210
func (n *Node) storeConfChange(che chan error) uint64 {
39✔
211
        n.Lock()
39✔
212
        defer n.Unlock()
39✔
213
        id := rand.Uint64()
39✔
214
        _, has := n.confChanges[id]
39✔
215
        for has {
39✔
216
                id = rand.Uint64()
×
217
                _, has = n.confChanges[id]
×
218
        }
×
219
        n.confChanges[id] = che
39✔
220
        return id
39✔
221
}
222

223
// ConfState would return the latest ConfState stored in node.
224
func (n *Node) ConfState() *raftpb.ConfState {
8,052✔
225
        n.RLock()
8,052✔
226
        defer n.RUnlock()
8,052✔
227
        return n._confState
8,052✔
228
}
8,052✔
229

230
// Peer returns the address of the peer with the given id.
231
func (n *Node) Peer(pid uint64) (string, bool) {
617✔
232
        n.RLock()
617✔
233
        defer n.RUnlock()
617✔
234
        addr, ok := n.peers[pid]
617✔
235
        return addr, ok
617✔
236
}
617✔
237

238
// SetPeer sets the address of the peer with the given id. The address must not be empty.
239
func (n *Node) SetPeer(pid uint64, addr string) {
104✔
240
        x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
104✔
241
        n.Lock()
104✔
242
        defer n.Unlock()
104✔
243
        n.peers[pid] = addr
104✔
244
}
104✔
245

246
// Send sends the given RAFT message from this node.
247
func (n *Node) Send(msg *raftpb.Message) {
959,265✔
248
        x.AssertTruef(n.Id != msg.To, "Sending message to itself")
959,265✔
249
        data, err := msg.Marshal()
959,265✔
250
        x.Check(err)
959,265✔
251

959,265✔
252
        if glog.V(2) {
1,918,530✔
253
                switch msg.Type {
959,265✔
254
                case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
476,504✔
255
                        atomic.AddInt64(&n.heartbeatsOut, 1)
476,504✔
256
                case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
×
257
                case raftpb.MsgApp, raftpb.MsgAppResp:
482,214✔
258
                case raftpb.MsgProp:
501✔
259
                default:
46✔
260
                        glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
46✔
261
                }
262
        }
263
        // As long as leadership is stable, any attempted Propose() calls should be reflected in the
264
        // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
265
        // MsgProp to the leader. It is up to the transport layer to get those messages to their
266
        // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
267
        // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
268
        // leadership transitions, proposals may get dropped even if the network is reliable.
269
        //
270
        // We can't do a select default here. The messages must be sent to the channel, otherwise we
271
        // should block until the channel can accept these messages. BatchAndSendMessages would take
272
        // care of dropping messages which can't be sent due to network issues to the corresponding
273
        // node. But, we shouldn't take the liberty to do that here. It would take us more time to
274
        // repropose these dropped messages anyway, than to block here a bit waiting for the messages
275
        // channel to clear out.
276
        n.messages <- sendmsg{to: msg.To, data: data}
959,265✔
277
}
278

279
// Snapshot returns the current snapshot.
280
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
×
281
        if n == nil || n.Store == nil {
×
282
                return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store")
×
283
        }
×
284
        return n.Store.Snapshot()
×
285
}
286

287
// SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.
288
func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot) {
843,619✔
289
        for {
1,687,238✔
290
                if err := n.Store.Save(h, es, s); err != nil {
843,619✔
291
                        glog.Errorf("While trying to save Raft update: %v. Retrying...", err)
×
292
                } else {
843,619✔
293
                        return
843,619✔
294
                }
843,619✔
295
        }
296
}
297

298
// PastLife returns the index of the snapshot before the restart (if any) and whether there was
299
// a previous state that should be recovered after a restart.
300
func (n *Node) PastLife() (uint64, bool, error) {
161✔
301
        var (
161✔
302
                sp      raftpb.Snapshot
161✔
303
                idx     uint64
161✔
304
                restart bool
161✔
305
                rerr    error
161✔
306
        )
161✔
307
        sp, rerr = n.Store.Snapshot()
161✔
308
        if rerr != nil {
161✔
309
                return 0, false, rerr
×
310
        }
×
311
        if !raft.IsEmptySnap(sp) {
162✔
312
                glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
1✔
313
                restart = true
1✔
314
                idx = sp.Metadata.Index
1✔
315
        }
1✔
316

317
        var hd raftpb.HardState
161✔
318
        hd, rerr = n.Store.HardState()
161✔
319
        if rerr != nil {
161✔
320
                return 0, false, rerr
×
321
        }
×
322
        if !raft.IsEmptyHardState(hd) {
162✔
323
                glog.Infof("Found hardstate: %+v\n", hd)
1✔
324
                restart = true
1✔
325
        }
1✔
326

327
        num := n.Store.NumEntries()
161✔
328
        glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
161✔
329
        // We'll always have at least one entry.
161✔
330
        if num > 1 {
162✔
331
                restart = true
1✔
332
        }
1✔
333
        return idx, restart, nil
161✔
334
}
335

336
const (
337
        messageBatchSoftLimit = 10e6
338
)
339

340
type stream struct {
341
        msgCh chan []byte
342
        alive int32
343
}
344

345
// BatchAndSendMessages sends messages in batches.
346
func (n *Node) BatchAndSendMessages() {
161✔
347
        batches := make(map[uint64]*bytes.Buffer)
161✔
348
        streams := make(map[uint64]*stream)
161✔
349

161✔
350
        for {
699,487✔
351
                totalSize := 0
699,326✔
352
                sm := <-n.messages
699,326✔
353
        slurp_loop:
699,326✔
354
                for {
1,658,591✔
355
                        var buf *bytes.Buffer
959,265✔
356
                        if b, ok := batches[sm.to]; !ok {
959,343✔
357
                                buf = new(bytes.Buffer)
78✔
358
                                batches[sm.to] = buf
78✔
359
                        } else {
959,265✔
360
                                buf = b
959,187✔
361
                        }
959,187✔
362
                        totalSize += 4 + len(sm.data)
959,265✔
363
                        x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
959,265✔
364
                        x.Check2(buf.Write(sm.data))
959,265✔
365

959,265✔
366
                        if totalSize > messageBatchSoftLimit {
959,265✔
367
                                // We limit the batch size, but we aren't pushing back on
×
368
                                // n.messages, because the loop below spawns a goroutine
×
369
                                // to do its dirty work.  This is good because right now
×
370
                                // (*node).send fails(!) if the channel is full.
×
371
                                break
×
372
                        }
373

374
                        select {
959,265✔
375
                        case sm = <-n.messages:
260,100✔
376
                        default:
699,165✔
377
                                break slurp_loop
699,165✔
378
                        }
379
                }
380

381
                for to, buf := range batches {
1,637,692✔
382
                        if buf.Len() == 0 {
942,704✔
383
                                continue
4,177✔
384
                        }
385
                        s, ok := streams[to]
934,350✔
386
                        if !ok || atomic.LoadInt32(&s.alive) <= 0 {
934,470✔
387
                                s = &stream{
120✔
388
                                        msgCh: make(chan []byte, 100),
120✔
389
                                        alive: 1,
120✔
390
                                }
120✔
391
                                go n.streamMessages(to, s)
120✔
392
                                streams[to] = s
120✔
393
                        }
120✔
394
                        data := make([]byte, buf.Len())
934,350✔
395
                        copy(data, buf.Bytes())
934,350✔
396
                        buf.Reset()
934,350✔
397

934,350✔
398
                        select {
934,350✔
399
                        case s.msgCh <- data:
933,268✔
400
                        default:
1,082✔
401
                        }
402
                }
403
        }
404
}
405

406
func (n *Node) streamMessages(to uint64, s *stream) {
120✔
407
        defer atomic.StoreInt32(&s.alive, 0)
120✔
408

120✔
409
        // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
120✔
410
        // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
120✔
411
        // a partition and get a new leader.
120✔
412
        deadline := time.Now().Add(10 * time.Second)
120✔
413
        ticker := time.NewTicker(time.Second)
120✔
414
        defer ticker.Stop()
120✔
415

120✔
416
        var logged int
120✔
417
        for range ticker.C { // Don't do this in an busy-wait loop, use a ticker.
506✔
418
                // doSendMessage would block doing a stream. So, time.Now().After is
386✔
419
                // only there to avoid a busy-wait.
386✔
420
                if err := n.doSendMessage(to, s.msgCh); err != nil {
724✔
421
                        // Update lastLog so we print error only a few times if we are not able to connect.
338✔
422
                        // Otherwise, the log is polluted with repeated errors.
338✔
423
                        if logged == 0 {
415✔
424
                                glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
77✔
425
                                logged++
77✔
426
                        }
77✔
427
                }
428
                if time.Now().After(deadline) {
382✔
429
                        return
44✔
430
                }
44✔
431
        }
432
}
433

434
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
386✔
435
        addr, has := n.Peer(to)
386✔
436
        if !has {
386✔
437
                return errors.Errorf("Do not have address of peer %#x", to)
×
438
        }
×
439
        pool, err := GetPools().Get(addr)
386✔
440
        if err != nil {
600✔
441
                return err
214✔
442
        }
214✔
443

444
        c := pb.NewRaftClient(pool.Get())
172✔
445
        ctx, span := otrace.StartSpan(context.Background(),
172✔
446
                fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
172✔
447
        defer span.End()
172✔
448

172✔
449
        mc, err := c.RaftMessage(ctx)
172✔
450
        if err != nil {
264✔
451
                return err
92✔
452
        }
92✔
453

454
        var packets, lastPackets uint64
76✔
455
        slurp := func(batch *pb.RaftBatch) {
928,143✔
456
                for {
1,858,324✔
457
                        if len(batch.Payload.Data) > messageBatchSoftLimit {
930,257✔
458
                                return
×
459
                        }
×
460
                        select {
930,257✔
461
                        case data := <-msgCh:
2,190✔
462
                                batch.Payload.Data = append(batch.Payload.Data, data...)
2,190✔
463
                                packets++
2,190✔
464
                        default:
928,067✔
465
                                return
928,067✔
466
                        }
467
                }
468
        }
469

470
        ctx = mc.Context()
76✔
471

76✔
472
        fastTick := time.NewTicker(5 * time.Second)
76✔
473
        defer fastTick.Stop()
76✔
474

76✔
475
        ticker := time.NewTicker(3 * time.Minute)
76✔
476
        defer ticker.Stop()
76✔
477

76✔
478
        for {
936,041✔
479
                select {
935,965✔
480
                case data := <-msgCh:
928,067✔
481
                        batch := &pb.RaftBatch{
928,067✔
482
                                Context: n.RaftContext,
928,067✔
483
                                Payload: &api.Payload{Data: data},
928,067✔
484
                        }
928,067✔
485
                        slurp(batch) // Pick up more entries from msgCh, if present.
928,067✔
486
                        span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
928,067✔
487
                                to, packets, len(batch.Payload.Data))
928,067✔
488
                        if packets%10000 == 0 {
928,143✔
489
                                glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
76✔
490
                                        to, packets, len(batch.Payload.Data))
76✔
491
                        }
76✔
492
                        packets++
928,067✔
493
                        if err := mc.Send(batch); err != nil {
928,099✔
494
                                span.Annotatef(nil, "Error while mc.Send: %v", err)
32✔
495
                                glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
32✔
496
                                switch {
32✔
497
                                case strings.Contains(err.Error(), "TransientFailure"):
×
498
                                        glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
×
499
                                        n.Raft().ReportUnreachable(to)
×
500
                                        pool.SetUnhealthy()
×
501
                                default:
32✔
502
                                }
503
                                // We don't need to do anything if we receive any error while sending message.
504
                                // RAFT would automatically retry.
505
                                return err
32✔
506
                        }
507
                case <-fastTick.C:
7,662✔
508
                        // We use this ticker, because during network partitions, mc.Send is
7,662✔
509
                        // unable to actually send packets, and also does not complain about
7,662✔
510
                        // them. We could have potentially used the separately tracked
7,662✔
511
                        // heartbeats to check this, but what we have observed is that
7,662✔
512
                        // incoming traffic might be OK, but outgoing might not be. So, this
7,662✔
513
                        // is a better way for us to verify whether this particular outbound
7,662✔
514
                        // connection is valid or not.
7,662✔
515
                        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
7,662✔
516
                        _, err := c.IsPeer(ctx, n.RaftContext)
7,662✔
517
                        cancel()
7,662✔
518
                        if err != nil {
7,662✔
519
                                glog.Errorf("Error while calling IsPeer %v. Reporting %x as unreachable.", err, to)
×
520
                                n.Raft().ReportUnreachable(to)
×
521
                                pool.SetUnhealthy()
×
522
                                return errors.Wrapf(err, "while calling IsPeer %x", to)
×
523
                        }
×
524
                case <-ticker.C:
192✔
525
                        if lastPackets == packets {
192✔
526
                                span.Annotatef(nil,
×
527
                                        "No activity for a while [Packets == %d]. Closing connection.", packets)
×
528
                                return mc.CloseSend()
×
529
                        }
×
530
                        lastPackets = packets
192✔
531
                case <-ctx.Done():
×
532
                        return ctx.Err()
×
533
                }
534
        }
535
}
536

537
// Connect connects the node and makes its peerPool refer to the constructed pool and address
538
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
539
// case this does nothing.)
540
func (n *Node) Connect(pid uint64, addr string) {
283✔
541
        if pid == n.Id {
371✔
542
                return
88✔
543
        }
88✔
544
        if paddr, ok := n.Peer(pid); ok && paddr == addr {
286✔
545
                // Already connected.
91✔
546
                return
91✔
547
        }
91✔
548
        // Here's what we do.  Right now peerPool maps peer node id's to addr values.  If
549
        // a *pool can be created, good, but if not, we still create a peerPoolEntry with
550
        // a nil *pool.
551
        if addr == n.MyAddr {
104✔
552
                // TODO: Note this fact in more general peer health info somehow.
×
553
                glog.Infof("Peer %d claims same host as me\n", pid)
×
554
                n.SetPeer(pid, addr)
×
555
                return
×
556
        }
×
557
        GetPools().Connect(addr, n.tlsClientConfig)
104✔
558
        n.SetPeer(pid, addr)
104✔
559
}
560

561
// DeletePeer deletes the record of the peer with the given id.
562
func (n *Node) DeletePeer(pid uint64) {
×
563
        if pid == n.Id {
×
564
                return
×
565
        }
×
566
        n.Lock()
×
567
        defer n.Unlock()
×
568
        delete(n.peers, pid)
×
569
}
570

571
var errInternalRetry = errors.New("Retry proposal again")
572

573
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
39✔
574
        cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
39✔
575
        defer cancel()
39✔
576

39✔
577
        ch := make(chan error, 1)
39✔
578
        id := n.storeConfChange(ch)
39✔
579
        // TODO: Delete id from the map.
39✔
580
        conf.ID = id
39✔
581
        if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
40✔
582
                if cctx.Err() != nil {
2✔
583
                        return errInternalRetry
1✔
584
                }
1✔
585
                glog.Warningf("Error while proposing conf change: %v", err)
×
586
                return err
×
587
        }
588
        select {
38✔
589
        case err := <-ch:
36✔
590
                return err
36✔
591
        case <-ctx.Done():
×
592
                return ctx.Err()
×
593
        case <-cctx.Done():
2✔
594
                return errInternalRetry
2✔
595
        }
596
}
597

598
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
36✔
599
        pid := rc.Id
36✔
600
        rc.SnapshotTs = 0
36✔
601
        rcBytes, err := rc.Marshal()
36✔
602
        x.Check(err)
36✔
603

36✔
604
        cc := raftpb.ConfChange{
36✔
605
                Type:    raftpb.ConfChangeAddNode,
36✔
606
                NodeID:  pid,
36✔
607
                Context: rcBytes,
36✔
608
        }
36✔
609
        if rc.IsLearner {
36✔
610
                cc.Type = raftpb.ConfChangeAddLearnerNode
×
611
        }
×
612

613
        err = errInternalRetry
36✔
614
        for err == errInternalRetry {
75✔
615
                glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
39✔
616
                glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
39✔
617
                err = n.proposeConfChange(ctx, cc)
39✔
618
        }
39✔
619
        return err
36✔
620
}
621

622
// ProposePeerRemoval proposes a new configuration with the peer with the given id removed.
623
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
×
624
        if n.Raft() == nil {
×
625
                return ErrNoNode
×
626
        }
×
627
        if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
×
628
                return errors.Errorf("Node %#x not part of group", id)
×
629
        }
×
630
        cc := raftpb.ConfChange{
×
631
                Type:   raftpb.ConfChangeRemoveNode,
×
632
                NodeID: id,
×
633
        }
×
634
        err := errInternalRetry
×
635
        for err == errInternalRetry {
×
636
                err = n.proposeConfChange(ctx, cc)
×
637
        }
×
638
        return err
×
639
}
640

641
type linReadReq struct {
642
        // A one-shot chan which we send a raft index upon.
643
        indexCh chan<- uint64
644
}
645

646
var errReadIndex = errors.Errorf(
647
        "Cannot get linearized read (time expired or no configured leader)")
648

649
var readIndexOk, readIndexTotal uint64
650

651
// WaitLinearizableRead waits until a linearizable read can be performed.
652
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
28,480✔
653
        span := otrace.FromContext(ctx)
28,480✔
654
        span.Annotate(nil, "WaitLinearizableRead")
28,480✔
655

28,480✔
656
        if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
28,500✔
657
                glog.V(2).Infof("ReadIndex Total: %d\n", num)
20✔
658
        }
20✔
659
        indexCh := make(chan uint64, 1)
28,480✔
660
        select {
28,480✔
661
        case n.requestCh <- linReadReq{indexCh: indexCh}:
28,480✔
662
                span.Annotate(nil, "Pushed to requestCh")
28,480✔
663
        case <-ctx.Done():
×
664
                span.Annotate(nil, "Context expired")
×
665
                return ctx.Err()
×
666
        }
667

668
        select {
28,480✔
669
        case index := <-indexCh:
28,461✔
670
                span.Annotatef(nil, "Received index: %d", index)
28,461✔
671
                if index == 0 {
28,464✔
672
                        return errReadIndex
3✔
673
                } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
28,481✔
674
                        glog.V(2).Infof("ReadIndex OK: %d\n", num)
20✔
675
                }
20✔
676
                err := n.Applied.WaitForMark(ctx, index)
28,458✔
677
                span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
28,458✔
678
                return err
28,458✔
679
        case <-ctx.Done():
19✔
680
                span.Annotate(nil, "Context expired")
19✔
681
                return ctx.Err()
19✔
682
        }
683
}
684

685
// RunReadIndexLoop runs the RAFT index in a loop.
686
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
65✔
687
        defer closer.Done()
65✔
688
        readIndex := func(activeRctx []byte) (uint64, error) {
28,617✔
689
                // Read Request can get rejected then we would wait indefinitely on the channel
28,552✔
690
                // so have a timeout.
28,552✔
691
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
28,552✔
692
                defer cancel()
28,552✔
693

28,552✔
694
                if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
28,555✔
695
                        glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
3✔
696
                        return 0, err
3✔
697
                }
3✔
698

699
        again:
28,549✔
700
                select {
28,549✔
701
                case <-closer.HasBeenClosed():
6✔
702
                        return 0, errors.New("Closer has been called")
6✔
703
                case rs := <-readStateCh:
28,441✔
704
                        if !bytes.Equal(activeRctx, rs.RequestCtx) {
28,441✔
705
                                glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
×
706
                                goto again
×
707
                        }
708
                        return rs.Index, nil
28,441✔
709
                case <-ctx.Done():
102✔
710
                        glog.Warningf("[%#x] Read index context timed out\n", n.Id)
102✔
711
                        return 0, errInternalRetry
102✔
712
                }
713
        } // end of readIndex func
714

715
        // We maintain one linearizable ReadIndex request at a time.  Others wait queued behind
716
        // requestCh.
717
        requests := []linReadReq{}
65✔
718
        for {
28,580✔
719
                select {
28,515✔
720
                case <-closer.HasBeenClosed():
65✔
721
                        return
65✔
722
                case <-readStateCh:
×
723
                        // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
724
                case req := <-n.requestCh:
28,450✔
725
                slurpLoop:
28,450✔
726
                        for {
56,918✔
727
                                requests = append(requests, req)
28,468✔
728
                                select {
28,468✔
729
                                case req = <-n.requestCh:
18✔
730
                                default:
28,450✔
731
                                        break slurpLoop
28,450✔
732
                                }
733
                        }
734
                        // Create one activeRctx slice for the read index, even if we have to call readIndex
735
                        // repeatedly. That way, we can process the requests as soon as we encounter the first
736
                        // activeRctx. This is better than flooding readIndex with a new activeRctx on each
737
                        // call, causing more unique traffic and further delays in request processing.
738
                        activeRctx := make([]byte, 8)
28,450✔
739
                        x.Check2(n.Rand.Read(activeRctx))
28,450✔
740
                        glog.V(4).Infof("Request readctx: %#x", activeRctx)
28,450✔
741
                        for {
57,002✔
742
                                index, err := readIndex(activeRctx)
28,552✔
743
                                if err == errInternalRetry {
28,654✔
744
                                        continue
102✔
745
                                }
746
                                if err != nil {
28,459✔
747
                                        index = 0
9✔
748
                                        glog.Errorf("[%#x] While trying to do lin read index: %v", n.Id, err)
9✔
749
                                }
9✔
750
                                for _, req := range requests {
56,918✔
751
                                        req.indexCh <- index
28,468✔
752
                                }
28,468✔
753
                                break
28,450✔
754
                        }
755
                        requests = requests[:0]
28,450✔
756
                }
757
        }
758
}
759

760
func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) {
36✔
761
        // Only process one JoinCluster request at a time.
36✔
762
        n.joinLock.Lock()
36✔
763
        defer n.joinLock.Unlock()
36✔
764

36✔
765
        // Check that the new node is from the same group as me.
36✔
766
        if rc.Group != n.RaftContext.Group {
36✔
767
                return nil, errors.Errorf("Raft group mismatch")
×
768
        }
×
769
        // Also check that the new node is not me.
770
        if rc.Id == n.RaftContext.Id {
36✔
771
                return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
×
772
        }
×
773

774
        // Check that the new node is not already part of the group.
775
        if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
36✔
776
                // There exists a healthy connection to server with same id.
×
777
                if _, err := GetPools().Get(addr); err == nil {
×
778
                        return &api.Payload{}, errors.Errorf(
×
779
                                "REUSE_ADDR: IP Address same as existing peer: %s", addr)
×
780
                }
×
781
        }
782
        n.Connect(rc.Id, rc.Addr)
36✔
783

36✔
784
        err := n.addToCluster(context.Background(), rc)
36✔
785
        glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err)
36✔
786
        return &api.Payload{}, err
36✔
787
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc