• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5078820494

25 May 2023 11:19AM UTC coverage: 67.259% (-0.009%) from 67.268%
5078820494

push

GitHub
dgraphtest: print container logs if the test fails (#8829)

58396 of 86823 relevant lines covered (67.26%)

2270884.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.59
/worker/groups.go
1
/*
2
 * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "context"
21
        "fmt"
22
        "io"
23
        "sort"
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/golang/glog"
29
        "github.com/golang/protobuf/proto"
30
        "github.com/pkg/errors"
31

32
        badgerpb "github.com/dgraph-io/badger/v4/pb"
33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/ee/enc"
36
        "github.com/dgraph-io/dgraph/protos/pb"
37
        "github.com/dgraph-io/dgraph/raftwal"
38
        "github.com/dgraph-io/dgraph/schema"
39
        "github.com/dgraph-io/dgraph/x"
40
        "github.com/dgraph-io/ristretto/z"
41
)
42

43
type groupi struct {
44
        x.SafeMutex
45
        state        *pb.MembershipState
46
        Node         *node
47
        gid          uint32
48
        tablets      map[string]*pb.Tablet
49
        triggerCh    chan struct{} // Used to trigger membership sync
50
        blockDeletes *sync.Mutex   // Ensure that deletion won't happen when move is going on.
51
        closer       *z.Closer
52

53
        // Group checksum is used to determine if the tablets served by the groups have changed from
54
        // the membership information that the Alpha has. If so, Alpha cannot service a read.
55
        deltaChecksum      uint64 // Checksum received by OracleDelta.
56
        membershipChecksum uint64 // Checksum received by MembershipState.
57
}
58

59
var gr = &groupi{
60
        blockDeletes: new(sync.Mutex),
61
        tablets:      make(map[string]*pb.Tablet),
62
        closer:       z.NewCloser(3), // Match CLOSER:1 in this file.
63
}
64

65
func groups() *groupi {
3,196,917✔
66
        return gr
3,196,917✔
67
}
3,196,917✔
68

69
// StartRaftNodes will read the WAL dir, create the RAFT groups,
70
// and either start or restart RAFT nodes.
71
// This function triggers RAFT nodes to be created, and is the entrance to the RAFT
72
// world from main.go.
73
func StartRaftNodes(walStore *raftwal.DiskStorage, bindall bool) {
93✔
74
        if x.WorkerConfig.MyAddr == "" {
93✔
75
                x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort())
×
76
        } else {
93✔
77
                // check if address is valid or not
93✔
78
                x.Check(x.ValidateAddress(x.WorkerConfig.MyAddr))
93✔
79
                if !bindall {
93✔
80
                        glog.Errorln("--my flag is provided without bindall, Did you forget to specify bindall?")
×
81
                }
×
82
        }
83

84
        x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
93✔
85
        for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
232✔
86
                x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
139✔
87
                        "Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.",
139✔
88
                        zeroAddr, x.WorkerConfig.MyAddr)
139✔
89
        }
139✔
90

91
        raftIdx := x.WorkerConfig.Raft.GetUint64("idx")
93✔
92
        if raftIdx == 0 {
152✔
93
                raftIdx = walStore.Uint(raftwal.RaftId)
59✔
94

59✔
95
                // If the w directory already contains raft information, ignore the proposed
59✔
96
                // group ID stored inside the p directory.
59✔
97
                if raftIdx > 0 {
59✔
98
                        x.WorkerConfig.ProposedGroupId = 0
×
99
                }
×
100
        }
101
        glog.Infof("Current Raft Id: %#x\n", raftIdx)
93✔
102

93✔
103
        if x.WorkerConfig.ProposedGroupId == 0 {
184✔
104
                x.WorkerConfig.ProposedGroupId = x.WorkerConfig.Raft.GetUint32("group")
91✔
105
        }
91✔
106
        // Successfully connect with dgraphzero, before doing anything else.
107
        // Connect with Zero leader and figure out what group we should belong to.
108
        m := &pb.Member{
93✔
109
                Id:      raftIdx,
93✔
110
                GroupId: x.WorkerConfig.ProposedGroupId,
93✔
111
                Addr:    x.WorkerConfig.MyAddr,
93✔
112
                Learner: x.WorkerConfig.Raft.GetBool("learner"),
93✔
113
        }
93✔
114
        if m.GroupId > 0 {
116✔
115
                m.ForceGroupId = true
23✔
116
        }
23✔
117
        glog.Infof("Sending member request to Zero: %+v\n", m)
93✔
118
        var connState *pb.ConnectionState
93✔
119
        var err error
93✔
120

93✔
121
        for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
208✔
122
                pl := gr.connToZeroLeader()
115✔
123
                if pl == nil {
115✔
124
                        continue
×
125
                }
126
                zc := pb.NewZeroClient(pl.Get())
115✔
127
                connState, err = zc.Connect(gr.Ctx(), m)
115✔
128
                if err == nil || x.ShouldCrash(err) {
208✔
129
                        break
93✔
130
                }
131
        }
132
        x.CheckfNoTrace(err)
93✔
133
        if connState.GetMember() == nil || connState.GetState() == nil {
93✔
134
                x.Fatalf("Unable to join cluster via dgraphzero")
×
135
        }
×
136
        glog.Infof("Connected to group zero. Assigned group: %+v\n", connState.GetMember().GetGroupId())
93✔
137
        raftIdx = connState.GetMember().GetId()
93✔
138
        glog.Infof("Raft Id after connection to Zero: %#x\n", raftIdx)
93✔
139

93✔
140
        // This timestamp would be used for reading during snapshot after bulk load.
93✔
141
        // The stream is async, we need this information before we start or else replica might
93✔
142
        // not get any data.
93✔
143
        gr.applyState(raftIdx, connState.GetState())
93✔
144

93✔
145
        gid := gr.groupId()
93✔
146
        gr.triggerCh = make(chan struct{}, 1)
93✔
147

93✔
148
        // Initialize DiskStorage and pass it along.
93✔
149
        walStore.SetUint(raftwal.RaftId, raftIdx)
93✔
150
        walStore.SetUint(raftwal.GroupId, uint64(gid))
93✔
151

93✔
152
        gr.Node = newNode(walStore, gid, raftIdx, x.WorkerConfig.MyAddr)
93✔
153

93✔
154
        x.Checkf(schema.LoadFromDb(context.Background()), "Error while initializing schema")
93✔
155
        glog.Infof("Load schema from DB: OK")
93✔
156
        raftServer.UpdateNode(gr.Node.Node)
93✔
157
        gr.Node.InitAndStartNode()
93✔
158
        glog.Infof("Init and start Raft node: OK")
93✔
159

93✔
160
        go gr.sendMembershipUpdates()
93✔
161
        go gr.receiveMembershipUpdates()
93✔
162
        go gr.processOracleDeltaStream()
93✔
163

93✔
164
        gr.informZeroAboutTablets()
93✔
165
        glog.Infof("Informed Zero about tablets I have: OK")
93✔
166
        gr.applyInitialSchema()
93✔
167
        gr.applyInitialTypes()
93✔
168
        glog.Infof("Upserted Schema and Types: OK")
93✔
169

93✔
170
        x.UpdateHealthStatus(true)
93✔
171
        glog.Infof("Server is ready: OK")
93✔
172
}
173

174
func (g *groupi) Ctx() context.Context {
149,798✔
175
        return g.closer.Ctx()
149,798✔
176
}
149,798✔
177

178
func (g *groupi) IsClosed() bool {
272✔
179
        return g.closer.Ctx().Err() != nil
272✔
180
}
272✔
181

182
func (g *groupi) informZeroAboutTablets() {
93✔
183
        // Before we start this Alpha, let's pick up all the predicates we have in our postings
93✔
184
        // directory, and ask Zero if we are allowed to serve it. Do this irrespective of whether
93✔
185
        // this node is the leader or the follower, because this early on, we might not have
93✔
186
        // figured that out.
93✔
187
        ticker := time.NewTicker(time.Second)
93✔
188
        defer ticker.Stop()
93✔
189

93✔
190
        for range ticker.C {
186✔
191
                preds := schema.State().Predicates()
93✔
192
                if _, err := g.Inform(preds); err != nil {
93✔
193
                        glog.Errorf("Error while getting tablet for preds %v", err)
×
194
                } else {
93✔
195
                        glog.V(1).Infof("Done informing Zero about the %d tablets I have", len(preds))
93✔
196
                        return
93✔
197
                }
93✔
198
        }
199
}
200

201
func (g *groupi) applyInitialTypes() {
93✔
202
        initialTypes := schema.InitialTypes(x.GalaxyNamespace)
93✔
203
        for _, t := range initialTypes {
363✔
204
                if _, ok := schema.State().GetType(t.TypeName); ok {
272✔
205
                        continue
2✔
206
                }
207
                // It is okay to write initial types at ts=1.
208
                if err := updateType(t.GetTypeName(), *t, 1); err != nil {
268✔
209
                        glog.Errorf("Error while applying initial type: %s", err)
×
210
                }
×
211
        }
212
}
213

214
func (g *groupi) applyInitialSchema() {
93✔
215
        if g.groupId() != 1 {
124✔
216
                return
31✔
217
        }
31✔
218
        initialSchema := schema.InitialSchema(x.GalaxyNamespace)
62✔
219
        ctx := g.Ctx()
62✔
220

62✔
221
        apply := func(s *pb.SchemaUpdate) {
465✔
222
                // There are 2 cases: either the alpha is fresh or it restarted. If it is fresh cluster
403✔
223
                // then we can write the schema at ts=1. If alpha restarted, then we will already have the
403✔
224
                // schema at higher version and this operation will be a no-op.
403✔
225
                if err := applySchema(s, 1); err != nil {
403✔
226
                        glog.Errorf("Error while applying initial schema: %s", err)
×
227
                }
×
228
        }
229

230
        for _, s := range initialSchema {
480✔
231
                if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil {
418✔
232
                        glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.",
×
233
                                s.Predicate)
×
234
                        apply(s)
×
235
                } else if gid == 0 {
734✔
236
                        // The tablet is not being served currently.
316✔
237
                        apply(s)
316✔
238
                } else if curr, _ := schema.State().Get(ctx, s.Predicate); gid == g.groupId() &&
418✔
239
                        !proto.Equal(s, &curr) {
189✔
240
                        // If this tablet is served to the group, do not upsert the schema unless the
87✔
241
                        // stored schema and the proposed one are different.
87✔
242
                        apply(s)
87✔
243
                } else {
102✔
244
                        // The schema for this predicate has already been proposed.
15✔
245
                        glog.V(1).Infof("Schema found for predicate %s: %+v", s.Predicate, curr)
15✔
246
                        continue
15✔
247
                }
248
        }
249
}
250

251
func applySchema(s *pb.SchemaUpdate, ts uint64) error {
8,921✔
252
        if err := updateSchema(s, ts); err != nil {
8,921✔
253
                return err
×
254
        }
×
255
        if servesTablet, err := groups().ServesTablet(s.Predicate); err != nil {
8,921✔
256
                return err
×
257
        } else if !servesTablet {
8,921✔
258
                return errors.Errorf("group 1 should always serve reserved predicate %s", s.Predicate)
×
259
        }
×
260
        return nil
8,921✔
261
}
262

263
// No locks are acquired while accessing this function.
264
// Don't acquire RW lock during this, otherwise we might deadlock.
265
func (g *groupi) groupId() uint32 {
1,141,700✔
266
        return atomic.LoadUint32(&g.gid)
1,141,700✔
267
}
1,141,700✔
268

269
// MaxLeaseId returns the maximum UID that has been leased.
270
func MaxLeaseId() uint64 {
342,146✔
271
        g := groups()
342,146✔
272
        g.RLock()
342,146✔
273
        defer g.RUnlock()
342,146✔
274
        if g.state == nil {
342,146✔
275
                return 0
×
276
        }
×
277
        return g.state.MaxUID
342,146✔
278
}
279

280
// GetMembershipState returns the current membership state.
281
func GetMembershipState() *pb.MembershipState {
823✔
282
        g := groups()
823✔
283
        g.RLock()
823✔
284
        defer g.RUnlock()
823✔
285
        return proto.Clone(g.state).(*pb.MembershipState)
823✔
286
}
823✔
287

288
// UpdateMembershipState contacts zero for an update on membership state.
289
func UpdateMembershipState(ctx context.Context) error {
213✔
290
        g := groups()
213✔
291
        p := g.Leader(0)
213✔
292
        if p == nil {
213✔
293
                return errors.Errorf("don't have the address of any dgraph zero leader")
×
294
        }
×
295

296
        c := pb.NewZeroClient(p.Get())
213✔
297
        state, err := c.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
213✔
298
        if err != nil {
213✔
299
                return err
×
300
        }
×
301
        g.applyState(g.Node.Id, state.GetState())
213✔
302
        return nil
213✔
303
}
304

305
func (g *groupi) applyState(myId uint64, state *pb.MembershipState) {
21,234✔
306
        x.AssertTrue(state != nil)
21,234✔
307
        g.Lock()
21,234✔
308
        defer g.Unlock()
21,234✔
309
        // We don't update state if we get any old state. Counter stores the raftindex of
21,234✔
310
        // last update. For leader changes at zero since we don't propose, state can get
21,234✔
311
        // updated at same counter value. So ignore only if counter is less.
21,234✔
312
        if g.state != nil && g.state.Counter > state.Counter {
21,234✔
313
                return
×
314
        }
×
315

316
        invalid := state.License != nil && !state.License.Enabled
21,234✔
317
        if g.Node != nil && g.Node.RaftContext.IsLearner && invalid {
21,234✔
318
                glog.Errorf("ENTERPRISE_ONLY_LEARNER: License Expired. Cannot run learner nodes.")
×
319
                x.ServerCloser.Signal()
×
320
                return
×
321
        }
×
322

323
        oldState := g.state
21,234✔
324
        g.state = state
21,234✔
325

21,234✔
326
        // Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues.
21,234✔
327
        var foundSelf bool
21,234✔
328
        g.tablets = make(map[string]*pb.Tablet)
21,234✔
329
        for gid, group := range g.state.Groups {
62,831✔
330
                for _, member := range group.Members {
156,794✔
331
                        if myId == member.Id {
136,431✔
332
                                foundSelf = true
21,234✔
333
                                atomic.StoreUint32(&g.gid, gid)
21,234✔
334
                        }
21,234✔
335
                        if x.WorkerConfig.MyAddr != member.Addr {
209,160✔
336
                                conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
93,963✔
337
                        }
93,963✔
338
                }
339
                for _, tablet := range group.Tablets {
1,576,433✔
340
                        g.tablets[tablet.Predicate] = tablet
1,534,836✔
341
                }
1,534,836✔
342
                if gid == g.groupId() {
62,831✔
343
                        glog.V(3).Infof("group %d checksum: %d", g.groupId(), group.Checksum)
21,234✔
344
                        atomic.StoreUint64(&g.membershipChecksum, group.Checksum)
21,234✔
345
                }
21,234✔
346
        }
347
        for _, member := range g.state.Zeros {
77,955✔
348
                if x.WorkerConfig.MyAddr != member.Addr {
113,442✔
349
                        conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
56,721✔
350
                }
56,721✔
351
        }
352
        if !foundSelf {
21,234✔
353
                // I'm not part of this cluster. I should crash myself.
×
354
                glog.Fatalf("Unable to find myself [id:%d group:%d] in membership state: %+v. Goodbye!",
×
355
                        myId, g.groupId(), state)
×
356
        }
×
357

358
        // While restarting we fill Node information after retrieving initial state.
359
        if g.Node != nil {
42,375✔
360
                // Lets have this block before the one that adds the new members, else we may end up
21,141✔
361
                // removing a freshly added node.
21,141✔
362

21,141✔
363
                for _, member := range g.state.GetRemoved() {
21,141✔
364
                        // TODO: This leader check can be done once instead of repeatedly.
×
365
                        if member.GetGroupId() == g.Node.gid && g.Node.AmLeader() {
×
366
                                member := member // capture range variable
×
367
                                go func() {
×
368
                                        // Don't try to remove a member if it's already marked as removed in
×
369
                                        // the membership state and is not a current peer of the node.
×
370
                                        _, isPeer := g.Node.Peer(member.GetId())
×
371
                                        // isPeer should only be true if the removed node is not the same as this node.
×
372
                                        isPeer = isPeer && member.GetId() != g.Node.RaftContext.Id
×
373

×
374
                                        for _, oldMember := range oldState.GetRemoved() {
×
375
                                                if oldMember.GetId() == member.GetId() && !isPeer {
×
376
                                                        return
×
377
                                                }
×
378
                                        }
379

380
                                        if err := g.Node.ProposePeerRemoval(g.Ctx(), member.GetId()); err != nil {
×
381
                                                glog.Errorf("Error while proposing node removal: %+v", err)
×
382
                                        }
×
383
                                }()
384
                        }
385
                }
386
                conn.GetPools().RemoveInvalid(g.state)
21,141✔
387
        }
388
}
389

390
func (g *groupi) ServesGroup(gid uint32) bool {
173,356✔
391
        return g.groupId() == gid
173,356✔
392
}
173,356✔
393

394
func (g *groupi) ChecksumsMatch(ctx context.Context) error {
84,012✔
395
        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
167,223✔
396
                return nil
83,211✔
397
        }
83,211✔
398
        t := time.NewTicker(100 * time.Millisecond)
801✔
399
        defer t.Stop()
801✔
400
        for {
6,783✔
401
                select {
5,982✔
402
                case <-t.C:
5,978✔
403
                        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
6,775✔
404
                                return nil
797✔
405
                        }
797✔
406
                case <-ctx.Done():
4✔
407
                        return errors.Errorf("Group checksum mismatch for id: %d", g.groupId())
4✔
408
                }
409
        }
410
}
411

412
func (g *groupi) BelongsTo(key string) (uint32, error) {
269,548✔
413
        if tablet, err := g.Tablet(key); err != nil {
269,548✔
414
                return 0, err
×
415
        } else if tablet != nil {
539,096✔
416
                return tablet.GroupId, nil
269,548✔
417
        }
269,548✔
418
        return 0, nil
×
419
}
420

421
// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
422
// the tablet for key if no group is currently serving it.
423
// The ts passed should be the start ts of the query, so this method can compare that against a
424
// tablet move timestamp. If the tablet was moved to this group after the start ts of the query, we
425
// should reject that query.
426
func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) {
1,037,352✔
427
        g.RLock()
1,037,352✔
428
        tablet := g.tablets[key]
1,037,352✔
429
        g.RUnlock()
1,037,352✔
430
        if tablet != nil {
2,073,396✔
431
                if ts > 0 && ts < tablet.MoveTs {
1,036,044✔
432
                        return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
433
                                ts, tablet.MoveTs, key)
×
434
                }
×
435
                return tablet.GetGroupId(), nil
1,036,044✔
436
        }
437

438
        // We don't know about this tablet. Talk to dgraphzero to find out who is
439
        // serving this tablet.
440
        pl := g.connToZeroLeader()
1,308✔
441
        zc := pb.NewZeroClient(pl.Get())
1,308✔
442

1,308✔
443
        tablet = &pb.Tablet{
1,308✔
444
                Predicate: key,
1,308✔
445
                ReadOnly:  true,
1,308✔
446
        }
1,308✔
447
        out, err := zc.ShouldServe(g.Ctx(), tablet)
1,308✔
448
        if err != nil {
1,308✔
449
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
450
                return 0, err
×
451
        }
×
452
        if out.GetGroupId() == 0 {
2,589✔
453
                return 0, nil
1,281✔
454
        }
1,281✔
455

456
        g.Lock()
27✔
457
        defer g.Unlock()
27✔
458
        g.tablets[key] = out
27✔
459
        if out != nil && ts > 0 && ts < out.MoveTs {
27✔
460
                return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
461
                        ts, out.MoveTs, key)
×
462
        }
×
463
        return out.GetGroupId(), nil
27✔
464
}
465

466
func (g *groupi) ServesTablet(key string) (bool, error) {
9,392✔
467
        if tablet, err := g.Tablet(key); err != nil {
9,392✔
468
                return false, err
×
469
        } else if tablet != nil && tablet.GroupId == groups().groupId() {
18,780✔
470
                return true, nil
9,388✔
471
        }
9,388✔
472
        return false, nil
4✔
473
}
474

475
func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) {
7,612✔
476
        pl := g.connToZeroLeader()
7,612✔
477
        zc := pb.NewZeroClient(pl.Get())
7,612✔
478

7,612✔
479
        out, err := zc.ShouldServe(g.Ctx(), tablet)
7,612✔
480
        if err != nil {
7,612✔
481
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
482
                return nil, err
×
483
        }
×
484

485
        // Do not store tablets with group ID 0, as they are just dummy tablets for
486
        // predicates that do no exist.
487
        if out.GroupId > 0 {
15,224✔
488
                g.Lock()
7,612✔
489
                g.tablets[out.GetPredicate()] = out
7,612✔
490
                g.Unlock()
7,612✔
491
        }
7,612✔
492

493
        if out.GroupId == groups().groupId() {
15,153✔
494
                glog.Infof("Serving tablet for: %v\n", tablet.GetPredicate())
7,541✔
495
        }
7,541✔
496
        return out, nil
7,612✔
497
}
498

499
func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) {
93✔
500
        unknownPreds := make([]*pb.Tablet, 0)
93✔
501
        tablets := make([]*pb.Tablet, 0)
93✔
502
        g.RLock()
93✔
503
        for _, p := range preds {
191✔
504
                if len(p) == 0 {
98✔
505
                        continue
×
506
                }
507

508
                if tab, ok := g.tablets[p]; !ok {
188✔
509
                        unknownPreds = append(unknownPreds, &pb.Tablet{GroupId: g.groupId(), Predicate: p})
90✔
510
                } else {
98✔
511
                        tablets = append(tablets, tab)
8✔
512
                }
8✔
513
        }
514
        g.RUnlock()
93✔
515

93✔
516
        if len(unknownPreds) == 0 {
184✔
517
                return nil, nil
91✔
518
        }
91✔
519

520
        pl := g.connToZeroLeader()
2✔
521
        zc := pb.NewZeroClient(pl.Get())
2✔
522
        out, err := zc.Inform(g.Ctx(), &pb.TabletRequest{
2✔
523
                Tablets: unknownPreds,
2✔
524
                GroupId: g.groupId(),
2✔
525
        })
2✔
526
        if err != nil {
2✔
527
                glog.Errorf("Error while Inform grpc call %v", err)
×
528
                return nil, err
×
529
        }
×
530

531
        // Do not store tablets with group ID 0, as they are just dummy tablets for
532
        // predicates that do no exist.
533
        g.Lock()
2✔
534
        for _, t := range out.Tablets {
92✔
535
                if t.GroupId > 0 {
180✔
536
                        g.tablets[t.GetPredicate()] = t
90✔
537
                        tablets = append(tablets, t)
90✔
538
                }
90✔
539

540
                if t.GroupId == groups().groupId() {
180✔
541
                        glog.Infof("Serving tablet for: %v\n", t.GetPredicate())
90✔
542
                }
90✔
543
        }
544
        g.Unlock()
2✔
545
        return tablets, nil
2✔
546
}
547

548
// Do not modify the returned Tablet
549
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
557,126✔
550
        // TODO: Remove all this later, create a membership state and apply it
557,126✔
551
        g.RLock()
557,126✔
552
        tablet, ok := g.tablets[key]
557,126✔
553
        g.RUnlock()
557,126✔
554
        if ok {
1,110,674✔
555
                return tablet, nil
553,548✔
556
        }
553,548✔
557

558
        // We don't know about this tablet.
559
        // Check with dgraphzero if we can serve it.
560
        tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key}
3,578✔
561
        return g.sendTablet(tablet)
3,578✔
562
}
563

564
func (g *groupi) ForceTablet(key string) (*pb.Tablet, error) {
4,034✔
565
        return g.sendTablet(&pb.Tablet{GroupId: g.groupId(), Predicate: key, Force: true})
4,034✔
566
}
4,034✔
567

568
func (g *groupi) HasMeInState() bool {
×
569
        g.RLock()
×
570
        defer g.RUnlock()
×
571
        if g.state == nil {
×
572
                return false
×
573
        }
×
574

575
        group, has := g.state.Groups[g.groupId()]
×
576
        if !has {
×
577
                return false
×
578
        }
×
579
        _, has = group.Members[g.Node.Id]
×
580
        return has
×
581
}
582

583
// Returns 0, 1, or 2 valid server addrs.
584
func (g *groupi) AnyTwoServers(gid uint32) []string {
19,497✔
585
        g.RLock()
19,497✔
586
        defer g.RUnlock()
19,497✔
587

19,497✔
588
        if g.state == nil {
19,850✔
589
                return []string{}
353✔
590
        }
353✔
591
        group, has := g.state.Groups[gid]
19,144✔
592
        if !has {
19,144✔
593
                return []string{}
×
594
        }
×
595
        var res []string
19,144✔
596
        for _, m := range group.Members {
56,284✔
597
                // map iteration gives us members in no particular order.
37,140✔
598
                res = append(res, m.Addr)
37,140✔
599
                if len(res) >= 2 {
55,136✔
600
                        break
17,996✔
601
                }
602
        }
603
        return res
19,144✔
604
}
605

606
func (g *groupi) members(gid uint32) map[uint64]*pb.Member {
121,456✔
607
        g.RLock()
121,456✔
608
        defer g.RUnlock()
121,456✔
609

121,456✔
610
        if g.state == nil {
121,839✔
611
                return nil
383✔
612
        }
383✔
613
        if gid == 0 {
224,080✔
614
                return g.state.Zeros
103,007✔
615
        }
103,007✔
616
        group, has := g.state.Groups[gid]
18,066✔
617
        if !has {
18,066✔
618
                return nil
×
619
        }
×
620
        return group.Members
18,066✔
621
}
622

623
func (g *groupi) AnyServer(gid uint32) *conn.Pool {
265✔
624
        members := g.members(gid)
265✔
625
        for _, m := range members {
322✔
626
                pl, err := conn.GetPools().Get(m.Addr)
57✔
627
                if err == nil {
114✔
628
                        return pl
57✔
629
                }
57✔
630
        }
631
        return nil
208✔
632
}
633

634
func (g *groupi) MyPeer() (uint64, bool) {
278✔
635
        members := g.members(g.groupId())
278✔
636
        for _, m := range members {
587✔
637
                if m.Id != g.Node.Id {
386✔
638
                        return m.Id, true
77✔
639
                }
77✔
640
        }
641
        return 0, false
201✔
642
}
643

644
// Leader will try to return the leader of a given group, based on membership information.
645
// There is currently no guarantee that the returned server is the leader of the group.
646
func (g *groupi) Leader(gid uint32) *conn.Pool {
120,912✔
647
        members := g.members(gid)
120,912✔
648
        if members == nil {
121,087✔
649
                return nil
175✔
650
        }
175✔
651
        for _, m := range members {
306,994✔
652
                if m.Leader {
306,970✔
653
                        if pl, err := conn.GetPools().Get(m.Addr); err == nil {
241,422✔
654
                                return pl
120,709✔
655
                        }
120,709✔
656
                }
657
        }
658
        return nil
28✔
659
}
660

661
func (g *groupi) KnownGroups() (gids []uint32) {
18,210✔
662
        g.RLock()
18,210✔
663
        defer g.RUnlock()
18,210✔
664
        if g.state == nil {
18,210✔
665
                return
×
666
        }
×
667
        for gid := range g.state.Groups {
53,460✔
668
                gids = append(gids, gid)
35,250✔
669
        }
35,250✔
670
        return
18,210✔
671
}
672

673
// KnownGroups returns the known groups using the global groupi instance.
674
func KnownGroups() []uint32 {
×
675
        return groups().KnownGroups()
×
676
}
×
677

678
// GroupId returns the group to which this worker belongs to.
679
func GroupId() uint32 {
141✔
680
        return groups().groupId()
141✔
681
}
141✔
682

683
// NodeId returns the raft id of the node.
684
func NodeId() uint64 {
2✔
685
        return groups().Node.Id
2✔
686
}
2✔
687

688
func (g *groupi) triggerMembershipSync() {
2,248✔
689
        // It's ok if we miss the trigger, periodic membership sync runs every minute.
2,248✔
690
        select {
2,248✔
691
        case g.triggerCh <- struct{}{}:
2,155✔
692
        // It's ok to ignore it, since we would be sending update of a later state
693
        default:
93✔
694
        }
695
}
696

697
const connBaseDelay = 100 * time.Millisecond
698

699
func (g *groupi) connToZeroLeader() *conn.Pool {
77,174✔
700
        pl := g.Leader(0)
77,174✔
701
        if pl != nil {
154,171✔
702
                return pl
76,997✔
703
        }
76,997✔
704
        glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...")
177✔
705

177✔
706
        getLeaderConn := func(zc pb.ZeroClient) *conn.Pool {
385✔
707
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
208✔
708
                defer cancel()
208✔
709

208✔
710
                connState, err := zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
208✔
711
                if err != nil || connState == nil {
241✔
712
                        glog.V(1).Infof("While retrieving Zero leader info. Error: %v. Retrying...", err)
33✔
713
                        return nil
33✔
714
                }
33✔
715
                for _, mz := range connState.State.GetZeros() {
409✔
716
                        if mz.Leader {
409✔
717
                                return conn.GetPools().Connect(mz.GetAddr(), x.WorkerConfig.TLSClientConfig)
175✔
718
                        }
175✔
719
                }
720
                return nil
×
721
        }
722

723
        // No leader found. Let's get the latest membership state from Zero.
724
        delay := connBaseDelay
177✔
725
        maxHalfDelay := time.Second
177✔
726
        for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
387✔
727
                if g.IsClosed() {
212✔
728
                        return nil
2✔
729
                }
2✔
730

731
                time.Sleep(delay)
208✔
732
                if delay <= maxHalfDelay {
416✔
733
                        delay *= 2
208✔
734
                }
208✔
735

736
                zAddrList := x.WorkerConfig.ZeroAddr
208✔
737
                // Pick addresses in round robin manner.
208✔
738
                addr := zAddrList[i%len(zAddrList)]
208✔
739

208✔
740
                pl := g.AnyServer(0)
208✔
741
                if pl == nil {
416✔
742
                        pl = conn.GetPools().Connect(addr, x.WorkerConfig.TLSClientConfig)
208✔
743
                }
208✔
744
                if pl == nil {
208✔
745
                        glog.V(1).Infof("No healthy Zero server found. Retrying...")
×
746
                        continue
×
747
                }
748
                zc := pb.NewZeroClient(pl.Get())
208✔
749
                if pl := getLeaderConn(zc); pl != nil {
383✔
750
                        glog.V(1).Infof("Found connection to leader: %s", pl.Addr)
175✔
751
                        return pl
175✔
752
                }
175✔
753
                glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...")
33✔
754
        }
755
}
756

757
func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error {
2,141✔
758
        leader := g.Node.AmLeader()
2,141✔
759
        member := &pb.Member{
2,141✔
760
                Id:         g.Node.Id,
2,141✔
761
                GroupId:    g.groupId(),
2,141✔
762
                Addr:       x.WorkerConfig.MyAddr,
2,141✔
763
                Leader:     leader,
2,141✔
764
                LastUpdate: uint64(time.Now().Unix()),
2,141✔
765
        }
2,141✔
766
        group := &pb.Group{
2,141✔
767
                Members: make(map[uint64]*pb.Member),
2,141✔
768
        }
2,141✔
769
        group.Members[member.Id] = member
2,141✔
770
        if leader {
3,000✔
771
                // Do not send tablet information, if I'm not the leader.
859✔
772
                group.Tablets = tablets
859✔
773
                if snap, err := g.Node.Snapshot(); err == nil {
1,718✔
774
                        group.SnapshotTs = snap.ReadTs
859✔
775
                }
859✔
776
                group.CheckpointTs = atomic.LoadUint64(&g.Node.checkpointTs)
859✔
777
        }
778

779
        pl := g.connToZeroLeader()
2,141✔
780
        if pl == nil {
2,141✔
781
                return errNoConnection
×
782
        }
×
783
        c := pb.NewZeroClient(pl.Get())
2,141✔
784
        ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
2,141✔
785
        defer cancel()
2,141✔
786
        reply, err := c.UpdateMembership(ctx, group)
2,141✔
787
        if err != nil {
2,154✔
788
                return err
13✔
789
        }
13✔
790
        if string(reply.GetData()) == "OK" {
4,256✔
791
                return nil
2,128✔
792
        }
2,128✔
793
        return errors.Errorf(string(reply.GetData()))
×
794
}
795

796
// sendMembershipUpdates sends the membership update to Zero leader. If this Alpha is the leader, it
797
// would also calculate the tablet sizes and send them to Zero.
798
func (g *groupi) sendMembershipUpdates() {
93✔
799
        defer func() {
186✔
800
                glog.Infoln("Closing sendMembershipUpdates")
93✔
801
                g.closer.Done() // CLOSER:1
93✔
802
        }()
93✔
803

804
        ticker := time.NewTicker(time.Second)
93✔
805
        defer ticker.Stop()
93✔
806

93✔
807
        consumeTriggers := func() {
2,234✔
808
                for {
4,289✔
809
                        select {
2,148✔
810
                        case <-g.triggerCh:
7✔
811
                        default:
2,141✔
812
                                return
2,141✔
813
                        }
814
                }
815
        }
816

817
        g.triggerMembershipSync() // Ticker doesn't start immediately
93✔
818
        var lastSent time.Time
93✔
819
        for {
23,185✔
820
                select {
23,092✔
821
                case <-g.closer.HasBeenClosed():
93✔
822
                        return
93✔
823
                case <-ticker.C:
20,858✔
824
                        if time.Since(lastSent) > 10*time.Second {
22,732✔
825
                                // On start of node if it becomes a leader, we would send tablets size for sure.
1,874✔
826
                                g.triggerMembershipSync()
1,874✔
827
                        }
1,874✔
828
                case <-g.triggerCh:
2,141✔
829
                        // Let's send update even if not leader, zero will know that this node is still active.
2,141✔
830
                        // We don't need to send tablet information everytime. So, let's only send it when we
2,141✔
831
                        // calculate it.
2,141✔
832
                        consumeTriggers()
2,141✔
833
                        if err := g.doSendMembership(nil); err != nil {
2,154✔
834
                                glog.Errorf("While sending membership update: %v", err)
13✔
835
                        } else {
2,141✔
836
                                lastSent = time.Now()
2,128✔
837
                        }
2,128✔
838
                }
839
        }
840
}
841

842
// receiveMembershipUpdates receives membership updates from ANY Zero server. This is the main
843
// connection which tells Alpha about the state of the cluster, including the latest Zero leader.
844
// All the other connections to Zero, are made only to the leader.
845
func (g *groupi) receiveMembershipUpdates() {
93✔
846
        defer func() {
186✔
847
                glog.Infoln("Closing receiveMembershipUpdates")
93✔
848
                g.closer.Done() // CLOSER:1
93✔
849
        }()
93✔
850

851
        ticker := time.NewTicker(10 * time.Second)
93✔
852
        defer ticker.Stop()
93✔
853

93✔
854
START:
93✔
855
        select {
93✔
856
        case <-g.closer.HasBeenClosed():
93✔
857
                return
93✔
858
        default:
141✔
859
        }
860

861
        pl := g.connToZeroLeader()
141✔
862
        // We should always have some connection to dgraphzero.
141✔
863
        if pl == nil {
141✔
864
                glog.Warningln("Membership update: No Zero server known.")
×
865
                time.Sleep(time.Second)
×
866
                goto START
×
867
        }
868
        glog.Infof("Got address of a Zero leader: %s", pl.Addr)
141✔
869

141✔
870
        c := pb.NewZeroClient(pl.Get())
141✔
871
        ctx, cancel := context.WithCancel(g.Ctx())
141✔
872
        stream, err := c.StreamMembership(ctx, &api.Payload{})
141✔
873
        if err != nil {
184✔
874
                cancel()
43✔
875
                glog.Errorf("Error while calling update %v\n", err)
43✔
876
                time.Sleep(time.Second)
43✔
877
                goto START
43✔
878
        }
879

880
        stateCh := make(chan *pb.MembershipState, 10)
98✔
881
        go func() {
196✔
882
                glog.Infof("Starting a new membership stream receive from %s.", pl.Addr)
98✔
883
                for i := 0; ; i++ {
21,124✔
884
                        // Blocking, should return if sending on stream fails(Need to verify).
21,026✔
885
                        state, err := stream.Recv()
21,026✔
886
                        if err != nil || state == nil {
21,124✔
887
                                if err == io.EOF {
98✔
888
                                        glog.Infoln("Membership sync stream closed.")
×
889
                                } else {
98✔
890
                                        glog.Errorf("Unable to sync memberships. Error: %v. State: %v", err, state)
98✔
891
                                }
98✔
892
                                // If zero server is lagging behind leader.
893
                                if ctx.Err() == nil {
126✔
894
                                        cancel()
28✔
895
                                }
28✔
896
                                return
98✔
897
                        }
898
                        if i == 0 {
21,021✔
899
                                glog.Infof("Received first state update from Zero: %+v", state)
93✔
900
                                x.WriteCidFile(state.Cid)
93✔
901
                        }
93✔
902
                        select {
20,928✔
903
                        case stateCh <- state:
20,928✔
904
                        case <-ctx.Done():
×
905
                                return
×
906
                        }
907
                }
908
        }()
909

910
        lastRecv := time.Now()
98✔
911
OUTER:
98✔
912
        for {
23,167✔
913
                select {
23,069✔
914
                case <-g.closer.HasBeenClosed():
70✔
915
                        if err := stream.CloseSend(); err != nil {
70✔
916
                                glog.Errorf("Error closing send stream: %+v", err)
×
917
                        }
×
918
                        break OUTER
70✔
919
                case <-ctx.Done():
28✔
920
                        if err := stream.CloseSend(); err != nil {
28✔
921
                                glog.Errorf("Error closing send stream: %+v", err)
×
922
                        }
×
923
                        break OUTER
28✔
924
                case state := <-stateCh:
20,928✔
925
                        lastRecv = time.Now()
20,928✔
926
                        g.applyState(g.Node.Id, state)
20,928✔
927
                case <-ticker.C:
2,043✔
928
                        if time.Since(lastRecv) > 10*time.Second {
2,043✔
929
                                // Zero might have gone under partition. We should recreate our connection.
×
930
                                glog.Warningf("No membership update for 10s. Closing connection to Zero.")
×
931
                                if err := stream.CloseSend(); err != nil {
×
932
                                        glog.Errorf("Error closing send stream: %+v", err)
×
933
                                }
×
934
                                break OUTER
×
935
                        }
936
                }
937
        }
938
        cancel()
98✔
939
        goto START
98✔
940
}
941

942
// processOracleDeltaStream is used to process oracle delta stream from Zero.
943
// Zero sends information about aborted/committed transactions and maxPending.
944
func (g *groupi) processOracleDeltaStream() {
93✔
945
        defer func() {
186✔
946
                glog.Infoln("Closing processOracleDeltaStream")
93✔
947
                g.closer.Done() // CLOSER:1
93✔
948
        }()
93✔
949

950
        ticker := time.NewTicker(time.Second)
93✔
951
        defer ticker.Stop()
93✔
952

93✔
953
        blockingReceiveAndPropose := func() {
196✔
954
                glog.Infof("Leader idx=%#x of group=%d is connecting to Zero for txn updates\n",
103✔
955
                        g.Node.Id, g.groupId())
103✔
956

103✔
957
                pl := g.connToZeroLeader()
103✔
958
                if pl == nil {
105✔
959
                        glog.Warningln("Oracle delta stream: No Zero leader known.")
2✔
960
                        if g.IsClosed() {
4✔
961
                                return
2✔
962
                        }
2✔
963
                        time.Sleep(time.Second)
×
964
                        return
×
965
                }
966
                glog.Infof("Got Zero leader: %s", pl.Addr)
101✔
967

101✔
968
                // The following code creates a stream. Then runs a goroutine to pick up events from the
101✔
969
                // stream and pushes them to a channel. The main loop loops over the channel, doing smart
101✔
970
                // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of
101✔
971
                // times proposals happen, which is a great optimization to have (and a common one in our
101✔
972
                // code base).
101✔
973
                ctx, cancel := context.WithCancel(g.Ctx())
101✔
974
                defer cancel()
101✔
975

101✔
976
                c := pb.NewZeroClient(pl.Get())
101✔
977
                stream, err := c.Oracle(ctx, &api.Payload{})
101✔
978
                if err != nil {
130✔
979
                        glog.Errorf("Error while calling Oracle %v\n", err)
29✔
980
                        time.Sleep(time.Second)
29✔
981
                        return
29✔
982
                }
29✔
983

984
                deltaCh := make(chan *pb.OracleDelta, 100)
72✔
985
                go func() {
144✔
986
                        // This would exit when either a Recv() returns error. Or, cancel() is called by
72✔
987
                        // something outside of this goroutine.
72✔
988
                        defer func() {
144✔
989
                                if err := stream.CloseSend(); err != nil {
72✔
990
                                        glog.Errorf("Error closing send stream: %+v", err)
×
991
                                }
×
992
                        }()
993
                        defer close(deltaCh)
72✔
994

72✔
995
                        for {
152,727✔
996
                                delta, err := stream.Recv()
152,655✔
997
                                if err != nil || delta == nil {
152,727✔
998
                                        glog.Errorf("Error in oracle delta stream. Error: %v", err)
72✔
999
                                        return
72✔
1000
                                }
72✔
1001

1002
                                select {
152,583✔
1003
                                case deltaCh <- delta:
152,583✔
1004
                                case <-ctx.Done():
×
1005
                                        return
×
1006
                                }
1007
                        }
1008
                }()
1009

1010
                for {
146,677✔
1011
                        var delta *pb.OracleDelta
146,605✔
1012
                        var batch int
146,605✔
1013
                        select {
146,605✔
1014
                        case delta = <-deltaCh:
138,061✔
1015
                                if delta == nil {
138,083✔
1016
                                        return
22✔
1017
                                }
22✔
1018
                                batch++
138,039✔
1019
                        case <-ticker.C:
8,494✔
1020
                                newLead := g.Leader(0)
8,494✔
1021
                                if newLead == nil || newLead.Addr != pl.Addr {
8,494✔
1022
                                        glog.Infof("Zero leadership changed. Renewing oracle delta stream.")
×
1023
                                        return
×
1024
                                }
×
1025
                                continue
8,494✔
1026

1027
                        case <-ctx.Done():
×
1028
                                return
×
1029
                        case <-g.closer.HasBeenClosed():
50✔
1030
                                return
50✔
1031
                        }
1032

1033
                SLURP:
138,039✔
1034
                        for {
290,622✔
1035
                                select {
152,583✔
1036
                                case more := <-deltaCh:
14,544✔
1037
                                        if more == nil {
14,544✔
1038
                                                return
×
1039
                                        }
×
1040
                                        batch++
14,544✔
1041
                                        if delta.GroupChecksums == nil {
14,544✔
1042
                                                delta.GroupChecksums = make(map[uint32]uint64)
×
1043
                                        }
×
1044
                                        delta.Txns = append(delta.Txns, more.Txns...)
14,544✔
1045
                                        delta.MaxAssigned = x.Max(delta.MaxAssigned, more.MaxAssigned)
14,544✔
1046
                                        for gid, checksum := range more.GroupChecksums {
42,439✔
1047
                                                delta.GroupChecksums[gid] = checksum
27,895✔
1048
                                        }
27,895✔
1049
                                default:
138,039✔
1050
                                        break SLURP
138,039✔
1051
                                }
1052
                        }
1053

1054
                        // Only the leader needs to propose the oracleDelta retrieved from Zero.
1055
                        // The leader and the followers would not directly apply or use the
1056
                        // oracleDelta streaming in from Zero. They would wait for the proposal to
1057
                        // go through and be applied via node.Run.  This saves us from many edge
1058
                        // cases around network partitions and race conditions between prewrites and
1059
                        // commits, etc.
1060
                        if !g.Node.AmLeader() {
138,039✔
1061
                                glog.Errorf("No longer the leader of group %d. Exiting", g.groupId())
×
1062
                                return
×
1063
                        }
×
1064

1065
                        // We should always sort the txns before applying. Otherwise, we might lose some of
1066
                        // these updates, because we never write over a new version.
1067
                        sort.Slice(delta.Txns, func(i, j int) bool {
141,505✔
1068
                                return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
3,466✔
1069
                        })
3,466✔
1070
                        if len(delta.Txns) > 0 {
157,326✔
1071
                                last := delta.Txns[len(delta.Txns)-1]
19,287✔
1072
                                // Update MaxAssigned on commit so best effort queries can get back latest data.
19,287✔
1073
                                delta.MaxAssigned = x.Max(delta.MaxAssigned, last.CommitTs)
19,287✔
1074
                        }
19,287✔
1075
                        if glog.V(3) {
142,447✔
1076
                                glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:",
4,408✔
1077
                                        batch, delta.MaxAssigned)
4,408✔
1078
                                for _, txn := range delta.Txns {
5,148✔
1079
                                        if txn.CommitTs == 0 {
776✔
1080
                                                glog.Infof("Aborted: %d", txn.StartTs)
36✔
1081
                                        } else {
740✔
1082
                                                glog.Infof("Committed: %d -> %d", txn.StartTs, txn.CommitTs)
704✔
1083
                                        }
704✔
1084
                                }
1085
                        }
1086
                        for {
276,078✔
1087
                                // Block forever trying to propose this. Also this proposal should not be counted
138,039✔
1088
                                // towards num pending proposals and be proposed right away.
138,039✔
1089
                                err := g.Node.proposeAndWait(g.Ctx(), &pb.Proposal{Delta: delta})
138,039✔
1090
                                if err == nil {
276,078✔
1091
                                        break
138,039✔
1092
                                }
1093
                                if g.Ctx().Err() != nil {
×
1094
                                        break
×
1095
                                }
1096
                                glog.Errorf("While proposing delta with MaxAssigned: %d and num txns: %d."+
×
1097
                                        " Error=%v. Retrying...\n", delta.MaxAssigned, len(delta.Txns), err)
×
1098
                        }
1099
                }
1100
        }
1101

1102
        for {
12,544✔
1103
                select {
12,451✔
1104
                case <-g.closer.HasBeenClosed():
93✔
1105
                        return
93✔
1106
                case <-ticker.C:
12,358✔
1107
                        // Only the leader needs to connect to Zero and get transaction
12,358✔
1108
                        // updates.
12,358✔
1109
                        if g.Node.AmLeader() {
12,461✔
1110
                                blockingReceiveAndPropose()
103✔
1111
                        }
103✔
1112
                }
1113
        }
1114
}
1115

1116
// GetEEFeaturesList returns a list of Enterprise Features that are available.
1117
func GetEEFeaturesList() []string {
163✔
1118
        if !EnterpriseEnabled() {
195✔
1119
                return nil
32✔
1120
        }
32✔
1121
        var ee []string
131✔
1122
        if len(Config.HmacSecret) > 0 {
187✔
1123
                ee = append(ee, "acl")
56✔
1124
                ee = append(ee, "multi_tenancy")
56✔
1125
        }
56✔
1126
        if x.WorkerConfig.EncryptionKey != nil {
179✔
1127
                ee = append(ee, "encryption_at_rest", "encrypted_backup_restore", "encrypted_export")
48✔
1128
        } else {
131✔
1129
                ee = append(ee, "backup_restore")
83✔
1130
        }
83✔
1131
        if x.WorkerConfig.Audit {
133✔
1132
                ee = append(ee, "audit")
2✔
1133
        }
2✔
1134
        if Config.ChangeDataConf != "" {
262✔
1135
                ee = append(ee, "cdc")
131✔
1136
        }
131✔
1137
        return ee
131✔
1138
}
1139

1140
// EnterpriseEnabled returns whether enterprise features can be used or not.
1141
func EnterpriseEnabled() bool {
714✔
1142
        if !enc.EeBuild {
714✔
1143
                return false
×
1144
        }
×
1145
        state := GetMembershipState()
714✔
1146
        if state == nil {
736✔
1147
                return groups().askZeroForEE()
22✔
1148
        }
22✔
1149
        return state.GetLicense().GetEnabled()
692✔
1150
}
1151

1152
func (g *groupi) askZeroForEE() bool {
22✔
1153
        var err error
22✔
1154
        var connState *pb.ConnectionState
22✔
1155

22✔
1156
        createConn := func() bool {
82✔
1157
                pl := g.connToZeroLeader()
60✔
1158
                if pl == nil {
60✔
1159
                        return false
×
1160
                }
×
1161
                zc := pb.NewZeroClient(pl.Get())
60✔
1162

60✔
1163
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
60✔
1164
                defer cancel()
60✔
1165

60✔
1166
                connState, err = zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
60✔
1167
                if connState == nil ||
60✔
1168
                        connState.GetState() == nil ||
60✔
1169
                        connState.GetState().GetLicense() == nil {
98✔
1170
                        glog.Info("Retry Zero Connection")
38✔
1171
                        return false
38✔
1172
                }
38✔
1173
                if err == nil || x.ShouldCrash(err) {
44✔
1174
                        return true
22✔
1175
                }
22✔
1176
                return false
×
1177
        }
1178

1179
        for !g.IsClosed() {
82✔
1180
                if createConn() {
82✔
1181
                        break
22✔
1182
                }
1183
                time.Sleep(time.Second)
38✔
1184
        }
1185
        return connState.GetState().GetLicense().GetEnabled()
22✔
1186
}
1187

1188
// SubscribeForUpdates will listen for updates for the given group.
1189
func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
1190
        group uint32, closer *z.Closer) {
121✔
1191

121✔
1192
        var prefix []byte
121✔
1193
        if len(prefixes) > 0 {
242✔
1194
                prefix = prefixes[0]
121✔
1195
        }
121✔
1196
        defer func() {
242✔
1197
                glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix)
121✔
1198
                closer.Done()
121✔
1199
        }()
121✔
1200

1201
        listen := func() error {
649✔
1202
                // Connect to any of the group 1 nodes.
528✔
1203
                members := groups().AnyTwoServers(group)
528✔
1204
                // There may be a lag while starting so keep retrying.
528✔
1205
                if len(members) == 0 {
881✔
1206
                        return fmt.Errorf("unable to find any servers for group: %d", group)
353✔
1207
                }
353✔
1208
                pool := conn.GetPools().Connect(members[0], x.WorkerConfig.TLSClientConfig)
175✔
1209
                client := pb.NewWorkerClient(pool.Get())
175✔
1210

175✔
1211
                // Get Subscriber stream.
175✔
1212
                stream, err := client.Subscribe(closer.Ctx(),
175✔
1213
                        &pb.SubscriptionRequest{Matches: x.PrefixesToMatches(prefixes, ignore)})
175✔
1214
                if err != nil {
218✔
1215
                        return errors.Wrapf(err, "error from client.subscribe")
43✔
1216
                }
43✔
1217
                for {
15,797✔
1218
                        // Listen for updates.
15,665✔
1219
                        kvs, err := stream.Recv()
15,665✔
1220
                        if err != nil {
15,797✔
1221
                                return errors.Wrapf(err, "while receiving from stream")
132✔
1222
                        }
132✔
1223
                        cb(kvs)
15,533✔
1224
                }
1225
        }
1226

1227
        for {
649✔
1228
                if err := listen(); err != nil {
1,056✔
1229
                        glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n",
528✔
1230
                                prefix, err, closer.Ctx().Err())
528✔
1231
                }
528✔
1232
                if closer.Ctx().Err() != nil {
649✔
1233
                        return
121✔
1234
                }
121✔
1235
                time.Sleep(time.Second)
407✔
1236
        }
1237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc