• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5262646808

14 Jun 2023 03:10AM UTC coverage: 67.209% (+0.004%) from 67.205%
5262646808

push

web-flow
Merge 3e9114f07 into 2787cfc58

58347 of 86814 relevant lines covered (67.21%)

2247229.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.45
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        nextLease   map[pb.NumLeaseType]uint64
63
        readOnlyTs  uint64
64
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
65
        rateLimiter *x.RateLimiter
66

67
        // groupMap    map[uint32]*Group
68
        nextGroup      uint32
69
        leaderChangeCh chan struct{}
70
        closer         *z.Closer  // Used to tell stream to close.
71
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
72

73
        // tls client config used to connect with zero internally
74
        tlsClientConfig *tls.Config
75

76
        moveOngoing    chan struct{}
77
        blockCommitsOn *sync.Map
78

79
        checkpointPerGroup map[uint32]uint64
80
}
81

82
// Init initializes the zero server.
83
func (s *Server) Init() {
64✔
84
        s.Lock()
64✔
85
        defer s.Unlock()
64✔
86

64✔
87
        s.orc = &Oracle{}
64✔
88
        s.orc.Init()
64✔
89
        s.state = &pb.MembershipState{
64✔
90
                Groups: make(map[uint32]*pb.Group),
64✔
91
                Zeros:  make(map[uint64]*pb.Member),
64✔
92
        }
64✔
93
        s.nextLease = make(map[pb.NumLeaseType]uint64)
64✔
94
        s.nextRaftId = 1
64✔
95
        s.nextLease[pb.Num_UID] = 1
64✔
96
        s.nextLease[pb.Num_TXN_TS] = 1
64✔
97
        s.nextLease[pb.Num_NS_ID] = 1
64✔
98
        s.nextGroup = 1
64✔
99
        s.leaderChangeCh = make(chan struct{}, 1)
64✔
100
        s.closer = z.NewCloser(2) // grpc and http
64✔
101
        s.blockCommitsOn = new(sync.Map)
64✔
102
        s.moveOngoing = make(chan struct{}, 1)
64✔
103
        s.checkpointPerGroup = make(map[uint32]uint64)
64✔
104
        if opts.limiterConfig.UidLeaseLimit > 0 {
65✔
105
                // rate limiting is not enabled when lease limit is set to zero.
1✔
106
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
107
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
108
        }
1✔
109

110
        go s.rebalanceTablets()
64✔
111
}
112

113
func (s *Server) periodicallyPostTelemetry() {
64✔
114
        glog.V(2).Infof("Starting telemetry data collection for zero...")
64✔
115
        start := time.Now()
64✔
116

64✔
117
        ticker := time.NewTicker(time.Minute * 10)
64✔
118
        defer ticker.Stop()
64✔
119

64✔
120
        var lastPostedAt time.Time
64✔
121
        for range ticker.C {
76✔
122
                if !s.Node.AmLeader() {
20✔
123
                        continue
8✔
124
                }
125
                if time.Since(lastPostedAt) < time.Hour {
4✔
126
                        continue
×
127
                }
128
                ms := s.membershipState()
4✔
129
                t := telemetry.NewZero(ms)
4✔
130
                if t == nil {
4✔
131
                        continue
×
132
                }
133
                t.SinceHours = int(time.Since(start).Hours())
4✔
134
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
4✔
135

4✔
136
                err := t.Post()
4✔
137
                if err == nil {
4✔
138
                        lastPostedAt = time.Now()
×
139
                } else {
4✔
140
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
4✔
141
                }
4✔
142
        }
143
}
144

145
func (s *Server) triggerLeaderChange() {
162✔
146
        s.Lock()
162✔
147
        defer s.Unlock()
162✔
148
        close(s.leaderChangeCh)
162✔
149
        s.leaderChangeCh = make(chan struct{}, 1)
162✔
150
}
162✔
151

152
func (s *Server) leaderChangeChannel() chan struct{} {
77✔
153
        s.RLock()
77✔
154
        defer s.RUnlock()
77✔
155
        return s.leaderChangeCh
77✔
156
}
77✔
157

158
func (s *Server) member(addr string) *pb.Member {
260✔
159
        s.AssertRLock()
260✔
160
        for _, m := range s.state.Zeros {
746✔
161
                if m.Addr == addr {
486✔
162
                        return m
×
163
                }
×
164
        }
165
        for _, g := range s.state.Groups {
545✔
166
                for _, m := range g.Members {
696✔
167
                        if m.Addr == addr {
515✔
168
                                return m
104✔
169
                        }
104✔
170
                }
171
        }
172
        return nil
156✔
173
}
174

175
// Leader returns a connection pool to the zero leader.
176
func (s *Server) Leader(gid uint32) *conn.Pool {
4✔
177
        s.RLock()
4✔
178
        defer s.RUnlock()
4✔
179
        if s.state == nil {
4✔
180
                return nil
×
181
        }
×
182
        var members map[uint64]*pb.Member
4✔
183
        if gid == 0 {
4✔
184
                members = s.state.Zeros
×
185
        } else {
4✔
186
                group := s.state.Groups[gid]
4✔
187
                if group == nil {
4✔
188
                        return nil
×
189
                }
×
190
                members = group.Members
4✔
191
        }
192
        var healthyPool *conn.Pool
4✔
193
        for _, m := range members {
8✔
194
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
8✔
195
                        healthyPool = pl
4✔
196
                        if m.Leader {
8✔
197
                                return pl
4✔
198
                        }
4✔
199
                }
200
        }
201
        return healthyPool
×
202
}
203

204
// KnownGroups returns a list of the known groups.
205
func (s *Server) KnownGroups() []uint32 {
1,009✔
206
        var groups []uint32
1,009✔
207
        s.RLock()
1,009✔
208
        defer s.RUnlock()
1,009✔
209
        for group := range s.state.Groups {
2,031✔
210
                groups = append(groups, group)
1,022✔
211
        }
1,022✔
212
        return groups
1,009✔
213
}
214

215
func (s *Server) hasLeader(gid uint32) bool {
5✔
216
        s.AssertRLock()
5✔
217
        if s.state == nil {
5✔
218
                return false
×
219
        }
×
220
        group := s.state.Groups[gid]
5✔
221
        if group == nil {
5✔
222
                return false
×
223
        }
×
224
        for _, m := range group.Members {
13✔
225
                if m.Leader {
13✔
226
                        return true
5✔
227
                }
5✔
228
        }
229
        return false
×
230
}
231

232
// SetMembershipState updates the membership state to the given one.
233
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
234
        s.Lock()
×
235
        defer s.Unlock()
×
236

×
237
        s.state = state
×
238
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
239

×
240
        if state.Zeros == nil {
×
241
                state.Zeros = make(map[uint64]*pb.Member)
×
242
        }
×
243
        if state.Groups == nil {
×
244
                state.Groups = make(map[uint32]*pb.Group)
×
245
        }
×
246

247
        // Create connections to all members.
248
        for _, g := range state.Groups {
×
249
                for _, m := range g.Members {
×
250
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
251
                }
×
252

253
                if g.Tablets == nil {
×
254
                        g.Tablets = make(map[string]*pb.Tablet)
×
255
                }
×
256
        }
257

258
        s.nextGroup = uint32(len(state.Groups) + 1)
×
259
}
260

261
// MarshalMembershipState returns the marshaled membership state.
262
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
263
        s.Lock()
×
264
        defer s.Unlock()
×
265
        return s.state.Marshal()
×
266
}
×
267

268
func (s *Server) membershipState() *pb.MembershipState {
28,441✔
269
        s.RLock()
28,441✔
270
        defer s.RUnlock()
28,441✔
271
        return proto.Clone(s.state).(*pb.MembershipState)
28,441✔
272
}
28,441✔
273

274
func (s *Server) groupChecksums() map[uint32]uint64 {
147,638✔
275
        s.RLock()
147,638✔
276
        defer s.RUnlock()
147,638✔
277
        m := make(map[uint32]uint64)
147,638✔
278
        for gid, g := range s.state.GetGroups() {
496,446✔
279
                m[gid] = g.Checksum
348,808✔
280
        }
348,808✔
281
        return m
147,638✔
282
}
283

284
func (s *Server) storeZero(m *pb.Member) {
98✔
285
        s.Lock()
98✔
286
        defer s.Unlock()
98✔
287

98✔
288
        s.state.Zeros[m.Id] = m
98✔
289
}
98✔
290

291
func (s *Server) updateZeroLeader() {
1,362✔
292
        s.Lock()
1,362✔
293
        defer s.Unlock()
1,362✔
294
        leader := s.Node.Raft().Status().Lead
1,362✔
295
        for _, m := range s.state.Zeros {
4,733✔
296
                m.Leader = m.Id == leader
3,371✔
297
        }
3,371✔
298
}
299

300
func (s *Server) removeZero(nodeId uint64) {
×
301
        s.Lock()
×
302
        defer s.Unlock()
×
303
        m, has := s.state.Zeros[nodeId]
×
304
        if !has {
×
305
                return
×
306
        }
×
307
        delete(s.state.Zeros, nodeId)
×
308
        s.state.Removed = append(s.state.Removed, m)
×
309
}
310

311
// ServingTablet returns the Tablet called tablet.
312
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
36,255✔
313
        s.RLock()
36,255✔
314
        defer s.RUnlock()
36,255✔
315

36,255✔
316
        for _, group := range s.state.Groups {
81,531✔
317
                if tab, ok := group.Tablets[tablet]; ok {
75,118✔
318
                        return tab
29,842✔
319
                }
29,842✔
320
        }
321
        return nil
6,413✔
322
}
323

324
func (s *Server) blockTablet(pred string) func() {
4✔
325
        s.blockCommitsOn.Store(pred, struct{}{})
4✔
326
        return func() {
8✔
327
                s.blockCommitsOn.Delete(pred)
4✔
328
        }
4✔
329
}
330

331
func (s *Server) isBlocked(pred string) bool {
19,428✔
332
        _, blocked := s.blockCommitsOn.Load(pred)
19,428✔
333
        return blocked
19,428✔
334
}
19,428✔
335

336
func (s *Server) servingTablet(tablet string) *pb.Tablet {
7,976✔
337
        s.AssertRLock()
7,976✔
338

7,976✔
339
        for _, group := range s.state.Groups {
17,889✔
340
                if tab, ok := group.Tablets[tablet]; ok {
12,549✔
341
                        return tab
2,636✔
342
                }
2,636✔
343
        }
344
        return nil
5,340✔
345
}
346

347
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,228✔
348
        var res []*pb.ZeroProposal
2,228✔
349
        if len(dst.Members) > 1 {
2,228✔
350
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
351
        }
×
352

353
        s.RLock()
2,228✔
354
        defer s.RUnlock()
2,228✔
355
        // There is only one member. We use for loop because we don't know what the mid is.
2,228✔
356
        for mid, dstMember := range dst.Members {
4,456✔
357
                group, has := s.state.Groups[dstMember.GroupId]
2,228✔
358
                if !has {
2,228✔
359
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
360
                }
×
361
                srcMember, has := group.Members[mid]
2,228✔
362
                if !has {
2,228✔
363
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
364
                }
×
365
                if srcMember.Addr != dstMember.Addr ||
2,228✔
366
                        srcMember.Leader != dstMember.Leader {
2,307✔
367

79✔
368
                        proposal := &pb.ZeroProposal{
79✔
369
                                Member: dstMember,
79✔
370
                        }
79✔
371
                        res = append(res, proposal)
79✔
372
                }
79✔
373
                if !dstMember.Leader {
3,566✔
374
                        // Don't continue to tablets if request is not from the leader.
1,338✔
375
                        return res, nil
1,338✔
376
                }
1,338✔
377
                if dst.SnapshotTs > group.SnapshotTs {
943✔
378
                        res = append(res, &pb.ZeroProposal{
53✔
379
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
53✔
380
                        })
53✔
381
                }
53✔
382
        }
383

384
        var tablets []*pb.Tablet
890✔
385
        for key, dstTablet := range dst.Tablets {
890✔
386
                group, has := s.state.Groups[dstTablet.GroupId]
×
387
                if !has {
×
388
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
389
                }
×
390
                srcTablet, has := group.Tablets[key]
×
391
                if !has {
×
392
                        // Tablet moved to new group
×
393
                        continue
×
394
                }
395

396
                s := float64(srcTablet.OnDiskBytes)
×
397
                d := float64(dstTablet.OnDiskBytes)
×
398
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
399
                        dstTablet.Force = false
×
400
                        tablets = append(tablets, dstTablet)
×
401
                }
×
402
        }
403

404
        if len(tablets) > 0 {
890✔
405
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
406
        }
×
407
        return res, nil
890✔
408
}
409

410
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
411
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
412
        defer span.End()
1✔
413
        if req == nil || len(req.Tablets) == 0 {
1✔
414
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
415
        }
×
416

417
        if req.GroupId == 0 {
1✔
418
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
419
        }
×
420

421
        tablets := make([]*pb.Tablet, 0)
1✔
422
        unknownTablets := make([]*pb.Tablet, 0)
1✔
423
        for _, t := range req.Tablets {
46✔
424
                tab := s.ServingTablet(t.Predicate)
45✔
425
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
426
                switch {
45✔
427
                case tab != nil && !t.Force:
×
428
                        tablets = append(tablets, t)
×
429
                case t.ReadOnly:
×
430
                        tablets = append(tablets, &pb.Tablet{})
×
431
                default:
45✔
432
                        unknownTablets = append(unknownTablets, t)
45✔
433
                }
434
        }
435

436
        if len(unknownTablets) == 0 {
1✔
437
                return &pb.TabletResponse{
×
438
                        Tablets: tablets,
×
439
                }, nil
×
440
        }
×
441

442
        // Set the tablet to be served by this server's group.
443
        var proposal pb.ZeroProposal
1✔
444
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
445
        for _, t := range unknownTablets {
46✔
446
                if x.IsReservedPredicate(t.Predicate) {
56✔
447
                        // Force all the reserved predicates to be allocated to group 1.
11✔
448
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
449
                        // since they only need to open one pipeline to receive updates for all
11✔
450
                        // ACL predicates.
11✔
451
                        // This will also make it easier to restore the reserved predicates after
11✔
452
                        // a DropAll operation.
11✔
453
                        t.GroupId = 1
11✔
454
                }
11✔
455
                proposal.Tablets = append(proposal.Tablets, t)
45✔
456
        }
457

458
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
459
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
460
                return nil, err
×
461
        }
×
462

463
        for _, t := range unknownTablets {
46✔
464
                tab := s.ServingTablet(t.Predicate)
45✔
465
                x.AssertTrue(tab != nil)
45✔
466
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
467
                tablets = append(tablets, tab)
45✔
468
        }
45✔
469

470
        return &pb.TabletResponse{
1✔
471
                Tablets: tablets,
1✔
472
        }, nil
1✔
473
}
474

475
// RemoveNode removes the given node from the given group.
476
// It's the user's responsibility to ensure that node doesn't come back again
477
// before calling the api.
478
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
479
        if req.GroupId == 0 {
1,005✔
480
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
481
        }
×
482
        zp := &pb.ZeroProposal{}
1,005✔
483
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
484
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
485
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
486
        }
2✔
487
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
488
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
489
                        req.GroupId)
1,001✔
490
        }
1,001✔
491
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
492
                Tablets) > 0 {
2✔
493
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
494
                        req.GroupId)
×
495
        }
×
496
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
497
                return nil, err
×
498
        }
×
499

500
        return &pb.Status{}, nil
2✔
501
}
502

503
// Connect is used by Alpha nodes to connect the very first time with group zero.
504
func (s *Server) Connect(ctx context.Context,
505
        m *pb.Member) (resp *pb.ConnectionState, err error) {
635✔
506
        // Ensures that connect requests are always serialized
635✔
507
        s.connectLock.Lock()
635✔
508
        defer s.connectLock.Unlock()
635✔
509
        glog.Infof("Got connection request: %+v\n", m)
635✔
510
        defer glog.Infof("Connected: %+v\n", m)
635✔
511

635✔
512
        if ctx.Err() != nil {
635✔
513
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
514
                return &emptyConnectionState, err
×
515
        }
×
516
        ms, err := s.latestMembershipState(ctx)
635✔
517
        if err != nil {
635✔
518
                return nil, err
×
519
        }
×
520

521
        if m.Learner && !ms.License.GetEnabled() {
635✔
522
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
523
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
524
                        "Cannot add Learner Node.")
×
525
        }
×
526

527
        if m.ClusterInfoOnly {
1,132✔
528
                // This request only wants to access the membership state, and nothing else. Most likely
497✔
529
                // from our clients.
497✔
530
                cs := &pb.ConnectionState{
497✔
531
                        State:      ms,
497✔
532
                        MaxPending: s.orc.MaxPending(),
497✔
533
                }
497✔
534
                return cs, err
497✔
535
        }
497✔
536
        if m.Addr == "" {
138✔
537
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
538
        }
×
539

540
        for _, member := range ms.Removed {
138✔
541
                // It is not recommended to reuse RAFT ids.
×
542
                if member.GroupId != 0 && m.Id == member.Id {
×
543
                        return &emptyConnectionState, errors.Errorf(
×
544
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
545
                }
×
546
        }
547

548
        numberOfNodes := len(ms.Zeros)
138✔
549
        for _, group := range ms.Groups {
209✔
550
                for _, member := range group.Members {
185✔
551
                        switch {
114✔
552
                        case member.Addr == m.Addr && m.Id == 0:
×
553
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
554
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
555
                                return &pb.ConnectionState{
×
556
                                        State:  ms,
×
557
                                        Member: member,
×
558
                                }, nil
×
559

560
                        case member.Addr == m.Addr && member.Id != m.Id:
×
561
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
562
                                // the first time. We can just directly return the membership information.
×
563
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
564
                                        " Self: +%v", member, m)
×
565

566
                        case member.Addr != m.Addr && member.Id == m.Id:
×
567
                                // Same Id. Different address.
×
568
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
569
                                        // Found a healthy connection.
×
570
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
571
                                                " with same ID: %+v", member)
×
572
                                }
×
573
                        }
574
                        numberOfNodes++
114✔
575
                }
576
        }
577

578
        // Create a connection and check validity of the address by doing an Echo.
579
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
138✔
580

138✔
581
        createProposal := func() *pb.ZeroProposal {
276✔
582
                s.Lock()
138✔
583
                defer s.Unlock()
138✔
584

138✔
585
                proposal := new(pb.ZeroProposal)
138✔
586
                // Check if we already have this member.
138✔
587
                for _, group := range s.state.Groups {
208✔
588
                        if _, has := group.Members[m.Id]; has {
72✔
589
                                return nil
2✔
590
                        }
2✔
591
                }
592
                if m.Id == 0 {
232✔
593
                        // In certain situations, the proposal can be sent and return with an error.
96✔
594
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
96✔
595
                        // IDs, the counter is incremented every time a proposal is created.
96✔
596
                        m.Id = s.nextRaftId
96✔
597
                        s.nextRaftId += 1
96✔
598
                        proposal.MaxRaftId = m.Id
96✔
599
                } else if m.Id >= s.nextRaftId {
160✔
600
                        s.nextRaftId = m.Id + 1
24✔
601
                        proposal.MaxRaftId = m.Id
24✔
602
                }
24✔
603

604
                // We don't have this member. So, let's see if it has preference for a group.
605
                if m.GroupId > 0 {
164✔
606
                        group, has := s.state.Groups[m.GroupId]
28✔
607
                        if !has {
52✔
608
                                // We don't have this group. Add the server to this group.
24✔
609
                                proposal.Member = m
24✔
610
                                return proposal
24✔
611
                        }
24✔
612

613
                        if _, has := group.Members[m.Id]; has {
4✔
614
                                proposal.Member = m // Update in case some fields have changed, like address.
×
615
                                return proposal
×
616
                        }
×
617

618
                        if m.Learner {
4✔
619
                                // Give it the group it wants.
×
620
                                proposal.Member = m
×
621
                                return proposal
×
622
                        }
×
623

624
                        // We don't have this server in the list.
625
                        if len(group.Members) < s.NumReplicas {
8✔
626
                                // We need more servers here, so let's add it.
4✔
627
                                proposal.Member = m
4✔
628
                                return proposal
4✔
629
                        } else if m.ForceGroupId {
4✔
630
                                // If the group ID was taken from the group_id file, force the member
×
631
                                // to be in this group even if the group is at capacity. This should
×
632
                                // not happen if users properly initialize a cluster after a bulk load.
×
633
                                proposal.Member = m
×
634
                                return proposal
×
635
                        }
×
636
                        // Already have plenty of servers serving this group.
637
                }
638
                // Let's assign this server to a new group.
639
                for gid, group := range s.state.Groups {
160✔
640
                        if len(group.Members) < s.NumReplicas {
72✔
641
                                m.GroupId = gid
20✔
642
                                proposal.Member = m
20✔
643
                                return proposal
20✔
644
                        }
20✔
645
                }
646
                // We either don't have any groups, or don't have any groups which need another member.
647
                m.GroupId = s.nextGroup
88✔
648
                // We shouldn't increase nextGroup here as we don't know whether we have enough
88✔
649
                // replicas until proposal is committed and can cause issues due to race.
88✔
650
                proposal.Member = m
88✔
651
                return proposal
88✔
652
        }
653

654
        proposal := createProposal()
138✔
655
        if proposal == nil {
140✔
656
                return &pb.ConnectionState{
2✔
657
                        State: ms, Member: m,
2✔
658
                }, nil
2✔
659
        }
2✔
660

661
        maxNodes := s.state.GetLicense().GetMaxNodes()
136✔
662
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
136✔
663
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
664
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
665
        }
×
666

667
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
172✔
668
                return &emptyConnectionState, err
36✔
669
        }
36✔
670
        resp = &pb.ConnectionState{
100✔
671
                State:  s.membershipState(),
100✔
672
                Member: m,
100✔
673
        }
100✔
674
        return resp, nil
100✔
675
}
676

677
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
678
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
679
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
680
        return &pb.Status{}, err
9✔
681
}
9✔
682

683
// ShouldServe returns the tablet serving the predicate passed in the request.
684
func (s *Server) ShouldServe(
685
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,334✔
686
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,334✔
687
        defer span.End()
9,334✔
688

9,334✔
689
        if tablet.Predicate == "" {
9,334✔
690
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
691
        }
×
692
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,334✔
693
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
694
        }
×
695

696
        // Check who is serving this tablet.
697
        tab := s.ServingTablet(tablet.Predicate)
9,334✔
698
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,334✔
699
        if tab != nil && !tablet.Force {
9,730✔
700
                // Someone is serving this tablet. Could be the caller as well.
396✔
701
                // The caller should compare the returned group against the group it holds to check who's
396✔
702
                // serving.
396✔
703
                return tab, nil
396✔
704
        }
396✔
705

706
        // Read-only requests should return an empty tablet instead of asking zero
707
        // to serve the predicate.
708
        if tablet.ReadOnly {
10,485✔
709
                return &pb.Tablet{}, nil
1,547✔
710
        }
1,547✔
711

712
        // Set the tablet to be served by this server's group.
713
        var proposal pb.ZeroProposal
7,391✔
714

7,391✔
715
        if x.IsReservedPredicate(tablet.Predicate) {
13,129✔
716
                // Force all the reserved predicates to be allocated to group 1.
5,738✔
717
                // This is to make it easier to stream ACL updates to all alpha servers
5,738✔
718
                // since they only need to open one pipeline to receive updates for all
5,738✔
719
                // ACL predicates.
5,738✔
720
                // This will also make it easier to restore the reserved predicates after
5,738✔
721
                // a DropAll operation.
5,738✔
722
                tablet.GroupId = 1
5,738✔
723
        }
5,738✔
724
        proposal.Tablet = tablet
7,391✔
725
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,391✔
726
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
727
                return tablet, err
×
728
        }
×
729
        tab = s.ServingTablet(tablet.Predicate)
7,391✔
730
        x.AssertTrue(tab != nil)
7,391✔
731
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,391✔
732
        return tab, nil
7,391✔
733
}
734

735
// UpdateMembership updates the membership of the given group.
736
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,228✔
737
        // Only Zero leader would get these membership updates.
2,228✔
738
        if ts := group.GetCheckpointTs(); ts > 0 {
2,796✔
739
                for _, m := range group.GetMembers() {
1,136✔
740
                        s.Lock()
568✔
741
                        s.checkpointPerGroup[m.GetGroupId()] = ts
568✔
742
                        s.Unlock()
568✔
743
                }
568✔
744
        }
745
        proposals, err := s.createProposals(group)
2,228✔
746
        if err != nil {
2,228✔
747
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
748
                // wait.
×
749
                time.Sleep(time.Second)
×
750
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
751
                return nil, err
×
752
        }
×
753

754
        ctx, cancel := context.WithCancel(ctx)
2,228✔
755
        defer cancel()
2,228✔
756

2,228✔
757
        errCh := make(chan error, len(proposals))
2,228✔
758
        for _, pr := range proposals {
2,360✔
759
                go func(pr *pb.ZeroProposal) {
264✔
760
                        errCh <- s.Node.proposeAndWait(ctx, pr)
132✔
761
                }(pr)
132✔
762
        }
763

764
        for range proposals {
2,360✔
765
                // We Don't care about these errors
132✔
766
                // Ideally shouldn't error out.
132✔
767
                if err := <-errCh; err != nil {
132✔
768
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
769
                        return nil, err
×
770
                }
×
771
        }
772

773
        if len(group.Members) == 0 {
2,228✔
774
                return &api.Payload{Data: []byte("OK")}, nil
×
775
        }
×
776
        select {
2,228✔
777
        case s.moveOngoing <- struct{}{}:
2,227✔
778
        default:
1✔
779
                // If a move is going on, don't do the next steps of deleting predicates.
1✔
780
                return &api.Payload{Data: []byte("OK")}, nil
1✔
781
        }
782
        defer func() {
4,454✔
783
                <-s.moveOngoing
2,227✔
784
        }()
2,227✔
785

786
        if err := s.deletePredicates(ctx, group); err != nil {
2,227✔
787
                glog.Warningf("While deleting predicates: %v", err)
×
788
        }
×
789
        return &api.Payload{Data: []byte("OK")}, nil
2,227✔
790
}
791

792
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,227✔
793
        if group == nil || group.Tablets == nil {
4,454✔
794
                return nil
2,227✔
795
        }
2,227✔
796
        var gid uint32
×
797
        for _, tablet := range group.Tablets {
×
798
                gid = tablet.GroupId
×
799
                break
×
800
        }
801
        if gid == 0 {
×
802
                return errors.Errorf("Unable to find group")
×
803
        }
×
804
        state, err := s.latestMembershipState(ctx)
×
805
        if err != nil {
×
806
                return err
×
807
        }
×
808
        sg, ok := state.Groups[gid]
×
809
        if !ok {
×
810
                return errors.Errorf("Unable to find group: %d", gid)
×
811
        }
×
812

813
        pl := s.Leader(gid)
×
814
        if pl == nil {
×
815
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
816
        }
×
817
        wc := pb.NewWorkerClient(pl.Get())
×
818

×
819
        for pred := range group.Tablets {
×
820
                if _, found := sg.Tablets[pred]; found {
×
821
                        continue
×
822
                }
823
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
824
                        pred, gid)
×
825
                in := &pb.MovePredicatePayload{
×
826
                        Predicate: pred,
×
827
                        SourceGid: gid,
×
828
                        DestGid:   0,
×
829
                }
×
830
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
831
                        return err
×
832
                }
×
833
        }
834
        return nil
×
835
}
836

837
// StreamMembership periodically streams the membership state to the given stream.
838
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
106✔
839
        // Send MembershipState right away. So, the connection is correctly established.
106✔
840
        ctx := stream.Context()
106✔
841
        ms, err := s.latestMembershipState(ctx)
106✔
842
        if err != nil {
110✔
843
                return err
4✔
844
        }
4✔
845
        if err := stream.Send(ms); err != nil {
102✔
846
                return err
×
847
        }
×
848

849
        ticker := time.NewTicker(time.Second)
102✔
850
        defer ticker.Stop()
102✔
851
        for {
21,827✔
852
                select {
21,725✔
853
                case <-ticker.C:
21,625✔
854
                        // Send an update every second.
21,625✔
855
                        ms, err := s.latestMembershipState(ctx)
21,625✔
856
                        if err != nil {
21,627✔
857
                                return err
2✔
858
                        }
2✔
859
                        if err := stream.Send(ms); err != nil {
21,623✔
860
                                return err
×
861
                        }
×
862
                case <-ctx.Done():
55✔
863
                        return ctx.Err()
55✔
864
                case <-s.closer.HasBeenClosed():
45✔
865
                        return errServerShutDown
45✔
866
                }
867
        }
868
}
869

870
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
27,923✔
871
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
27,943✔
872
                return nil, err
20✔
873
        }
20✔
874
        ms := s.membershipState()
27,903✔
875
        if ms == nil {
27,903✔
876
                return &pb.MembershipState{}, nil
×
877
        }
×
878
        return ms, nil
27,903✔
879
}
880

881
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
882
        error) {
7✔
883
        var l license
7✔
884
        signedData := bytes.NewReader(req.License)
7✔
885
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
886
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
887
        }
5✔
888

889
        numNodes := len(s.state.GetZeros())
2✔
890
        for _, group := range s.state.GetGroups() {
4✔
891
                numNodes += len(group.GetMembers())
2✔
892
        }
2✔
893
        if uint64(numNodes) > l.MaxNodes {
2✔
894
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
895
                        "You have: [%v].", l.MaxNodes, numNodes)
×
896
        }
×
897

898
        proposal := &pb.ZeroProposal{
2✔
899
                License: &pb.License{
2✔
900
                        User:     l.User,
2✔
901
                        MaxNodes: l.MaxNodes,
2✔
902
                        ExpiryTs: l.Expiry.Unix(),
2✔
903
                },
2✔
904
        }
2✔
905

2✔
906
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
907
        if err != nil {
2✔
908
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
909
        }
×
910
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
911
        return &pb.Status{}, nil
2✔
912
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc