• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5880098024

16 Aug 2023 02:26PM UTC coverage: 67.058% (+0.2%) from 66.883%
5880098024

push

web-flow
fix(alpha): convert numbers correctly in superflags (#7712) (#8943)

7 of 7 new or added lines in 1 file covered. (100.0%)

58416 of 87113 relevant lines covered (67.06%)

2252113.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.01
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        // nextUint is the uint64 which we can hand out next. See maxLease for the
63
        // max ID leased via Zero quorum.
64
        nextUint    map[pb.NumLeaseType]uint64
65
        readOnlyTs  uint64
66
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
67
        rateLimiter *x.RateLimiter
68

69
        // groupMap    map[uint32]*Group
70
        nextGroup      uint32
71
        leaderChangeCh chan struct{}
72
        closer         *z.Closer  // Used to tell stream to close.
73
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
74

75
        // tls client config used to connect with zero internally
76
        tlsClientConfig *tls.Config
77

78
        moveOngoing    chan struct{}
79
        blockCommitsOn *sync.Map
80

81
        checkpointPerGroup map[uint32]uint64
82
}
83

84
// Init initializes the zero server.
85
func (s *Server) Init() {
65✔
86
        s.Lock()
65✔
87
        defer s.Unlock()
65✔
88

65✔
89
        s.orc = &Oracle{}
65✔
90
        s.orc.Init()
65✔
91
        s.state = &pb.MembershipState{
65✔
92
                Groups: make(map[uint32]*pb.Group),
65✔
93
                Zeros:  make(map[uint64]*pb.Member),
65✔
94
        }
65✔
95
        s.nextUint = make(map[pb.NumLeaseType]uint64)
65✔
96
        s.nextRaftId = 1
65✔
97
        s.nextUint[pb.Num_UID] = 1
65✔
98
        s.nextUint[pb.Num_TXN_TS] = 1
65✔
99
        s.nextUint[pb.Num_NS_ID] = 1
65✔
100
        s.nextGroup = 1
65✔
101
        s.leaderChangeCh = make(chan struct{}, 1)
65✔
102
        s.closer = z.NewCloser(2) // grpc and http
65✔
103
        s.blockCommitsOn = new(sync.Map)
65✔
104
        s.moveOngoing = make(chan struct{}, 1)
65✔
105
        s.checkpointPerGroup = make(map[uint32]uint64)
65✔
106
        if opts.limiterConfig.UidLeaseLimit > 0 {
66✔
107
                // rate limiting is not enabled when lease limit is set to zero.
1✔
108
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
109
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
110
        }
1✔
111

112
        go s.rebalanceTablets()
65✔
113
}
114

115
func (s *Server) periodicallyPostTelemetry() {
65✔
116
        glog.V(2).Infof("Starting telemetry data collection for zero...")
65✔
117
        start := time.Now()
65✔
118

65✔
119
        ticker := time.NewTicker(time.Minute * 10)
65✔
120
        defer ticker.Stop()
65✔
121

65✔
122
        var lastPostedAt time.Time
65✔
123
        for range ticker.C {
77✔
124
                if !s.Node.AmLeader() {
20✔
125
                        continue
8✔
126
                }
127
                if time.Since(lastPostedAt) < time.Hour {
4✔
128
                        continue
×
129
                }
130
                ms := s.membershipState()
4✔
131
                t := telemetry.NewZero(ms)
4✔
132
                if t == nil {
4✔
133
                        continue
×
134
                }
135
                t.SinceHours = int(time.Since(start).Hours())
4✔
136
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
4✔
137

4✔
138
                err := t.Post()
4✔
139
                if err == nil {
4✔
140
                        lastPostedAt = time.Now()
×
141
                } else {
4✔
142
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
4✔
143
                }
4✔
144
        }
145
}
146

147
func (s *Server) triggerLeaderChange() {
166✔
148
        s.Lock()
166✔
149
        defer s.Unlock()
166✔
150
        close(s.leaderChangeCh)
166✔
151
        s.leaderChangeCh = make(chan struct{}, 1)
166✔
152
}
166✔
153

154
func (s *Server) leaderChangeChannel() chan struct{} {
83✔
155
        s.RLock()
83✔
156
        defer s.RUnlock()
83✔
157
        return s.leaderChangeCh
83✔
158
}
83✔
159

160
func (s *Server) member(addr string) *pb.Member {
272✔
161
        s.AssertRLock()
272✔
162
        for _, m := range s.state.Zeros {
794✔
163
                if m.Addr == addr {
522✔
164
                        return m
×
165
                }
×
166
        }
167
        for _, g := range s.state.Groups {
571✔
168
                for _, m := range g.Members {
746✔
169
                        if m.Addr == addr {
560✔
170
                                return m
113✔
171
                        }
113✔
172
                }
173
        }
174
        return nil
159✔
175
}
176

177
// Leader returns a connection pool to the zero leader.
178
func (s *Server) Leader(gid uint32) *conn.Pool {
4✔
179
        s.RLock()
4✔
180
        defer s.RUnlock()
4✔
181
        if s.state == nil {
4✔
182
                return nil
×
183
        }
×
184
        var members map[uint64]*pb.Member
4✔
185
        if gid == 0 {
4✔
186
                members = s.state.Zeros
×
187
        } else {
4✔
188
                group := s.state.Groups[gid]
4✔
189
                if group == nil {
4✔
190
                        return nil
×
191
                }
×
192
                members = group.Members
4✔
193
        }
194
        var healthyPool *conn.Pool
4✔
195
        for _, m := range members {
8✔
196
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
8✔
197
                        healthyPool = pl
4✔
198
                        if m.Leader {
8✔
199
                                return pl
4✔
200
                        }
4✔
201
                }
202
        }
203
        return healthyPool
×
204
}
205

206
// KnownGroups returns a list of the known groups.
207
func (s *Server) KnownGroups() []uint32 {
1,009✔
208
        var groups []uint32
1,009✔
209
        s.RLock()
1,009✔
210
        defer s.RUnlock()
1,009✔
211
        for group := range s.state.Groups {
2,031✔
212
                groups = append(groups, group)
1,022✔
213
        }
1,022✔
214
        return groups
1,009✔
215
}
216

217
func (s *Server) hasLeader(gid uint32) bool {
5✔
218
        s.AssertRLock()
5✔
219
        if s.state == nil {
5✔
220
                return false
×
221
        }
×
222
        group := s.state.Groups[gid]
5✔
223
        if group == nil {
5✔
224
                return false
×
225
        }
×
226
        for _, m := range group.Members {
11✔
227
                if m.Leader {
11✔
228
                        return true
5✔
229
                }
5✔
230
        }
231
        return false
×
232
}
233

234
// SetMembershipState updates the membership state to the given one.
235
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
236
        s.Lock()
×
237
        defer s.Unlock()
×
238

×
239
        s.state = state
×
240
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
241

×
242
        if state.Zeros == nil {
×
243
                state.Zeros = make(map[uint64]*pb.Member)
×
244
        }
×
245
        if state.Groups == nil {
×
246
                state.Groups = make(map[uint32]*pb.Group)
×
247
        }
×
248

249
        // Create connections to all members.
250
        for _, g := range state.Groups {
×
251
                for _, m := range g.Members {
×
252
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
253
                }
×
254

255
                if g.Tablets == nil {
×
256
                        g.Tablets = make(map[string]*pb.Tablet)
×
257
                }
×
258
        }
259

260
        s.nextGroup = uint32(len(state.Groups) + 1)
×
261
}
262

263
// MarshalMembershipState returns the marshaled membership state.
264
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
265
        s.Lock()
×
266
        defer s.Unlock()
×
267
        return s.state.Marshal()
×
268
}
×
269

270
func (s *Server) membershipState() *pb.MembershipState {
29,853✔
271
        s.RLock()
29,853✔
272
        defer s.RUnlock()
29,853✔
273
        return proto.Clone(s.state).(*pb.MembershipState)
29,853✔
274
}
29,853✔
275

276
func (s *Server) groupChecksums() map[uint32]uint64 {
153,404✔
277
        s.RLock()
153,404✔
278
        defer s.RUnlock()
153,404✔
279
        m := make(map[uint32]uint64)
153,404✔
280
        for gid, g := range s.state.GetGroups() {
515,252✔
281
                m[gid] = g.Checksum
361,848✔
282
        }
361,848✔
283
        return m
153,404✔
284
}
285

286
func (s *Server) storeZero(m *pb.Member) {
101✔
287
        s.Lock()
101✔
288
        defer s.Unlock()
101✔
289

101✔
290
        s.state.Zeros[m.Id] = m
101✔
291
}
101✔
292

293
func (s *Server) updateZeroLeader() {
1,424✔
294
        s.Lock()
1,424✔
295
        defer s.Unlock()
1,424✔
296
        leader := s.Node.Raft().Status().Lead
1,424✔
297
        for _, m := range s.state.Zeros {
4,954✔
298
                m.Leader = m.Id == leader
3,530✔
299
        }
3,530✔
300
}
301

302
func (s *Server) removeZero(nodeId uint64) {
×
303
        s.Lock()
×
304
        defer s.Unlock()
×
305
        m, has := s.state.Zeros[nodeId]
×
306
        if !has {
×
307
                return
×
308
        }
×
309
        delete(s.state.Zeros, nodeId)
×
310
        s.state.Removed = append(s.state.Removed, m)
×
311
}
312

313
// ServingTablet returns the Tablet called tablet.
314
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
36,719✔
315
        s.RLock()
36,719✔
316
        defer s.RUnlock()
36,719✔
317

36,719✔
318
        for _, group := range s.state.Groups {
81,019✔
319
                if tab, ok := group.Tablets[tablet]; ok {
74,530✔
320
                        return tab
30,230✔
321
                }
30,230✔
322
        }
323
        return nil
6,489✔
324
}
325

326
func (s *Server) blockTablet(pred string) func() {
4✔
327
        s.blockCommitsOn.Store(pred, struct{}{})
4✔
328
        return func() {
8✔
329
                s.blockCommitsOn.Delete(pred)
4✔
330
        }
4✔
331
}
332

333
func (s *Server) isBlocked(pred string) bool {
19,753✔
334
        _, blocked := s.blockCommitsOn.Load(pred)
19,753✔
335
        return blocked
19,753✔
336
}
19,753✔
337

338
func (s *Server) servingTablet(tablet string) *pb.Tablet {
8,034✔
339
        s.AssertRLock()
8,034✔
340

8,034✔
341
        for _, group := range s.state.Groups {
18,109✔
342
                if tab, ok := group.Tablets[tablet]; ok {
12,710✔
343
                        return tab
2,635✔
344
                }
2,635✔
345
        }
346
        return nil
5,399✔
347
}
348

349
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,332✔
350
        var res []*pb.ZeroProposal
2,332✔
351
        if len(dst.Members) > 1 {
2,332✔
352
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
353
        }
×
354

355
        s.RLock()
2,332✔
356
        defer s.RUnlock()
2,332✔
357
        // There is only one member. We use for loop because we don't know what the mid is.
2,332✔
358
        for mid, dstMember := range dst.Members {
4,664✔
359
                group, has := s.state.Groups[dstMember.GroupId]
2,332✔
360
                if !has {
2,332✔
361
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
362
                }
×
363
                srcMember, has := group.Members[mid]
2,332✔
364
                if !has {
2,332✔
365
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
366
                }
×
367
                if srcMember.Addr != dstMember.Addr ||
2,332✔
368
                        srcMember.Leader != dstMember.Leader {
2,417✔
369

85✔
370
                        proposal := &pb.ZeroProposal{
85✔
371
                                Member: dstMember,
85✔
372
                        }
85✔
373
                        res = append(res, proposal)
85✔
374
                }
85✔
375
                if !dstMember.Leader {
3,734✔
376
                        // Don't continue to tablets if request is not from the leader.
1,402✔
377
                        return res, nil
1,402✔
378
                }
1,402✔
379
                if dst.SnapshotTs > group.SnapshotTs {
985✔
380
                        res = append(res, &pb.ZeroProposal{
55✔
381
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
55✔
382
                        })
55✔
383
                }
55✔
384
        }
385

386
        var tablets []*pb.Tablet
930✔
387
        for key, dstTablet := range dst.Tablets {
930✔
388
                group, has := s.state.Groups[dstTablet.GroupId]
×
389
                if !has {
×
390
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
391
                }
×
392
                srcTablet, has := group.Tablets[key]
×
393
                if !has {
×
394
                        // Tablet moved to new group
×
395
                        continue
×
396
                }
397

398
                s := float64(srcTablet.OnDiskBytes)
×
399
                d := float64(dstTablet.OnDiskBytes)
×
400
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
401
                        dstTablet.Force = false
×
402
                        tablets = append(tablets, dstTablet)
×
403
                }
×
404
        }
405

406
        if len(tablets) > 0 {
930✔
407
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
408
        }
×
409
        return res, nil
930✔
410
}
411

412
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
413
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
414
        defer span.End()
1✔
415
        if req == nil || len(req.Tablets) == 0 {
1✔
416
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
417
        }
×
418

419
        if req.GroupId == 0 {
1✔
420
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
421
        }
×
422

423
        tablets := make([]*pb.Tablet, 0)
1✔
424
        unknownTablets := make([]*pb.Tablet, 0)
1✔
425
        for _, t := range req.Tablets {
46✔
426
                tab := s.ServingTablet(t.Predicate)
45✔
427
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
428
                switch {
45✔
429
                case tab != nil && !t.Force:
×
430
                        tablets = append(tablets, t)
×
431
                case t.ReadOnly:
×
432
                        tablets = append(tablets, &pb.Tablet{})
×
433
                default:
45✔
434
                        unknownTablets = append(unknownTablets, t)
45✔
435
                }
436
        }
437

438
        if len(unknownTablets) == 0 {
1✔
439
                return &pb.TabletResponse{
×
440
                        Tablets: tablets,
×
441
                }, nil
×
442
        }
×
443

444
        // Set the tablet to be served by this server's group.
445
        var proposal pb.ZeroProposal
1✔
446
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
447
        for _, t := range unknownTablets {
46✔
448
                if x.IsReservedPredicate(t.Predicate) {
56✔
449
                        // Force all the reserved predicates to be allocated to group 1.
11✔
450
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
451
                        // since they only need to open one pipeline to receive updates for all
11✔
452
                        // ACL predicates.
11✔
453
                        // This will also make it easier to restore the reserved predicates after
11✔
454
                        // a DropAll operation.
11✔
455
                        t.GroupId = 1
11✔
456
                }
11✔
457
                proposal.Tablets = append(proposal.Tablets, t)
45✔
458
        }
459

460
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
461
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
462
                return nil, err
×
463
        }
×
464

465
        for _, t := range unknownTablets {
46✔
466
                tab := s.ServingTablet(t.Predicate)
45✔
467
                x.AssertTrue(tab != nil)
45✔
468
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
469
                tablets = append(tablets, tab)
45✔
470
        }
45✔
471

472
        return &pb.TabletResponse{
1✔
473
                Tablets: tablets,
1✔
474
        }, nil
1✔
475
}
476

477
// RemoveNode removes the given node from the given group.
478
// It's the user's responsibility to ensure that node doesn't come back again
479
// before calling the api.
480
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
481
        if req.GroupId == 0 {
1,005✔
482
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
483
        }
×
484
        zp := &pb.ZeroProposal{}
1,005✔
485
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
486
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
487
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
488
        }
2✔
489
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
490
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
491
                        req.GroupId)
1,001✔
492
        }
1,001✔
493
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
494
                Tablets) > 0 {
2✔
495
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
496
                        req.GroupId)
×
497
        }
×
498
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
499
                return nil, err
×
500
        }
×
501

502
        return &pb.Status{}, nil
2✔
503
}
504

505
// Connect is used by Alpha nodes to connect the very first time with group zero.
506
func (s *Server) Connect(ctx context.Context,
507
        m *pb.Member) (resp *pb.ConnectionState, err error) {
718✔
508
        // Ensures that connect requests are always serialized
718✔
509
        s.connectLock.Lock()
718✔
510
        defer s.connectLock.Unlock()
718✔
511
        glog.Infof("Got connection request: %+v\n", m)
718✔
512
        defer glog.Infof("Connected: %+v\n", m)
718✔
513

718✔
514
        if ctx.Err() != nil {
718✔
515
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
516
                return &emptyConnectionState, err
×
517
        }
×
518
        ms, err := s.latestMembershipState(ctx)
718✔
519
        if err != nil {
718✔
520
                return nil, err
×
521
        }
×
522

523
        if m.Learner && !ms.License.GetEnabled() {
718✔
524
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
525
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
526
                        "Cannot add Learner Node.")
×
527
        }
×
528

529
        if m.ClusterInfoOnly {
1,253✔
530
                // This request only wants to access the membership state, and nothing else. Most likely
535✔
531
                // from our clients.
535✔
532
                cs := &pb.ConnectionState{
535✔
533
                        State:      ms,
535✔
534
                        MaxPending: s.orc.MaxPending(),
535✔
535
                }
535✔
536
                return cs, err
535✔
537
        }
535✔
538
        if m.Addr == "" {
183✔
539
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
540
        }
×
541

542
        for _, member := range ms.Removed {
183✔
543
                // It is not recommended to reuse RAFT ids.
×
544
                if member.GroupId != 0 && m.Id == member.Id {
×
545
                        return &emptyConnectionState, errors.Errorf(
×
546
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
547
                }
×
548
        }
549

550
        numberOfNodes := len(ms.Zeros)
183✔
551
        for _, group := range ms.Groups {
256✔
552
                for _, member := range group.Members {
190✔
553
                        switch {
117✔
554
                        case member.Addr == m.Addr && m.Id == 0:
×
555
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
556
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
557
                                return &pb.ConnectionState{
×
558
                                        State:  ms,
×
559
                                        Member: member,
×
560
                                }, nil
×
561

562
                        case member.Addr == m.Addr && member.Id != m.Id:
×
563
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
564
                                // the first time. We can just directly return the membership information.
×
565
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
566
                                        " Self: +%v", member, m)
×
567

568
                        case member.Addr != m.Addr && member.Id == m.Id:
×
569
                                // Same Id. Different address.
×
570
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
571
                                        // Found a healthy connection.
×
572
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
573
                                                " with same ID: %+v", member)
×
574
                                }
×
575
                        }
576
                        numberOfNodes++
117✔
577
                }
578
        }
579

580
        // Create a connection and check validity of the address by doing an Echo.
581
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
183✔
582

183✔
583
        createProposal := func() *pb.ZeroProposal {
366✔
584
                s.Lock()
183✔
585
                defer s.Unlock()
183✔
586

183✔
587
                proposal := new(pb.ZeroProposal)
183✔
588
                // Check if we already have this member.
183✔
589
                for _, group := range s.state.Groups {
256✔
590
                        if _, has := group.Members[m.Id]; has {
75✔
591
                                return nil
2✔
592
                        }
2✔
593
                }
594
                if m.Id == 0 {
311✔
595
                        // In certain situations, the proposal can be sent and return with an error.
130✔
596
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
130✔
597
                        // IDs, the counter is incremented every time a proposal is created.
130✔
598
                        m.Id = s.nextRaftId
130✔
599
                        s.nextRaftId += 1
130✔
600
                        proposal.MaxRaftId = m.Id
130✔
601
                } else if m.Id >= s.nextRaftId {
201✔
602
                        s.nextRaftId = m.Id + 1
20✔
603
                        proposal.MaxRaftId = m.Id
20✔
604
                }
20✔
605

606
                // We don't have this member. So, let's see if it has preference for a group.
607
                if m.GroupId > 0 {
211✔
608
                        group, has := s.state.Groups[m.GroupId]
30✔
609
                        if !has {
56✔
610
                                // We don't have this group. Add the server to this group.
26✔
611
                                proposal.Member = m
26✔
612
                                return proposal
26✔
613
                        }
26✔
614

615
                        if _, has := group.Members[m.Id]; has {
4✔
616
                                proposal.Member = m // Update in case some fields have changed, like address.
×
617
                                return proposal
×
618
                        }
×
619

620
                        if m.Learner {
4✔
621
                                // Give it the group it wants.
×
622
                                proposal.Member = m
×
623
                                return proposal
×
624
                        }
×
625

626
                        // We don't have this server in the list.
627
                        if len(group.Members) < s.NumReplicas {
8✔
628
                                // We need more servers here, so let's add it.
4✔
629
                                proposal.Member = m
4✔
630
                                return proposal
4✔
631
                        } else if m.ForceGroupId {
4✔
632
                                // If the group ID was taken from the group_id file, force the member
×
633
                                // to be in this group even if the group is at capacity. This should
×
634
                                // not happen if users properly initialize a cluster after a bulk load.
×
635
                                proposal.Member = m
×
636
                                return proposal
×
637
                        }
×
638
                        // Already have plenty of servers serving this group.
639
                }
640
                // Let's assign this server to a new group.
641
                for gid, group := range s.state.Groups {
206✔
642
                        if len(group.Members) < s.NumReplicas {
75✔
643
                                m.GroupId = gid
20✔
644
                                proposal.Member = m
20✔
645
                                return proposal
20✔
646
                        }
20✔
647
                }
648
                // We either don't have any groups, or don't have any groups which need another member.
649
                m.GroupId = s.nextGroup
131✔
650
                // We shouldn't increase nextGroup here as we don't know whether we have enough
131✔
651
                // replicas until proposal is committed and can cause issues due to race.
131✔
652
                proposal.Member = m
131✔
653
                return proposal
131✔
654
        }
655

656
        proposal := createProposal()
183✔
657
        if proposal == nil {
185✔
658
                return &pb.ConnectionState{
2✔
659
                        State: ms, Member: m,
2✔
660
                }, nil
2✔
661
        }
2✔
662

663
        maxNodes := s.state.GetLicense().GetMaxNodes()
181✔
664
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
181✔
665
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
666
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
667
        }
×
668

669
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
259✔
670
                return &emptyConnectionState, err
78✔
671
        }
78✔
672
        resp = &pb.ConnectionState{
103✔
673
                State:  s.membershipState(),
103✔
674
                Member: m,
103✔
675
        }
103✔
676
        return resp, nil
103✔
677
}
678

679
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
680
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
681
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
682
        return &pb.Status{}, err
9✔
683
}
9✔
684

685
// ShouldServe returns the tablet serving the predicate passed in the request.
686
func (s *Server) ShouldServe(
687
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,419✔
688
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,419✔
689
        defer span.End()
9,419✔
690

9,419✔
691
        if tablet.Predicate == "" {
9,419✔
692
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
693
        }
×
694
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,419✔
695
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
696
        }
×
697

698
        // Check who is serving this tablet.
699
        tab := s.ServingTablet(tablet.Predicate)
9,419✔
700
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,419✔
701
        if tab != nil && !tablet.Force {
9,823✔
702
                // Someone is serving this tablet. Could be the caller as well.
404✔
703
                // The caller should compare the returned group against the group it holds to check who's
404✔
704
                // serving.
404✔
705
                return tab, nil
404✔
706
        }
404✔
707

708
        // Read-only requests should return an empty tablet instead of asking zero
709
        // to serve the predicate.
710
        if tablet.ReadOnly {
10,585✔
711
                return &pb.Tablet{}, nil
1,570✔
712
        }
1,570✔
713

714
        // Set the tablet to be served by this server's group.
715
        var proposal pb.ZeroProposal
7,445✔
716

7,445✔
717
        if x.IsReservedPredicate(tablet.Predicate) {
13,187✔
718
                // Force all the reserved predicates to be allocated to group 1.
5,742✔
719
                // This is to make it easier to stream ACL updates to all alpha servers
5,742✔
720
                // since they only need to open one pipeline to receive updates for all
5,742✔
721
                // ACL predicates.
5,742✔
722
                // This will also make it easier to restore the reserved predicates after
5,742✔
723
                // a DropAll operation.
5,742✔
724
                tablet.GroupId = 1
5,742✔
725
        }
5,742✔
726
        proposal.Tablet = tablet
7,445✔
727
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,445✔
728
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
729
                return tablet, err
×
730
        }
×
731
        tab = s.ServingTablet(tablet.Predicate)
7,445✔
732
        x.AssertTrue(tab != nil)
7,445✔
733
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,445✔
734
        return tab, nil
7,445✔
735
}
736

737
// UpdateMembership updates the membership of the given group.
738
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,332✔
739
        // Only Zero leader would get these membership updates.
2,332✔
740
        if ts := group.GetCheckpointTs(); ts > 0 {
2,932✔
741
                for _, m := range group.GetMembers() {
1,200✔
742
                        s.Lock()
600✔
743
                        s.checkpointPerGroup[m.GetGroupId()] = ts
600✔
744
                        s.Unlock()
600✔
745
                }
600✔
746
        }
747
        proposals, err := s.createProposals(group)
2,332✔
748
        if err != nil {
2,332✔
749
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
750
                // wait.
×
751
                time.Sleep(time.Second)
×
752
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
753
                return nil, err
×
754
        }
×
755

756
        ctx, cancel := context.WithCancel(ctx)
2,332✔
757
        defer cancel()
2,332✔
758

2,332✔
759
        errCh := make(chan error, len(proposals))
2,332✔
760
        for _, pr := range proposals {
2,472✔
761
                go func(pr *pb.ZeroProposal) {
280✔
762
                        errCh <- s.Node.proposeAndWait(ctx, pr)
140✔
763
                }(pr)
140✔
764
        }
765

766
        for range proposals {
2,472✔
767
                // We Don't care about these errors
140✔
768
                // Ideally shouldn't error out.
140✔
769
                if err := <-errCh; err != nil {
140✔
770
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
771
                        return nil, err
×
772
                }
×
773
        }
774

775
        if len(group.Members) == 0 {
2,332✔
776
                return &api.Payload{Data: []byte("OK")}, nil
×
777
        }
×
778
        select {
2,332✔
779
        case s.moveOngoing <- struct{}{}:
2,332✔
780
        default:
×
781
                // If a move is going on, don't do the next steps of deleting predicates.
×
782
                return &api.Payload{Data: []byte("OK")}, nil
×
783
        }
784
        defer func() {
4,664✔
785
                <-s.moveOngoing
2,332✔
786
        }()
2,332✔
787

788
        if err := s.deletePredicates(ctx, group); err != nil {
2,332✔
789
                glog.Warningf("While deleting predicates: %v", err)
×
790
        }
×
791
        return &api.Payload{Data: []byte("OK")}, nil
2,332✔
792
}
793

794
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,332✔
795
        if group == nil || group.Tablets == nil {
4,664✔
796
                return nil
2,332✔
797
        }
2,332✔
798
        var gid uint32
×
799
        for _, tablet := range group.Tablets {
×
800
                gid = tablet.GroupId
×
801
                break
×
802
        }
803
        if gid == 0 {
×
804
                return errors.Errorf("Unable to find group")
×
805
        }
×
806
        state, err := s.latestMembershipState(ctx)
×
807
        if err != nil {
×
808
                return err
×
809
        }
×
810
        sg, ok := state.Groups[gid]
×
811
        if !ok {
×
812
                return errors.Errorf("Unable to find group: %d", gid)
×
813
        }
×
814

815
        pl := s.Leader(gid)
×
816
        if pl == nil {
×
817
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
818
        }
×
819
        wc := pb.NewWorkerClient(pl.Get())
×
820

×
821
        for pred := range group.Tablets {
×
822
                if _, found := sg.Tablets[pred]; found {
×
823
                        continue
×
824
                }
825
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
826
                        pred, gid)
×
827
                in := &pb.MovePredicatePayload{
×
828
                        Predicate: pred,
×
829
                        SourceGid: gid,
×
830
                        DestGid:   0,
×
831
                }
×
832
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
833
                        return err
×
834
                }
×
835
        }
836
        return nil
×
837
}
838

839
// StreamMembership periodically streams the membership state to the given stream.
840
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
109✔
841
        // Send MembershipState right away. So, the connection is correctly established.
109✔
842
        ctx := stream.Context()
109✔
843
        ms, err := s.latestMembershipState(ctx)
109✔
844
        if err != nil {
113✔
845
                return err
4✔
846
        }
4✔
847
        if err := stream.Send(ms); err != nil {
105✔
848
                return err
×
849
        }
×
850

851
        ticker := time.NewTicker(time.Second)
105✔
852
        defer ticker.Stop()
105✔
853
        for {
22,877✔
854
                select {
22,772✔
855
                case <-ticker.C:
22,671✔
856
                        // Send an update every second.
22,671✔
857
                        ms, err := s.latestMembershipState(ctx)
22,671✔
858
                        if err != nil {
22,675✔
859
                                return err
4✔
860
                        }
4✔
861
                        if err := stream.Send(ms); err != nil {
22,667✔
862
                                return err
×
863
                        }
×
864
                case <-ctx.Done():
57✔
865
                        return ctx.Err()
57✔
866
                case <-s.closer.HasBeenClosed():
44✔
867
                        return errServerShutDown
44✔
868
                }
869
        }
870
}
871

872
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
29,335✔
873
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
29,358✔
874
                return nil, err
23✔
875
        }
23✔
876
        ms := s.membershipState()
29,312✔
877
        if ms == nil {
29,312✔
878
                return &pb.MembershipState{}, nil
×
879
        }
×
880
        return ms, nil
29,312✔
881
}
882

883
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
884
        error) {
7✔
885
        var l license
7✔
886
        signedData := bytes.NewReader(req.License)
7✔
887
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
888
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
889
        }
5✔
890

891
        numNodes := len(s.state.GetZeros())
2✔
892
        for _, group := range s.state.GetGroups() {
4✔
893
                numNodes += len(group.GetMembers())
2✔
894
        }
2✔
895
        if uint64(numNodes) > l.MaxNodes {
2✔
896
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
897
                        "You have: [%v].", l.MaxNodes, numNodes)
×
898
        }
×
899

900
        proposal := &pb.ZeroProposal{
2✔
901
                License: &pb.License{
2✔
902
                        User:     l.User,
2✔
903
                        MaxNodes: l.MaxNodes,
2✔
904
                        ExpiryTs: l.Expiry.Unix(),
2✔
905
                },
2✔
906
        }
2✔
907

2✔
908
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
909
        if err != nil {
2✔
910
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
911
        }
×
912
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
913
        return &pb.Status{}, nil
2✔
914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc