• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 6000148643

28 Aug 2023 12:58PM UTC coverage: 67.112% (+0.5%) from 66.655%
6000148643

push

web-flow
feat(dql): add @unique constraint support in schema for new predicates (#8827)

Partially Fixes #8827
Closes: DGRAPHCORE-206
Docs PR: https://github.com/dgraph-io/dgraph-docs/pull/638

This PR adds support for uniqueness constraint using @unique directive
in DQL schema. This unique directive ensures that all values of the
predicate are different in a Dgraph Cluster. This completes phase 1, and
enables adding a new predicate with unique directive. As part of the
phase 2, we will work on adding support for unique directive for
existing predicates.

## Performance
Live Loader before this change on 21 million dataset took 10m54s whereas
after this change took 11m02s. It did not make any significant different
to non-unique predicates.

## How to Use
You can now specify unique in schema as follows: `email: string @unique
@index(hash) @upsert .`. Now, Dgraph will ensure that no mutation adds a
duplicate for the predicate email.

## Phase 2 [TODO]
- [ ] check if @unique can be added to schema depending upon whether
existing data has any duplicates. If the existing data has any
duplicates, we do not allow adding the @unique directive and return a
query that allows user to identify these UIDs.
- [ ] If index computation is in progress, we should not allow mutations
with predicates for which @unique is set
- [ ] Fix ACL to ensure that we do not end up adding duplicate users
- [ ] Ensure that unique constraint is not violated during Bulk loader

---------

Co-authored-by: Aman Mangal <aman@dgraph.io>

347 of 347 new or added lines in 8 files covered. (100.0%)

58763 of 87560 relevant lines covered (67.11%)

2200726.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.07
/conn/node.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package conn
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "encoding/binary"
24
        "fmt"
25
        "math/rand"
26
        "strings"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/golang/glog"
32
        "github.com/pkg/errors"
33
        "go.etcd.io/etcd/raft/v3"
34
        "go.etcd.io/etcd/raft/v3/raftpb"
35
        otrace "go.opencensus.io/trace"
36

37
        "github.com/dgraph-io/badger/v4/y"
38
        "github.com/dgraph-io/dgo/v230/protos/api"
39
        "github.com/dgraph-io/dgraph/protos/pb"
40
        "github.com/dgraph-io/dgraph/raftwal"
41
        "github.com/dgraph-io/dgraph/x"
42
        "github.com/dgraph-io/ristretto/z"
43
)
44

45
var (
46
        // ErrNoNode is returned when no node has been set up.
47
        ErrNoNode = errors.Errorf("No node has been set up yet")
48
)
49

50
// Node represents a node participating in the RAFT protocol.
51
type Node struct {
52
        x.SafeMutex
53

54
        // Applied is used to keep track of the applied RAFT proposals.
55
        // The stages are proposed -> committed (accepted by cluster) ->
56
        // applied (to PL) -> synced (to BadgerDB).
57
        // This needs to be 64 bit aligned for atomics to work on 32 bit machine.
58
        Applied y.WaterMark
59

60
        joinLock sync.Mutex
61

62
        // Used to keep track of lin read requests.
63
        requestCh chan linReadReq
64

65
        // SafeMutex is for fields which can be changed after init.
66
        _confState *raftpb.ConfState
67
        _raft      raft.Node
68

69
        // Fields which are never changed after init.
70
        StartTime       time.Time
71
        Cfg             *raft.Config
72
        MyAddr          string
73
        Id              uint64
74
        peers           map[uint64]string
75
        confChanges     map[uint64]chan error
76
        messages        chan sendmsg
77
        RaftContext     *pb.RaftContext
78
        Store           *raftwal.DiskStorage
79
        Rand            *rand.Rand
80
        tlsClientConfig *tls.Config
81

82
        Proposals proposals
83

84
        heartbeatsOut int64
85
        heartbeatsIn  int64
86
}
87

88
// NewNode returns a new Node instance.
89
func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node {
156✔
90
        snap, err := store.Snapshot()
156✔
91
        x.Check(err)
156✔
92

156✔
93
        n := &Node{
156✔
94
                StartTime: time.Now(),
156✔
95
                Id:        rc.Id,
156✔
96
                MyAddr:    rc.Addr,
156✔
97
                Store:     store,
156✔
98
                Cfg: &raft.Config{
156✔
99
                        ID:                       rc.Id,
156✔
100
                        ElectionTick:             20, // 2s if we call Tick() every 100 ms.
156✔
101
                        HeartbeatTick:            1,  // 100ms if we call Tick() every 100 ms.
156✔
102
                        Storage:                  store,
156✔
103
                        MaxInflightMsgs:          256,
156✔
104
                        MaxSizePerMsg:            256 << 10, // 256 KB should allow more batching.
156✔
105
                        MaxCommittedSizePerReady: 64 << 20,  // Avoid loading entire Raft log into memory.
156✔
106
                        // We don't need lease based reads. They cause issues because they
156✔
107
                        // require CheckQuorum to be true, and that causes a lot of issues
156✔
108
                        // for us during cluster bootstrapping and later. A seemingly
156✔
109
                        // healthy cluster would just cause leader to step down due to
156✔
110
                        // "inactive" quorum, and then disallow anyone from becoming leader.
156✔
111
                        // So, let's stick to default options.  Let's achieve correctness,
156✔
112
                        // then we achieve performance. Plus, for the Dgraph alphas, we'll
156✔
113
                        // be soon relying only on Timestamps for blocking reads and
156✔
114
                        // achieving linearizability, than checking quorums (Zero would
156✔
115
                        // still check quorums).
156✔
116
                        ReadOnlyOption: raft.ReadOnlySafe,
156✔
117
                        // When a disconnected node joins back, it forces a leader change,
156✔
118
                        // as it starts with a higher term, as described in Raft thesis (not
156✔
119
                        // the paper) in section 9.6. This setting can avoid that by only
156✔
120
                        // increasing the term, if the node has a good chance of becoming
156✔
121
                        // the leader.
156✔
122
                        PreVote: true,
156✔
123

156✔
124
                        // We can explicitly set Applied to the first index in the Raft log,
156✔
125
                        // so it does not derive it separately, thus avoiding a crash when
156✔
126
                        // the Applied is set to below snapshot index by Raft.
156✔
127
                        // In case this is a new Raft log, first would be 1, and therefore
156✔
128
                        // Applied would be zero, hence meeting the condition by the library
156✔
129
                        // that Applied should only be set during a restart.
156✔
130
                        //
156✔
131
                        // Update: Set the Applied to the latest snapshot, because it seems
156✔
132
                        // like somehow the first index can be out of sync with the latest
156✔
133
                        // snapshot.
156✔
134
                        Applied: snap.Metadata.Index,
156✔
135

156✔
136
                        Logger: &x.ToGlog{},
156✔
137
                },
156✔
138
                // processConfChange etc are not throttled so some extra delta, so that we don't
156✔
139
                // block tick when applyCh is full
156✔
140
                Applied:     y.WaterMark{Name: "Applied watermark"},
156✔
141
                RaftContext: rc,
156✔
142
                //nolint:gosec // random node id generator does not require cryptographic precision
156✔
143
                Rand:            rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
156✔
144
                confChanges:     make(map[uint64]chan error),
156✔
145
                messages:        make(chan sendmsg, 100),
156✔
146
                peers:           make(map[uint64]string),
156✔
147
                requestCh:       make(chan linReadReq, 100),
156✔
148
                tlsClientConfig: tlsConfig,
156✔
149
        }
156✔
150
        n.Applied.Init(nil)
156✔
151
        // This should match up to the Applied index set above.
156✔
152
        n.Applied.SetDoneUntil(n.Cfg.Applied)
156✔
153
        glog.Infof("Setting raft.Config to: %+v", n.Cfg)
156✔
154
        return n
156✔
155
}
156✔
156

157
// ReportRaftComms periodically prints the state of the node (heartbeats in and out).
158
func (n *Node) ReportRaftComms() {
155✔
159
        if !glog.V(3) {
307✔
160
                return
152✔
161
        }
152✔
162
        ticker := time.NewTicker(time.Second)
3✔
163
        defer ticker.Stop()
3✔
164

3✔
165
        for range ticker.C {
33✔
166
                out := atomic.SwapInt64(&n.heartbeatsOut, 0)
30✔
167
                in := atomic.SwapInt64(&n.heartbeatsIn, 0)
30✔
168
                glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in)
30✔
169
        }
30✔
170
}
171

172
// SetRaft would set the provided raft.Node to this node.
173
// It would check fail if the node is already set.
174
func (n *Node) SetRaft(r raft.Node) {
156✔
175
        n.Lock()
156✔
176
        defer n.Unlock()
156✔
177
        x.AssertTrue(n._raft == nil)
156✔
178
        n._raft = r
156✔
179
}
156✔
180

181
// Raft would return back the raft.Node stored in the node.
182
func (n *Node) Raft() raft.Node {
3,510,378✔
183
        n.RLock()
3,510,378✔
184
        defer n.RUnlock()
3,510,378✔
185
        return n._raft
3,510,378✔
186
}
3,510,378✔
187

188
// SetConfState would store the latest ConfState generated by ApplyConfChange.
189
func (n *Node) SetConfState(cs *raftpb.ConfState) {
258✔
190
        glog.Infof("Setting conf state to %+v\n", cs)
258✔
191
        n.Lock()
258✔
192
        defer n.Unlock()
258✔
193
        n._confState = cs
258✔
194
}
258✔
195

196
// DoneConfChange marks a configuration change as done and sends the given error to the
197
// config channel.
198
func (n *Node) DoneConfChange(id uint64, err error) {
257✔
199
        n.Lock()
257✔
200
        defer n.Unlock()
257✔
201
        ch, has := n.confChanges[id]
257✔
202
        if !has {
481✔
203
                return
224✔
204
        }
224✔
205
        delete(n.confChanges, id)
33✔
206
        ch <- err
33✔
207
}
208

209
//nolint:gosec // random node id generator does not require cryptographic precision
210
func (n *Node) storeConfChange(che chan error) uint64 {
36✔
211
        n.Lock()
36✔
212
        defer n.Unlock()
36✔
213
        id := rand.Uint64()
36✔
214
        _, has := n.confChanges[id]
36✔
215
        for has {
36✔
216
                id = rand.Uint64()
×
217
                _, has = n.confChanges[id]
×
218
        }
×
219
        n.confChanges[id] = che
36✔
220
        return id
36✔
221
}
222

223
// ConfState would return the latest ConfState stored in node.
224
func (n *Node) ConfState() *raftpb.ConfState {
8,096✔
225
        n.RLock()
8,096✔
226
        defer n.RUnlock()
8,096✔
227
        return n._confState
8,096✔
228
}
8,096✔
229

230
// Peer returns the address of the peer with the given id.
231
func (n *Node) Peer(pid uint64) (string, bool) {
604✔
232
        n.RLock()
604✔
233
        defer n.RUnlock()
604✔
234
        addr, ok := n.peers[pid]
604✔
235
        return addr, ok
604✔
236
}
604✔
237

238
// SetPeer sets the address of the peer with the given id. The address must not be empty.
239
func (n *Node) SetPeer(pid uint64, addr string) {
102✔
240
        x.AssertTruef(addr != "", "SetPeer for peer %d has empty addr.", pid)
102✔
241
        n.Lock()
102✔
242
        defer n.Unlock()
102✔
243
        n.peers[pid] = addr
102✔
244
}
102✔
245

246
// Send sends the given RAFT message from this node.
247
func (n *Node) Send(msg *raftpb.Message) {
1,025,268✔
248
        x.AssertTruef(n.Id != msg.To, "Sending message to itself")
1,025,268✔
249
        data, err := msg.Marshal()
1,025,268✔
250
        x.Check(err)
1,025,268✔
251

1,025,268✔
252
        if glog.V(2) {
2,050,536✔
253
                switch msg.Type {
1,025,268✔
254
                case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
476,998✔
255
                        atomic.AddInt64(&n.heartbeatsOut, 1)
476,998✔
256
                case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
8✔
257
                case raftpb.MsgApp, raftpb.MsgAppResp:
547,765✔
258
                case raftpb.MsgProp:
464✔
259
                default:
33✔
260
                        glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
33✔
261
                }
262
        }
263
        // As long as leadership is stable, any attempted Propose() calls should be reflected in the
264
        // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
265
        // MsgProp to the leader. It is up to the transport layer to get those messages to their
266
        // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
267
        // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
268
        // leadership transitions, proposals may get dropped even if the network is reliable.
269
        //
270
        // We can't do a select default here. The messages must be sent to the channel, otherwise we
271
        // should block until the channel can accept these messages. BatchAndSendMessages would take
272
        // care of dropping messages which can't be sent due to network issues to the corresponding
273
        // node. But, we shouldn't take the liberty to do that here. It would take us more time to
274
        // repropose these dropped messages anyway, than to block here a bit waiting for the messages
275
        // channel to clear out.
276
        n.messages <- sendmsg{to: msg.To, data: data}
1,025,268✔
277
}
278

279
// Snapshot returns the current snapshot.
280
func (n *Node) Snapshot() (raftpb.Snapshot, error) {
×
281
        if n == nil || n.Store == nil {
×
282
                return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store")
×
283
        }
×
284
        return n.Store.Snapshot()
×
285
}
286

287
// SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.
288
func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot) {
846,680✔
289
        for {
1,693,360✔
290
                if err := n.Store.Save(h, es, s); err != nil {
846,680✔
291
                        glog.Errorf("While trying to save Raft update: %v. Retrying...", err)
×
292
                } else {
846,680✔
293
                        return
846,680✔
294
                }
846,680✔
295
        }
296
}
297

298
// PastLife returns the index of the snapshot before the restart (if any) and whether there was
299
// a previous state that should be recovered after a restart.
300
func (n *Node) PastLife() (uint64, bool, error) {
155✔
301
        var (
155✔
302
                sp      raftpb.Snapshot
155✔
303
                idx     uint64
155✔
304
                restart bool
155✔
305
                rerr    error
155✔
306
        )
155✔
307
        sp, rerr = n.Store.Snapshot()
155✔
308
        if rerr != nil {
155✔
309
                return 0, false, rerr
×
310
        }
×
311
        if !raft.IsEmptySnap(sp) {
156✔
312
                glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
1✔
313
                restart = true
1✔
314
                idx = sp.Metadata.Index
1✔
315
        }
1✔
316

317
        var hd raftpb.HardState
155✔
318
        hd, rerr = n.Store.HardState()
155✔
319
        if rerr != nil {
155✔
320
                return 0, false, rerr
×
321
        }
×
322
        if !raft.IsEmptyHardState(hd) {
156✔
323
                glog.Infof("Found hardstate: %+v\n", hd)
1✔
324
                restart = true
1✔
325
        }
1✔
326

327
        num := n.Store.NumEntries()
155✔
328
        glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
155✔
329
        // We'll always have at least one entry.
155✔
330
        if num > 1 {
156✔
331
                restart = true
1✔
332
        }
1✔
333
        return idx, restart, nil
155✔
334
}
335

336
const (
337
        messageBatchSoftLimit = 10e6
338
)
339

340
type stream struct {
341
        msgCh chan []byte
342
        alive int32
343
}
344

345
// BatchAndSendMessages sends messages in batches.
346
func (n *Node) BatchAndSendMessages() {
155✔
347
        batches := make(map[uint64]*bytes.Buffer)
155✔
348
        streams := make(map[uint64]*stream)
155✔
349

155✔
350
        for {
742,346✔
351
                totalSize := 0
742,191✔
352
                sm := <-n.messages
742,191✔
353
        slurp_loop:
742,191✔
354
                for {
1,767,459✔
355
                        var buf *bytes.Buffer
1,025,268✔
356
                        if b, ok := batches[sm.to]; !ok {
1,025,341✔
357
                                buf = new(bytes.Buffer)
73✔
358
                                batches[sm.to] = buf
73✔
359
                        } else {
1,025,268✔
360
                                buf = b
1,025,195✔
361
                        }
1,025,195✔
362
                        totalSize += 4 + len(sm.data)
1,025,268✔
363
                        x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
1,025,268✔
364
                        x.Check2(buf.Write(sm.data))
1,025,268✔
365

1,025,268✔
366
                        if totalSize > messageBatchSoftLimit {
1,025,268✔
367
                                // We limit the batch size, but we aren't pushing back on
×
368
                                // n.messages, because the loop below spawns a goroutine
×
369
                                // to do its dirty work.  This is good because right now
×
370
                                // (*node).send fails(!) if the channel is full.
×
371
                                break
×
372
                        }
373

374
                        select {
1,025,268✔
375
                        case sm = <-n.messages:
283,232✔
376
                        default:
742,036✔
377
                                break slurp_loop
742,036✔
378
                        }
379
                }
380

381
                for to, buf := range batches {
1,732,849✔
382
                        if buf.Len() == 0 {
995,032✔
383
                                continue
4,219✔
384
                        }
385
                        s, ok := streams[to]
986,594✔
386
                        if !ok || atomic.LoadInt32(&s.alive) <= 0 {
986,709✔
387
                                s = &stream{
115✔
388
                                        msgCh: make(chan []byte, 100),
115✔
389
                                        alive: 1,
115✔
390
                                }
115✔
391
                                go n.streamMessages(to, s)
115✔
392
                                streams[to] = s
115✔
393
                        }
115✔
394
                        data := make([]byte, buf.Len())
986,594✔
395
                        copy(data, buf.Bytes())
986,594✔
396
                        buf.Reset()
986,594✔
397

986,594✔
398
                        select {
986,594✔
399
                        case s.msgCh <- data:
985,552✔
400
                        default:
1,042✔
401
                        }
402
                }
403
        }
404
}
405

406
func (n *Node) streamMessages(to uint64, s *stream) {
115✔
407
        defer atomic.StoreInt32(&s.alive, 0)
115✔
408

115✔
409
        // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
115✔
410
        // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
115✔
411
        // a partition and get a new leader.
115✔
412
        deadline := time.Now().Add(10 * time.Second)
115✔
413
        ticker := time.NewTicker(time.Second)
115✔
414
        defer ticker.Stop()
115✔
415

115✔
416
        var logged int
115✔
417
        for range ticker.C { // Don't do this in an busy-wait loop, use a ticker.
501✔
418
                // doSendMessage would block doing a stream. So, time.Now().After is
386✔
419
                // only there to avoid a busy-wait.
386✔
420
                if err := n.doSendMessage(to, s.msgCh); err != nil {
727✔
421
                        // Update lastLog so we print error only a few times if we are not able to connect.
341✔
422
                        // Otherwise, the log is polluted with repeated errors.
341✔
423
                        if logged == 0 {
407✔
424
                                glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
66✔
425
                                logged++
66✔
426
                        }
66✔
427
                }
428
                if time.Now().After(deadline) {
385✔
429
                        return
44✔
430
                }
44✔
431
        }
432
}
433

434
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
386✔
435
        addr, has := n.Peer(to)
386✔
436
        if !has {
386✔
437
                return errors.Errorf("Do not have address of peer %#x", to)
×
438
        }
×
439
        pool, err := GetPools().Get(addr)
386✔
440
        if err != nil {
609✔
441
                return err
223✔
442
        }
223✔
443

444
        c := pb.NewRaftClient(pool.Get())
163✔
445
        ctx, span := otrace.StartSpan(context.Background(),
163✔
446
                fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
163✔
447
        defer span.End()
163✔
448

163✔
449
        mc, err := c.RaftMessage(ctx)
163✔
450
        if err != nil {
251✔
451
                return err
88✔
452
        }
88✔
453

454
        var packets, lastPackets uint64
74✔
455
        slurp := func(batch *pb.RaftBatch) {
980,245✔
456
                for {
1,962,864✔
457
                        if len(batch.Payload.Data) > messageBatchSoftLimit {
982,693✔
458
                                return
×
459
                        }
×
460
                        select {
982,693✔
461
                        case data := <-msgCh:
2,522✔
462
                                batch.Payload.Data = append(batch.Payload.Data, data...)
2,522✔
463
                                packets++
2,522✔
464
                        default:
980,171✔
465
                                return
980,171✔
466
                        }
467
                }
468
        }
469

470
        ctx = mc.Context()
74✔
471

74✔
472
        fastTick := time.NewTicker(5 * time.Second)
74✔
473
        defer fastTick.Stop()
74✔
474

74✔
475
        ticker := time.NewTicker(3 * time.Minute)
74✔
476
        defer ticker.Stop()
74✔
477

74✔
478
        for {
988,188✔
479
                select {
988,114✔
480
                case data := <-msgCh:
980,171✔
481
                        batch := &pb.RaftBatch{
980,171✔
482
                                Context: n.RaftContext,
980,171✔
483
                                Payload: &api.Payload{Data: data},
980,171✔
484
                        }
980,171✔
485
                        slurp(batch) // Pick up more entries from msgCh, if present.
980,171✔
486
                        span.Annotatef(nil, "[to: %x] [Packets: %d] Sending data of length: %d.",
980,171✔
487
                                to, packets, len(batch.Payload.Data))
980,171✔
488
                        if packets%10000 == 0 {
980,251✔
489
                                glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
80✔
490
                                        to, packets, len(batch.Payload.Data))
80✔
491
                        }
80✔
492
                        packets++
980,171✔
493
                        if err := mc.Send(batch); err != nil {
980,201✔
494
                                span.Annotatef(nil, "Error while mc.Send: %v", err)
30✔
495
                                glog.Errorf("[to: %x] Error while mc.Send: %v", to, err)
30✔
496
                                switch {
30✔
497
                                case strings.Contains(err.Error(), "TransientFailure"):
×
498
                                        glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
×
499
                                        n.Raft().ReportUnreachable(to)
×
500
                                        pool.SetUnhealthy()
×
501
                                default:
30✔
502
                                }
503
                                // We don't need to do anything if we receive any error while sending message.
504
                                // RAFT would automatically retry.
505
                                return err
30✔
506
                        }
507
                case <-fastTick.C:
7,699✔
508
                        // We use this ticker, because during network partitions, mc.Send is
7,699✔
509
                        // unable to actually send packets, and also does not complain about
7,699✔
510
                        // them. We could have potentially used the separately tracked
7,699✔
511
                        // heartbeats to check this, but what we have observed is that
7,699✔
512
                        // incoming traffic might be OK, but outgoing might not be. So, this
7,699✔
513
                        // is a better way for us to verify whether this particular outbound
7,699✔
514
                        // connection is valid or not.
7,699✔
515
                        ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
7,699✔
516
                        _, err := c.IsPeer(ctx, n.RaftContext)
7,699✔
517
                        cancel()
7,699✔
518
                        if err != nil {
7,699✔
519
                                glog.Errorf("Error while calling IsPeer %v. Reporting %x as unreachable.", err, to)
×
520
                                n.Raft().ReportUnreachable(to)
×
521
                                pool.SetUnhealthy()
×
522
                                return errors.Wrapf(err, "while calling IsPeer %x", to)
×
523
                        }
×
524
                case <-ticker.C:
200✔
525
                        if lastPackets == packets {
200✔
526
                                span.Annotatef(nil,
×
527
                                        "No activity for a while [Packets == %d]. Closing connection.", packets)
×
528
                                return mc.CloseSend()
×
529
                        }
×
530
                        lastPackets = packets
200✔
531
                case <-ctx.Done():
×
532
                        return ctx.Err()
×
533
                }
534
        }
535
}
536

537
// Connect connects the node and makes its peerPool refer to the constructed pool and address
538
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
539
// case this does nothing.)
540
func (n *Node) Connect(pid uint64, addr string) {
274✔
541
        if pid == n.Id {
362✔
542
                return
88✔
543
        }
88✔
544
        if paddr, ok := n.Peer(pid); ok && paddr == addr {
270✔
545
                // Already connected.
84✔
546
                return
84✔
547
        }
84✔
548
        // Here's what we do.  Right now peerPool maps peer node id's to addr values.  If
549
        // a *pool can be created, good, but if not, we still create a peerPoolEntry with
550
        // a nil *pool.
551
        if addr == n.MyAddr {
102✔
552
                // TODO: Note this fact in more general peer health info somehow.
×
553
                glog.Infof("Peer %d claims same host as me\n", pid)
×
554
                n.SetPeer(pid, addr)
×
555
                return
×
556
        }
×
557
        GetPools().Connect(addr, n.tlsClientConfig)
102✔
558
        n.SetPeer(pid, addr)
102✔
559
}
560

561
// DeletePeer deletes the record of the peer with the given id.
562
func (n *Node) DeletePeer(pid uint64) {
×
563
        if pid == n.Id {
×
564
                return
×
565
        }
×
566
        n.Lock()
×
567
        defer n.Unlock()
×
568
        delete(n.peers, pid)
×
569
}
570

571
var errInternalRetry = errors.New("Retry proposal again")
572

573
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
36✔
574
        cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
36✔
575
        defer cancel()
36✔
576

36✔
577
        ch := make(chan error, 1)
36✔
578
        id := n.storeConfChange(ch)
36✔
579
        // TODO: Delete id from the map.
36✔
580
        conf.ID = id
36✔
581
        if err := n.Raft().ProposeConfChange(cctx, conf); err != nil {
38✔
582
                if cctx.Err() != nil {
4✔
583
                        return errInternalRetry
2✔
584
                }
2✔
585
                glog.Warningf("Error while proposing conf change: %v", err)
×
586
                return err
×
587
        }
588
        select {
34✔
589
        case err := <-ch:
32✔
590
                return err
32✔
591
        case <-ctx.Done():
×
592
                return ctx.Err()
×
593
        case <-cctx.Done():
2✔
594
                return errInternalRetry
2✔
595
        }
596
}
597

598
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
32✔
599
        pid := rc.Id
32✔
600
        rc.SnapshotTs = 0
32✔
601
        rcBytes, err := rc.Marshal()
32✔
602
        x.Check(err)
32✔
603

32✔
604
        cc := raftpb.ConfChange{
32✔
605
                Type:    raftpb.ConfChangeAddNode,
32✔
606
                NodeID:  pid,
32✔
607
                Context: rcBytes,
32✔
608
        }
32✔
609
        if rc.IsLearner {
32✔
610
                cc.Type = raftpb.ConfChangeAddLearnerNode
×
611
        }
×
612

613
        err = errInternalRetry
32✔
614
        for err == errInternalRetry {
68✔
615
                glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
36✔
616
                glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
36✔
617
                err = n.proposeConfChange(ctx, cc)
36✔
618
        }
36✔
619
        return err
32✔
620
}
621

622
// ProposePeerRemoval proposes a new configuration with the peer with the given id removed.
623
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
×
624
        if n.Raft() == nil {
×
625
                return ErrNoNode
×
626
        }
×
627
        if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
×
628
                return errors.Errorf("Node %#x not part of group", id)
×
629
        }
×
630
        cc := raftpb.ConfChange{
×
631
                Type:   raftpb.ConfChangeRemoveNode,
×
632
                NodeID: id,
×
633
        }
×
634
        err := errInternalRetry
×
635
        for err == errInternalRetry {
×
636
                err = n.proposeConfChange(ctx, cc)
×
637
        }
×
638
        return err
×
639
}
640

641
type linReadReq struct {
642
        // A one-shot chan which we send a raft index upon.
643
        indexCh chan<- uint64
644
}
645

646
var errReadIndex = errors.Errorf(
647
        "Cannot get linearized read (time expired or no configured leader)")
648

649
var readIndexOk, readIndexTotal uint64
650

651
// WaitLinearizableRead waits until a linearizable read can be performed.
652
func (n *Node) WaitLinearizableRead(ctx context.Context) error {
27,662✔
653
        span := otrace.FromContext(ctx)
27,662✔
654
        span.Annotate(nil, "WaitLinearizableRead")
27,662✔
655

27,662✔
656
        if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 {
27,682✔
657
                glog.V(2).Infof("ReadIndex Total: %d\n", num)
20✔
658
        }
20✔
659
        indexCh := make(chan uint64, 1)
27,662✔
660
        select {
27,662✔
661
        case n.requestCh <- linReadReq{indexCh: indexCh}:
27,662✔
662
                span.Annotate(nil, "Pushed to requestCh")
27,662✔
663
        case <-ctx.Done():
×
664
                span.Annotate(nil, "Context expired")
×
665
                return ctx.Err()
×
666
        }
667

668
        select {
27,662✔
669
        case index := <-indexCh:
27,646✔
670
                span.Annotatef(nil, "Received index: %d", index)
27,646✔
671
                if index == 0 {
27,648✔
672
                        return errReadIndex
2✔
673
                } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 {
27,665✔
674
                        glog.V(2).Infof("ReadIndex OK: %d\n", num)
19✔
675
                }
19✔
676
                err := n.Applied.WaitForMark(ctx, index)
27,644✔
677
                span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
27,644✔
678
                return err
27,644✔
679
        case <-ctx.Done():
16✔
680
                span.Annotate(nil, "Context expired")
16✔
681
                return ctx.Err()
16✔
682
        }
683
}
684

685
// RunReadIndexLoop runs the RAFT index in a loop.
686
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
64✔
687
        defer closer.Done()
64✔
688
        readIndex := func(activeRctx []byte) (uint64, error) {
27,753✔
689
                // Read Request can get rejected then we would wait indefinitely on the channel
27,689✔
690
                // so have a timeout.
27,689✔
691
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
27,689✔
692
                defer cancel()
27,689✔
693

27,689✔
694
                if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
27,691✔
695
                        glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
2✔
696
                        return 0, err
2✔
697
                }
2✔
698

699
        again:
27,687✔
700
                select {
27,687✔
701
                case <-closer.HasBeenClosed():
5✔
702
                        return 0, errors.New("Closer has been called")
5✔
703
                case rs := <-readStateCh:
27,578✔
704
                        if !bytes.Equal(activeRctx, rs.RequestCtx) {
27,578✔
705
                                glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
×
706
                                goto again
×
707
                        }
708
                        return rs.Index, nil
27,578✔
709
                case <-ctx.Done():
104✔
710
                        glog.Warningf("[%#x] Read index context timed out\n", n.Id)
104✔
711
                        return 0, errInternalRetry
104✔
712
                }
713
        } // end of readIndex func
714

715
        // We maintain one linearizable ReadIndex request at a time.  Others wait queued behind
716
        // requestCh.
717
        requests := []linReadReq{}
64✔
718
        for {
27,713✔
719
                select {
27,649✔
720
                case <-closer.HasBeenClosed():
64✔
721
                        return
64✔
722
                case <-readStateCh:
×
723
                        // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
724
                case req := <-n.requestCh:
27,585✔
725
                slurpLoop:
27,585✔
726
                        for {
55,238✔
727
                                requests = append(requests, req)
27,653✔
728
                                select {
27,653✔
729
                                case req = <-n.requestCh:
68✔
730
                                default:
27,585✔
731
                                        break slurpLoop
27,585✔
732
                                }
733
                        }
734
                        // Create one activeRctx slice for the read index, even if we have to call readIndex
735
                        // repeatedly. That way, we can process the requests as soon as we encounter the first
736
                        // activeRctx. This is better than flooding readIndex with a new activeRctx on each
737
                        // call, causing more unique traffic and further delays in request processing.
738
                        activeRctx := make([]byte, 8)
27,585✔
739
                        x.Check2(n.Rand.Read(activeRctx))
27,585✔
740
                        glog.V(4).Infof("Request readctx: %#x", activeRctx)
27,585✔
741
                        for {
55,274✔
742
                                index, err := readIndex(activeRctx)
27,689✔
743
                                if err == errInternalRetry {
27,793✔
744
                                        continue
104✔
745
                                }
746
                                if err != nil {
27,592✔
747
                                        index = 0
7✔
748
                                        glog.Errorf("[%#x] While trying to do lin read index: %v", n.Id, err)
7✔
749
                                }
7✔
750
                                for _, req := range requests {
55,238✔
751
                                        req.indexCh <- index
27,653✔
752
                                }
27,653✔
753
                                break
27,585✔
754
                        }
755
                        requests = requests[:0]
27,585✔
756
                }
757
        }
758
}
759

760
func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) {
32✔
761
        // Only process one JoinCluster request at a time.
32✔
762
        n.joinLock.Lock()
32✔
763
        defer n.joinLock.Unlock()
32✔
764

32✔
765
        // Check that the new node is from the same group as me.
32✔
766
        if rc.Group != n.RaftContext.Group {
32✔
767
                return nil, errors.Errorf("Raft group mismatch")
×
768
        }
×
769
        // Also check that the new node is not me.
770
        if rc.Id == n.RaftContext.Id {
32✔
771
                return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
×
772
        }
×
773

774
        // Check that the new node is not already part of the group.
775
        if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr {
32✔
776
                // There exists a healthy connection to server with same id.
×
777
                if _, err := GetPools().Get(addr); err == nil {
×
778
                        return &api.Payload{}, errors.Errorf(
×
779
                                "REUSE_ADDR: IP Address same as existing peer: %s", addr)
×
780
                }
×
781
        }
782
        n.Connect(rc.Id, rc.Addr)
32✔
783

32✔
784
        err := n.addToCluster(context.Background(), rc)
32✔
785
        glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err)
32✔
786
        return &api.Payload{}, err
32✔
787
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc