• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5228265546

10 Jun 2023 05:08AM UTC coverage: 67.303% (+0.07%) from 67.23%
5228265546

push

web-flow
Merge dba1461bb into ab3769797

16 of 16 new or added lines in 2 files covered. (100.0%)

58428 of 86814 relevant lines covered (67.3%)

2257715.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.45
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        nextLease   map[pb.NumLeaseType]uint64
63
        readOnlyTs  uint64
64
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
65
        rateLimiter *x.RateLimiter
66

67
        // groupMap    map[uint32]*Group
68
        nextGroup      uint32
69
        leaderChangeCh chan struct{}
70
        closer         *z.Closer  // Used to tell stream to close.
71
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
72

73
        // tls client config used to connect with zero internally
74
        tlsClientConfig *tls.Config
75

76
        moveOngoing    chan struct{}
77
        blockCommitsOn *sync.Map
78

79
        checkpointPerGroup map[uint32]uint64
80
}
81

82
// Init initializes the zero server.
83
func (s *Server) Init() {
65✔
84
        s.Lock()
65✔
85
        defer s.Unlock()
65✔
86

65✔
87
        s.orc = &Oracle{}
65✔
88
        s.orc.Init()
65✔
89
        s.state = &pb.MembershipState{
65✔
90
                Groups: make(map[uint32]*pb.Group),
65✔
91
                Zeros:  make(map[uint64]*pb.Member),
65✔
92
        }
65✔
93
        s.nextLease = make(map[pb.NumLeaseType]uint64)
65✔
94
        s.nextRaftId = 1
65✔
95
        s.nextLease[pb.Num_UID] = 1
65✔
96
        s.nextLease[pb.Num_TXN_TS] = 1
65✔
97
        s.nextLease[pb.Num_NS_ID] = 1
65✔
98
        s.nextGroup = 1
65✔
99
        s.leaderChangeCh = make(chan struct{}, 1)
65✔
100
        s.closer = z.NewCloser(2) // grpc and http
65✔
101
        s.blockCommitsOn = new(sync.Map)
65✔
102
        s.moveOngoing = make(chan struct{}, 1)
65✔
103
        s.checkpointPerGroup = make(map[uint32]uint64)
65✔
104
        if opts.limiterConfig.UidLeaseLimit > 0 {
66✔
105
                // rate limiting is not enabled when lease limit is set to zero.
1✔
106
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
107
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
108
        }
1✔
109

110
        go s.rebalanceTablets()
65✔
111
}
112

113
func (s *Server) periodicallyPostTelemetry() {
65✔
114
        glog.V(2).Infof("Starting telemetry data collection for zero...")
65✔
115
        start := time.Now()
65✔
116

65✔
117
        ticker := time.NewTicker(time.Minute * 10)
65✔
118
        defer ticker.Stop()
65✔
119

65✔
120
        var lastPostedAt time.Time
65✔
121
        for range ticker.C {
77✔
122
                if !s.Node.AmLeader() {
20✔
123
                        continue
8✔
124
                }
125
                if time.Since(lastPostedAt) < time.Hour {
4✔
126
                        continue
×
127
                }
128
                ms := s.membershipState()
4✔
129
                t := telemetry.NewZero(ms)
4✔
130
                if t == nil {
4✔
131
                        continue
×
132
                }
133
                t.SinceHours = int(time.Since(start).Hours())
4✔
134
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
4✔
135

4✔
136
                err := t.Post()
4✔
137
                if err == nil {
4✔
138
                        lastPostedAt = time.Now()
×
139
                } else {
4✔
140
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
4✔
141
                }
4✔
142
        }
143
}
144

145
func (s *Server) triggerLeaderChange() {
166✔
146
        s.Lock()
166✔
147
        defer s.Unlock()
166✔
148
        close(s.leaderChangeCh)
166✔
149
        s.leaderChangeCh = make(chan struct{}, 1)
166✔
150
}
166✔
151

152
func (s *Server) leaderChangeChannel() chan struct{} {
77✔
153
        s.RLock()
77✔
154
        defer s.RUnlock()
77✔
155
        return s.leaderChangeCh
77✔
156
}
77✔
157

158
func (s *Server) member(addr string) *pb.Member {
261✔
159
        s.AssertRLock()
261✔
160
        for _, m := range s.state.Zeros {
752✔
161
                if m.Addr == addr {
491✔
162
                        return m
×
163
                }
×
164
        }
165
        for _, g := range s.state.Groups {
533✔
166
                for _, m := range g.Members {
653✔
167
                        if m.Addr == addr {
483✔
168
                                return m
102✔
169
                        }
102✔
170
                }
171
        }
172
        return nil
159✔
173
}
174

175
// Leader returns a connection pool to the zero leader.
176
func (s *Server) Leader(gid uint32) *conn.Pool {
5✔
177
        s.RLock()
5✔
178
        defer s.RUnlock()
5✔
179
        if s.state == nil {
5✔
180
                return nil
×
181
        }
×
182
        var members map[uint64]*pb.Member
5✔
183
        if gid == 0 {
5✔
184
                members = s.state.Zeros
×
185
        } else {
5✔
186
                group := s.state.Groups[gid]
5✔
187
                if group == nil {
5✔
188
                        return nil
×
189
                }
×
190
                members = group.Members
5✔
191
        }
192
        var healthyPool *conn.Pool
5✔
193
        for _, m := range members {
10✔
194
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
10✔
195
                        healthyPool = pl
5✔
196
                        if m.Leader {
10✔
197
                                return pl
5✔
198
                        }
5✔
199
                }
200
        }
201
        return healthyPool
×
202
}
203

204
// KnownGroups returns a list of the known groups.
205
func (s *Server) KnownGroups() []uint32 {
1,009✔
206
        var groups []uint32
1,009✔
207
        s.RLock()
1,009✔
208
        defer s.RUnlock()
1,009✔
209
        for group := range s.state.Groups {
2,031✔
210
                groups = append(groups, group)
1,022✔
211
        }
1,022✔
212
        return groups
1,009✔
213
}
214

215
func (s *Server) hasLeader(gid uint32) bool {
5✔
216
        s.AssertRLock()
5✔
217
        if s.state == nil {
5✔
218
                return false
×
219
        }
×
220
        group := s.state.Groups[gid]
5✔
221
        if group == nil {
5✔
222
                return false
×
223
        }
×
224
        for _, m := range group.Members {
12✔
225
                if m.Leader {
12✔
226
                        return true
5✔
227
                }
5✔
228
        }
229
        return false
×
230
}
231

232
// SetMembershipState updates the membership state to the given one.
233
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
234
        s.Lock()
×
235
        defer s.Unlock()
×
236

×
237
        s.state = state
×
238
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
239

×
240
        if state.Zeros == nil {
×
241
                state.Zeros = make(map[uint64]*pb.Member)
×
242
        }
×
243
        if state.Groups == nil {
×
244
                state.Groups = make(map[uint32]*pb.Group)
×
245
        }
×
246

247
        // Create connections to all members.
248
        for _, g := range state.Groups {
×
249
                for _, m := range g.Members {
×
250
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
251
                }
×
252

253
                if g.Tablets == nil {
×
254
                        g.Tablets = make(map[string]*pb.Tablet)
×
255
                }
×
256
        }
257

258
        s.nextGroup = uint32(len(state.Groups) + 1)
×
259
}
260

261
// MarshalMembershipState returns the marshaled membership state.
262
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
263
        s.Lock()
×
264
        defer s.Unlock()
×
265
        return s.state.Marshal()
×
266
}
×
267

268
func (s *Server) membershipState() *pb.MembershipState {
28,865✔
269
        s.RLock()
28,865✔
270
        defer s.RUnlock()
28,865✔
271
        return proto.Clone(s.state).(*pb.MembershipState)
28,865✔
272
}
28,865✔
273

274
func (s *Server) groupChecksums() map[uint32]uint64 {
149,255✔
275
        s.RLock()
149,255✔
276
        defer s.RUnlock()
149,255✔
277
        m := make(map[uint32]uint64)
149,255✔
278
        for gid, g := range s.state.GetGroups() {
499,313✔
279
                m[gid] = g.Checksum
350,058✔
280
        }
350,058✔
281
        return m
149,255✔
282
}
283

284
func (s *Server) storeZero(m *pb.Member) {
101✔
285
        s.Lock()
101✔
286
        defer s.Unlock()
101✔
287

101✔
288
        s.state.Zeros[m.Id] = m
101✔
289
}
101✔
290

291
func (s *Server) updateZeroLeader() {
1,388✔
292
        s.Lock()
1,388✔
293
        defer s.Unlock()
1,388✔
294
        leader := s.Node.Raft().Status().Lead
1,388✔
295
        for _, m := range s.state.Zeros {
4,821✔
296
                m.Leader = m.Id == leader
3,433✔
297
        }
3,433✔
298
}
299

300
func (s *Server) removeZero(nodeId uint64) {
×
301
        s.Lock()
×
302
        defer s.Unlock()
×
303
        m, has := s.state.Zeros[nodeId]
×
304
        if !has {
×
305
                return
×
306
        }
×
307
        delete(s.state.Zeros, nodeId)
×
308
        s.state.Removed = append(s.state.Removed, m)
×
309
}
310

311
// ServingTablet returns the Tablet called tablet.
312
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
36,374✔
313
        s.RLock()
36,374✔
314
        defer s.RUnlock()
36,374✔
315

36,374✔
316
        for _, group := range s.state.Groups {
79,718✔
317
                if tab, ok := group.Tablets[tablet]; ok {
73,313✔
318
                        return tab
29,969✔
319
                }
29,969✔
320
        }
321
        return nil
6,405✔
322
}
323

324
func (s *Server) blockTablet(pred string) func() {
5✔
325
        s.blockCommitsOn.Store(pred, struct{}{})
5✔
326
        return func() {
10✔
327
                s.blockCommitsOn.Delete(pred)
5✔
328
        }
5✔
329
}
330

331
func (s *Server) isBlocked(pred string) bool {
19,552✔
332
        _, blocked := s.blockCommitsOn.Load(pred)
19,552✔
333
        return blocked
19,552✔
334
}
19,552✔
335

336
func (s *Server) servingTablet(tablet string) *pb.Tablet {
7,984✔
337
        s.AssertRLock()
7,984✔
338

7,984✔
339
        for _, group := range s.state.Groups {
17,837✔
340
                if tab, ok := group.Tablets[tablet]; ok {
12,491✔
341
                        return tab
2,638✔
342
                }
2,638✔
343
        }
344
        return nil
5,346✔
345
}
346

347
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,250✔
348
        var res []*pb.ZeroProposal
2,250✔
349
        if len(dst.Members) > 1 {
2,250✔
350
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
351
        }
×
352

353
        s.RLock()
2,250✔
354
        defer s.RUnlock()
2,250✔
355
        // There is only one member. We use for loop because we don't know what the mid is.
2,250✔
356
        for mid, dstMember := range dst.Members {
4,500✔
357
                group, has := s.state.Groups[dstMember.GroupId]
2,250✔
358
                if !has {
2,250✔
359
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
360
                }
×
361
                srcMember, has := group.Members[mid]
2,250✔
362
                if !has {
2,250✔
363
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
364
                }
×
365
                if srcMember.Addr != dstMember.Addr ||
2,250✔
366
                        srcMember.Leader != dstMember.Leader {
2,327✔
367

77✔
368
                        proposal := &pb.ZeroProposal{
77✔
369
                                Member: dstMember,
77✔
370
                        }
77✔
371
                        res = append(res, proposal)
77✔
372
                }
77✔
373
                if !dstMember.Leader {
3,600✔
374
                        // Don't continue to tablets if request is not from the leader.
1,350✔
375
                        return res, nil
1,350✔
376
                }
1,350✔
377
                if dst.SnapshotTs > group.SnapshotTs {
953✔
378
                        res = append(res, &pb.ZeroProposal{
53✔
379
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
53✔
380
                        })
53✔
381
                }
53✔
382
        }
383

384
        var tablets []*pb.Tablet
900✔
385
        for key, dstTablet := range dst.Tablets {
900✔
386
                group, has := s.state.Groups[dstTablet.GroupId]
×
387
                if !has {
×
388
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
389
                }
×
390
                srcTablet, has := group.Tablets[key]
×
391
                if !has {
×
392
                        // Tablet moved to new group
×
393
                        continue
×
394
                }
395

396
                s := float64(srcTablet.OnDiskBytes)
×
397
                d := float64(dstTablet.OnDiskBytes)
×
398
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
399
                        dstTablet.Force = false
×
400
                        tablets = append(tablets, dstTablet)
×
401
                }
×
402
        }
403

404
        if len(tablets) > 0 {
900✔
405
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
406
        }
×
407
        return res, nil
900✔
408
}
409

410
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
411
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
412
        defer span.End()
1✔
413
        if req == nil || len(req.Tablets) == 0 {
1✔
414
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
415
        }
×
416

417
        if req.GroupId == 0 {
1✔
418
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
419
        }
×
420

421
        tablets := make([]*pb.Tablet, 0)
1✔
422
        unknownTablets := make([]*pb.Tablet, 0)
1✔
423
        for _, t := range req.Tablets {
46✔
424
                tab := s.ServingTablet(t.Predicate)
45✔
425
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
426
                switch {
45✔
427
                case tab != nil && !t.Force:
×
428
                        tablets = append(tablets, t)
×
429
                case t.ReadOnly:
×
430
                        tablets = append(tablets, &pb.Tablet{})
×
431
                default:
45✔
432
                        unknownTablets = append(unknownTablets, t)
45✔
433
                }
434
        }
435

436
        if len(unknownTablets) == 0 {
1✔
437
                return &pb.TabletResponse{
×
438
                        Tablets: tablets,
×
439
                }, nil
×
440
        }
×
441

442
        // Set the tablet to be served by this server's group.
443
        var proposal pb.ZeroProposal
1✔
444
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
445
        for _, t := range unknownTablets {
46✔
446
                if x.IsReservedPredicate(t.Predicate) {
56✔
447
                        // Force all the reserved predicates to be allocated to group 1.
11✔
448
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
449
                        // since they only need to open one pipeline to receive updates for all
11✔
450
                        // ACL predicates.
11✔
451
                        // This will also make it easier to restore the reserved predicates after
11✔
452
                        // a DropAll operation.
11✔
453
                        t.GroupId = 1
11✔
454
                }
11✔
455
                proposal.Tablets = append(proposal.Tablets, t)
45✔
456
        }
457

458
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
459
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
460
                return nil, err
×
461
        }
×
462

463
        for _, t := range unknownTablets {
46✔
464
                tab := s.ServingTablet(t.Predicate)
45✔
465
                x.AssertTrue(tab != nil)
45✔
466
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
467
                tablets = append(tablets, tab)
45✔
468
        }
45✔
469

470
        return &pb.TabletResponse{
1✔
471
                Tablets: tablets,
1✔
472
        }, nil
1✔
473
}
474

475
// RemoveNode removes the given node from the given group.
476
// It's the user's responsibility to ensure that node doesn't come back again
477
// before calling the api.
478
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
479
        if req.GroupId == 0 {
1,005✔
480
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
481
        }
×
482
        zp := &pb.ZeroProposal{}
1,005✔
483
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
484
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
485
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
486
        }
2✔
487
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
488
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
489
                        req.GroupId)
1,001✔
490
        }
1,001✔
491
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
492
                Tablets) > 0 {
2✔
493
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
494
                        req.GroupId)
×
495
        }
×
496
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
497
                return nil, err
×
498
        }
×
499

500
        return &pb.Status{}, nil
2✔
501
}
502

503
// Connect is used by Alpha nodes to connect the very first time with group zero.
504
func (s *Server) Connect(ctx context.Context,
505
        m *pb.Member) (resp *pb.ConnectionState, err error) {
614✔
506
        // Ensures that connect requests are always serialized
614✔
507
        s.connectLock.Lock()
614✔
508
        defer s.connectLock.Unlock()
614✔
509
        glog.Infof("Got connection request: %+v\n", m)
614✔
510
        defer glog.Infof("Connected: %+v\n", m)
614✔
511

614✔
512
        if ctx.Err() != nil {
614✔
513
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
514
                return &emptyConnectionState, err
×
515
        }
×
516
        ms, err := s.latestMembershipState(ctx)
614✔
517
        if err != nil {
614✔
518
                return nil, err
×
519
        }
×
520

521
        if m.Learner && !ms.License.GetEnabled() {
614✔
522
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
523
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
524
                        "Cannot add Learner Node.")
×
525
        }
×
526

527
        if m.ClusterInfoOnly {
1,103✔
528
                // This request only wants to access the membership state, and nothing else. Most likely
489✔
529
                // from our clients.
489✔
530
                cs := &pb.ConnectionState{
489✔
531
                        State:      ms,
489✔
532
                        MaxPending: s.orc.MaxPending(),
489✔
533
                }
489✔
534
                return cs, err
489✔
535
        }
489✔
536
        if m.Addr == "" {
125✔
537
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
538
        }
×
539

540
        for _, member := range ms.Removed {
125✔
541
                // It is not recommended to reuse RAFT ids.
×
542
                if member.GroupId != 0 && m.Id == member.Id {
×
543
                        return &emptyConnectionState, errors.Errorf(
×
544
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
545
                }
×
546
        }
547

548
        numberOfNodes := len(ms.Zeros)
125✔
549
        for _, group := range ms.Groups {
196✔
550
                for _, member := range group.Members {
185✔
551
                        switch {
114✔
552
                        case member.Addr == m.Addr && m.Id == 0:
×
553
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
554
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
555
                                return &pb.ConnectionState{
×
556
                                        State:  ms,
×
557
                                        Member: member,
×
558
                                }, nil
×
559

560
                        case member.Addr == m.Addr && member.Id != m.Id:
×
561
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
562
                                // the first time. We can just directly return the membership information.
×
563
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
564
                                        " Self: +%v", member, m)
×
565

566
                        case member.Addr != m.Addr && member.Id == m.Id:
×
567
                                // Same Id. Different address.
×
568
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
569
                                        // Found a healthy connection.
×
570
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
571
                                                " with same ID: %+v", member)
×
572
                                }
×
573
                        }
574
                        numberOfNodes++
114✔
575
                }
576
        }
577

578
        // Create a connection and check validity of the address by doing an Echo.
579
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
125✔
580

125✔
581
        createProposal := func() *pb.ZeroProposal {
250✔
582
                s.Lock()
125✔
583
                defer s.Unlock()
125✔
584

125✔
585
                proposal := new(pb.ZeroProposal)
125✔
586
                // Check if we already have this member.
125✔
587
                for _, group := range s.state.Groups {
195✔
588
                        if _, has := group.Members[m.Id]; has {
72✔
589
                                return nil
2✔
590
                        }
2✔
591
                }
592
                if m.Id == 0 {
200✔
593
                        // In certain situations, the proposal can be sent and return with an error.
77✔
594
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
77✔
595
                        // IDs, the counter is incremented every time a proposal is created.
77✔
596
                        m.Id = s.nextRaftId
77✔
597
                        s.nextRaftId += 1
77✔
598
                        proposal.MaxRaftId = m.Id
77✔
599
                } else if m.Id >= s.nextRaftId {
145✔
600
                        s.nextRaftId = m.Id + 1
22✔
601
                        proposal.MaxRaftId = m.Id
22✔
602
                }
22✔
603

604
                // We don't have this member. So, let's see if it has preference for a group.
605
                if m.GroupId > 0 {
157✔
606
                        group, has := s.state.Groups[m.GroupId]
34✔
607
                        if !has {
64✔
608
                                // We don't have this group. Add the server to this group.
30✔
609
                                proposal.Member = m
30✔
610
                                return proposal
30✔
611
                        }
30✔
612

613
                        if _, has := group.Members[m.Id]; has {
4✔
614
                                proposal.Member = m // Update in case some fields have changed, like address.
×
615
                                return proposal
×
616
                        }
×
617

618
                        if m.Learner {
4✔
619
                                // Give it the group it wants.
×
620
                                proposal.Member = m
×
621
                                return proposal
×
622
                        }
×
623

624
                        // We don't have this server in the list.
625
                        if len(group.Members) < s.NumReplicas {
8✔
626
                                // We need more servers here, so let's add it.
4✔
627
                                proposal.Member = m
4✔
628
                                return proposal
4✔
629
                        } else if m.ForceGroupId {
4✔
630
                                // If the group ID was taken from the group_id file, force the member
×
631
                                // to be in this group even if the group is at capacity. This should
×
632
                                // not happen if users properly initialize a cluster after a bulk load.
×
633
                                proposal.Member = m
×
634
                                return proposal
×
635
                        }
×
636
                        // Already have plenty of servers serving this group.
637
                }
638
                // Let's assign this server to a new group.
639
                for gid, group := range s.state.Groups {
142✔
640
                        if len(group.Members) < s.NumReplicas {
73✔
641
                                m.GroupId = gid
20✔
642
                                proposal.Member = m
20✔
643
                                return proposal
20✔
644
                        }
20✔
645
                }
646
                // We either don't have any groups, or don't have any groups which need another member.
647
                m.GroupId = s.nextGroup
69✔
648
                // We shouldn't increase nextGroup here as we don't know whether we have enough
69✔
649
                // replicas until proposal is committed and can cause issues due to race.
69✔
650
                proposal.Member = m
69✔
651
                return proposal
69✔
652
        }
653

654
        proposal := createProposal()
125✔
655
        if proposal == nil {
127✔
656
                return &pb.ConnectionState{
2✔
657
                        State: ms, Member: m,
2✔
658
                }, nil
2✔
659
        }
2✔
660

661
        maxNodes := s.state.GetLicense().GetMaxNodes()
123✔
662
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
123✔
663
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
664
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
665
        }
×
666

667
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
146✔
668
                return &emptyConnectionState, err
23✔
669
        }
23✔
670
        resp = &pb.ConnectionState{
100✔
671
                State:  s.membershipState(),
100✔
672
                Member: m,
100✔
673
        }
100✔
674
        return resp, nil
100✔
675
}
676

677
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
678
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
679
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
680
        return &pb.Status{}, err
9✔
681
}
9✔
682

683
// ShouldServe returns the tablet serving the predicate passed in the request.
684
func (s *Server) ShouldServe(
685
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,329✔
686
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,329✔
687
        defer span.End()
9,329✔
688

9,329✔
689
        if tablet.Predicate == "" {
9,329✔
690
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
691
        }
×
692
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,329✔
693
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
694
        }
×
695

696
        // Check who is serving this tablet.
697
        tab := s.ServingTablet(tablet.Predicate)
9,329✔
698
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,329✔
699
        if tab != nil && !tablet.Force {
9,726✔
700
                // Someone is serving this tablet. Could be the caller as well.
397✔
701
                // The caller should compare the returned group against the group it holds to check who's
397✔
702
                // serving.
397✔
703
                return tab, nil
397✔
704
        }
397✔
705

706
        // Read-only requests should return an empty tablet instead of asking zero
707
        // to serve the predicate.
708
        if tablet.ReadOnly {
10,474✔
709
                return &pb.Tablet{}, nil
1,542✔
710
        }
1,542✔
711

712
        // Set the tablet to be served by this server's group.
713
        var proposal pb.ZeroProposal
7,390✔
714

7,390✔
715
        if x.IsReservedPredicate(tablet.Predicate) {
13,127✔
716
                // Force all the reserved predicates to be allocated to group 1.
5,737✔
717
                // This is to make it easier to stream ACL updates to all alpha servers
5,737✔
718
                // since they only need to open one pipeline to receive updates for all
5,737✔
719
                // ACL predicates.
5,737✔
720
                // This will also make it easier to restore the reserved predicates after
5,737✔
721
                // a DropAll operation.
5,737✔
722
                tablet.GroupId = 1
5,737✔
723
        }
5,737✔
724
        proposal.Tablet = tablet
7,390✔
725
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,390✔
726
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
727
                return tablet, err
×
728
        }
×
729
        tab = s.ServingTablet(tablet.Predicate)
7,390✔
730
        x.AssertTrue(tab != nil)
7,390✔
731
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,390✔
732
        return tab, nil
7,390✔
733
}
734

735
// UpdateMembership updates the membership of the given group.
736
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,250✔
737
        // Only Zero leader would get these membership updates.
2,250✔
738
        if ts := group.GetCheckpointTs(); ts > 0 {
2,825✔
739
                for _, m := range group.GetMembers() {
1,150✔
740
                        s.Lock()
575✔
741
                        s.checkpointPerGroup[m.GetGroupId()] = ts
575✔
742
                        s.Unlock()
575✔
743
                }
575✔
744
        }
745
        proposals, err := s.createProposals(group)
2,250✔
746
        if err != nil {
2,250✔
747
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
748
                // wait.
×
749
                time.Sleep(time.Second)
×
750
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
751
                return nil, err
×
752
        }
×
753

754
        ctx, cancel := context.WithCancel(ctx)
2,250✔
755
        defer cancel()
2,250✔
756

2,250✔
757
        errCh := make(chan error, len(proposals))
2,250✔
758
        for _, pr := range proposals {
2,380✔
759
                go func(pr *pb.ZeroProposal) {
260✔
760
                        errCh <- s.Node.proposeAndWait(ctx, pr)
130✔
761
                }(pr)
130✔
762
        }
763

764
        for range proposals {
2,380✔
765
                // We Don't care about these errors
130✔
766
                // Ideally shouldn't error out.
130✔
767
                if err := <-errCh; err != nil {
130✔
768
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
769
                        return nil, err
×
770
                }
×
771
        }
772

773
        if len(group.Members) == 0 {
2,250✔
774
                return &api.Payload{Data: []byte("OK")}, nil
×
775
        }
×
776
        select {
2,250✔
777
        case s.moveOngoing <- struct{}{}:
2,249✔
778
        default:
1✔
779
                // If a move is going on, don't do the next steps of deleting predicates.
1✔
780
                return &api.Payload{Data: []byte("OK")}, nil
1✔
781
        }
782
        defer func() {
4,498✔
783
                <-s.moveOngoing
2,249✔
784
        }()
2,249✔
785

786
        if err := s.deletePredicates(ctx, group); err != nil {
2,249✔
787
                glog.Warningf("While deleting predicates: %v", err)
×
788
        }
×
789
        return &api.Payload{Data: []byte("OK")}, nil
2,249✔
790
}
791

792
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,249✔
793
        if group == nil || group.Tablets == nil {
4,498✔
794
                return nil
2,249✔
795
        }
2,249✔
796
        var gid uint32
×
797
        for _, tablet := range group.Tablets {
×
798
                gid = tablet.GroupId
×
799
                break
×
800
        }
801
        if gid == 0 {
×
802
                return errors.Errorf("Unable to find group")
×
803
        }
×
804
        state, err := s.latestMembershipState(ctx)
×
805
        if err != nil {
×
806
                return err
×
807
        }
×
808
        sg, ok := state.Groups[gid]
×
809
        if !ok {
×
810
                return errors.Errorf("Unable to find group: %d", gid)
×
811
        }
×
812

813
        pl := s.Leader(gid)
×
814
        if pl == nil {
×
815
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
816
        }
×
817
        wc := pb.NewWorkerClient(pl.Get())
×
818

×
819
        for pred := range group.Tablets {
×
820
                if _, found := sg.Tablets[pred]; found {
×
821
                        continue
×
822
                }
823
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
824
                        pred, gid)
×
825
                in := &pb.MovePredicatePayload{
×
826
                        Predicate: pred,
×
827
                        SourceGid: gid,
×
828
                        DestGid:   0,
×
829
                }
×
830
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
831
                        return err
×
832
                }
×
833
        }
834
        return nil
×
835
}
836

837
// StreamMembership periodically streams the membership state to the given stream.
838
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
107✔
839
        // Send MembershipState right away. So, the connection is correctly established.
107✔
840
        ctx := stream.Context()
107✔
841
        ms, err := s.latestMembershipState(ctx)
107✔
842
        if err != nil {
112✔
843
                return err
5✔
844
        }
5✔
845
        if err := stream.Send(ms); err != nil {
102✔
846
                return err
×
847
        }
×
848

849
        ticker := time.NewTicker(time.Second)
102✔
850
        defer ticker.Stop()
102✔
851
        for {
22,162✔
852
                select {
22,060✔
853
                case <-ticker.C:
21,960✔
854
                        // Send an update every second.
21,960✔
855
                        ms, err := s.latestMembershipState(ctx)
21,960✔
856
                        if err != nil {
21,962✔
857
                                return err
2✔
858
                        }
2✔
859
                        if err := stream.Send(ms); err != nil {
21,958✔
860
                                return err
×
861
                        }
×
862
                case <-ctx.Done():
51✔
863
                        return ctx.Err()
51✔
864
                case <-s.closer.HasBeenClosed():
49✔
865
                        return errServerShutDown
49✔
866
                }
867
        }
868
}
869

870
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
28,342✔
871
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
28,363✔
872
                return nil, err
21✔
873
        }
21✔
874
        ms := s.membershipState()
28,321✔
875
        if ms == nil {
28,321✔
876
                return &pb.MembershipState{}, nil
×
877
        }
×
878
        return ms, nil
28,321✔
879
}
880

881
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
882
        error) {
7✔
883
        var l license
7✔
884
        signedData := bytes.NewReader(req.License)
7✔
885
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
886
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
887
        }
5✔
888

889
        numNodes := len(s.state.GetZeros())
2✔
890
        for _, group := range s.state.GetGroups() {
4✔
891
                numNodes += len(group.GetMembers())
2✔
892
        }
2✔
893
        if uint64(numNodes) > l.MaxNodes {
2✔
894
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
895
                        "You have: [%v].", l.MaxNodes, numNodes)
×
896
        }
×
897

898
        proposal := &pb.ZeroProposal{
2✔
899
                License: &pb.License{
2✔
900
                        User:     l.User,
2✔
901
                        MaxNodes: l.MaxNodes,
2✔
902
                        ExpiryTs: l.Expiry.Unix(),
2✔
903
                },
2✔
904
        }
2✔
905

2✔
906
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
907
        if err != nil {
2✔
908
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
909
        }
×
910
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
911
        return &pb.Status{}, nil
2✔
912
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc