• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5892261038

17 Aug 2023 02:30PM UTC coverage: 67.256% (-0.02%) from 67.279%
5892261038

push

web-flow
chore: update cron job frequency to reset github notifications (#8956)

In our [scheduled
runs](https://github.com/dgraph-io/dgraph/actions?query=event%3Aschedule)
we see a number of "ghost" jobs. E.g.
[1](https://github.com/dgraph-io/dgraph/actions/runs/5881148541). This
is probably due to the following sequence of events:
- github actions sends a notification to developer who created /
modified cron job frequency
- if this person is no longer able to receive this notification (i.e. no
longer receiving emails) the action raise this ghost job with an error
about unverified email

This is a bug with github actions. After contacting github support they
suggested changing the cron job frequency in order to reset who receives
the notifications. This may solve the issue.

58589 of 87114 relevant lines covered (67.26%)

2221956.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.01
/dgraph/cmd/zero/zero.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/tls"
23
        "math"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/gogo/protobuf/proto"
29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31
        otrace "go.opencensus.io/trace"
32

33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/protos/pb"
36
        "github.com/dgraph-io/dgraph/telemetry"
37
        "github.com/dgraph-io/dgraph/x"
38
        "github.com/dgraph-io/ristretto/z"
39
)
40

41
var (
42
        emptyConnectionState pb.ConnectionState
43
        errServerShutDown    = errors.New("Server is being shut down")
44
)
45

46
type license struct {
47
        User     string    `json:"user"`
48
        MaxNodes uint64    `json:"max_nodes"`
49
        Expiry   time.Time `json:"expiry"`
50
}
51

52
// Server implements the zero server.
53
type Server struct {
54
        x.SafeMutex
55
        Node *node
56
        orc  *Oracle
57

58
        NumReplicas int
59
        state       *pb.MembershipState
60
        nextRaftId  uint64
61

62
        // nextUint is the uint64 which we can hand out next. See maxLease for the
63
        // max ID leased via Zero quorum.
64
        nextUint    map[pb.NumLeaseType]uint64
65
        readOnlyTs  uint64
66
        leaseLock   sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
67
        rateLimiter *x.RateLimiter
68

69
        // groupMap    map[uint32]*Group
70
        nextGroup      uint32
71
        leaderChangeCh chan struct{}
72
        closer         *z.Closer  // Used to tell stream to close.
73
        connectLock    sync.Mutex // Used to serialize connect requests from servers.
74

75
        // tls client config used to connect with zero internally
76
        tlsClientConfig *tls.Config
77

78
        moveOngoing    chan struct{}
79
        blockCommitsOn *sync.Map
80

81
        checkpointPerGroup map[uint32]uint64
82
}
83

84
// Init initializes the zero server.
85
func (s *Server) Init() {
66✔
86
        s.Lock()
66✔
87
        defer s.Unlock()
66✔
88

66✔
89
        s.orc = &Oracle{}
66✔
90
        s.orc.Init()
66✔
91
        s.state = &pb.MembershipState{
66✔
92
                Groups: make(map[uint32]*pb.Group),
66✔
93
                Zeros:  make(map[uint64]*pb.Member),
66✔
94
        }
66✔
95
        s.nextUint = make(map[pb.NumLeaseType]uint64)
66✔
96
        s.nextRaftId = 1
66✔
97
        s.nextUint[pb.Num_UID] = 1
66✔
98
        s.nextUint[pb.Num_TXN_TS] = 1
66✔
99
        s.nextUint[pb.Num_NS_ID] = 1
66✔
100
        s.nextGroup = 1
66✔
101
        s.leaderChangeCh = make(chan struct{}, 1)
66✔
102
        s.closer = z.NewCloser(2) // grpc and http
66✔
103
        s.blockCommitsOn = new(sync.Map)
66✔
104
        s.moveOngoing = make(chan struct{}, 1)
66✔
105
        s.checkpointPerGroup = make(map[uint32]uint64)
66✔
106
        if opts.limiterConfig.UidLeaseLimit > 0 {
67✔
107
                // rate limiting is not enabled when lease limit is set to zero.
1✔
108
                s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
1✔
109
                        opts.limiterConfig.RefillAfter, s.closer)
1✔
110
        }
1✔
111

112
        go s.rebalanceTablets()
66✔
113
}
114

115
func (s *Server) periodicallyPostTelemetry() {
66✔
116
        glog.V(2).Infof("Starting telemetry data collection for zero...")
66✔
117
        start := time.Now()
66✔
118

66✔
119
        ticker := time.NewTicker(time.Minute * 10)
66✔
120
        defer ticker.Stop()
66✔
121

66✔
122
        var lastPostedAt time.Time
66✔
123
        for range ticker.C {
78✔
124
                if !s.Node.AmLeader() {
20✔
125
                        continue
8✔
126
                }
127
                if time.Since(lastPostedAt) < time.Hour {
4✔
128
                        continue
×
129
                }
130
                ms := s.membershipState()
4✔
131
                t := telemetry.NewZero(ms)
4✔
132
                if t == nil {
4✔
133
                        continue
×
134
                }
135
                t.SinceHours = int(time.Since(start).Hours())
4✔
136
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
4✔
137

4✔
138
                err := t.Post()
4✔
139
                if err == nil {
4✔
140
                        lastPostedAt = time.Now()
×
141
                } else {
4✔
142
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
4✔
143
                }
4✔
144
        }
145
}
146

147
func (s *Server) triggerLeaderChange() {
172✔
148
        s.Lock()
172✔
149
        defer s.Unlock()
172✔
150
        close(s.leaderChangeCh)
172✔
151
        s.leaderChangeCh = make(chan struct{}, 1)
172✔
152
}
172✔
153

154
func (s *Server) leaderChangeChannel() chan struct{} {
78✔
155
        s.RLock()
78✔
156
        defer s.RUnlock()
78✔
157
        return s.leaderChangeCh
78✔
158
}
78✔
159

160
func (s *Server) member(addr string) *pb.Member {
268✔
161
        s.AssertRLock()
268✔
162
        for _, m := range s.state.Zeros {
788✔
163
                if m.Addr == addr {
520✔
164
                        return m
×
165
                }
×
166
        }
167
        for _, g := range s.state.Groups {
544✔
168
                for _, m := range g.Members {
676✔
169
                        if m.Addr == addr {
509✔
170
                                return m
109✔
171
                        }
109✔
172
                }
173
        }
174
        return nil
159✔
175
}
176

177
// Leader returns a connection pool to the zero leader.
178
func (s *Server) Leader(gid uint32) *conn.Pool {
3✔
179
        s.RLock()
3✔
180
        defer s.RUnlock()
3✔
181
        if s.state == nil {
3✔
182
                return nil
×
183
        }
×
184
        var members map[uint64]*pb.Member
3✔
185
        if gid == 0 {
3✔
186
                members = s.state.Zeros
×
187
        } else {
3✔
188
                group := s.state.Groups[gid]
3✔
189
                if group == nil {
3✔
190
                        return nil
×
191
                }
×
192
                members = group.Members
3✔
193
        }
194
        var healthyPool *conn.Pool
3✔
195
        for _, m := range members {
6✔
196
                if pl, err := conn.GetPools().Get(m.Addr); err == nil {
6✔
197
                        healthyPool = pl
3✔
198
                        if m.Leader {
6✔
199
                                return pl
3✔
200
                        }
3✔
201
                }
202
        }
203
        return healthyPool
×
204
}
205

206
// KnownGroups returns a list of the known groups.
207
func (s *Server) KnownGroups() []uint32 {
1,009✔
208
        var groups []uint32
1,009✔
209
        s.RLock()
1,009✔
210
        defer s.RUnlock()
1,009✔
211
        for group := range s.state.Groups {
2,031✔
212
                groups = append(groups, group)
1,022✔
213
        }
1,022✔
214
        return groups
1,009✔
215
}
216

217
func (s *Server) hasLeader(gid uint32) bool {
5✔
218
        s.AssertRLock()
5✔
219
        if s.state == nil {
5✔
220
                return false
×
221
        }
×
222
        group := s.state.Groups[gid]
5✔
223
        if group == nil {
5✔
224
                return false
×
225
        }
×
226
        for _, m := range group.Members {
12✔
227
                if m.Leader {
12✔
228
                        return true
5✔
229
                }
5✔
230
        }
231
        return false
×
232
}
233

234
// SetMembershipState updates the membership state to the given one.
235
func (s *Server) SetMembershipState(state *pb.MembershipState) {
×
236
        s.Lock()
×
237
        defer s.Unlock()
×
238

×
239
        s.state = state
×
240
        s.nextRaftId = x.Max(s.nextRaftId, s.state.MaxRaftId+1)
×
241

×
242
        if state.Zeros == nil {
×
243
                state.Zeros = make(map[uint64]*pb.Member)
×
244
        }
×
245
        if state.Groups == nil {
×
246
                state.Groups = make(map[uint32]*pb.Group)
×
247
        }
×
248

249
        // Create connections to all members.
250
        for _, g := range state.Groups {
×
251
                for _, m := range g.Members {
×
252
                        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
253
                }
×
254

255
                if g.Tablets == nil {
×
256
                        g.Tablets = make(map[string]*pb.Tablet)
×
257
                }
×
258
        }
259

260
        s.nextGroup = uint32(len(state.Groups) + 1)
×
261
}
262

263
// MarshalMembershipState returns the marshaled membership state.
264
func (s *Server) MarshalMembershipState() ([]byte, error) {
×
265
        s.Lock()
×
266
        defer s.Unlock()
×
267
        return s.state.Marshal()
×
268
}
×
269

270
func (s *Server) membershipState() *pb.MembershipState {
29,850✔
271
        s.RLock()
29,850✔
272
        defer s.RUnlock()
29,850✔
273
        return proto.Clone(s.state).(*pb.MembershipState)
29,850✔
274
}
29,850✔
275

276
func (s *Server) groupChecksums() map[uint32]uint64 {
152,427✔
277
        s.RLock()
152,427✔
278
        defer s.RUnlock()
152,427✔
279
        m := make(map[uint32]uint64)
152,427✔
280
        for gid, g := range s.state.GetGroups() {
512,144✔
281
                m[gid] = g.Checksum
359,717✔
282
        }
359,717✔
283
        return m
152,427✔
284
}
285

286
func (s *Server) storeZero(m *pb.Member) {
106✔
287
        s.Lock()
106✔
288
        defer s.Unlock()
106✔
289

106✔
290
        s.state.Zeros[m.Id] = m
106✔
291
}
106✔
292

293
func (s *Server) updateZeroLeader() {
1,441✔
294
        s.Lock()
1,441✔
295
        defer s.Unlock()
1,441✔
296
        leader := s.Node.Raft().Status().Lead
1,441✔
297
        for _, m := range s.state.Zeros {
5,018✔
298
                m.Leader = m.Id == leader
3,577✔
299
        }
3,577✔
300
}
301

302
func (s *Server) removeZero(nodeId uint64) {
×
303
        s.Lock()
×
304
        defer s.Unlock()
×
305
        m, has := s.state.Zeros[nodeId]
×
306
        if !has {
×
307
                return
×
308
        }
×
309
        delete(s.state.Zeros, nodeId)
×
310
        s.state.Removed = append(s.state.Removed, m)
×
311
}
312

313
// ServingTablet returns the Tablet called tablet.
314
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
36,419✔
315
        s.RLock()
36,419✔
316
        defer s.RUnlock()
36,419✔
317

36,419✔
318
        for _, group := range s.state.Groups {
81,015✔
319
                if tab, ok := group.Tablets[tablet]; ok {
74,539✔
320
                        return tab
29,943✔
321
                }
29,943✔
322
        }
323
        return nil
6,476✔
324
}
325

326
func (s *Server) blockTablet(pred string) func() {
3✔
327
        s.blockCommitsOn.Store(pred, struct{}{})
3✔
328
        return func() {
6✔
329
                s.blockCommitsOn.Delete(pred)
3✔
330
        }
3✔
331
}
332

333
func (s *Server) isBlocked(pred string) bool {
19,487✔
334
        _, blocked := s.blockCommitsOn.Load(pred)
19,487✔
335
        return blocked
19,487✔
336
}
19,487✔
337

338
func (s *Server) servingTablet(tablet string) *pb.Tablet {
8,031✔
339
        s.AssertRLock()
8,031✔
340

8,031✔
341
        for _, group := range s.state.Groups {
18,009✔
342
                if tab, ok := group.Tablets[tablet]; ok {
12,612✔
343
                        return tab
2,634✔
344
                }
2,634✔
345
        }
346
        return nil
5,397✔
347
}
348

349
func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
2,320✔
350
        var res []*pb.ZeroProposal
2,320✔
351
        if len(dst.Members) > 1 {
2,320✔
352
                return res, errors.Errorf("Create Proposal: Invalid group: %+v", dst)
×
353
        }
×
354

355
        s.RLock()
2,320✔
356
        defer s.RUnlock()
2,320✔
357
        // There is only one member. We use for loop because we don't know what the mid is.
2,320✔
358
        for mid, dstMember := range dst.Members {
4,640✔
359
                group, has := s.state.Groups[dstMember.GroupId]
2,320✔
360
                if !has {
2,320✔
361
                        return res, errors.Errorf("Unknown group for member: %+v", dstMember)
×
362
                }
×
363
                srcMember, has := group.Members[mid]
2,320✔
364
                if !has {
2,320✔
365
                        return res, errors.Errorf("Unknown member: %+v", dstMember)
×
366
                }
×
367
                if srcMember.Addr != dstMember.Addr ||
2,320✔
368
                        srcMember.Leader != dstMember.Leader {
2,400✔
369

80✔
370
                        proposal := &pb.ZeroProposal{
80✔
371
                                Member: dstMember,
80✔
372
                        }
80✔
373
                        res = append(res, proposal)
80✔
374
                }
80✔
375
                if !dstMember.Leader {
3,709✔
376
                        // Don't continue to tablets if request is not from the leader.
1,389✔
377
                        return res, nil
1,389✔
378
                }
1,389✔
379
                if dst.SnapshotTs > group.SnapshotTs {
985✔
380
                        res = append(res, &pb.ZeroProposal{
54✔
381
                                SnapshotTs: map[uint32]uint64{dstMember.GroupId: dst.SnapshotTs},
54✔
382
                        })
54✔
383
                }
54✔
384
        }
385

386
        var tablets []*pb.Tablet
931✔
387
        for key, dstTablet := range dst.Tablets {
931✔
388
                group, has := s.state.Groups[dstTablet.GroupId]
×
389
                if !has {
×
390
                        return res, errors.Errorf("Unknown group for tablet: %+v", dstTablet)
×
391
                }
×
392
                srcTablet, has := group.Tablets[key]
×
393
                if !has {
×
394
                        // Tablet moved to new group
×
395
                        continue
×
396
                }
397

398
                s := float64(srcTablet.OnDiskBytes)
×
399
                d := float64(dstTablet.OnDiskBytes)
×
400
                if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
×
401
                        dstTablet.Force = false
×
402
                        tablets = append(tablets, dstTablet)
×
403
                }
×
404
        }
405

406
        if len(tablets) > 0 {
931✔
407
                res = append(res, &pb.ZeroProposal{Tablets: tablets})
×
408
        }
×
409
        return res, nil
931✔
410
}
411

412
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
1✔
413
        ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
1✔
414
        defer span.End()
1✔
415
        if req == nil || len(req.Tablets) == 0 {
1✔
416
                return nil, errors.Errorf("Tablets are empty in %+v", req)
×
417
        }
×
418

419
        if req.GroupId == 0 {
1✔
420
                return nil, errors.Errorf("Group ID is Zero in %+v", req)
×
421
        }
×
422

423
        tablets := make([]*pb.Tablet, 0)
1✔
424
        unknownTablets := make([]*pb.Tablet, 0)
1✔
425
        for _, t := range req.Tablets {
46✔
426
                tab := s.ServingTablet(t.Predicate)
45✔
427
                span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
45✔
428
                switch {
45✔
429
                case tab != nil && !t.Force:
×
430
                        tablets = append(tablets, t)
×
431
                case t.ReadOnly:
×
432
                        tablets = append(tablets, &pb.Tablet{})
×
433
                default:
45✔
434
                        unknownTablets = append(unknownTablets, t)
45✔
435
                }
436
        }
437

438
        if len(unknownTablets) == 0 {
1✔
439
                return &pb.TabletResponse{
×
440
                        Tablets: tablets,
×
441
                }, nil
×
442
        }
×
443

444
        // Set the tablet to be served by this server's group.
445
        var proposal pb.ZeroProposal
1✔
446
        proposal.Tablets = make([]*pb.Tablet, 0)
1✔
447
        for _, t := range unknownTablets {
46✔
448
                if x.IsReservedPredicate(t.Predicate) {
56✔
449
                        // Force all the reserved predicates to be allocated to group 1.
11✔
450
                        // This is to make it easier to stream ACL updates to all alpha servers
11✔
451
                        // since they only need to open one pipeline to receive updates for all
11✔
452
                        // ACL predicates.
11✔
453
                        // This will also make it easier to restore the reserved predicates after
11✔
454
                        // a DropAll operation.
11✔
455
                        t.GroupId = 1
11✔
456
                }
11✔
457
                proposal.Tablets = append(proposal.Tablets, t)
45✔
458
        }
459

460
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
1✔
461
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
462
                return nil, err
×
463
        }
×
464

465
        for _, t := range unknownTablets {
46✔
466
                tab := s.ServingTablet(t.Predicate)
45✔
467
                x.AssertTrue(tab != nil)
45✔
468
                span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
45✔
469
                tablets = append(tablets, tab)
45✔
470
        }
45✔
471

472
        return &pb.TabletResponse{
1✔
473
                Tablets: tablets,
1✔
474
        }, nil
1✔
475
}
476

477
// RemoveNode removes the given node from the given group.
478
// It's the user's responsibility to ensure that node doesn't come back again
479
// before calling the api.
480
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
1,005✔
481
        if req.GroupId == 0 {
1,005✔
482
                return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
×
483
        }
×
484
        zp := &pb.ZeroProposal{}
1,005✔
485
        zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
1,005✔
486
        if _, ok := s.state.Groups[req.GroupId]; !ok {
1,007✔
487
                return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
2✔
488
        }
2✔
489
        if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
2,004✔
490
                return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
1,001✔
491
                        req.GroupId)
1,001✔
492
        }
1,001✔
493
        if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
2✔
494
                Tablets) > 0 {
2✔
495
                return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
×
496
                        req.GroupId)
×
497
        }
×
498
        if err := s.Node.proposeAndWait(ctx, zp); err != nil {
2✔
499
                return nil, err
×
500
        }
×
501

502
        return &pb.Status{}, nil
2✔
503
}
504

505
// Connect is used by Alpha nodes to connect the very first time with group zero.
506
func (s *Server) Connect(ctx context.Context,
507
        m *pb.Member) (resp *pb.ConnectionState, err error) {
597✔
508
        // Ensures that connect requests are always serialized
597✔
509
        s.connectLock.Lock()
597✔
510
        defer s.connectLock.Unlock()
597✔
511
        glog.Infof("Got connection request: %+v\n", m)
597✔
512
        defer glog.Infof("Connected: %+v\n", m)
597✔
513

597✔
514
        if ctx.Err() != nil {
597✔
515
                err := errors.Errorf("Context has error: %v\n", ctx.Err())
×
516
                return &emptyConnectionState, err
×
517
        }
×
518
        ms, err := s.latestMembershipState(ctx)
597✔
519
        if err != nil {
597✔
520
                return nil, err
×
521
        }
×
522

523
        if m.Learner && !ms.License.GetEnabled() {
597✔
524
                // Update the "ShouldCrash" function in x/x.go if you change the error message here.
×
525
                return nil, errors.New("ENTERPRISE_ONLY_LEARNER - Missing or expired Enterpise License. " +
×
526
                        "Cannot add Learner Node.")
×
527
        }
×
528

529
        if m.ClusterInfoOnly {
1,057✔
530
                // This request only wants to access the membership state, and nothing else. Most likely
460✔
531
                // from our clients.
460✔
532
                cs := &pb.ConnectionState{
460✔
533
                        State:      ms,
460✔
534
                        MaxPending: s.orc.MaxPending(),
460✔
535
                }
460✔
536
                return cs, err
460✔
537
        }
460✔
538
        if m.Addr == "" {
137✔
539
                return &emptyConnectionState, errors.Errorf("NO_ADDR: No address provided: %+v", m)
×
540
        }
×
541

542
        for _, member := range ms.Removed {
137✔
543
                // It is not recommended to reuse RAFT ids.
×
544
                if member.GroupId != 0 && m.Id == member.Id {
×
545
                        return &emptyConnectionState, errors.Errorf(
×
546
                                "REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
×
547
                }
×
548
        }
549

550
        numberOfNodes := len(ms.Zeros)
137✔
551
        for _, group := range ms.Groups {
203✔
552
                for _, member := range group.Members {
168✔
553
                        switch {
102✔
554
                        case member.Addr == m.Addr && m.Id == 0:
×
555
                                glog.Infof("Found a member with the same address. Returning: %+v", member)
×
556
                                conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
×
557
                                return &pb.ConnectionState{
×
558
                                        State:  ms,
×
559
                                        Member: member,
×
560
                                }, nil
×
561

562
                        case member.Addr == m.Addr && member.Id != m.Id:
×
563
                                // Same address. Different Id. If Id is zero, then it might be trying to connect for
×
564
                                // the first time. We can just directly return the membership information.
×
565
                                return nil, errors.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
×
566
                                        " Self: +%v", member, m)
×
567

568
                        case member.Addr != m.Addr && member.Id == m.Id:
×
569
                                // Same Id. Different address.
×
570
                                if pl, err := conn.GetPools().Get(member.Addr); err == nil && pl.IsHealthy() {
×
571
                                        // Found a healthy connection.
×
572
                                        return nil, errors.Errorf("REUSE_RAFTID: Healthy connection to a member"+
×
573
                                                " with same ID: %+v", member)
×
574
                                }
×
575
                        }
576
                        numberOfNodes++
102✔
577
                }
578
        }
579

580
        // Create a connection and check validity of the address by doing an Echo.
581
        conn.GetPools().Connect(m.Addr, s.tlsClientConfig)
137✔
582

137✔
583
        createProposal := func() *pb.ZeroProposal {
274✔
584
                s.Lock()
137✔
585
                defer s.Unlock()
137✔
586

137✔
587
                proposal := new(pb.ZeroProposal)
137✔
588
                // Check if we already have this member.
137✔
589
                for _, group := range s.state.Groups {
203✔
590
                        if _, has := group.Members[m.Id]; has {
68✔
591
                                return nil
2✔
592
                        }
2✔
593
                }
594
                if m.Id == 0 {
212✔
595
                        // In certain situations, the proposal can be sent and return with an error.
77✔
596
                        // However,  Dgraph will keep retrying the proposal. To avoid assigning duplicating
77✔
597
                        // IDs, the counter is incremented every time a proposal is created.
77✔
598
                        m.Id = s.nextRaftId
77✔
599
                        s.nextRaftId += 1
77✔
600
                        proposal.MaxRaftId = m.Id
77✔
601
                } else if m.Id >= s.nextRaftId {
159✔
602
                        s.nextRaftId = m.Id + 1
24✔
603
                        proposal.MaxRaftId = m.Id
24✔
604
                }
24✔
605

606
                // We don't have this member. So, let's see if it has preference for a group.
607
                if m.GroupId > 0 {
160✔
608
                        group, has := s.state.Groups[m.GroupId]
25✔
609
                        if !has {
46✔
610
                                // We don't have this group. Add the server to this group.
21✔
611
                                proposal.Member = m
21✔
612
                                return proposal
21✔
613
                        }
21✔
614

615
                        if _, has := group.Members[m.Id]; has {
4✔
616
                                proposal.Member = m // Update in case some fields have changed, like address.
×
617
                                return proposal
×
618
                        }
×
619

620
                        if m.Learner {
4✔
621
                                // Give it the group it wants.
×
622
                                proposal.Member = m
×
623
                                return proposal
×
624
                        }
×
625

626
                        // We don't have this server in the list.
627
                        if len(group.Members) < s.NumReplicas {
8✔
628
                                // We need more servers here, so let's add it.
4✔
629
                                proposal.Member = m
4✔
630
                                return proposal
4✔
631
                        } else if m.ForceGroupId {
4✔
632
                                // If the group ID was taken from the group_id file, force the member
×
633
                                // to be in this group even if the group is at capacity. This should
×
634
                                // not happen if users properly initialize a cluster after a bulk load.
×
635
                                proposal.Member = m
×
636
                                return proposal
×
637
                        }
×
638
                        // Already have plenty of servers serving this group.
639
                }
640
                // Let's assign this server to a new group.
641
                for gid, group := range s.state.Groups {
159✔
642
                        if len(group.Members) < s.NumReplicas {
65✔
643
                                m.GroupId = gid
16✔
644
                                proposal.Member = m
16✔
645
                                return proposal
16✔
646
                        }
16✔
647
                }
648
                // We either don't have any groups, or don't have any groups which need another member.
649
                m.GroupId = s.nextGroup
94✔
650
                // We shouldn't increase nextGroup here as we don't know whether we have enough
94✔
651
                // replicas until proposal is committed and can cause issues due to race.
94✔
652
                proposal.Member = m
94✔
653
                return proposal
94✔
654
        }
655

656
        proposal := createProposal()
137✔
657
        if proposal == nil {
139✔
658
                return &pb.ConnectionState{
2✔
659
                        State: ms, Member: m,
2✔
660
                }, nil
2✔
661
        }
2✔
662

663
        maxNodes := s.state.GetLicense().GetMaxNodes()
135✔
664
        if s.state.GetLicense().GetEnabled() && uint64(numberOfNodes) >= maxNodes {
135✔
665
                return nil, errors.Errorf("ENTERPRISE_LIMIT_REACHED: You are already using the maximum "+
×
666
                        "number of nodes: [%v] permitted for your enterprise license.", maxNodes)
×
667
        }
×
668

669
        if err := s.Node.proposeAndWait(ctx, proposal); err != nil {
173✔
670
                return &emptyConnectionState, err
38✔
671
        }
38✔
672
        resp = &pb.ConnectionState{
97✔
673
                State:  s.membershipState(),
97✔
674
                Member: m,
97✔
675
        }
97✔
676
        return resp, nil
97✔
677
}
678

679
// DeleteNamespace removes the tablets for deleted namespace from the membership state.
680
func (s *Server) DeleteNamespace(ctx context.Context, in *pb.DeleteNsRequest) (*pb.Status, error) {
9✔
681
        err := s.Node.proposeAndWait(ctx, &pb.ZeroProposal{DeleteNs: in})
9✔
682
        return &pb.Status{}, err
9✔
683
}
9✔
684

685
// ShouldServe returns the tablet serving the predicate passed in the request.
686
func (s *Server) ShouldServe(
687
        ctx context.Context, tablet *pb.Tablet) (resp *pb.Tablet, err error) {
9,398✔
688
        ctx, span := otrace.StartSpan(ctx, "Zero.ShouldServe")
9,398✔
689
        defer span.End()
9,398✔
690

9,398✔
691
        if tablet.Predicate == "" {
9,398✔
692
                return resp, errors.Errorf("Tablet predicate is empty in %+v", tablet)
×
693
        }
×
694
        if tablet.GroupId == 0 && !tablet.ReadOnly {
9,398✔
695
                return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
×
696
        }
×
697

698
        // Check who is serving this tablet.
699
        tab := s.ServingTablet(tablet.Predicate)
9,398✔
700
        span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
9,398✔
701
        if tab != nil && !tablet.Force {
9,793✔
702
                // Someone is serving this tablet. Could be the caller as well.
395✔
703
                // The caller should compare the returned group against the group it holds to check who's
395✔
704
                // serving.
395✔
705
                return tab, nil
395✔
706
        }
395✔
707

708
        // Read-only requests should return an empty tablet instead of asking zero
709
        // to serve the predicate.
710
        if tablet.ReadOnly {
10,573✔
711
                return &pb.Tablet{}, nil
1,570✔
712
        }
1,570✔
713

714
        // Set the tablet to be served by this server's group.
715
        var proposal pb.ZeroProposal
7,433✔
716

7,433✔
717
        if x.IsReservedPredicate(tablet.Predicate) {
13,164✔
718
                // Force all the reserved predicates to be allocated to group 1.
5,731✔
719
                // This is to make it easier to stream ACL updates to all alpha servers
5,731✔
720
                // since they only need to open one pipeline to receive updates for all
5,731✔
721
                // ACL predicates.
5,731✔
722
                // This will also make it easier to restore the reserved predicates after
5,731✔
723
                // a DropAll operation.
5,731✔
724
                tablet.GroupId = 1
5,731✔
725
        }
5,731✔
726
        proposal.Tablet = tablet
7,433✔
727
        if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
7,433✔
728
                span.Annotatef(nil, "While proposing tablet: %v", err)
×
729
                return tablet, err
×
730
        }
×
731
        tab = s.ServingTablet(tablet.Predicate)
7,433✔
732
        x.AssertTrue(tab != nil)
7,433✔
733
        span.Annotatef(nil, "Now serving tablet for %s: %+v", tablet.Predicate, tab)
7,433✔
734
        return tab, nil
7,433✔
735
}
736

737
// UpdateMembership updates the membership of the given group.
738
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
2,320✔
739
        // Only Zero leader would get these membership updates.
2,320✔
740
        if ts := group.GetCheckpointTs(); ts > 0 {
2,922✔
741
                for _, m := range group.GetMembers() {
1,204✔
742
                        s.Lock()
602✔
743
                        s.checkpointPerGroup[m.GetGroupId()] = ts
602✔
744
                        s.Unlock()
602✔
745
                }
602✔
746
        }
747
        proposals, err := s.createProposals(group)
2,320✔
748
        if err != nil {
2,320✔
749
                // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
×
750
                // wait.
×
751
                time.Sleep(time.Second)
×
752
                glog.Errorf("Error while creating proposals in Update: %v\n", err)
×
753
                return nil, err
×
754
        }
×
755

756
        ctx, cancel := context.WithCancel(ctx)
2,320✔
757
        defer cancel()
2,320✔
758

2,320✔
759
        errCh := make(chan error, len(proposals))
2,320✔
760
        for _, pr := range proposals {
2,454✔
761
                go func(pr *pb.ZeroProposal) {
268✔
762
                        errCh <- s.Node.proposeAndWait(ctx, pr)
134✔
763
                }(pr)
134✔
764
        }
765

766
        for range proposals {
2,454✔
767
                // We Don't care about these errors
134✔
768
                // Ideally shouldn't error out.
134✔
769
                if err := <-errCh; err != nil {
134✔
770
                        glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
×
771
                        return nil, err
×
772
                }
×
773
        }
774

775
        if len(group.Members) == 0 {
2,320✔
776
                return &api.Payload{Data: []byte("OK")}, nil
×
777
        }
×
778
        select {
2,320✔
779
        case s.moveOngoing <- struct{}{}:
2,320✔
780
        default:
×
781
                // If a move is going on, don't do the next steps of deleting predicates.
×
782
                return &api.Payload{Data: []byte("OK")}, nil
×
783
        }
784
        defer func() {
4,640✔
785
                <-s.moveOngoing
2,320✔
786
        }()
2,320✔
787

788
        if err := s.deletePredicates(ctx, group); err != nil {
2,320✔
789
                glog.Warningf("While deleting predicates: %v", err)
×
790
        }
×
791
        return &api.Payload{Data: []byte("OK")}, nil
2,320✔
792
}
793

794
func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
2,320✔
795
        if group == nil || group.Tablets == nil {
4,640✔
796
                return nil
2,320✔
797
        }
2,320✔
798
        var gid uint32
×
799
        for _, tablet := range group.Tablets {
×
800
                gid = tablet.GroupId
×
801
                break
×
802
        }
803
        if gid == 0 {
×
804
                return errors.Errorf("Unable to find group")
×
805
        }
×
806
        state, err := s.latestMembershipState(ctx)
×
807
        if err != nil {
×
808
                return err
×
809
        }
×
810
        sg, ok := state.Groups[gid]
×
811
        if !ok {
×
812
                return errors.Errorf("Unable to find group: %d", gid)
×
813
        }
×
814

815
        pl := s.Leader(gid)
×
816
        if pl == nil {
×
817
                return errors.Errorf("Unable to reach leader of group: %d", gid)
×
818
        }
×
819
        wc := pb.NewWorkerClient(pl.Get())
×
820

×
821
        for pred := range group.Tablets {
×
822
                if _, found := sg.Tablets[pred]; found {
×
823
                        continue
×
824
                }
825
                glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
×
826
                        pred, gid)
×
827
                in := &pb.MovePredicatePayload{
×
828
                        Predicate: pred,
×
829
                        SourceGid: gid,
×
830
                        DestGid:   0,
×
831
                }
×
832
                if _, err := wc.MovePredicate(ctx, in); err != nil {
×
833
                        return err
×
834
                }
×
835
        }
836
        return nil
×
837
}
838

839
// StreamMembership periodically streams the membership state to the given stream.
840
func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
102✔
841
        // Send MembershipState right away. So, the connection is correctly established.
102✔
842
        ctx := stream.Context()
102✔
843
        ms, err := s.latestMembershipState(ctx)
102✔
844
        if err != nil {
105✔
845
                return err
3✔
846
        }
3✔
847
        if err := stream.Send(ms); err != nil {
99✔
848
                return err
×
849
        }
×
850

851
        ticker := time.NewTicker(time.Second)
99✔
852
        defer ticker.Stop()
99✔
853
        for {
22,974✔
854
                select {
22,875✔
855
                case <-ticker.C:
22,778✔
856
                        // Send an update every second.
22,778✔
857
                        ms, err := s.latestMembershipState(ctx)
22,778✔
858
                        if err != nil {
22,780✔
859
                                return err
2✔
860
                        }
2✔
861
                        if err := stream.Send(ms); err != nil {
22,776✔
862
                                return err
×
863
                        }
×
864
                case <-ctx.Done():
49✔
865
                        return ctx.Err()
49✔
866
                case <-s.closer.HasBeenClosed():
48✔
867
                        return errServerShutDown
48✔
868
                }
869
        }
870
}
871

872
func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
29,330✔
873
        if err := s.Node.WaitLinearizableRead(ctx); err != nil {
29,348✔
874
                return nil, err
18✔
875
        }
18✔
876
        ms := s.membershipState()
29,312✔
877
        if ms == nil {
29,312✔
878
                return &pb.MembershipState{}, nil
×
879
        }
×
880
        return ms, nil
29,312✔
881
}
882

883
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
884
        error) {
7✔
885
        var l license
7✔
886
        signedData := bytes.NewReader(req.License)
7✔
887
        if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
12✔
888
                return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
5✔
889
        }
5✔
890

891
        numNodes := len(s.state.GetZeros())
2✔
892
        for _, group := range s.state.GetGroups() {
4✔
893
                numNodes += len(group.GetMembers())
2✔
894
        }
2✔
895
        if uint64(numNodes) > l.MaxNodes {
2✔
896
                return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
×
897
                        "You have: [%v].", l.MaxNodes, numNodes)
×
898
        }
×
899

900
        proposal := &pb.ZeroProposal{
2✔
901
                License: &pb.License{
2✔
902
                        User:     l.User,
2✔
903
                        MaxNodes: l.MaxNodes,
2✔
904
                        ExpiryTs: l.Expiry.Unix(),
2✔
905
                },
2✔
906
        }
2✔
907

2✔
908
        err := s.Node.proposeAndWait(ctx, proposal)
2✔
909
        if err != nil {
2✔
910
                return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
×
911
        }
×
912
        glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
2✔
913
        return &pb.Status{}, nil
2✔
914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc