• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5078820494

25 May 2023 11:19AM UTC coverage: 67.259% (-0.009%) from 67.268%
5078820494

push

GitHub
dgraphtest: print container logs if the test fails (#8829)

58396 of 86823 relevant lines covered (67.26%)

2270884.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.63
/conn/node.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package conn
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "encoding/binary"
24
        "fmt"
25
        "math/rand"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/golang/glog"
32
        "github.com/pkg/errors"
33
        "go.etcd.io/etcd/raft"
34
        "go.etcd.io/etcd/raft/raftpb"
35
        otrace "go.opencensus.io/trace"
36

37
        "github.com/dgraph-io/badger/v4/y"
38
        "github.com/dgraph-io/dgo/v230/protos/api"
39
        "github.com/dgraph-io/dgraph/protos/pb"
40
        "github.com/dgraph-io/dgraph/raftwal"
41
        "github.com/dgraph-io/dgraph/x"
42
        "github.com/dgraph-io/ristretto/z"
43
)
44

45
var (
46
        // ErrNoNode is returned when no node has been set up.
47
        ErrNoNode = errors.Errorf("No node has been set up yet")
48
)
49

50
// Node represents a node participating in the RAFT protocol.
51
type Node struct {
52
        x.SafeMutex
53

54
        // Applied is used to keep track of the applied RAFT proposals.
55
        // The stages are proposed -> committed (accepted by cluster) ->
56
        // applied (to PL) -> synced (to BadgerDB).
57
        // This needs to be 64 bit aligned for atomics to work on 32 bit machine.
58
        Applied y.WaterMark
59

60
        joinLock sync.Mutex
61

62
        // Used to keep track of lin read requests.
63
        requestCh chan linReadReq
64

65
        // SafeMutex is for fields which can be changed after init.
66
        _confState *raftpb.ConfState
67
        _raft      raft.Node
68

69
        // Fields which are never changed after init.
70
        StartTime       time.Time
71
        Cfg             *raft.Config
72
        MyAddr          string
73
        Id              uint64
74
        peers           map[uint64]string
75
        confChanges     map[uint64]chan error
76
        messages        chan sendmsg
77
        RaftContext     *pb.RaftContext
78
        Store           *raftwal.DiskStorage
79
        Rand            *rand.Rand
80
        tlsClientConfig *tls.Config
81

82
        Proposals proposals
83

84
        heartbeatsOut int64
85
        heartbeatsIn  int64
86
}
87

88
// NewNode returns a new Node instance.
89
func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node {
160✔
90
        snap, err := store.Snapshot()
160✔
91
        x.Check(err)
160✔
92

160✔
93
        n := &Node{
160✔
94
                StartTime: time.Now(),
160✔
95
                Id:        rc.Id,
160✔
96
                MyAddr:    rc.Addr,
160✔
97
                Store:     store,
160✔
98
                Cfg: &raft.Config{
160✔
99
                        ID:                       rc.Id,
160✔
100
                        ElectionTick:             20, // 2s if we call Tick() every 100 ms.
160✔
101
                        HeartbeatTick:            1,  // 100ms if we call Tick() every 100 ms.
160✔
102
                        Storage:                  store,
160✔
103
                        MaxInflightMsgs:          256,
160✔
104
                        MaxSizePerMsg:            256 << 10, // 256 KB should allow more batching.
160✔
105
                        MaxCommittedSizePerReady: 64 << 20,  // Avoid loading entire Raft log into memory.
160✔
106
                        // We don't need lease based reads. They cause issues because they
160✔
107
                        // require CheckQuorum to be true, and that causes a lot of issues
160✔
108
                        // for us during cluster bootstrapping and later. A seemingly
160✔
109
                        // healthy cluster would just cause leader to step down due to
160✔
110
                        // "inactive" quorum, and then disallow anyone from becoming leader.
160✔
111
                        // So, let's stick to default options.  Let's achieve correctness,
160✔
112
                        // then we achieve performance. Plus, for the Dgraph alphas, we'll
160✔
113
                        // be soon relying only on Timestamps for blocking reads and
160✔
114
                        // achieving linearizability, than checking quorums (Zero would
160✔
115
                        // still check quorums).
160✔
116
                        ReadOnlyOption: raft.ReadOnlySafe,
160✔
117
                        // When a disconnected node joins back, it forces a leader change,
160✔
118
                        // as it starts with a higher term, as described in Raft thesis (not
160✔
119
                        // the paper) in section 9.6. This setting can avoid that by only
160✔
120
                        // increasing the term, if the node has a good chance of becoming
160✔
121
                        // the leader.
160✔
122
                        PreVote: true,
160✔
123

160✔
124
                        // We can explicitly set Applied to the first index in the Raft log,
160✔
125
                        // so it does not derive it separately, thus avoiding a crash when
160✔
126
                        // the Applied is set to below snapshot index by Raft.
160✔
127
                        // In case this is a new Raft log, first would be 1, and therefore
160✔
128
                        // Applied would be zero, hence meeting the condition by the library
160✔
129
                        // that Applied should only be set during a restart.
160✔
130
                        //
160✔
131
                        // Update: Set the Applied to the latest snapshot, because it seems
160✔
132
                        // like somehow the first index can be out of sync with the latest
160✔
133
                        // snapshot.
160✔
134
                        Applied: snap.Metadata.Index,
160✔
135

160✔
136
                        Logger: &x.ToGlog{},
160✔
137
                },
160✔
138
                // processConfChange etc are not throttled so some extra delta, so that we don't
160✔
139
                // block tick when applyCh is full
160✔
140
                Applied:     y.WaterMark{Name: "Applied watermark"},
160✔
141
                RaftContext: rc,
160✔
142
                //nolint:gosec // random node id generator does not require cryptographic precision
160✔
143
                Rand:            rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
160✔
144
                confChanges:     make(map[uint64]chan error),
160✔
145
                messages:        make(chan sendmsg, 100),
160✔
146
                peers:           make(map[uint64]string),
160✔
147
                requestCh:       make(chan linReadReq, 100),
160✔
148
                tlsClientConfig: tlsConfig,
160✔
149
        }
160✔
150
        n.Applied.Init(nil)
160✔
151
        // This should match up to the Applied index set above.
160✔
152
        n.Applied.SetDoneUntil(n.Cfg.Applied)
160✔
153
        glog.Infof("Setting raft.Config to: %+v", n.Cfg)
160✔
154
        return n
160✔
155
}
160✔
156

157
// ReportRaftComms periodically prints the state of the node (heartbeats in and out).
158
func (n *Node) ReportRaftComms() {
159✔
159
        if !glog.V(3) {
315✔
160
                return
156✔
161
        }
156✔
162
        ticker := time.NewTicker(time.Second)
3✔
163
        defer ticker.Stop()
3✔
164

3✔
165
        for range ticker.C {
33✔
166
                out := atomic.SwapInt64(&n.heartbeatsOut, 0)
30✔
167
                in := atomic.SwapInt64(&n.heartbeatsIn, 0)
30✔
168
                glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
30✔
169
        }
30✔
170
}
171

172
// SetRaft would set the provided raft.Node to this node.
173
// It would check fail if the node is already set.
174
func (n *Node) SetRaft(r raft.Node) {
160✔
175
        n.Lock()
160✔
176
        defer n.Unlock()
160✔
177
        x.AssertTrue(n._raft == nil)
160✔
178
        n._raft = r
160✔
179
}
160✔
180

181
// Raft would return back the raft.Node stored in the node.
182
func (n *Node) Raft() raft.Node {
3,624,290✔
183
        n.RLock()
3,624,290✔
184
        defer n.RUnlock()
3,624,290✔
185
        return n._raft
3,624,290✔
186
}
3,624,290✔
187

188
// SetConfState would store the latest ConfState generated by ApplyConfChange.
189
func (n *Node) SetConfState(cs *raftpb.ConfState) {
266✔
190
        glog.Infof("Setting conf state to %+v\n", cs)
266✔
191
        n.Lock()
266✔
192
        defer n.Unlock()
266✔
193
        n._confState = cs
266✔
194
}
266✔
195

196
// DoneConfChange marks a configuration change as done and sends the given error to the
197
// config channel.
198
func (n *Node) DoneConfChange(id uint64, err error) {
265✔
199
        n.Lock()
265✔
200
        defer n.Unlock()
265✔
201
        ch, has := n.confChanges[id]
265✔
202
        if !has {
492✔
203
                return
227✔
204
        }
227✔
205
        delete(n.confChanges, id)
38✔
206
        ch <- err
38✔
207
}
208

209
//nolint:gosec // random node id generator does not require cryptographic precision
210
func (n *Node) storeConfChange(che chan error) uint64 {
41✔
211
        n.Lock()
41✔
212
        defer n.Unlock()
41✔
213
        id := rand.Uint64()
41✔
214
        _, has := n.confChanges[id]
41✔
215
        for has {
41✔
216
                id = rand.Uint64()
×
217
                _, has = n.confChanges[id]
×
218
        }
×
219
        n.confChanges[id] = che
41✔
220
        return id
41✔
221
}
222

223
// ConfState would return the latest ConfState stored in node.
224
func (n *Node) ConfState() *raftpb.ConfState {
7,996✔
225
        n.RLock()
7,996✔
226
        defer n.RUnlock()
7,996✔
227
        return n._confState
7,996✔
228
}
7,996✔
229

230
// Peer returns the address of the peer with the given id.
231
func (n *Node) Peer(pid uint64) (string, bool) {
641✔
232
        n.RLock()
641✔
233
        defer n.RUnlock()
641✔
234
        addr, ok := n.peers[pid]
641✔
235
        return addr, ok
641✔
236
}
641✔
237

238
// SetPeer sets the address of the peer with the given id. The address must not be empty.
239
func (n *Node) SetPeer(pid uint64, addr string) {
106✔
240
        x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
106✔
241
        n.Lock()
106✔
242
        defer n.Unlock()
106✔
243
        n.peers[pid] = addr
106✔
244
}
106✔
245

246
// Send sends the given RAFT message from this node.
247
func (n *Node) Send(msg *raftpb.Message) {
963,555✔
248
        x.AssertTruef(n.Id != msg.To, "Sending message to itself")
963,555✔
249
        data, err := msg.Marshal()
963,555✔
250
        x.Check(err)
963,555✔
251

963,555✔
252
        if glog.V(2) {
1,927,110✔
253
                switch msg.Type {
963,555✔
254
                case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
473,572✔
255
                        atomic.AddInt64(&n.heartbeatsOut, 1)
473,572✔
256
                case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
×
257
                case raftpb.MsgApp, raftpb.MsgAppResp:
489,482✔
258
                case raftpb.MsgProp:
466✔
259
                default:
35✔
260
                        glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
35✔
261
                }
262
        }
263
        // As long as leadership is stable, any attempted Propose() calls should be reflected in the
264
        // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
265
        // MsgProp to the leader. It is up to the transport layer to get those messages to their
266
        // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
267
        // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
268
        // leadership transitions, proposals may get dropped even if the network is reliable.
269
        //
270
        // We can't do a select default here. The messages must be sent to the channel, otherwise we
271
        // should block until the channel can accept these messages. BatchAndSendMessages would take
272
        // care of dropping messages which can't be sent due to network issues to the corresponding
273
        // node. But, we shouldn't take the liberty to do that here. It would take us more time to
274
        // repropose these dropped messages anyway, than to block here a bit waiting for the messages
275
        // channel to clear out.
276
        n.messages <- sendmsg{to: msg.To, data: data}
963,555✔
277
}
278

279
// Snapshot returns the current snapshot.
280
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
×
281
        if n == nil || n.Store == nil {
×
282
                return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store")
×
283
        }
×
284
        return n.Store.Snapshot()
×
285
}
286

287
// SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.
288
func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot) {
848,763✔
289
        for {
1,697,526✔
290
                if err := n.Store.Save(h, es, s); err != nil {
848,763✔
291
                        glog.Errorf("While trying to save Raft update: %v. Retrying...", err)
×
292
                } else {
848,763✔
293
                        return
848,763✔
294
                }
848,763✔
295
        }
296
}
297

298
// PastLife returns the index of the snapshot before the restart (if any) and whether there was
299
// a previous state that should be recovered after a restart.
300
func (n *Node) PastLife() (uint64, bool, error) {
159✔
301
        var (
159✔
302
                sp      raftpb.Snapshot
159✔
303
                idx     uint64
159✔
304
                restart bool
159✔
305
                rerr    error
159✔
306
        )
159✔
307
        sp, rerr = n.Store.Snapshot()
159✔
308
        if rerr != nil {
159✔
309
                return 0, false, rerr
×
310
        }
×
311
        if !raft.IsEmptySnap(sp) {
160✔
312
                glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
1✔
313
                restart = true
1✔
314
                idx = sp.Metadata.Index
1✔
315
        }
1✔
316

317
        var hd raftpb.HardState
159✔
318
        hd, rerr = n.Store.HardState()
159✔
319
        if rerr != nil {
159✔
320
                return 0, false, rerr
×
321
        }
×
322
        if !raft.IsEmptyHardState(hd) {
160✔
323
                glog.Infof("Found hardstate: %+v\n", hd)
1✔
324
                restart = true
1✔
325
        }
1✔
326

327
        num := n.Store.NumEntries()
159✔
328
        glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
159✔
329
        // We'll always have at least one entry.
159✔
330
        if num > 1 {
160✔
331
                restart = true
1✔
332
        }
1✔
333
        return idx, restart, nil
159✔
334
}
335

336
const (
337
        messageBatchSoftLimit = 10e6
338
)
339

340
type stream struct {
341
        msgCh chan []byte
342
        alive int32
343
}
344

345
// BatchAndSendMessages sends messages in batches.
346
func (n *Node) BatchAndSendMessages() {
159✔
347
        batches := make(map[uint64]*bytes.Buffer)
159✔
348
        streams := make(map[uint64]*stream)
159✔
349

159✔
350
        for {
702,626✔
351
                totalSize := 0
702,467✔
352
                sm := <-n.messages
702,467✔
353
        slurp_loop:
702,467✔
354
                for {
1,666,022✔
355
                        var buf *bytes.Buffer
963,555✔
356
                        if b, ok := batches[sm.to]; !ok {
963,632✔
357
                                buf = new(bytes.Buffer)
77✔
358
                                batches[sm.to] = buf
77✔
359
                        } else {
963,555✔
360
                                buf = b
963,478✔
361
                        }
963,478✔
362
                        totalSize += 4 + len(sm.data)
963,555✔
363
                        x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
963,555✔
364
                        x.Check2(buf.Write(sm.data))
963,555✔
365

963,555✔
366
                        if totalSize > messageBatchSoftLimit {
963,555✔
367
                                // We limit the batch size, but we aren't pushing back on
×
368
                                // n.messages, because the loop below spawns a goroutine
×
369
                                // to do its dirty work.  This is good because right now
×
370
                                // (*node).send fails(!) if the channel is full.
×
371
                                break
×
372
                        }
373

374
                        select {
963,555✔
375
                        case sm = <-n.messages:
261,247✔
376
                        default:
702,308✔
377
                                break slurp_loop
702,308✔
378
                        }
379
                }
380

381
                for to, buf := range batches {
1,644,840✔
382
                        if buf.Len() == 0 {
946,503✔
383
                                continue
3,971✔
384
                        }
385
                        s, ok := streams[to]
938,561✔
386
                        if !ok || atomic.LoadInt32(&s.alive) <= 0 {
938,683✔
387
                                s = &stream{
122✔
388
                                        msgCh: make(chan []byte, 100),
122✔
389
                                        alive: 1,
122✔
390
                                }
122✔
391
                                go n.streamMessages(to, s)
122✔
392
                                streams[to] = s
122✔
393
                        }
122✔
394
                        data := make([]byte, buf.Len())
938,561✔
395
                        copy(data, buf.Bytes())
938,561✔
396
                        buf.Reset()
938,561✔
397

938,561✔
398
                        select {
938,561✔
399
                        case s.msgCh <- data:
937,545✔
400
                        default:
1,016✔
401
                        }
402
                }
403
        }
404
}
405

406
func (n *Node) streamMessages(to uint64, s *stream) {
122✔
407
        defer atomic.StoreInt32(&s.alive, 0)
122✔
408

122✔
409
        // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
122✔
410
        // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
122✔
411
        // a partition and get a new leader.
122✔
412
        deadline := time.Now().Add(10 * time.Second)
122✔
413
        ticker := time.NewTicker(time.Second)
122✔
414
        defer ticker.Stop()
122✔
415

122✔
416
        var logged int
122✔
417
        for range ticker.C { // Don't do this in an busy-wait loop, use a ticker.
524✔
418
                // doSendMessage would block doing a stream. So, time.Now().After is
402✔
419
                // only there to avoid a busy-wait.
402✔
420
                if err := n.doSendMessage(to, s.msgCh); err != nil {
760✔
421
                        // Update lastLog so we print error only a few times if we are not able to connect.
358✔
422
                        // Otherwise, the log is polluted with repeated errors.
358✔
423
                        if logged == 0 {
438✔
424
                                glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
80✔
425
                                logged++
80✔
426
                        }
80✔
427
                }
428
                if time.Now().After(deadline) {
409✔
429
                        return
51✔
430
                }
51✔
431
        }
432
}
433

434
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
402✔
435
        addr, has := n.Peer(to)
402✔
436
        if !has {
402✔
437
                return errors.Errorf("Do not have address of peer %#x", to)
×
438
        }
×
439
        pool, err := GetPools().Get(addr)
402✔
440
        if err != nil {
628✔
441
                return err
226✔
442
        }
226✔
443

444
        c := pb.NewRaftClient(pool.Get())
176✔
445
        ctx, span := otrace.StartSpan(context.Background(),
176✔
446
                fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
176✔
447
        defer span.End()
176✔
448

176✔
449
        mc, err := c.RaftMessage(ctx)
176✔
450
        if err != nil {
274✔
451
                return err
98✔
452
        }
98✔
453

454
        var packets, lastPackets uint64
76✔
455
        slurp := func(batch *pb.RaftBatch) {
932,270✔
456
                for {
1,866,748✔
457
                        if len(batch.Payload.Data) > messageBatchSoftLimit {
934,554✔
458
                                return
×
459
                        }
×
460
                        select {
934,554✔
461
                        case data := <-msgCh:
2,360✔
462
                                batch.Payload.Data = append(batch.Payload.Data, data...)
2,360✔
463
                                packets++
2,360✔
464
                        default:
932,194✔
465
                                return
932,194✔
466
                        }
467
                }
468
        }
469

470
        ctx = mc.Context()
76✔
471

76✔
472
        fastTick := time.NewTicker(5 * time.Second)
76✔
473
        defer fastTick.Stop()
76✔
474

76✔
475
        ticker := time.NewTicker(3 * time.Minute)
76✔
476
        defer ticker.Stop()
76✔
477

76✔
478
        for {
940,125✔
479
                select {
940,049✔
480
                case data := <-msgCh:
932,194✔
481
                        batch := &pb.RaftBatch{
932,194✔
482
                                Context: n.RaftContext,
932,194✔
483
                                Payload: &api.Payload{Data: data},
932,194✔
484
                        }
932,194✔
485
                        slurp(batch) // Pick up more entries from msgCh, if present.
932,194✔
486
                        span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
932,194✔
487
                                to, packets, len(batch.Payload.Data))
932,194✔
488
                        if packets%10000 == 0 {
932,270✔
489
                                glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
76✔
490
                                        to, packets, len(batch.Payload.Data))
76✔
491
                        }
76✔
492
                        packets++
932,194✔
493
                        if err := mc.Send(batch); err != nil {
932,225✔
494
                                span.Annotatef(nil, "Error while mc.Send: %v", err)
31✔
495
                                glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
31✔
496
                                switch {
31✔
497
                                case strings.Contains(err.Error(), "TransientFailure"):
×
498
                                        glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
×
499
                                        n.Raft().ReportUnreachable(to)
×
500
                                        pool.SetUnhealthy()
×
501
                                default:
31✔
502
                                }
503
                                // We don't need to do anything if we receive any error while sending message.
504
                                // RAFT would automatically retry.
505
                                return err
31✔
506
                        }
507
                case <-fastTick.C:
7,621✔
508
                        // We use this ticker, because during network partitions, mc.Send is
7,621✔
509
                        // unable to actually send packets, and also does not complain about
7,621✔
510
                        // them. We could have potentially used the separately tracked
7,621✔
511
                        // heartbeats to check this, but what we have observed is that
7,621✔
512
                        // incoming traffic might be OK, but outgoing might not be. So, this
7,621✔
513
                        // is a better way for us to verify whether this particular outbound
7,621✔
514
                        // connection is valid or not.
7,621✔
515
                        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
7,621✔
516
                        _, err := c.IsPeer(ctx, n.RaftContext)
7,621✔
517
                        cancel()
7,621✔
518
                        if err != nil {
7,624✔
519
                                glog.Errorf("Error while calling IsPeer %v. Reporting %x as unreachable.", err, to)
3✔
520
                                n.Raft().ReportUnreachable(to)
3✔
521
                                pool.SetUnhealthy()
3✔
522
                                return errors.Wrapf(err, "while calling IsPeer %x", to)
3✔
523
                        }
3✔
524
                case <-ticker.C:
192✔
525
                        if lastPackets == packets {
192✔
526
                                span.Annotatef(nil,
×
527
                                        "No activity for a while [Packets == %d]. Closing connection.", packets)
×
528
                                return mc.CloseSend()
×
529
                        }
×
530
                        lastPackets = packets
192✔
531
                case <-ctx.Done():
×
532
                        return ctx.Err()
×
533
                }
534
        }
535
}
536

537
// Connect connects the node and makes its peerPool refer to the constructed pool and address
538
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
539
// case this does nothing.)
540
func (n *Node) Connect(pid uint64, addr string) {
290✔
541
        if pid == n.Id {
379✔
542
                return
89✔
543
        }
89✔
544
        if paddr, ok := n.Peer(pid); ok && paddr == addr {
296✔
545
                // Already connected.
95✔
546
                return
95✔
547
        }
95✔
548
        // Here's what we do.  Right now peerPool maps peer node id's to addr values.  If
549
        // a *pool can be created, good, but if not, we still create a peerPoolEntry with
550
        // a nil *pool.
551
        if addr == n.MyAddr {
106✔
552
                // TODO: Note this fact in more general peer health info somehow.
×
553
                glog.Infof("Peer %d claims same host as me\n", pid)
×
554
                n.SetPeer(pid, addr)
×
555
                return
×
556
        }
×
557
        GetPools().Connect(addr, n.tlsClientConfig)
106✔
558
        n.SetPeer(pid, addr)
106✔
559
}
560

561
// DeletePeer deletes the record of the peer with the given id.
562
func (n *Node) DeletePeer(pid uint64) {
×
563
        if pid == n.Id {
×
564
                return
×
565
        }
×
566
        n.Lock()
×
567
        defer n.Unlock()
×
568
        delete(n.peers, pid)
×
569
}
570

571
var errInternalRetry = errors.New("Retry proposal again")
572

573
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
41✔
574
        cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
41✔
575
        defer cancel()
41✔
576

41✔
577
        ch := make(chan error, 1)
41✔
578
        id := n.storeConfChange(ch)
41✔
579
        // TODO: Delete id from the map.
41✔
580
        conf.ID = id
41✔
581
        if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
44✔
582
                if cctx.Err() != nil {
6✔
583
                        return errInternalRetry
3✔
584
                }
3✔
585
                glog.Warningf("Error while proposing conf change: %v", err)
×
586
                return err
×
587
        }
588
        select {
38✔
589
        case err := <-ch:
38✔
590
                return err
38✔
591
        case <-ctx.Done():
×
592
                return ctx.Err()
×
593
        case <-cctx.Done():
×
594
                return errInternalRetry
×
595
        }
596
}
597

598
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
38✔
599
        pid := rc.Id
38✔
600
        rc.SnapshotTs = 0
38✔
601
        rcBytes, err := rc.Marshal()
38✔
602
        x.Check(err)
38✔
603

38✔
604
        cc := raftpb.ConfChange{
38✔
605
                Type:    raftpb.ConfChangeAddNode,
38✔
606
                NodeID:  pid,
38✔
607
                Context: rcBytes,
38✔
608
        }
38✔
609
        if rc.IsLearner {
38✔
610
                cc.Type = raftpb.ConfChangeAddLearnerNode
×
611
        }
×
612

613
        err = errInternalRetry
38✔
614
        for err == errInternalRetry {
79✔
615
                glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
41✔
616
                glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
41✔
617
                err = n.proposeConfChange(ctx, cc)
41✔
618
        }
41✔
619
        return err
38✔
620
}
621

622
// ProposePeerRemoval proposes a new configuration with the peer with the given id removed.
623
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
×
624
        if n.Raft() == nil {
×
625
                return ErrNoNode
×
626
        }
×
627
        if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
×
628
                return errors.Errorf("Node %#x not part of group", id)
×
629
        }
×
630
        cc := raftpb.ConfChange{
×
631
                Type:   raftpb.ConfChangeRemoveNode,
×
632
                NodeID: id,
×
633
        }
×
634
        err := errInternalRetry
×
635
        for err == errInternalRetry {
×
636
                err = n.proposeConfChange(ctx, cc)
×
637
        }
×
638
        return err
×
639
}
640

641
type linReadReq struct {
642
        // A one-shot chan which we send a raft index upon.
643
        indexCh chan<- uint64
644
}
645

646
var errReadIndex = errors.Errorf(
647
        "Cannot get linearized read (time expired or no configured leader)")
648

649
var readIndexOk, readIndexTotal uint64
650

651
// WaitLinearizableRead waits until a linearizable read can be performed.
652
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
28,257✔
653
        span := otrace.FromContext(ctx)
28,257✔
654
        span.Annotate(nil, "WaitLinearizableRead")
28,257✔
655

28,257✔
656
        if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
28,277✔
657
                glog.V(2).Infof("ReadIndex Total: %d\n", num)
20✔
658
        }
20✔
659
        indexCh := make(chan uint64, 1)
28,257✔
660
        select {
28,257✔
661
        case n.requestCh <- linReadReq{indexCh: indexCh}:
28,257✔
662
                span.Annotate(nil, "Pushed to requestCh")
28,257✔
663
        case <-ctx.Done():
×
664
                span.Annotate(nil, "Context expired")
×
665
                return ctx.Err()
×
666
        }
667

668
        select {
28,257✔
669
        case index := <-indexCh:
28,239✔
670
                span.Annotatef(nil, "Received index: %d", index)
28,239✔
671
                if index == 0 {
28,242✔
672
                        return errReadIndex
3✔
673
                } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
28,259✔
674
                        glog.V(2).Infof("ReadIndex OK: %d\n", num)
20✔
675
                }
20✔
676
                err := n.Applied.WaitForMark(ctx, index)
28,236✔
677
                span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
28,236✔
678
                return err
28,236✔
679
        case <-ctx.Done():
18✔
680
                span.Annotate(nil, "Context expired")
18✔
681
                return ctx.Err()
18✔
682
        }
683
}
684

685
// RunReadIndexLoop runs the RAFT index in a loop.
686
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
66✔
687
        defer closer.Done()
66✔
688
        readIndex := func(activeRctx []byte) (uint64, error) {
28,378✔
689
                // Read Request can get rejected then we would wait indefinitely on the channel
28,312✔
690
                // so have a timeout.
28,312✔
691
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
28,312✔
692
                defer cancel()
28,312✔
693

28,312✔
694
                if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
28,314✔
695
                        glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
2✔
696
                        return 0, err
2✔
697
                }
2✔
698

699
        again:
28,310✔
700
                select {
28,310✔
701
                case <-closer.HasBeenClosed():
6✔
702
                        return 0, errors.New("Closer has been called")
6✔
703
                case rs := <-readStateCh:
28,197✔
704
                        if !bytes.Equal(activeRctx, rs.RequestCtx) {
28,197✔
705
                                glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
×
706
                                goto again
×
707
                        }
708
                        return rs.Index, nil
28,197✔
709
                case <-ctx.Done():
107✔
710
                        glog.Warningf("[%#x] Read index context timed out\n", n.Id)
107✔
711
                        return 0, errInternalRetry
107✔
712
                }
713
        } // end of readIndex func
714

715
        // We maintain one linearizable ReadIndex request at a time.  Others wait queued behind
716
        // requestCh.
717
        requests := []linReadReq{}
66✔
718
        for {
28,338✔
719
                select {
28,272✔
720
                case <-closer.HasBeenClosed():
66✔
721
                        return
66✔
722
                case <-readStateCh:
1✔
723
                        // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
724
                case req := <-n.requestCh:
28,205✔
725
                slurpLoop:
28,205✔
726
                        for {
56,450✔
727
                                requests = append(requests, req)
28,245✔
728
                                select {
28,245✔
729
                                case req = <-n.requestCh:
40✔
730
                                default:
28,205✔
731
                                        break slurpLoop
28,205✔
732
                                }
733
                        }
734
                        // Create one activeRctx slice for the read index, even if we have to call readIndex
735
                        // repeatedly. That way, we can process the requests as soon as we encounter the first
736
                        // activeRctx. This is better than flooding readIndex with a new activeRctx on each
737
                        // call, causing more unique traffic and further delays in request processing.
738
                        activeRctx := make([]byte, 8)
28,205✔
739
                        x.Check2(n.Rand.Read(activeRctx))
28,205✔
740
                        glog.V(4).Infof("Request readctx: %#x", activeRctx)
28,205✔
741
                        for {
56,517✔
742
                                index, err := readIndex(activeRctx)
28,312✔
743
                                if err == errInternalRetry {
28,419✔
744
                                        continue
107✔
745
                                }
746
                                if err != nil {
28,213✔
747
                                        index = 0
8✔
748
                                        glog.Errorf("[%#x] While trying to do lin read index: %v", n.Id, err)
8✔
749
                                }
8✔
750
                                for _, req := range requests {
56,450✔
751
                                        req.indexCh <- index
28,245✔
752
                                }
28,245✔
753
                                break
28,205✔
754
                        }
755
                        requests = requests[:0]
28,205✔
756
                }
757
        }
758
}
759

760
func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) {
38✔
761
        // Only process one JoinCluster request at a time.
38✔
762
        n.joinLock.Lock()
38✔
763
        defer n.joinLock.Unlock()
38✔
764

38✔
765
        // Check that the new node is from the same group as me.
38✔
766
        if rc.Group != n.RaftContext.Group {
38✔
767
                return nil, errors.Errorf("Raft group mismatch")
×
768
        }
×
769
        // Also check that the new node is not me.
770
        if rc.Id == n.RaftContext.Id {
38✔
771
                return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
×
772
        }
×
773

774
        // Check that the new node is not already part of the group.
775
        if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
38✔
776
                // There exists a healthy connection to server with same id.
×
777
                if _, err := GetPools().Get(addr); err == nil {
×
778
                        return &api.Payload{}, errors.Errorf(
×
779
                                "REUSE_ADDR: IP Address same as existing peer: %s", addr)
×
780
                }
×
781
        }
782
        n.Connect(rc.Id, rc.Addr)
38✔
783

38✔
784
        err := n.addToCluster(context.Background(), rc)
38✔
785
        glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err)
38✔
786
        return &api.Payload{}, err
38✔
787
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc