• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5918491212

20 Aug 2023 04:34PM UTC coverage: 67.141% (-0.1%) from 67.252%
5918491212

push

web-flow
ci(sentry): disable sentry in compose files and CI jobs (#8964)

58499 of 87129 relevant lines covered (67.14%)

2221737.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.33
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        // nextUint is the uint64 which we can hand out next. See maxLease for the
63
        // max ID leased via Zero quorum.
64
        nextUint    map[pb.NumLeaseType]uint64
65
        readOnlyTs  uint64
66
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
67
        rateLimiter *x.RateLimiter
68

69
        // groupMap    map[uint32]*Group
70
        nextGroup      uint32
71
        leaderChangeCh chan struct{}
72
        closer         *z.Closer  // Used to tell stream to close.
73
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
74

75
        // tls client config used to connect with zero internally
76
        tlsClientConfig *tls.Config
77

78
        moveOngoing    chan struct{}
79
        blockCommitsOn *sync.Map
80

81
        checkpointPerGroup map[uint32]uint64
82
}
83

84
// Init initializes the zero server.
85
func (s *Server) Init() {
67✔
86
        s.Lock()
67✔
87
        defer s.Unlock()
67✔
88

67✔
89
        s.orc = &Oracle{}
67✔
90
        s.orc.Init()
67✔
91
        s.state = &pb.MembershipState{
67✔
92
                Groups: make(map[uint32]*pb.Group),
67✔
93
                Zeros:  make(map[uint64]*pb.Member),
67✔
94
        }
67✔
95
        s.nextUint = make(map[pb.NumLeaseType]uint64)
67✔
96
        s.nextRaftId = 1
67✔
97
        s.nextUint[pb.Num_UID] = 1
67✔
98
        s.nextUint[pb.Num_TXN_TS] = 1
67✔
99
        s.nextUint[pb.Num_NS_ID] = 1
67✔
100
        s.nextGroup = 1
67✔
101
        s.leaderChangeCh = make(chan struct{}, 1)
67✔
102
        s.closer = z.NewCloser(2) // grpc and http
67✔
103
        s.blockCommitsOn = new(sync.Map)
67✔
104
        s.moveOngoing = make(chan struct{}, 1)
67✔
105
        s.checkpointPerGroup = make(map[uint32]uint64)
67✔
106
        if opts.limiterConfig.UidLeaseLimit > 0 {
68✔
107
                // rate limiting is not enabled when lease limit is set to zero.
1✔
108
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
109
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
110
        }
1✔
111

112
        go s.rebalanceTablets()
67✔
113
}
114

115
func (s *Server) periodicallyPostTelemetry() {
×
116
        glog.V(2).Infof("Starting telemetry data collection for zero...")
×
117
        start := time.Now()
×
118

×
119
        ticker := time.NewTicker(time.Minute * 10)
×
120
        defer ticker.Stop()
×
121

×
122
        var lastPostedAt time.Time
×
123
        for range ticker.C {
×
124
                if !s.Node.AmLeader() {
×
125
                        continue
×
126
                }
127
                if time.Since(lastPostedAt) < time.Hour {
×
128
                        continue
×
129
                }
130
                ms := s.membershipState()
×
131
                t := telemetry.NewZero(ms)
×
132
                if t == nil {
×
133
                        continue
×
134
                }
135
                t.SinceHours = int(time.Since(start).Hours())
×
136
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
×
137

×
138
                err := t.Post()
×
139
                if err == nil {
×
140
                        lastPostedAt = time.Now()
×
141
                } else {
×
142
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
×
143
                }
×
144
        }
145
}
146

147
func (s *Server) triggerLeaderChange() {
176✔
148
        s.Lock()
176✔
149
        defer s.Unlock()
176✔
150
        close(s.leaderChangeCh)
176✔
151
        s.leaderChangeCh = make(chan struct{}, 1)
176✔
152
}
176✔
153

154
func (s *Server) leaderChangeChannel() chan struct{} {
82✔
155
        s.RLock()
82✔
156
        defer s.RUnlock()
82✔
157
        return s.leaderChangeCh
82✔
158
}
82✔
159

160
func (s *Server) member(addr string) *pb.Member {
276✔
161
        s.AssertRLock()
276✔
162
        for _, m := range s.state.Zeros {
820✔
163
                if m.Addr == addr {
544✔
164
                        return m
×
165
                }
×
166
        }
167
        for _, g := range s.state.Groups {
571✔
168
                for _, m := range g.Members {
722✔
169
                        if m.Addr == addr {
538✔
170
                                return m
111✔
171
                        }
111✔
172
                }
173
        }
174
        return nil
165✔
175
}
176

177
// Leader returns a connection pool to the zero leader.
178
func (s *Server) Leader(gid uint32) *conn.Pool {
5✔
179
        s.RLock()
5✔
180
        defer s.RUnlock()
5✔
181
        if s.state == nil {
5✔
182
                return nil
×
183
        }
×
184
        var members map[uint64]*pb.Member
5✔
185
        if gid == 0 {
5✔
186
                members = s.state.Zeros
×
187
        } else {
5✔
188
                group := s.state.Groups[gid]
5✔
189
                if group == nil {
5✔
190
                        return nil
×
191
                }
×
192
                members = group.Members
5✔
193
        }
194
        var healthyPool *conn.Pool
5✔
195
        for _, m := range members {
10✔
196
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
10✔
197
                        healthyPool = pl
5✔
198
                        if m.Leader {
10✔
199
                                return pl
5✔
200
                        }
5✔
201
                }
202
        }
203
        return healthyPool
×
204
}
205

206
// KnownGroups returns a list of the known groups.
207
func (s *Server) KnownGroups() []uint32 {
1,009✔
208
        var groups []uint32
1,009✔
209
        s.RLock()
1,009✔
210
        defer s.RUnlock()
1,009✔
211
        for group := range s.state.Groups {
2,031✔
212
                groups = append(groups, group)
1,022✔
213
        }
1,022✔
214
        return groups
1,009✔
215
}
216

217
func (s *Server) hasLeader(gid uint32) bool {
5✔
218
        s.AssertRLock()
5✔
219
        if s.state == nil {
5✔
220
                return false
×
221
        }
×
222
        group := s.state.Groups[gid]
5✔
223
        if group == nil {
5✔
224
                return false
×
225
        }
×
226
        for _, m := range group.Members {
10✔
227
                if m.Leader {
10✔
228
                        return true
5✔
229
                }
5✔
230
        }
231
        return false
×
232
}
233

234
// SetMembershipState updates the membership state to the given one.
235
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
236
        s.Lock()
×
237
        defer s.Unlock()
×
238

×
239
        s.state = state
×
240
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
241

×
242
        if state.Zeros == nil {
×
243
                state.Zeros = make(map[uint64]*pb.Member)
×
244
        }
×
245
        if state.Groups == nil {
×
246
                state.Groups = make(map[uint32]*pb.Group)
×
247
        }
×
248

249
        // Create connections to all members.
250
        for _, g := range state.Groups {
×
251
                for _, m := range g.Members {
×
252
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
253
                }
×
254

255
                if g.Tablets == nil {
×
256
                        g.Tablets = make(map[string]*pb.Tablet)
×
257
                }
×
258
        }
259

260
        s.nextGroup = uint32(len(state.Groups) + 1)
×
261
}
262

263
// MarshalMembershipState returns the marshaled membership state.
264
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
265
        s.Lock()
×
266
        defer s.Unlock()
×
267
        return s.state.Marshal()
×
268
}
×
269

270
func (s *Server) membershipState() *pb.MembershipState {
29,261✔
271
        s.RLock()
29,261✔
272
        defer s.RUnlock()
29,261✔
273
        return proto.Clone(s.state).(*pb.MembershipState)
29,261✔
274
}
29,261✔
275

276
func (s *Server) groupChecksums() map[uint32]uint64 {
154,270✔
277
        s.RLock()
154,270✔
278
        defer s.RUnlock()
154,270✔
279
        m := make(map[uint32]uint64)
154,270✔
280
        for gid, g := range s.state.GetGroups() {
518,737✔
281
                m[gid] = g.Checksum
364,467✔
282
        }
364,467✔
283
        return m
154,270✔
284
}
285

286
func (s *Server) storeZero(m *pb.Member) {
109✔
287
        s.Lock()
109✔
288
        defer s.Unlock()
109✔
289

109✔
290
        s.state.Zeros[m.Id] = m
109✔
291
}
109✔
292

293
func (s *Server) updateZeroLeader() {
1,410✔
294
        s.Lock()
1,410✔
295
        defer s.Unlock()
1,410✔
296
        leader := s.Node.Raft().Status().Lead
1,410✔
297
        for _, m := range s.state.Zeros {
4,899✔
298
                m.Leader = m.Id == leader
3,489✔
299
        }
3,489✔
300
}
301

302
func (s *Server) removeZero(nodeId uint64) {
×
303
        s.Lock()
×
304
        defer s.Unlock()
×
305
        m, has := s.state.Zeros[nodeId]
×
306
        if !has {
×
307
                return
×
308
        }
×
309
        delete(s.state.Zeros, nodeId)
×
310
        s.state.Removed = append(s.state.Removed, m)
×
311
}
312

313
// ServingTablet returns the Tablet called tablet.
314
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
36,695✔
315
        s.RLock()
36,695✔
316
        defer s.RUnlock()
36,695✔
317

36,695✔
318
        for _, group := range s.state.Groups {
79,373✔
319
                if tab, ok := group.Tablets[tablet]; ok {
72,889✔
320
                        return tab
30,211✔
321
                }
30,211✔
322
        }
323
        return nil
6,484✔
324
}
325

326
func (s *Server) blockTablet(pred string) func() {
5✔
327
        s.blockCommitsOn.Store(pred, struct{}{})
5✔
328
        return func() {
10✔
329
                s.blockCommitsOn.Delete(pred)
5✔
330
        }
5✔
331
}
332

333
func (s *Server) isBlocked(pred string) bool {
19,725✔
334
        _, blocked := s.blockCommitsOn.Load(pred)
19,725✔
335
        return blocked
19,725✔
336
}
19,725✔
337

338
func (s *Server) servingTablet(tablet string) *pb.Tablet {
8,049✔
339
        s.AssertRLock()
8,049✔
340

8,049✔
341
        for _, group := range s.state.Groups {
18,072✔
342
                if tab, ok := group.Tablets[tablet]; ok {
12,658✔
343
                        return tab
2,635✔
344
                }
2,635✔
345
        }
346
        return nil
5,414✔
347
}
348

349
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,285✔
350
        var res []*pb.ZeroProposal
2,285✔
351
        if len(dst.Members) > 1 {
2,285✔
352
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
353
        }
×
354

355
        s.RLock()
2,285✔
356
        defer s.RUnlock()
2,285✔
357
        // There is only one member. We use for loop because we don't know what the mid is.
2,285✔
358
        for mid, dstMember := range dst.Members {
4,570✔
359
                group, has := s.state.Groups[dstMember.GroupId]
2,285✔
360
                if !has {
2,285✔
361
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
362
                }
×
363
                srcMember, has := group.Members[mid]
2,285✔
364
                if !has {
2,285✔
365
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
366
                }
×
367
                if srcMember.Addr != dstMember.Addr ||
2,285✔
368
                        srcMember.Leader != dstMember.Leader {
2,368✔
369

83✔
370
                        proposal := &pb.ZeroProposal{
83✔
371
                                Member: dstMember,
83✔
372
                        }
83✔
373
                        res = append(res, proposal)
83✔
374
                }
83✔
375
                if !dstMember.Leader {
3,652✔
376
                        // Don't continue to tablets if request is not from the leader.
1,367✔
377
                        return res, nil
1,367✔
378
                }
1,367✔
379
                if dst.SnapshotTs > group.SnapshotTs {
972✔
380
                        res = append(res, &pb.ZeroProposal{
54✔
381
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
54✔
382
                        })
54✔
383
                }
54✔
384
        }
385

386
        var tablets []*pb.Tablet
918✔
387
        for key, dstTablet := range dst.Tablets {
918✔
388
                group, has := s.state.Groups[dstTablet.GroupId]
×
389
                if !has {
×
390
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
391
                }
×
392
                srcTablet, has := group.Tablets[key]
×
393
                if !has {
×
394
                        // Tablet moved to new group
×
395
                        continue
×
396
                }
397

398
                s := float64(srcTablet.OnDiskBytes)
×
399
                d := float64(dstTablet.OnDiskBytes)
×
400
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
401
                        dstTablet.Force = false
×
402
                        tablets = append(tablets, dstTablet)
×
403
                }
×
404
        }
405

406
        if len(tablets) > 0 {
918✔
407
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
408
        }
×
409
        return res, nil
918✔
410
}
411

412
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
413
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
414
        defer span.End()
1✔
415
        if req == nil || len(req.Tablets) == 0 {
1✔
416
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
417
        }
×
418

419
        if req.GroupId == 0 {
1✔
420
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
421
        }
×
422

423
        tablets := make([]*pb.Tablet, 0)
1✔
424
        unknownTablets := make([]*pb.Tablet, 0)
1✔
425
        for _, t := range req.Tablets {
46✔
426
                tab := s.ServingTablet(t.Predicate)
45✔
427
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
428
                switch {
45✔
429
                case tab != nil && !t.Force:
×
430
                        tablets = append(tablets, t)
×
431
                case t.ReadOnly:
×
432
                        tablets = append(tablets, &pb.Tablet{})
×
433
                default:
45✔
434
                        unknownTablets = append(unknownTablets, t)
45✔
435
                }
436
        }
437

438
        if len(unknownTablets) == 0 {
1✔
439
                return &pb.TabletResponse{
×
440
                        Tablets: tablets,
×
441
                }, nil
×
442
        }
×
443

444
        // Set the tablet to be served by this server's group.
445
        var proposal pb.ZeroProposal
1✔
446
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
447
        for _, t := range unknownTablets {
46✔
448
                if x.IsReservedPredicate(t.Predicate) {
56✔
449
                        // Force all the reserved predicates to be allocated to group 1.
11✔
450
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
451
                        // since they only need to open one pipeline to receive updates for all
11✔
452
                        // ACL predicates.
11✔
453
                        // This will also make it easier to restore the reserved predicates after
11✔
454
                        // a DropAll operation.
11✔
455
                        t.GroupId = 1
11✔
456
                }
11✔
457
                proposal.Tablets = append(proposal.Tablets, t)
45✔
458
        }
459

460
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
461
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
462
                return nil, err
×
463
        }
×
464

465
        for _, t := range unknownTablets {
46✔
466
                tab := s.ServingTablet(t.Predicate)
45✔
467
                x.AssertTrue(tab != nil)
45✔
468
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
469
                tablets = append(tablets, tab)
45✔
470
        }
45✔
471

472
        return &pb.TabletResponse{
1✔
473
                Tablets: tablets,
1✔
474
        }, nil
1✔
475
}
476

477
// RemoveNode removes the given node from the given group.
478
// It's the user's responsibility to ensure that node doesn't come back again
479
// before calling the api.
480
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
481
        if req.GroupId == 0 {
1,005✔
482
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
483
        }
×
484
        zp := &pb.ZeroProposal{}
1,005✔
485
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
486
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
487
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
488
        }
2✔
489
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
490
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
491
                        req.GroupId)
1,001✔
492
        }
1,001✔
493
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
494
                Tablets) > 0 {
2✔
495
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
496
                        req.GroupId)
×
497
        }
×
498
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
499
                return nil, err
×
500
        }
×
501

502
        return &pb.Status{}, nil
2✔
503
}
504

505
// Connect is used by Alpha nodes to connect the very first time with group zero.
506
func (s *Server) Connect(ctx context.Context,
507
        m *pb.Member) (resp *pb.ConnectionState, err error) {
692✔
508
        // Ensures that connect requests are always serialized
692✔
509
        s.connectLock.Lock()
692✔
510
        defer s.connectLock.Unlock()
692✔
511
        glog.Infof("Got connection request: %+v\n", m)
692✔
512
        defer glog.Infof("Connected: %+v\n", m)
692✔
513

692✔
514
        if ctx.Err() != nil {
692✔
515
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
516
                return &emptyConnectionState, err
×
517
        }
×
518
        ms, err := s.latestMembershipState(ctx)
692✔
519
        if err != nil {
695✔
520
                return nil, err
3✔
521
        }
3✔
522

523
        if m.Learner && !ms.License.GetEnabled() {
689✔
524
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
525
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
526
                        "Cannot add Learner Node.")
×
527
        }
×
528

529
        if m.ClusterInfoOnly {
1,214✔
530
                // This request only wants to access the membership state, and nothing else. Most likely
525✔
531
                // from our clients.
525✔
532
                cs := &pb.ConnectionState{
525✔
533
                        State:      ms,
525✔
534
                        MaxPending: s.orc.MaxPending(),
525✔
535
                }
525✔
536
                return cs, err
525✔
537
        }
525✔
538
        if m.Addr == "" {
164✔
539
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
540
        }
×
541

542
        for _, member := range ms.Removed {
164✔
543
                // It is not recommended to reuse RAFT ids.
×
544
                if member.GroupId != 0 && m.Id == member.Id {
×
545
                        return &emptyConnectionState, errors.Errorf(
×
546
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
547
                }
×
548
        }
549

550
        numberOfNodes := len(ms.Zeros)
164✔
551
        for _, group := range ms.Groups {
239✔
552
                for _, member := range group.Members {
192✔
553
                        switch {
117✔
554
                        case member.Addr == m.Addr && m.Id == 0:
×
555
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
556
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
557
                                return &pb.ConnectionState{
×
558
                                        State:  ms,
×
559
                                        Member: member,
×
560
                                }, nil
×
561

562
                        case member.Addr == m.Addr && member.Id != m.Id:
×
563
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
564
                                // the first time. We can just directly return the membership information.
×
565
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
566
                                        " Self: +%v", member, m)
×
567

568
                        case member.Addr != m.Addr && member.Id == m.Id:
×
569
                                // Same Id. Different address.
×
570
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
571
                                        // Found a healthy connection.
×
572
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
573
                                                " with same ID: %+v", member)
×
574
                                }
×
575
                        }
576
                        numberOfNodes++
117✔
577
                }
578
        }
579

580
        // Create a connection and check validity of the address by doing an Echo.
581
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
164✔
582

164✔
583
        createProposal := func() *pb.ZeroProposal {
328✔
584
                s.Lock()
164✔
585
                defer s.Unlock()
164✔
586

164✔
587
                proposal := new(pb.ZeroProposal)
164✔
588
                // Check if we already have this member.
164✔
589
                for _, group := range s.state.Groups {
239✔
590
                        if _, has := group.Members[m.Id]; has {
77✔
591
                                return nil
2✔
592
                        }
2✔
593
                }
594
                if m.Id == 0 {
259✔
595
                        // In certain situations, the proposal can be sent and return with an error.
97✔
596
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
97✔
597
                        // IDs, the counter is incremented every time a proposal is created.
97✔
598
                        m.Id = s.nextRaftId
97✔
599
                        s.nextRaftId += 1
97✔
600
                        proposal.MaxRaftId = m.Id
97✔
601
                } else if m.Id >= s.nextRaftId {
186✔
602
                        s.nextRaftId = m.Id + 1
24✔
603
                        proposal.MaxRaftId = m.Id
24✔
604
                }
24✔
605

606
                // We don't have this member. So, let's see if it has preference for a group.
607
                if m.GroupId > 0 {
188✔
608
                        group, has := s.state.Groups[m.GroupId]
26✔
609
                        if !has {
48✔
610
                                // We don't have this group. Add the server to this group.
22✔
611
                                proposal.Member = m
22✔
612
                                return proposal
22✔
613
                        }
22✔
614

615
                        if _, has := group.Members[m.Id]; has {
4✔
616
                                proposal.Member = m // Update in case some fields have changed, like address.
×
617
                                return proposal
×
618
                        }
×
619

620
                        if m.Learner {
4✔
621
                                // Give it the group it wants.
×
622
                                proposal.Member = m
×
623
                                return proposal
×
624
                        }
×
625

626
                        // We don't have this server in the list.
627
                        if len(group.Members) < s.NumReplicas {
8✔
628
                                // We need more servers here, so let's add it.
4✔
629
                                proposal.Member = m
4✔
630
                                return proposal
4✔
631
                        } else if m.ForceGroupId {
4✔
632
                                // If the group ID was taken from the group_id file, force the member
×
633
                                // to be in this group even if the group is at capacity. This should
×
634
                                // not happen if users properly initialize a cluster after a bulk load.
×
635
                                proposal.Member = m
×
636
                                return proposal
×
637
                        }
×
638
                        // Already have plenty of servers serving this group.
639
                }
640
                // Let's assign this server to a new group.
641
                for gid, group := range s.state.Groups {
192✔
642
                        if len(group.Members) < s.NumReplicas {
76✔
643
                                m.GroupId = gid
20✔
644
                                proposal.Member = m
20✔
645
                                return proposal
20✔
646
                        }
20✔
647
                }
648
                // We either don't have any groups, or don't have any groups which need another member.
649
                m.GroupId = s.nextGroup
116✔
650
                // We shouldn't increase nextGroup here as we don't know whether we have enough
116✔
651
                // replicas until proposal is committed and can cause issues due to race.
116✔
652
                proposal.Member = m
116✔
653
                return proposal
116✔
654
        }
655

656
        proposal := createProposal()
164✔
657
        if proposal == nil {
166✔
658
                return &pb.ConnectionState{
2✔
659
                        State: ms, Member: m,
2✔
660
                }, nil
2✔
661
        }
2✔
662

663
        maxNodes := s.state.GetLicense().GetMaxNodes()
162✔
664
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
162✔
665
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
666
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
667
        }
×
668

669
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
221✔
670
                return &emptyConnectionState, err
59✔
671
        }
59✔
672
        resp = &pb.ConnectionState{
103✔
673
                State:  s.membershipState(),
103✔
674
                Member: m,
103✔
675
        }
103✔
676
        return resp, nil
103✔
677
}
678

679
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
680
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
681
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
682
        return &pb.Status{}, err
9✔
683
}
9✔
684

685
// ShouldServe returns the tablet serving the predicate passed in the request.
686
func (s *Server) ShouldServe(
687
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,422✔
688
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,422✔
689
        defer span.End()
9,422✔
690

9,422✔
691
        if tablet.Predicate == "" {
9,422✔
692
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
693
        }
×
694
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,422✔
695
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
696
        }
×
697

698
        // Check who is serving this tablet.
699
        tab := s.ServingTablet(tablet.Predicate)
9,422✔
700
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,422✔
701
        if tab != nil && !tablet.Force {
9,834✔
702
                // Someone is serving this tablet. Could be the caller as well.
412✔
703
                // The caller should compare the returned group against the group it holds to check who's
412✔
704
                // serving.
412✔
705
                return tab, nil
412✔
706
        }
412✔
707

708
        // Read-only requests should return an empty tablet instead of asking zero
709
        // to serve the predicate.
710
        if tablet.ReadOnly {
10,575✔
711
                return &pb.Tablet{}, nil
1,565✔
712
        }
1,565✔
713

714
        // Set the tablet to be served by this server's group.
715
        var proposal pb.ZeroProposal
7,445✔
716

7,445✔
717
        if x.IsReservedPredicate(tablet.Predicate) {
13,186✔
718
                // Force all the reserved predicates to be allocated to group 1.
5,741✔
719
                // This is to make it easier to stream ACL updates to all alpha servers
5,741✔
720
                // since they only need to open one pipeline to receive updates for all
5,741✔
721
                // ACL predicates.
5,741✔
722
                // This will also make it easier to restore the reserved predicates after
5,741✔
723
                // a DropAll operation.
5,741✔
724
                tablet.GroupId = 1
5,741✔
725
        }
5,741✔
726
        proposal.Tablet = tablet
7,445✔
727
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,445✔
728
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
729
                return tablet, err
×
730
        }
×
731
        tab = s.ServingTablet(tablet.Predicate)
7,445✔
732
        x.AssertTrue(tab != nil)
7,445✔
733
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,445✔
734
        return tab, nil
7,445✔
735
}
736

737
// UpdateMembership updates the membership of the given group.
738
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,285✔
739
        // Only Zero leader would get these membership updates.
2,285✔
740
        if ts := group.GetCheckpointTs(); ts > 0 {
2,869✔
741
                for _, m := range group.GetMembers() {
1,168✔
742
                        s.Lock()
584✔
743
                        s.checkpointPerGroup[m.GetGroupId()] = ts
584✔
744
                        s.Unlock()
584✔
745
                }
584✔
746
        }
747
        proposals, err := s.createProposals(group)
2,285✔
748
        if err != nil {
2,285✔
749
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
750
                // wait.
×
751
                time.Sleep(time.Second)
×
752
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
753
                return nil, err
×
754
        }
×
755

756
        ctx, cancel := context.WithCancel(ctx)
2,285✔
757
        defer cancel()
2,285✔
758

2,285✔
759
        errCh := make(chan error, len(proposals))
2,285✔
760
        for _, pr := range proposals {
2,422✔
761
                go func(pr *pb.ZeroProposal) {
274✔
762
                        errCh <- s.Node.proposeAndWait(ctx, pr)
137✔
763
                }(pr)
137✔
764
        }
765

766
        for range proposals {
2,422✔
767
                // We Don't care about these errors
137✔
768
                // Ideally shouldn't error out.
137✔
769
                if err := <-errCh; err != nil {
137✔
770
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
771
                        return nil, err
×
772
                }
×
773
        }
774

775
        if len(group.Members) == 0 {
2,285✔
776
                return &api.Payload{Data: []byte("OK")}, nil
×
777
        }
×
778
        select {
2,285✔
779
        case s.moveOngoing <- struct{}{}:
2,284✔
780
        default:
1✔
781
                // If a move is going on, don't do the next steps of deleting predicates.
1✔
782
                return &api.Payload{Data: []byte("OK")}, nil
1✔
783
        }
784
        defer func() {
4,568✔
785
                <-s.moveOngoing
2,284✔
786
        }()
2,284✔
787

788
        if err := s.deletePredicates(ctx, group); err != nil {
2,284✔
789
                glog.Warningf("While deleting predicates: %v", err)
×
790
        }
×
791
        return &api.Payload{Data: []byte("OK")}, nil
2,284✔
792
}
793

794
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,284✔
795
        if group == nil || group.Tablets == nil {
4,568✔
796
                return nil
2,284✔
797
        }
2,284✔
798
        var gid uint32
×
799
        for _, tablet := range group.Tablets {
×
800
                gid = tablet.GroupId
×
801
                break
×
802
        }
803
        if gid == 0 {
×
804
                return errors.Errorf("Unable to find group")
×
805
        }
×
806
        state, err := s.latestMembershipState(ctx)
×
807
        if err != nil {
×
808
                return err
×
809
        }
×
810
        sg, ok := state.Groups[gid]
×
811
        if !ok {
×
812
                return errors.Errorf("Unable to find group: %d", gid)
×
813
        }
×
814

815
        pl := s.Leader(gid)
×
816
        if pl == nil {
×
817
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
818
        }
×
819
        wc := pb.NewWorkerClient(pl.Get())
×
820

×
821
        for pred := range group.Tablets {
×
822
                if _, found := sg.Tablets[pred]; found {
×
823
                        continue
×
824
                }
825
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
826
                        pred, gid)
×
827
                in := &pb.MovePredicatePayload{
×
828
                        Predicate: pred,
×
829
                        SourceGid: gid,
×
830
                        DestGid:   0,
×
831
                }
×
832
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
833
                        return err
×
834
                }
×
835
        }
836
        return nil
×
837
}
838

839
// StreamMembership periodically streams the membership state to the given stream.
840
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
108✔
841
        // Send MembershipState right away. So, the connection is correctly established.
108✔
842
        ctx := stream.Context()
108✔
843
        ms, err := s.latestMembershipState(ctx)
108✔
844
        if err != nil {
111✔
845
                return err
3✔
846
        }
3✔
847
        if err := stream.Send(ms); err != nil {
105✔
848
                return err
×
849
        }
×
850

851
        ticker := time.NewTicker(time.Second)
105✔
852
        defer ticker.Stop()
105✔
853
        for {
22,408✔
854
                select {
22,303✔
855
                case <-ticker.C:
22,200✔
856
                        // Send an update every second.
22,200✔
857
                        ms, err := s.latestMembershipState(ctx)
22,200✔
858
                        if err != nil {
22,202✔
859
                                return err
2✔
860
                        }
2✔
861
                        if err := stream.Send(ms); err != nil {
22,198✔
862
                                return err
×
863
                        }
×
864
                case <-ctx.Done():
55✔
865
                        return ctx.Err()
55✔
866
                case <-s.closer.HasBeenClosed():
48✔
867
                        return errServerShutDown
48✔
868
                }
869
        }
870
}
871

872
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
28,729✔
873
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
28,751✔
874
                return nil, err
22✔
875
        }
22✔
876
        ms := s.membershipState()
28,707✔
877
        if ms == nil {
28,707✔
878
                return &pb.MembershipState{}, nil
×
879
        }
×
880
        return ms, nil
28,707✔
881
}
882

883
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
884
        error) {
7✔
885
        var l license
7✔
886
        signedData := bytes.NewReader(req.License)
7✔
887
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
888
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
889
        }
5✔
890

891
        numNodes := len(s.state.GetZeros())
2✔
892
        for _, group := range s.state.GetGroups() {
4✔
893
                numNodes += len(group.GetMembers())
2✔
894
        }
2✔
895
        if uint64(numNodes) > l.MaxNodes {
2✔
896
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
897
                        "You have: [%v].", l.MaxNodes, numNodes)
×
898
        }
×
899

900
        proposal := &pb.ZeroProposal{
2✔
901
                License: &pb.License{
2✔
902
                        User:     l.User,
2✔
903
                        MaxNodes: l.MaxNodes,
2✔
904
                        ExpiryTs: l.Expiry.Unix(),
2✔
905
                },
2✔
906
        }
2✔
907

2✔
908
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
909
        if err != nil {
2✔
910
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
911
        }
×
912
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
913
        return &pb.Status{}, nil
2✔
914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc