• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 6000148643

28 Aug 2023 12:58PM UTC coverage: 67.112% (+0.5%) from 66.655%
6000148643

push

web-flow
feat(dql): add @unique constraint support in schema for new predicates (#8827)

Partially Fixes #8827
Closes: DGRAPHCORE-206
Docs PR: https://github.com/dgraph-io/dgraph-docs/pull/638

This PR adds support for uniqueness constraint using @unique directive
in DQL schema. This unique directive ensures that all values of the
predicate are different in a Dgraph Cluster. This completes phase 1, and
enables adding a new predicate with unique directive. As part of the
phase 2, we will work on adding support for unique directive for
existing predicates.

## Performance
Live Loader before this change on 21 million dataset took 10m54s whereas
after this change took 11m02s. It did not make any significant different
to non-unique predicates.

## How to Use
You can now specify unique in schema as follows: `email: string @unique
@index(hash) @upsert .`. Now, Dgraph will ensure that no mutation adds a
duplicate for the predicate email.

## Phase 2 [TODO]
- [ ] check if @unique can be added to schema depending upon whether
existing data has any duplicates. If the existing data has any
duplicates, we do not allow adding the @unique directive and return a
query that allows user to identify these UIDs.
- [ ] If index computation is in progress, we should not allow mutations
with predicates for which @unique is set
- [ ] Fix ACL to ensure that we do not end up adding duplicate users
- [ ] Ensure that unique constraint is not violated during Bulk loader

---------

Co-authored-by: Aman Mangal <aman@dgraph.io>

347 of 347 new or added lines in 8 files covered. (100.0%)

58763 of 87560 relevant lines covered (67.11%)

2200726.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.39
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        // nextUint is the uint64 which we can hand out next. See maxLease for the
63
        // max ID leased via Zero quorum.
64
        nextUint    map[pb.NumLeaseType]uint64
65
        readOnlyTs  uint64
66
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
67
        rateLimiter *x.RateLimiter
68

69
        // groupMap    map[uint32]*Group
70
        nextGroup      uint32
71
        leaderChangeCh chan struct{}
72
        closer         *z.Closer  // Used to tell stream to close.
73
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
74

75
        // tls client config used to connect with zero internally
76
        tlsClientConfig *tls.Config
77

78
        moveOngoing    chan struct{}
79
        blockCommitsOn *sync.Map
80

81
        checkpointPerGroup map[uint32]uint64
82
}
83

84
// Init initializes the zero server.
85
func (s *Server) Init() {
64✔
86
        s.Lock()
64✔
87
        defer s.Unlock()
64✔
88

64✔
89
        s.orc = &Oracle{}
64✔
90
        s.orc.Init()
64✔
91
        s.state = &pb.MembershipState{
64✔
92
                Groups: make(map[uint32]*pb.Group),
64✔
93
                Zeros:  make(map[uint64]*pb.Member),
64✔
94
        }
64✔
95
        s.nextUint = make(map[pb.NumLeaseType]uint64)
64✔
96
        s.nextRaftId = 1
64✔
97
        s.nextUint[pb.Num_UID] = 1
64✔
98
        s.nextUint[pb.Num_TXN_TS] = 1
64✔
99
        s.nextUint[pb.Num_NS_ID] = 1
64✔
100
        s.nextGroup = 1
64✔
101
        s.leaderChangeCh = make(chan struct{}, 1)
64✔
102
        s.closer = z.NewCloser(2) // grpc and http
64✔
103
        s.blockCommitsOn = new(sync.Map)
64✔
104
        s.moveOngoing = make(chan struct{}, 1)
64✔
105
        s.checkpointPerGroup = make(map[uint32]uint64)
64✔
106
        if opts.limiterConfig.UidLeaseLimit > 0 {
65✔
107
                // rate limiting is not enabled when lease limit is set to zero.
1✔
108
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
109
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
110
        }
1✔
111

112
        go s.rebalanceTablets()
64✔
113
}
114

115
func (s *Server) periodicallyPostTelemetry() {
×
116
        glog.V(2).Infof("Starting telemetry data collection for zero...")
×
117
        start := time.Now()
×
118

×
119
        ticker := time.NewTicker(time.Minute * 10)
×
120
        defer ticker.Stop()
×
121

×
122
        var lastPostedAt time.Time
×
123
        for range ticker.C {
×
124
                if !s.Node.AmLeader() {
×
125
                        continue
×
126
                }
127
                if time.Since(lastPostedAt) < time.Hour {
×
128
                        continue
×
129
                }
130
                ms := s.membershipState()
×
131
                t := telemetry.NewZero(ms)
×
132
                if t == nil {
×
133
                        continue
×
134
                }
135
                t.SinceHours = int(time.Since(start).Hours())
×
136
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
×
137

×
138
                err := t.Post()
×
139
                if err == nil {
×
140
                        lastPostedAt = time.Now()
×
141
                } else {
×
142
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
×
143
                }
×
144
        }
145
}
146

147
func (s *Server) triggerLeaderChange() {
162✔
148
        s.Lock()
162✔
149
        defer s.Unlock()
162✔
150
        close(s.leaderChangeCh)
162✔
151
        s.leaderChangeCh = make(chan struct{}, 1)
162✔
152
}
162✔
153

154
func (s *Server) leaderChangeChannel() chan struct{} {
79✔
155
        s.RLock()
79✔
156
        defer s.RUnlock()
79✔
157
        return s.leaderChangeCh
79✔
158
}
79✔
159

160
func (s *Server) member(addr string) *pb.Member {
259✔
161
        s.AssertRLock()
259✔
162
        for _, m := range s.state.Zeros {
742✔
163
                if m.Addr == addr {
483✔
164
                        return m
×
165
                }
×
166
        }
167
        for _, g := range s.state.Groups {
533✔
168
                for _, m := range g.Members {
669✔
169
                        if m.Addr == addr {
501✔
170
                                return m
106✔
171
                        }
106✔
172
                }
173
        }
174
        return nil
153✔
175
}
176

177
// Leader returns a connection pool to the zero leader.
178
func (s *Server) Leader(gid uint32) *conn.Pool {
4✔
179
        s.RLock()
4✔
180
        defer s.RUnlock()
4✔
181
        if s.state == nil {
4✔
182
                return nil
×
183
        }
×
184
        var members map[uint64]*pb.Member
4✔
185
        if gid == 0 {
4✔
186
                members = s.state.Zeros
×
187
        } else {
4✔
188
                group := s.state.Groups[gid]
4✔
189
                if group == nil {
4✔
190
                        return nil
×
191
                }
×
192
                members = group.Members
4✔
193
        }
194
        var healthyPool *conn.Pool
4✔
195
        for _, m := range members {
8✔
196
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
8✔
197
                        healthyPool = pl
4✔
198
                        if m.Leader {
8✔
199
                                return pl
4✔
200
                        }
4✔
201
                }
202
        }
203
        return healthyPool
×
204
}
205

206
// KnownGroups returns a list of the known groups.
207
func (s *Server) KnownGroups() []uint32 {
1,009✔
208
        var groups []uint32
1,009✔
209
        s.RLock()
1,009✔
210
        defer s.RUnlock()
1,009✔
211
        for group := range s.state.Groups {
2,031✔
212
                groups = append(groups, group)
1,022✔
213
        }
1,022✔
214
        return groups
1,009✔
215
}
216

217
func (s *Server) hasLeader(gid uint32) bool {
5✔
218
        s.AssertRLock()
5✔
219
        if s.state == nil {
5✔
220
                return false
×
221
        }
×
222
        group := s.state.Groups[gid]
5✔
223
        if group == nil {
5✔
224
                return false
×
225
        }
×
226
        for _, m := range group.Members {
10✔
227
                if m.Leader {
10✔
228
                        return true
5✔
229
                }
5✔
230
        }
231
        return false
×
232
}
233

234
// SetMembershipState updates the membership state to the given one.
235
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
236
        s.Lock()
×
237
        defer s.Unlock()
×
238

×
239
        s.state = state
×
240
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
241

×
242
        if state.Zeros == nil {
×
243
                state.Zeros = make(map[uint64]*pb.Member)
×
244
        }
×
245
        if state.Groups == nil {
×
246
                state.Groups = make(map[uint32]*pb.Group)
×
247
        }
×
248

249
        // Create connections to all members.
250
        for _, g := range state.Groups {
×
251
                for _, m := range g.Members {
×
252
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
253
                }
×
254

255
                if g.Tablets == nil {
×
256
                        g.Tablets = make(map[string]*pb.Tablet)
×
257
                }
×
258
        }
259

260
        s.nextGroup = uint32(len(state.Groups) + 1)
×
261
}
262

263
// MarshalMembershipState returns the marshaled membership state.
264
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
265
        s.Lock()
×
266
        defer s.Unlock()
×
267
        return s.state.Marshal()
×
268
}
×
269

270
func (s *Server) membershipState() *pb.MembershipState {
28,007✔
271
        s.RLock()
28,007✔
272
        defer s.RUnlock()
28,007✔
273
        return proto.Clone(s.state).(*pb.MembershipState)
28,007✔
274
}
28,007✔
275

276
func (s *Server) groupChecksums() map[uint32]uint64 {
153,067✔
277
        s.RLock()
153,067✔
278
        defer s.RUnlock()
153,067✔
279
        m := make(map[uint32]uint64)
153,067✔
280
        for gid, g := range s.state.GetGroups() {
510,717✔
281
                m[gid] = g.Checksum
357,650✔
282
        }
357,650✔
283
        return m
153,067✔
284
}
285

286
func (s *Server) storeZero(m *pb.Member) {
98✔
287
        s.Lock()
98✔
288
        defer s.Unlock()
98✔
289

98✔
290
        s.state.Zeros[m.Id] = m
98✔
291
}
98✔
292

293
func (s *Server) updateZeroLeader() {
1,381✔
294
        s.Lock()
1,381✔
295
        defer s.Unlock()
1,381✔
296
        leader := s.Node.Raft().Status().Lead
1,381✔
297
        for _, m := range s.state.Zeros {
4,801✔
298
                m.Leader = m.Id == leader
3,420✔
299
        }
3,420✔
300
}
301

302
func (s *Server) removeZero(nodeId uint64) {
×
303
        s.Lock()
×
304
        defer s.Unlock()
×
305
        m, has := s.state.Zeros[nodeId]
×
306
        if !has {
×
307
                return
×
308
        }
×
309
        delete(s.state.Zeros, nodeId)
×
310
        s.state.Removed = append(s.state.Removed, m)
×
311
}
312

313
// ServingTablet returns the Tablet called tablet.
314
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
37,514✔
315
        s.RLock()
37,514✔
316
        defer s.RUnlock()
37,514✔
317

37,514✔
318
        for _, group := range s.state.Groups {
83,233✔
319
                if tab, ok := group.Tablets[tablet]; ok {
76,797✔
320
                        return tab
31,078✔
321
                }
31,078✔
322
        }
323
        return nil
6,436✔
324
}
325

326
func (s *Server) blockTablet(pred string) func() {
4✔
327
        s.blockCommitsOn.Store(pred, struct{}{})
4✔
328
        return func() {
8✔
329
                s.blockCommitsOn.Delete(pred)
4✔
330
        }
4✔
331
}
332

333
func (s *Server) isBlocked(pred string) bool {
20,631✔
334
        _, blocked := s.blockCommitsOn.Load(pred)
20,631✔
335
        return blocked
20,631✔
336
}
20,631✔
337

338
func (s *Server) servingTablet(tablet string) *pb.Tablet {
8,023✔
339
        s.AssertRLock()
8,023✔
340

8,023✔
341
        for _, group := range s.state.Groups {
17,986✔
342
                if tab, ok := group.Tablets[tablet]; ok {
12,598✔
343
                        return tab
2,635✔
344
                }
2,635✔
345
        }
346
        return nil
5,388✔
347
}
348

349
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,189✔
350
        var res []*pb.ZeroProposal
2,189✔
351
        if len(dst.Members) > 1 {
2,189✔
352
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
353
        }
×
354

355
        s.RLock()
2,189✔
356
        defer s.RUnlock()
2,189✔
357
        // There is only one member. We use for loop because we don't know what the mid is.
2,189✔
358
        for mid, dstMember := range dst.Members {
4,378✔
359
                group, has := s.state.Groups[dstMember.GroupId]
2,189✔
360
                if !has {
2,189✔
361
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
362
                }
×
363
                srcMember, has := group.Members[mid]
2,189✔
364
                if !has {
2,189✔
365
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
366
                }
×
367
                if srcMember.Addr != dstMember.Addr ||
2,189✔
368
                        srcMember.Leader != dstMember.Leader {
2,269✔
369

80✔
370
                        proposal := &pb.ZeroProposal{
80✔
371
                                Member: dstMember,
80✔
372
                        }
80✔
373
                        res = append(res, proposal)
80✔
374
                }
80✔
375
                if !dstMember.Leader {
3,495✔
376
                        // Don't continue to tablets if request is not from the leader.
1,306✔
377
                        return res, nil
1,306✔
378
                }
1,306✔
379
                if dst.SnapshotTs > group.SnapshotTs {
932✔
380
                        res = append(res, &pb.ZeroProposal{
49✔
381
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
49✔
382
                        })
49✔
383
                }
49✔
384
        }
385

386
        var tablets []*pb.Tablet
883✔
387
        for key, dstTablet := range dst.Tablets {
883✔
388
                group, has := s.state.Groups[dstTablet.GroupId]
×
389
                if !has {
×
390
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
391
                }
×
392
                srcTablet, has := group.Tablets[key]
×
393
                if !has {
×
394
                        // Tablet moved to new group
×
395
                        continue
×
396
                }
397

398
                s := float64(srcTablet.OnDiskBytes)
×
399
                d := float64(dstTablet.OnDiskBytes)
×
400
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
401
                        dstTablet.Force = false
×
402
                        tablets = append(tablets, dstTablet)
×
403
                }
×
404
        }
405

406
        if len(tablets) > 0 {
883✔
407
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
408
        }
×
409
        return res, nil
883✔
410
}
411

412
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
413
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
414
        defer span.End()
1✔
415
        if req == nil || len(req.Tablets) == 0 {
1✔
416
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
417
        }
×
418

419
        if req.GroupId == 0 {
1✔
420
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
421
        }
×
422

423
        tablets := make([]*pb.Tablet, 0)
1✔
424
        unknownTablets := make([]*pb.Tablet, 0)
1✔
425
        for _, t := range req.Tablets {
46✔
426
                tab := s.ServingTablet(t.Predicate)
45✔
427
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
428
                switch {
45✔
429
                case tab != nil && !t.Force:
×
430
                        tablets = append(tablets, t)
×
431
                case t.ReadOnly:
×
432
                        tablets = append(tablets, &pb.Tablet{})
×
433
                default:
45✔
434
                        unknownTablets = append(unknownTablets, t)
45✔
435
                }
436
        }
437

438
        if len(unknownTablets) == 0 {
1✔
439
                return &pb.TabletResponse{
×
440
                        Tablets: tablets,
×
441
                }, nil
×
442
        }
×
443

444
        // Set the tablet to be served by this server's group.
445
        var proposal pb.ZeroProposal
1✔
446
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
447
        for _, t := range unknownTablets {
46✔
448
                if x.IsReservedPredicate(t.Predicate) {
56✔
449
                        // Force all the reserved predicates to be allocated to group 1.
11✔
450
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
451
                        // since they only need to open one pipeline to receive updates for all
11✔
452
                        // ACL predicates.
11✔
453
                        // This will also make it easier to restore the reserved predicates after
11✔
454
                        // a DropAll operation.
11✔
455
                        t.GroupId = 1
11✔
456
                }
11✔
457
                proposal.Tablets = append(proposal.Tablets, t)
45✔
458
        }
459

460
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
461
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
462
                return nil, err
×
463
        }
×
464

465
        for _, t := range unknownTablets {
46✔
466
                tab := s.ServingTablet(t.Predicate)
45✔
467
                x.AssertTrue(tab != nil)
45✔
468
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
469
                tablets = append(tablets, tab)
45✔
470
        }
45✔
471

472
        return &pb.TabletResponse{
1✔
473
                Tablets: tablets,
1✔
474
        }, nil
1✔
475
}
476

477
// RemoveNode removes the given node from the given group.
478
// It's the user's responsibility to ensure that node doesn't come back again
479
// before calling the api.
480
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
481
        if req.GroupId == 0 {
1,005✔
482
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
483
        }
×
484
        zp := &pb.ZeroProposal{}
1,005✔
485
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
486
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
487
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
488
        }
2✔
489
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
490
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
491
                        req.GroupId)
1,001✔
492
        }
1,001✔
493
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
494
                Tablets) > 0 {
2✔
495
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
496
                        req.GroupId)
×
497
        }
×
498
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
499
                return nil, err
×
500
        }
×
501

502
        return &pb.Status{}, nil
2✔
503
}
504

505
// Connect is used by Alpha nodes to connect the very first time with group zero.
506
func (s *Server) Connect(ctx context.Context,
507
        m *pb.Member) (resp *pb.ConnectionState, err error) {
662✔
508
        // Ensures that connect requests are always serialized
662✔
509
        s.connectLock.Lock()
662✔
510
        defer s.connectLock.Unlock()
662✔
511
        glog.Infof("Got connection request: %+v\n", m)
662✔
512
        defer glog.Infof("Connected: %+v\n", m)
662✔
513

662✔
514
        if ctx.Err() != nil {
662✔
515
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
516
                return &emptyConnectionState, err
×
517
        }
×
518
        ms, err := s.latestMembershipState(ctx)
662✔
519
        if err != nil {
663✔
520
                return nil, err
1✔
521
        }
1✔
522

523
        if m.Learner && !ms.License.GetEnabled() {
661✔
524
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
525
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
526
                        "Cannot add Learner Node.")
×
527
        }
×
528

529
        if m.ClusterInfoOnly {
1,174✔
530
                // This request only wants to access the membership state, and nothing else. Most likely
513✔
531
                // from our clients.
513✔
532
                cs := &pb.ConnectionState{
513✔
533
                        State:      ms,
513✔
534
                        MaxPending: s.orc.MaxPending(),
513✔
535
                }
513✔
536
                return cs, err
513✔
537
        }
513✔
538
        if m.Addr == "" {
148✔
539
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
540
        }
×
541

542
        for _, member := range ms.Removed {
148✔
543
                // It is not recommended to reuse RAFT ids.
×
544
                if member.GroupId != 0 && m.Id == member.Id {
×
545
                        return &emptyConnectionState, errors.Errorf(
×
546
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
547
                }
×
548
        }
549

550
        numberOfNodes := len(ms.Zeros)
148✔
551
        for _, group := range ms.Groups {
210✔
552
                for _, member := range group.Members {
152✔
553
                        switch {
90✔
554
                        case member.Addr == m.Addr && m.Id == 0:
×
555
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
556
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
557
                                return &pb.ConnectionState{
×
558
                                        State:  ms,
×
559
                                        Member: member,
×
560
                                }, nil
×
561

562
                        case member.Addr == m.Addr && member.Id != m.Id:
×
563
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
564
                                // the first time. We can just directly return the membership information.
×
565
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
566
                                        " Self: +%v", member, m)
×
567

568
                        case member.Addr != m.Addr && member.Id == m.Id:
×
569
                                // Same Id. Different address.
×
570
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
571
                                        // Found a healthy connection.
×
572
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
573
                                                " with same ID: %+v", member)
×
574
                                }
×
575
                        }
576
                        numberOfNodes++
90✔
577
                }
578
        }
579

580
        // Create a connection and check validity of the address by doing an Echo.
581
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
148✔
582

148✔
583
        createProposal := func() *pb.ZeroProposal {
296✔
584
                s.Lock()
148✔
585
                defer s.Unlock()
148✔
586

148✔
587
                proposal := new(pb.ZeroProposal)
148✔
588
                // Check if we already have this member.
148✔
589
                for _, group := range s.state.Groups {
210✔
590
                        if _, has := group.Members[m.Id]; has {
62✔
591
                                return nil
×
592
                        }
×
593
                }
594
                if m.Id == 0 {
261✔
595
                        // In certain situations, the proposal can be sent and return with an error.
113✔
596
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
113✔
597
                        // IDs, the counter is incremented every time a proposal is created.
113✔
598
                        m.Id = s.nextRaftId
113✔
599
                        s.nextRaftId += 1
113✔
600
                        proposal.MaxRaftId = m.Id
113✔
601
                } else if m.Id >= s.nextRaftId {
170✔
602
                        s.nextRaftId = m.Id + 1
22✔
603
                        proposal.MaxRaftId = m.Id
22✔
604
                }
22✔
605

606
                // We don't have this member. So, let's see if it has preference for a group.
607
                if m.GroupId > 0 {
171✔
608
                        group, has := s.state.Groups[m.GroupId]
23✔
609
                        if !has {
46✔
610
                                // We don't have this group. Add the server to this group.
23✔
611
                                proposal.Member = m
23✔
612
                                return proposal
23✔
613
                        }
23✔
614

615
                        if _, has := group.Members[m.Id]; has {
×
616
                                proposal.Member = m // Update in case some fields have changed, like address.
×
617
                                return proposal
×
618
                        }
×
619

620
                        if m.Learner {
×
621
                                // Give it the group it wants.
×
622
                                proposal.Member = m
×
623
                                return proposal
×
624
                        }
×
625

626
                        // We don't have this server in the list.
627
                        if len(group.Members) < s.NumReplicas {
×
628
                                // We need more servers here, so let's add it.
×
629
                                proposal.Member = m
×
630
                                return proposal
×
631
                        } else if m.ForceGroupId {
×
632
                                // If the group ID was taken from the group_id file, force the member
×
633
                                // to be in this group even if the group is at capacity. This should
×
634
                                // not happen if users properly initialize a cluster after a bulk load.
×
635
                                proposal.Member = m
×
636
                                return proposal
×
637
                        }
×
638
                        // Already have plenty of servers serving this group.
639
                }
640
                // Let's assign this server to a new group.
641
                for gid, group := range s.state.Groups {
181✔
642
                        if len(group.Members) < s.NumReplicas {
76✔
643
                                m.GroupId = gid
20✔
644
                                proposal.Member = m
20✔
645
                                return proposal
20✔
646
                        }
20✔
647
                }
648
                // We either don't have any groups, or don't have any groups which need another member.
649
                m.GroupId = s.nextGroup
105✔
650
                // We shouldn't increase nextGroup here as we don't know whether we have enough
105✔
651
                // replicas until proposal is committed and can cause issues due to race.
105✔
652
                proposal.Member = m
105✔
653
                return proposal
105✔
654
        }
655

656
        proposal := createProposal()
148✔
657
        if proposal == nil {
148✔
658
                return &pb.ConnectionState{
×
659
                        State: ms, Member: m,
×
660
                }, nil
×
661
        }
×
662

663
        maxNodes := s.state.GetLicense().GetMaxNodes()
148✔
664
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
148✔
665
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
666
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
667
        }
×
668

669
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
199✔
670
                return &emptyConnectionState, err
51✔
671
        }
51✔
672
        resp = &pb.ConnectionState{
97✔
673
                State:  s.membershipState(),
97✔
674
                Member: m,
97✔
675
        }
97✔
676
        return resp, nil
97✔
677
}
678

679
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
680
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
681
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
682
        return &pb.Status{}, err
9✔
683
}
9✔
684

685
// ShouldServe returns the tablet serving the predicate passed in the request.
686
func (s *Server) ShouldServe(
687
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,347✔
688
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,347✔
689
        defer span.End()
9,347✔
690

9,347✔
691
        if tablet.Predicate == "" {
9,347✔
692
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
693
        }
×
694
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,347✔
695
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
696
        }
×
697

698
        // Check who is serving this tablet.
699
        tab := s.ServingTablet(tablet.Predicate)
9,347✔
700
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,347✔
701
        if tab != nil && !tablet.Force {
9,731✔
702
                // Someone is serving this tablet. Could be the caller as well.
384✔
703
                // The caller should compare the returned group against the group it holds to check who's
384✔
704
                // serving.
384✔
705
                return tab, nil
384✔
706
        }
384✔
707

708
        // Read-only requests should return an empty tablet instead of asking zero
709
        // to serve the predicate.
710
        if tablet.ReadOnly {
10,492✔
711
                return &pb.Tablet{}, nil
1,529✔
712
        }
1,529✔
713

714
        // Set the tablet to be served by this server's group.
715
        var proposal pb.ZeroProposal
7,434✔
716

7,434✔
717
        if x.IsReservedPredicate(tablet.Predicate) {
13,171✔
718
                // Force all the reserved predicates to be allocated to group 1.
5,737✔
719
                // This is to make it easier to stream ACL updates to all alpha servers
5,737✔
720
                // since they only need to open one pipeline to receive updates for all
5,737✔
721
                // ACL predicates.
5,737✔
722
                // This will also make it easier to restore the reserved predicates after
5,737✔
723
                // a DropAll operation.
5,737✔
724
                tablet.GroupId = 1
5,737✔
725
        }
5,737✔
726
        proposal.Tablet = tablet
7,434✔
727
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,434✔
728
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
729
                return tablet, err
×
730
        }
×
731
        tab = s.ServingTablet(tablet.Predicate)
7,434✔
732
        x.AssertTrue(tab != nil)
7,434✔
733
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,434✔
734
        return tab, nil
7,434✔
735
}
736

737
// UpdateMembership updates the membership of the given group.
738
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,189✔
739
        // Only Zero leader would get these membership updates.
2,189✔
740
        if ts := group.GetCheckpointTs(); ts > 0 {
2,754✔
741
                for _, m := range group.GetMembers() {
1,130✔
742
                        s.Lock()
565✔
743
                        s.checkpointPerGroup[m.GetGroupId()] = ts
565✔
744
                        s.Unlock()
565✔
745
                }
565✔
746
        }
747
        proposals, err := s.createProposals(group)
2,189✔
748
        if err != nil {
2,189✔
749
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
750
                // wait.
×
751
                time.Sleep(time.Second)
×
752
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
753
                return nil, err
×
754
        }
×
755

756
        ctx, cancel := context.WithCancel(ctx)
2,189✔
757
        defer cancel()
2,189✔
758

2,189✔
759
        errCh := make(chan error, len(proposals))
2,189✔
760
        for _, pr := range proposals {
2,318✔
761
                go func(pr *pb.ZeroProposal) {
258✔
762
                        errCh <- s.Node.proposeAndWait(ctx, pr)
129✔
763
                }(pr)
129✔
764
        }
765

766
        for range proposals {
2,318✔
767
                // We Don't care about these errors
129✔
768
                // Ideally shouldn't error out.
129✔
769
                if err := <-errCh; err != nil {
129✔
770
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
771
                        return nil, err
×
772
                }
×
773
        }
774

775
        if len(group.Members) == 0 {
2,189✔
776
                return &api.Payload{Data: []byte("OK")}, nil
×
777
        }
×
778
        select {
2,189✔
779
        case s.moveOngoing <- struct{}{}:
2,187✔
780
        default:
2✔
781
                // If a move is going on, don't do the next steps of deleting predicates.
2✔
782
                return &api.Payload{Data: []byte("OK")}, nil
2✔
783
        }
784
        defer func() {
4,374✔
785
                <-s.moveOngoing
2,187✔
786
        }()
2,187✔
787

788
        if err := s.deletePredicates(ctx, group); err != nil {
2,187✔
789
                glog.Warningf("While deleting predicates: %v", err)
×
790
        }
×
791
        return &api.Payload{Data: []byte("OK")}, nil
2,187✔
792
}
793

794
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,187✔
795
        if group == nil || group.Tablets == nil {
4,374✔
796
                return nil
2,187✔
797
        }
2,187✔
798
        var gid uint32
×
799
        for _, tablet := range group.Tablets {
×
800
                gid = tablet.GroupId
×
801
                break
×
802
        }
803
        if gid == 0 {
×
804
                return errors.Errorf("Unable to find group")
×
805
        }
×
806
        state, err := s.latestMembershipState(ctx)
×
807
        if err != nil {
×
808
                return err
×
809
        }
×
810
        sg, ok := state.Groups[gid]
×
811
        if !ok {
×
812
                return errors.Errorf("Unable to find group: %d", gid)
×
813
        }
×
814

815
        pl := s.Leader(gid)
×
816
        if pl == nil {
×
817
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
818
        }
×
819
        wc := pb.NewWorkerClient(pl.Get())
×
820

×
821
        for pred := range group.Tablets {
×
822
                if _, found := sg.Tablets[pred]; found {
×
823
                        continue
×
824
                }
825
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
826
                        pred, gid)
×
827
                in := &pb.MovePredicatePayload{
×
828
                        Predicate: pred,
×
829
                        SourceGid: gid,
×
830
                        DestGid:   0,
×
831
                }
×
832
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
833
                        return err
×
834
                }
×
835
        }
836
        return nil
×
837
}
838

839
// StreamMembership periodically streams the membership state to the given stream.
840
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
99✔
841
        // Send MembershipState right away. So, the connection is correctly established.
99✔
842
        ctx := stream.Context()
99✔
843
        ms, err := s.latestMembershipState(ctx)
99✔
844
        if err != nil {
101✔
845
                return err
2✔
846
        }
2✔
847
        if err := stream.Send(ms); err != nil {
97✔
848
                return err
×
849
        }
×
850

851
        ticker := time.NewTicker(time.Second)
97✔
852
        defer ticker.Stop()
97✔
853
        for {
21,528✔
854
                select {
21,431✔
855
                case <-ticker.C:
21,336✔
856
                        // Send an update every second.
21,336✔
857
                        ms, err := s.latestMembershipState(ctx)
21,336✔
858
                        if err != nil {
21,338✔
859
                                return err
2✔
860
                        }
2✔
861
                        if err := stream.Send(ms); err != nil {
21,334✔
862
                                return err
×
863
                        }
×
864
                case <-ctx.Done():
45✔
865
                        return ctx.Err()
45✔
866
                case <-s.closer.HasBeenClosed():
50✔
867
                        return errServerShutDown
50✔
868
                }
869
        }
870
}
871

872
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
27,639✔
873
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
27,657✔
874
                return nil, err
18✔
875
        }
18✔
876
        ms := s.membershipState()
27,621✔
877
        if ms == nil {
27,621✔
878
                return &pb.MembershipState{}, nil
×
879
        }
×
880
        return ms, nil
27,621✔
881
}
882

883
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
884
        error) {
7✔
885
        var l license
7✔
886
        signedData := bytes.NewReader(req.License)
7✔
887
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
888
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
889
        }
5✔
890

891
        numNodes := len(s.state.GetZeros())
2✔
892
        for _, group := range s.state.GetGroups() {
4✔
893
                numNodes += len(group.GetMembers())
2✔
894
        }
2✔
895
        if uint64(numNodes) > l.MaxNodes {
2✔
896
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
897
                        "You have: [%v].", l.MaxNodes, numNodes)
×
898
        }
×
899

900
        proposal := &pb.ZeroProposal{
2✔
901
                License: &pb.License{
2✔
902
                        User:     l.User,
2✔
903
                        MaxNodes: l.MaxNodes,
2✔
904
                        ExpiryTs: l.Expiry.Unix(),
2✔
905
                },
2✔
906
        }
2✔
907

2✔
908
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
909
        if err != nil {
2✔
910
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
911
        }
×
912
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
913
        return &pb.Status{}, nil
2✔
914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc