• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5228265546

10 Jun 2023 05:08AM UTC coverage: 67.303% (+0.07%) from 67.23%
5228265546

push

web-flow
Merge dba1461bb into ab3769797

16 of 16 new or added lines in 2 files covered. (100.0%)

58428 of 86814 relevant lines covered (67.3%)

2257715.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
/worker/groups.go
1
/*
2
 * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "context"
21
        "fmt"
22
        "io"
23
        "sort"
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/golang/glog"
29
        "github.com/golang/protobuf/proto"
30
        "github.com/pkg/errors"
31

32
        badgerpb "github.com/dgraph-io/badger/v4/pb"
33
        "github.com/dgraph-io/dgo/v230/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/ee/enc"
36
        "github.com/dgraph-io/dgraph/protos/pb"
37
        "github.com/dgraph-io/dgraph/raftwal"
38
        "github.com/dgraph-io/dgraph/schema"
39
        "github.com/dgraph-io/dgraph/x"
40
        "github.com/dgraph-io/ristretto/z"
41
)
42

43
type groupi struct {
44
        x.SafeMutex
45
        state        *pb.MembershipState
46
        Node         *node
47
        gid          uint32
48
        tablets      map[string]*pb.Tablet
49
        triggerCh    chan struct{} // Used to trigger membership sync
50
        blockDeletes *sync.Mutex   // Ensure that deletion won't happen when move is going on.
51
        closer       *z.Closer
52

53
        // Group checksum is used to determine if the tablets served by the groups have changed from
54
        // the membership information that the Alpha has. If so, Alpha cannot service a read.
55
        deltaChecksum      uint64 // Checksum received by OracleDelta.
56
        membershipChecksum uint64 // Checksum received by MembershipState.
57
}
58

59
var gr = &groupi{
60
        blockDeletes: new(sync.Mutex),
61
        tablets:      make(map[string]*pb.Tablet),
62
        closer:       z.NewCloser(3), // Match CLOSER:1 in this file.
63
}
64

65
func groups() *groupi {
3,192,342✔
66
        return gr
3,192,342✔
67
}
3,192,342✔
68

69
// StartRaftNodes will read the WAL dir, create the RAFT groups,
70
// and either start or restart RAFT nodes.
71
// This function triggers RAFT nodes to be created, and is the entrance to the RAFT
72
// world from main.go.
73
func StartRaftNodes(walStore *raftwal.DiskStorage, bindall bool) {
91✔
74
        if x.WorkerConfig.MyAddr == "" {
91✔
75
                x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort())
×
76
        } else {
91✔
77
                // check if address is valid or not
91✔
78
                x.Check(x.ValidateAddress(x.WorkerConfig.MyAddr))
91✔
79
                if !bindall {
91✔
80
                        glog.Errorln("--my flag is provided without bindall, Did you forget to specify bindall?")
×
81
                }
×
82
        }
83

84
        x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
91✔
85
        for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
228✔
86
                x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
137✔
87
                        "Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.",
137✔
88
                        zeroAddr, x.WorkerConfig.MyAddr)
137✔
89
        }
137✔
90

91
        raftIdx := x.WorkerConfig.Raft.GetUint64("idx")
91✔
92
        if raftIdx == 0 {
148✔
93
                raftIdx = walStore.Uint(raftwal.RaftId)
57✔
94

57✔
95
                // If the w directory already contains raft information, ignore the proposed
57✔
96
                // group ID stored inside the p directory.
57✔
97
                if raftIdx > 0 {
57✔
98
                        x.WorkerConfig.ProposedGroupId = 0
×
99
                }
×
100
        }
101
        glog.Infof("Current Raft Id: %#x\n", raftIdx)
91✔
102

91✔
103
        if x.WorkerConfig.ProposedGroupId == 0 {
180✔
104
                x.WorkerConfig.ProposedGroupId = x.WorkerConfig.Raft.GetUint32("group")
89✔
105
        }
89✔
106
        // Successfully connect with dgraphzero, before doing anything else.
107
        // Connect with Zero leader and figure out what group we should belong to.
108
        m := &pb.Member{
91✔
109
                Id:      raftIdx,
91✔
110
                GroupId: x.WorkerConfig.ProposedGroupId,
91✔
111
                Addr:    x.WorkerConfig.MyAddr,
91✔
112
                Learner: x.WorkerConfig.Raft.GetBool("learner"),
91✔
113
        }
91✔
114
        if m.GroupId > 0 {
114✔
115
                m.ForceGroupId = true
23✔
116
        }
23✔
117
        glog.Infof("Sending member request to Zero: %+v\n", m)
91✔
118
        var connState *pb.ConnectionState
91✔
119
        var err error
91✔
120

91✔
121
        for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
205✔
122
                pl := gr.connToZeroLeader()
114✔
123
                if pl == nil {
114✔
124
                        continue
×
125
                }
126
                zc := pb.NewZeroClient(pl.Get())
114✔
127
                connState, err = zc.Connect(gr.Ctx(), m)
114✔
128
                if err == nil || x.ShouldCrash(err) {
205✔
129
                        break
91✔
130
                }
131
        }
132
        x.CheckfNoTrace(err)
91✔
133
        if connState.GetMember() == nil || connState.GetState() == nil {
91✔
134
                x.Fatalf("Unable to join cluster via dgraphzero")
×
135
        }
×
136
        glog.Infof("Connected to group zero. Assigned group: %+v\n", connState.GetMember().GetGroupId())
91✔
137
        raftIdx = connState.GetMember().GetId()
91✔
138
        glog.Infof("Raft Id after connection to Zero: %#x\n", raftIdx)
91✔
139

91✔
140
        // This timestamp would be used for reading during snapshot after bulk load.
91✔
141
        // The stream is async, we need this information before we start or else replica might
91✔
142
        // not get any data.
91✔
143
        gr.applyState(raftIdx, connState.GetState())
91✔
144

91✔
145
        gid := gr.groupId()
91✔
146
        gr.triggerCh = make(chan struct{}, 1)
91✔
147

91✔
148
        // Initialize DiskStorage and pass it along.
91✔
149
        walStore.SetUint(raftwal.RaftId, raftIdx)
91✔
150
        walStore.SetUint(raftwal.GroupId, uint64(gid))
91✔
151

91✔
152
        gr.Node = newNode(walStore, gid, raftIdx, x.WorkerConfig.MyAddr)
91✔
153

91✔
154
        x.Checkf(schema.LoadFromDb(context.Background()), "Error while initializing schema")
91✔
155
        glog.Infof("Load schema from DB: OK")
91✔
156
        raftServer.UpdateNode(gr.Node.Node)
91✔
157
        gr.Node.InitAndStartNode()
91✔
158
        glog.Infof("Init and start Raft node: OK")
91✔
159

91✔
160
        go gr.sendMembershipUpdates()
91✔
161
        go gr.receiveMembershipUpdates()
91✔
162
        go gr.processOracleDeltaStream()
91✔
163

91✔
164
        gr.informZeroAboutTablets()
91✔
165
        glog.Infof("Informed Zero about tablets I have: OK")
91✔
166
        gr.applyInitialSchema()
91✔
167
        gr.applyInitialTypes()
91✔
168
        glog.Infof("Upserted Schema and Types: OK")
91✔
169

91✔
170
        x.UpdateHealthStatus(true)
91✔
171
        glog.Infof("Server is ready: OK")
91✔
172
}
173

174
func (g *groupi) Ctx() context.Context {
145,942✔
175
        return g.closer.Ctx()
145,942✔
176
}
145,942✔
177

178
func (g *groupi) IsClosed() bool {
279✔
179
        return g.closer.Ctx().Err() != nil
279✔
180
}
279✔
181

182
func (g *groupi) informZeroAboutTablets() {
91✔
183
        // Before we start this Alpha, let's pick up all the predicates we have in our postings
91✔
184
        // directory, and ask Zero if we are allowed to serve it. Do this irrespective of whether
91✔
185
        // this node is the leader or the follower, because this early on, we might not have
91✔
186
        // figured that out.
91✔
187
        ticker := time.NewTicker(time.Second)
91✔
188
        defer ticker.Stop()
91✔
189

91✔
190
        for range ticker.C {
182✔
191
                preds := schema.State().Predicates()
91✔
192
                if _, err := g.Inform(preds); err != nil {
91✔
193
                        glog.Errorf("Error while getting tablet for preds %v", err)
×
194
                } else {
91✔
195
                        glog.V(1).Infof("Done informing Zero about the %d tablets I have", len(preds))
91✔
196
                        return
91✔
197
                }
91✔
198
        }
199
}
200

201
func (g *groupi) applyInitialTypes() {
91✔
202
        initialTypes := schema.InitialTypes(x.GalaxyNamespace)
91✔
203
        for _, t := range initialTypes {
357✔
204
                if _, ok := schema.State().GetType(t.TypeName); ok {
268✔
205
                        continue
2✔
206
                }
207
                // It is okay to write initial types at ts=1.
208
                if err := updateType(t.GetTypeName(), *t, 1); err != nil {
264✔
209
                        glog.Errorf("Error while applying initial type: %s", err)
×
210
                }
×
211
        }
212
}
213

214
func (g *groupi) applyInitialSchema() {
91✔
215
        if g.groupId() != 1 {
121✔
216
                return
30✔
217
        }
30✔
218
        initialSchema := schema.InitialSchema(x.GalaxyNamespace)
61✔
219
        ctx := g.Ctx()
61✔
220

61✔
221
        apply := func(s *pb.SchemaUpdate) {
459✔
222
                // There are 2 cases: either the alpha is fresh or it restarted. If it is fresh cluster
398✔
223
                // then we can write the schema at ts=1. If alpha restarted, then we will already have the
398✔
224
                // schema at higher version and this operation will be a no-op.
398✔
225
                if err := applySchema(s, 1); err != nil {
398✔
226
                        glog.Errorf("Error while applying initial schema: %s", err)
×
227
                }
×
228
        }
229

230
        for _, s := range initialSchema {
474✔
231
                if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil {
413✔
232
                        glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.",
×
233
                                s.Predicate)
×
234
                        apply(s)
×
235
                } else if gid == 0 {
717✔
236
                        // The tablet is not being served currently.
304✔
237
                        apply(s)
304✔
238
                } else if curr, _ := schema.State().Get(ctx, s.Predicate); gid == g.groupId() &&
413✔
239
                        !proto.Equal(s, &curr) {
203✔
240
                        // If this tablet is served to the group, do not upsert the schema unless the
94✔
241
                        // stored schema and the proposed one are different.
94✔
242
                        apply(s)
94✔
243
                } else {
109✔
244
                        // The schema for this predicate has already been proposed.
15✔
245
                        glog.V(1).Infof("Schema found for predicate %s: %+v", s.Predicate, curr)
15✔
246
                        continue
15✔
247
                }
248
        }
249
}
250

251
func applySchema(s *pb.SchemaUpdate, ts uint64) error {
8,922✔
252
        if err := updateSchema(s, ts); err != nil {
8,922✔
253
                return err
×
254
        }
×
255
        if servesTablet, err := groups().ServesTablet(s.Predicate); err != nil {
8,922✔
256
                return err
×
257
        } else if !servesTablet {
8,922✔
258
                return errors.Errorf("group 1 should always serve reserved predicate %s", s.Predicate)
×
259
        }
×
260
        return nil
8,922✔
261
}
262

263
// No locks are acquired while accessing this function.
264
// Don't acquire RW lock during this, otherwise we might deadlock.
265
func (g *groupi) groupId() uint32 {
1,133,471✔
266
        return atomic.LoadUint32(&g.gid)
1,133,471✔
267
}
1,133,471✔
268

269
// MaxLeaseId returns the maximum UID that has been leased.
270
func MaxLeaseId() uint64 {
342,333✔
271
        g := groups()
342,333✔
272
        g.RLock()
342,333✔
273
        defer g.RUnlock()
342,333✔
274
        if g.state == nil {
342,333✔
275
                return 0
×
276
        }
×
277
        return g.state.MaxUID
342,333✔
278
}
279

280
// GetMembershipState returns the current membership state.
281
func GetMembershipState() *pb.MembershipState {
816✔
282
        g := groups()
816✔
283
        g.RLock()
816✔
284
        defer g.RUnlock()
816✔
285
        return proto.Clone(g.state).(*pb.MembershipState)
816✔
286
}
816✔
287

288
// UpdateMembershipState contacts zero for an update on membership state.
289
func UpdateMembershipState(ctx context.Context) error {
206✔
290
        g := groups()
206✔
291
        p := g.Leader(0)
206✔
292
        if p == nil {
206✔
293
                return errors.Errorf("don't have the address of any dgraph zero leader")
×
294
        }
×
295

296
        c := pb.NewZeroClient(p.Get())
206✔
297
        state, err := c.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
206✔
298
        if err != nil {
206✔
299
                return err
×
300
        }
×
301
        g.applyState(g.Node.Id, state.GetState())
206✔
302
        return nil
206✔
303
}
304

305
func (g *groupi) applyState(myId uint64, state *pb.MembershipState) {
21,410✔
306
        x.AssertTrue(state != nil)
21,410✔
307
        g.Lock()
21,410✔
308
        defer g.Unlock()
21,410✔
309
        // We don't update state if we get any old state. Counter stores the raftindex of
21,410✔
310
        // last update. For leader changes at zero since we don't propose, state can get
21,410✔
311
        // updated at same counter value. So ignore only if counter is less.
21,410✔
312
        if g.state != nil && g.state.Counter > state.Counter {
21,410✔
313
                return
×
314
        }
×
315

316
        invalid := state.License != nil && !state.License.Enabled
21,410✔
317
        if g.Node != nil && g.Node.RaftContext.IsLearner && invalid {
21,410✔
318
                glog.Errorf("ENTERPRISE_ONLY_LEARNER: License Expired. Cannot run learner nodes.")
×
319
                x.ServerCloser.Signal()
×
320
                return
×
321
        }
×
322

323
        oldState := g.state
21,410✔
324
        g.state = state
21,410✔
325

21,410✔
326
        // Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues.
21,410✔
327
        var foundSelf bool
21,410✔
328
        g.tablets = make(map[string]*pb.Tablet)
21,410✔
329
        for gid, group := range g.state.Groups {
63,318✔
330
                for _, member := range group.Members {
158,102✔
331
                        if myId == member.Id {
137,604✔
332
                                foundSelf = true
21,410✔
333
                                atomic.StoreUint32(&g.gid, gid)
21,410✔
334
                        }
21,410✔
335
                        if x.WorkerConfig.MyAddr != member.Addr {
210,978✔
336
                                conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
94,784✔
337
                        }
94,784✔
338
                }
339
                for _, tablet := range group.Tablets {
1,586,076✔
340
                        g.tablets[tablet.Predicate] = tablet
1,544,168✔
341
                }
1,544,168✔
342
                if gid == g.groupId() {
63,318✔
343
                        glog.V(3).Infof("group %d checksum: %d", g.groupId(), group.Checksum)
21,410✔
344
                        atomic.StoreUint64(&g.membershipChecksum, group.Checksum)
21,410✔
345
                }
21,410✔
346
        }
347
        for _, member := range g.state.Zeros {
78,664✔
348
                if x.WorkerConfig.MyAddr != member.Addr {
114,508✔
349
                        conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
57,254✔
350
                }
57,254✔
351
        }
352
        if !foundSelf {
21,410✔
353
                // I'm not part of this cluster. I should crash myself.
×
354
                glog.Fatalf("Unable to find myself [id:%d group:%d] in membership state: %+v. Goodbye!",
×
355
                        myId, g.groupId(), state)
×
356
        }
×
357

358
        // While restarting we fill Node information after retrieving initial state.
359
        if g.Node != nil {
42,729✔
360
                // Lets have this block before the one that adds the new members, else we may end up
21,319✔
361
                // removing a freshly added node.
21,319✔
362

21,319✔
363
                for _, member := range g.state.GetRemoved() {
21,319✔
364
                        // TODO: This leader check can be done once instead of repeatedly.
×
365
                        if member.GetGroupId() == g.Node.gid && g.Node.AmLeader() {
×
366
                                member := member // capture range variable
×
367
                                go func() {
×
368
                                        // Don't try to remove a member if it's already marked as removed in
×
369
                                        // the membership state and is not a current peer of the node.
×
370
                                        _, isPeer := g.Node.Peer(member.GetId())
×
371
                                        // isPeer should only be true if the removed node is not the same as this node.
×
372
                                        isPeer = isPeer && member.GetId() != g.Node.RaftContext.Id
×
373

×
374
                                        for _, oldMember := range oldState.GetRemoved() {
×
375
                                                if oldMember.GetId() == member.GetId() && !isPeer {
×
376
                                                        return
×
377
                                                }
×
378
                                        }
379

380
                                        if err := g.Node.ProposePeerRemoval(g.Ctx(), member.GetId()); err != nil {
×
381
                                                glog.Errorf("Error while proposing node removal: %+v", err)
×
382
                                        }
×
383
                                }()
384
                        }
385
                }
386
                conn.GetPools().RemoveInvalid(g.state)
21,319✔
387
        }
388
}
389

390
func (g *groupi) ServesGroup(gid uint32) bool {
173,485✔
391
        return g.groupId() == gid
173,485✔
392
}
173,485✔
393

394
func (g *groupi) ChecksumsMatch(ctx context.Context) error {
84,010✔
395
        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
167,214✔
396
                return nil
83,204✔
397
        }
83,204✔
398
        t := time.NewTicker(100 * time.Millisecond)
806✔
399
        defer t.Stop()
806✔
400
        for {
6,905✔
401
                select {
6,099✔
402
                case <-t.C:
6,091✔
403
                        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
6,889✔
404
                                return nil
798✔
405
                        }
798✔
406
                case <-ctx.Done():
8✔
407
                        return errors.Errorf("Group checksum mismatch for id: %d", g.groupId())
8✔
408
                }
409
        }
410
}
411

412
func (g *groupi) BelongsTo(key string) (uint32, error) {
269,785✔
413
        if tablet, err := g.Tablet(key); err != nil {
269,785✔
414
                return 0, err
×
415
        } else if tablet != nil {
539,570✔
416
                return tablet.GroupId, nil
269,785✔
417
        }
269,785✔
418
        return 0, nil
×
419
}
420

421
// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
422
// the tablet for key if no group is currently serving it.
423
// The ts passed should be the start ts of the query, so this method can compare that against a
424
// tablet move timestamp. If the tablet was moved to this group after the start ts of the query, we
425
// should reject that query.
426
func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) {
1,037,456✔
427
        g.RLock()
1,037,456✔
428
        tablet := g.tablets[key]
1,037,456✔
429
        g.RUnlock()
1,037,456✔
430
        if tablet != nil {
2,073,637✔
431
                if ts > 0 && ts < tablet.MoveTs {
1,036,181✔
432
                        return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
433
                                ts, tablet.MoveTs, key)
×
434
                }
×
435
                return tablet.GetGroupId(), nil
1,036,181✔
436
        }
437

438
        // We don't know about this tablet. Talk to dgraphzero to find out who is
439
        // serving this tablet.
440
        pl := g.connToZeroLeader()
1,275✔
441
        zc := pb.NewZeroClient(pl.Get())
1,275✔
442

1,275✔
443
        tablet = &pb.Tablet{
1,275✔
444
                Predicate: key,
1,275✔
445
                ReadOnly:  true,
1,275✔
446
        }
1,275✔
447
        out, err := zc.ShouldServe(g.Ctx(), tablet)
1,275✔
448
        if err != nil {
1,275✔
449
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
450
                return 0, err
×
451
        }
×
452
        if out.GetGroupId() == 0 {
2,522✔
453
                return 0, nil
1,247✔
454
        }
1,247✔
455

456
        g.Lock()
28✔
457
        defer g.Unlock()
28✔
458
        g.tablets[key] = out
28✔
459
        if out != nil && ts > 0 && ts < out.MoveTs {
28✔
460
                return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
461
                        ts, out.MoveTs, key)
×
462
        }
×
463
        return out.GetGroupId(), nil
28✔
464
}
465

466
func (g *groupi) ServesTablet(key string) (bool, error) {
9,381✔
467
        if tablet, err := g.Tablet(key); err != nil {
9,381✔
468
                return false, err
×
469
        } else if tablet != nil && tablet.GroupId == groups().groupId() {
18,758✔
470
                return true, nil
9,377✔
471
        }
9,377✔
472
        return false, nil
4✔
473
}
474

475
func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) {
7,654✔
476
        pl := g.connToZeroLeader()
7,654✔
477
        zc := pb.NewZeroClient(pl.Get())
7,654✔
478

7,654✔
479
        out, err := zc.ShouldServe(g.Ctx(), tablet)
7,654✔
480
        if err != nil {
7,654✔
481
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
482
                return nil, err
×
483
        }
×
484

485
        // Do not store tablets with group ID 0, as they are just dummy tablets for
486
        // predicates that do no exist.
487
        if out.GroupId > 0 {
15,308✔
488
                g.Lock()
7,654✔
489
                g.tablets[out.GetPredicate()] = out
7,654✔
490
                g.Unlock()
7,654✔
491
        }
7,654✔
492

493
        if out.GroupId == groups().groupId() {
15,235✔
494
                glog.Infof("Serving tablet for: %v\n", tablet.GetPredicate())
7,581✔
495
        }
7,581✔
496
        return out, nil
7,654✔
497
}
498

499
func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) {
91✔
500
        unknownPreds := make([]*pb.Tablet, 0)
91✔
501
        tablets := make([]*pb.Tablet, 0)
91✔
502
        g.RLock()
91✔
503
        for _, p := range preds {
189✔
504
                if len(p) == 0 {
98✔
505
                        continue
×
506
                }
507

508
                if tab, ok := g.tablets[p]; !ok {
188✔
509
                        unknownPreds = append(unknownPreds, &pb.Tablet{GroupId: g.groupId(), Predicate: p})
90✔
510
                } else {
98✔
511
                        tablets = append(tablets, tab)
8✔
512
                }
8✔
513
        }
514
        g.RUnlock()
91✔
515

91✔
516
        if len(unknownPreds) == 0 {
180✔
517
                return nil, nil
89✔
518
        }
89✔
519

520
        pl := g.connToZeroLeader()
2✔
521
        zc := pb.NewZeroClient(pl.Get())
2✔
522
        out, err := zc.Inform(g.Ctx(), &pb.TabletRequest{
2✔
523
                Tablets: unknownPreds,
2✔
524
                GroupId: g.groupId(),
2✔
525
        })
2✔
526
        if err != nil {
2✔
527
                glog.Errorf("Error while Inform grpc call %v", err)
×
528
                return nil, err
×
529
        }
×
530

531
        // Do not store tablets with group ID 0, as they are just dummy tablets for
532
        // predicates that do no exist.
533
        g.Lock()
2✔
534
        for _, t := range out.Tablets {
92✔
535
                if t.GroupId > 0 {
180✔
536
                        g.tablets[t.GetPredicate()] = t
90✔
537
                        tablets = append(tablets, t)
90✔
538
                }
90✔
539

540
                if t.GroupId == groups().groupId() {
180✔
541
                        glog.Infof("Serving tablet for: %v\n", t.GetPredicate())
90✔
542
                }
90✔
543
        }
544
        g.Unlock()
2✔
545
        return tablets, nil
2✔
546
}
547

548
// Do not modify the returned Tablet
549
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
557,643✔
550
        // TODO: Remove all this later, create a membership state and apply it
557,643✔
551
        g.RLock()
557,643✔
552
        tablet, ok := g.tablets[key]
557,643✔
553
        g.RUnlock()
557,643✔
554
        if ok {
1,111,666✔
555
                return tablet, nil
554,023✔
556
        }
554,023✔
557

558
        // We don't know about this tablet.
559
        // Check with dgraphzero if we can serve it.
560
        tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key}
3,620✔
561
        return g.sendTablet(tablet)
3,620✔
562
}
563

564
func (g *groupi) ForceTablet(key string) (*pb.Tablet, error) {
4,034✔
565
        return g.sendTablet(&pb.Tablet{GroupId: g.groupId(), Predicate: key, Force: true})
4,034✔
566
}
4,034✔
567

568
func (g *groupi) HasMeInState() bool {
×
569
        g.RLock()
×
570
        defer g.RUnlock()
×
571
        if g.state == nil {
×
572
                return false
×
573
        }
×
574

575
        group, has := g.state.Groups[g.groupId()]
×
576
        if !has {
×
577
                return false
×
578
        }
×
579
        _, has = group.Members[g.Node.Id]
×
580
        return has
×
581
}
582

583
// Returns 0, 1, or 2 valid server addrs.
584
func (g *groupi) AnyTwoServers(gid uint32) []string {
19,309✔
585
        g.RLock()
19,309✔
586
        defer g.RUnlock()
19,309✔
587

19,309✔
588
        if g.state == nil {
19,654✔
589
                return []string{}
345✔
590
        }
345✔
591
        group, has := g.state.Groups[gid]
18,964✔
592
        if !has {
18,964✔
593
                return []string{}
×
594
        }
×
595
        var res []string
18,964✔
596
        for _, m := range group.Members {
55,736✔
597
                // map iteration gives us members in no particular order.
36,772✔
598
                res = append(res, m.Addr)
36,772✔
599
                if len(res) >= 2 {
54,580✔
600
                        break
17,808✔
601
                }
602
        }
603
        return res
18,964✔
604
}
605

606
func (g *groupi) members(gid uint32) map[uint64]*pb.Member {
120,108✔
607
        g.RLock()
120,108✔
608
        defer g.RUnlock()
120,108✔
609

120,108✔
610
        if g.state == nil {
120,501✔
611
                return nil
393✔
612
        }
393✔
613
        if gid == 0 {
221,264✔
614
                return g.state.Zeros
101,549✔
615
        }
101,549✔
616
        group, has := g.state.Groups[gid]
18,166✔
617
        if !has {
18,166✔
618
                return nil
×
619
        }
×
620
        return group.Members
18,166✔
621
}
622

623
func (g *groupi) AnyServer(gid uint32) *conn.Pool {
273✔
624
        members := g.members(gid)
273✔
625
        for _, m := range members {
330✔
626
                pl, err := conn.GetPools().Get(m.Addr)
57✔
627
                if err == nil {
114✔
628
                        return pl
57✔
629
                }
57✔
630
        }
631
        return nil
216✔
632
}
633

634
func (g *groupi) MyPeer() (uint64, bool) {
272✔
635
        members := g.members(g.groupId())
272✔
636
        for _, m := range members {
584✔
637
                if m.Id != g.Node.Id {
391✔
638
                        return m.Id, true
79✔
639
                }
79✔
640
        }
641
        return 0, false
193✔
642
}
643

644
// Leader will try to return the leader of a given group, based on membership information.
645
// There is currently no guarantee that the returned server is the leader of the group.
646
func (g *groupi) Leader(gid uint32) *conn.Pool {
119,562✔
647
        members := g.members(gid)
119,562✔
648
        if members == nil {
119,739✔
649
                return nil
177✔
650
        }
177✔
651
        for _, m := range members {
305,499✔
652
                if m.Leader {
305,481✔
653
                        if pl, err := conn.GetPools().Get(m.Addr); err == nil {
238,733✔
654
                                return pl
119,366✔
655
                        }
119,366✔
656
                }
657
        }
658
        return nil
19✔
659
}
660

661
func (g *groupi) KnownGroups() (gids []uint32) {
18,288✔
662
        g.RLock()
18,288✔
663
        defer g.RUnlock()
18,288✔
664
        if g.state == nil {
18,288✔
665
                return
×
666
        }
×
667
        for gid := range g.state.Groups {
53,673✔
668
                gids = append(gids, gid)
35,385✔
669
        }
35,385✔
670
        return
18,288✔
671
}
672

673
// KnownGroups returns the known groups using the global groupi instance.
674
func KnownGroups() []uint32 {
×
675
        return groups().KnownGroups()
×
676
}
×
677

678
// GroupId returns the group to which this worker belongs to.
679
func GroupId() uint32 {
137✔
680
        return groups().groupId()
137✔
681
}
137✔
682

683
// NodeId returns the raft id of the node.
684
func NodeId() uint64 {
2✔
685
        return groups().Node.Id
2✔
686
}
2✔
687

688
func (g *groupi) triggerMembershipSync() {
2,233✔
689
        // It's ok if we miss the trigger, periodic membership sync runs every minute.
2,233✔
690
        select {
2,233✔
691
        case g.triggerCh <- struct{}{}:
2,151✔
692
        // It's ok to ignore it, since we would be sending update of a later state
693
        default:
82✔
694
        }
695
}
696

697
const connBaseDelay = 100 * time.Millisecond
698

699
func (g *groupi) connToZeroLeader() *conn.Pool {
75,872✔
700
        pl := g.Leader(0)
75,872✔
701
        if pl != nil {
151,567✔
702
                return pl
75,695✔
703
        }
75,695✔
704
        glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...")
177✔
705

177✔
706
        getLeaderConn := func(zc pb.ZeroClient) *conn.Pool {
393✔
707
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
216✔
708
                defer cancel()
216✔
709

216✔
710
                connState, err := zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
216✔
711
                if err != nil || connState == nil {
255✔
712
                        glog.V(1).Infof("While retrieving Zero leader info. Error: %v. Retrying...", err)
39✔
713
                        return nil
39✔
714
                }
39✔
715
                for _, mz := range connState.State.GetZeros() {
417✔
716
                        if mz.Leader {
417✔
717
                                return conn.GetPools().Connect(mz.GetAddr(), x.WorkerConfig.TLSClientConfig)
177✔
718
                        }
177✔
719
                }
720
                return nil
×
721
        }
722

723
        // No leader found. Let's get the latest membership state from Zero.
724
        delay := connBaseDelay
177✔
725
        maxHalfDelay := time.Second
177✔
726
        for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
393✔
727
                if g.IsClosed() {
216✔
728
                        return nil
×
729
                }
×
730

731
                time.Sleep(delay)
216✔
732
                if delay <= maxHalfDelay {
432✔
733
                        delay *= 2
216✔
734
                }
216✔
735

736
                zAddrList := x.WorkerConfig.ZeroAddr
216✔
737
                // Pick addresses in round robin manner.
216✔
738
                addr := zAddrList[i%len(zAddrList)]
216✔
739

216✔
740
                pl := g.AnyServer(0)
216✔
741
                if pl == nil {
432✔
742
                        pl = conn.GetPools().Connect(addr, x.WorkerConfig.TLSClientConfig)
216✔
743
                }
216✔
744
                if pl == nil {
216✔
745
                        glog.V(1).Infof("No healthy Zero server found. Retrying...")
×
746
                        continue
×
747
                }
748
                zc := pb.NewZeroClient(pl.Get())
216✔
749
                if pl := getLeaderConn(zc); pl != nil {
393✔
750
                        glog.V(1).Infof("Found connection to leader: %s", pl.Addr)
177✔
751
                        return pl
177✔
752
                }
177✔
753
                glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...")
39✔
754
        }
755
}
756

757
func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error {
2,143✔
758
        leader := g.Node.AmLeader()
2,143✔
759
        member := &pb.Member{
2,143✔
760
                Id:         g.Node.Id,
2,143✔
761
                GroupId:    g.groupId(),
2,143✔
762
                Addr:       x.WorkerConfig.MyAddr,
2,143✔
763
                Leader:     leader,
2,143✔
764
                LastUpdate: uint64(time.Now().Unix()),
2,143✔
765
        }
2,143✔
766
        group := &pb.Group{
2,143✔
767
                Members: make(map[uint64]*pb.Member),
2,143✔
768
        }
2,143✔
769
        group.Members[member.Id] = member
2,143✔
770
        if leader {
2,977✔
771
                // Do not send tablet information, if I'm not the leader.
834✔
772
                group.Tablets = tablets
834✔
773
                if snap, err := g.Node.Snapshot(); err == nil {
1,668✔
774
                        group.SnapshotTs = snap.ReadTs
834✔
775
                }
834✔
776
                group.CheckpointTs = atomic.LoadUint64(&g.Node.checkpointTs)
834✔
777
        }
778

779
        pl := g.connToZeroLeader()
2,143✔
780
        if pl == nil {
2,143✔
781
                return errNoConnection
×
782
        }
×
783
        c := pb.NewZeroClient(pl.Get())
2,143✔
784
        ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
2,143✔
785
        defer cancel()
2,143✔
786
        reply, err := c.UpdateMembership(ctx, group)
2,143✔
787
        if err != nil {
2,150✔
788
                return err
7✔
789
        }
7✔
790
        if string(reply.GetData()) == "OK" {
4,272✔
791
                return nil
2,136✔
792
        }
2,136✔
793
        return errors.Errorf(string(reply.GetData()))
×
794
}
795

796
// sendMembershipUpdates sends the membership update to Zero leader. If this Alpha is the leader, it
797
// would also calculate the tablet sizes and send them to Zero.
798
func (g *groupi) sendMembershipUpdates() {
91✔
799
        defer func() {
182✔
800
                glog.Infoln("Closing sendMembershipUpdates")
91✔
801
                g.closer.Done() // CLOSER:1
91✔
802
        }()
91✔
803

804
        ticker := time.NewTicker(time.Second)
91✔
805
        defer ticker.Stop()
91✔
806

91✔
807
        consumeTriggers := func() {
2,234✔
808
                for {
4,293✔
809
                        select {
2,150✔
810
                        case <-g.triggerCh:
7✔
811
                        default:
2,143✔
812
                                return
2,143✔
813
                        }
814
                }
815
        }
816

817
        g.triggerMembershipSync() // Ticker doesn't start immediately
91✔
818
        var lastSent time.Time
91✔
819
        for {
23,368✔
820
                select {
23,277✔
821
                case <-g.closer.HasBeenClosed():
91✔
822
                        return
91✔
823
                case <-ticker.C:
21,043✔
824
                        if time.Since(lastSent) > 10*time.Second {
22,934✔
825
                                // On start of node if it becomes a leader, we would send tablets size for sure.
1,891✔
826
                                g.triggerMembershipSync()
1,891✔
827
                        }
1,891✔
828
                case <-g.triggerCh:
2,143✔
829
                        // Let's send update even if not leader, zero will know that this node is still active.
2,143✔
830
                        // We don't need to send tablet information everytime. So, let's only send it when we
2,143✔
831
                        // calculate it.
2,143✔
832
                        consumeTriggers()
2,143✔
833
                        if err := g.doSendMembership(nil); err != nil {
2,150✔
834
                                glog.Errorf("While sending membership update: %v", err)
7✔
835
                        } else {
2,143✔
836
                                lastSent = time.Now()
2,136✔
837
                        }
2,136✔
838
                }
839
        }
840
}
841

842
// receiveMembershipUpdates receives membership updates from ANY Zero server. This is the main
843
// connection which tells Alpha about the state of the cluster, including the latest Zero leader.
844
// All the other connections to Zero, are made only to the leader.
845
func (g *groupi) receiveMembershipUpdates() {
91✔
846
        defer func() {
182✔
847
                glog.Infoln("Closing receiveMembershipUpdates")
91✔
848
                g.closer.Done() // CLOSER:1
91✔
849
        }()
91✔
850

851
        ticker := time.NewTicker(10 * time.Second)
91✔
852
        defer ticker.Stop()
91✔
853

91✔
854
START:
91✔
855
        select {
91✔
856
        case <-g.closer.HasBeenClosed():
91✔
857
                return
91✔
858
        default:
132✔
859
        }
860

861
        pl := g.connToZeroLeader()
132✔
862
        // We should always have some connection to dgraphzero.
132✔
863
        if pl == nil {
132✔
864
                glog.Warningln("Membership update: No Zero server known.")
×
865
                time.Sleep(time.Second)
×
866
                goto START
×
867
        }
868
        glog.Infof("Got address of a Zero leader: %s", pl.Addr)
132✔
869

132✔
870
        c := pb.NewZeroClient(pl.Get())
132✔
871
        ctx, cancel := context.WithCancel(g.Ctx())
132✔
872
        stream, err := c.StreamMembership(ctx, &api.Payload{})
132✔
873
        if err != nil {
168✔
874
                cancel()
36✔
875
                glog.Errorf("Error while calling update %v\n", err)
36✔
876
                time.Sleep(time.Second)
36✔
877
                goto START
36✔
878
        }
879

880
        stateCh := make(chan *pb.MembershipState, 10)
96✔
881
        go func() {
192✔
882
                glog.Infof("Starting a new membership stream receive from %s.", pl.Addr)
96✔
883
                for i := 0; ; i++ {
21,305✔
884
                        // Blocking, should return if sending on stream fails(Need to verify).
21,209✔
885
                        state, err := stream.Recv()
21,209✔
886
                        if err != nil || state == nil {
21,305✔
887
                                if err == io.EOF {
96✔
888
                                        glog.Infoln("Membership sync stream closed.")
×
889
                                } else {
96✔
890
                                        glog.Errorf("Unable to sync memberships. Error: %v. State: %v", err, state)
96✔
891
                                }
96✔
892
                                // If zero server is lagging behind leader.
893
                                if ctx.Err() == nil {
126✔
894
                                        cancel()
30✔
895
                                }
30✔
896
                                return
96✔
897
                        }
898
                        if i == 0 {
21,204✔
899
                                glog.Infof("Received first state update from Zero: %+v", state)
91✔
900
                                x.WriteCidFile(state.Cid)
91✔
901
                        }
91✔
902
                        select {
21,113✔
903
                        case stateCh <- state:
21,113✔
904
                        case <-ctx.Done():
×
905
                                return
×
906
                        }
907
                }
908
        }()
909

910
        lastRecv := time.Now()
96✔
911
OUTER:
96✔
912
        for {
23,369✔
913
                select {
23,273✔
914
                case <-g.closer.HasBeenClosed():
66✔
915
                        if err := stream.CloseSend(); err != nil {
66✔
916
                                glog.Errorf("Error closing send stream: %+v", err)
×
917
                        }
×
918
                        break OUTER
66✔
919
                case <-ctx.Done():
30✔
920
                        if err := stream.CloseSend(); err != nil {
30✔
921
                                glog.Errorf("Error closing send stream: %+v", err)
×
922
                        }
×
923
                        break OUTER
30✔
924
                case state := <-stateCh:
21,113✔
925
                        lastRecv = time.Now()
21,113✔
926
                        g.applyState(g.Node.Id, state)
21,113✔
927
                case <-ticker.C:
2,064✔
928
                        if time.Since(lastRecv) > 10*time.Second {
2,064✔
929
                                // Zero might have gone under partition. We should recreate our connection.
×
930
                                glog.Warningf("No membership update for 10s. Closing connection to Zero.")
×
931
                                if err := stream.CloseSend(); err != nil {
×
932
                                        glog.Errorf("Error closing send stream: %+v", err)
×
933
                                }
×
934
                                break OUTER
×
935
                        }
936
                }
937
        }
938
        cancel()
96✔
939
        goto START
96✔
940
}
941

942
// processOracleDeltaStream is used to process oracle delta stream from Zero.
943
// Zero sends information about aborted/committed transactions and maxPending.
944
func (g *groupi) processOracleDeltaStream() {
91✔
945
        defer func() {
182✔
946
                glog.Infoln("Closing processOracleDeltaStream")
91✔
947
                g.closer.Done() // CLOSER:1
91✔
948
        }()
91✔
949

950
        ticker := time.NewTicker(time.Second)
91✔
951
        defer ticker.Stop()
91✔
952

91✔
953
        blockingReceiveAndPropose := func() {
174✔
954
                glog.Infof("Leader idx=%#x of group=%d is connecting to Zero for txn updates\n",
83✔
955
                        g.Node.Id, g.groupId())
83✔
956

83✔
957
                pl := g.connToZeroLeader()
83✔
958
                if pl == nil {
83✔
959
                        glog.Warningln("Oracle delta stream: No Zero leader known.")
×
960
                        if g.IsClosed() {
×
961
                                return
×
962
                        }
×
963
                        time.Sleep(time.Second)
×
964
                        return
×
965
                }
966
                glog.Infof("Got Zero leader: %s", pl.Addr)
83✔
967

83✔
968
                // The following code creates a stream. Then runs a goroutine to pick up events from the
83✔
969
                // stream and pushes them to a channel. The main loop loops over the channel, doing smart
83✔
970
                // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of
83✔
971
                // times proposals happen, which is a great optimization to have (and a common one in our
83✔
972
                // code base).
83✔
973
                ctx, cancel := context.WithCancel(g.Ctx())
83✔
974
                defer cancel()
83✔
975

83✔
976
                c := pb.NewZeroClient(pl.Get())
83✔
977
                stream, err := c.Oracle(ctx, &api.Payload{})
83✔
978
                if err != nil {
98✔
979
                        glog.Errorf("Error while calling Oracle %v\n", err)
15✔
980
                        time.Sleep(time.Second)
15✔
981
                        return
15✔
982
                }
15✔
983

984
                deltaCh := make(chan *pb.OracleDelta, 100)
68✔
985
                go func() {
136✔
986
                        // This would exit when either a Recv() returns error. Or, cancel() is called by
68✔
987
                        // something outside of this goroutine.
68✔
988
                        defer func() {
136✔
989
                                if err := stream.CloseSend(); err != nil {
68✔
990
                                        glog.Errorf("Error closing send stream: %+v", err)
×
991
                                }
×
992
                        }()
993
                        defer close(deltaCh)
68✔
994

68✔
995
                        for {
149,056✔
996
                                delta, err := stream.Recv()
148,988✔
997
                                if err != nil || delta == nil {
149,056✔
998
                                        glog.Errorf("Error in oracle delta stream. Error: %v", err)
68✔
999
                                        return
68✔
1000
                                }
68✔
1001

1002
                                select {
148,920✔
1003
                                case deltaCh <- delta:
148,920✔
1004
                                case <-ctx.Done():
×
1005
                                        return
×
1006
                                }
1007
                        }
1008
                }()
1009

1010
                for {
142,650✔
1011
                        var delta *pb.OracleDelta
142,582✔
1012
                        var batch int
142,582✔
1013
                        select {
142,582✔
1014
                        case delta = <-deltaCh:
134,216✔
1015
                                if delta == nil {
134,242✔
1016
                                        return
26✔
1017
                                }
26✔
1018
                                batch++
134,190✔
1019
                        case <-ticker.C:
8,324✔
1020
                                newLead := g.Leader(0)
8,324✔
1021
                                if newLead == nil || newLead.Addr != pl.Addr {
8,324✔
1022
                                        glog.Infof("Zero leadership changed. Renewing oracle delta stream.")
×
1023
                                        return
×
1024
                                }
×
1025
                                continue
8,324✔
1026

1027
                        case <-ctx.Done():
×
1028
                                return
×
1029
                        case <-g.closer.HasBeenClosed():
42✔
1030
                                return
42✔
1031
                        }
1032

1033
                SLURP:
134,190✔
1034
                        for {
283,110✔
1035
                                select {
148,920✔
1036
                                case more := <-deltaCh:
14,730✔
1037
                                        if more == nil {
14,730✔
1038
                                                return
×
1039
                                        }
×
1040
                                        batch++
14,730✔
1041
                                        if delta.GroupChecksums == nil {
14,730✔
1042
                                                delta.GroupChecksums = make(map[uint32]uint64)
×
1043
                                        }
×
1044
                                        delta.Txns = append(delta.Txns, more.Txns...)
14,730✔
1045
                                        delta.MaxAssigned = x.Max(delta.MaxAssigned, more.MaxAssigned)
14,730✔
1046
                                        for gid, checksum := range more.GroupChecksums {
43,103✔
1047
                                                delta.GroupChecksums[gid] = checksum
28,373✔
1048
                                        }
28,373✔
1049
                                default:
134,190✔
1050
                                        break SLURP
134,190✔
1051
                                }
1052
                        }
1053

1054
                        // Only the leader needs to propose the oracleDelta retrieved from Zero.
1055
                        // The leader and the followers would not directly apply or use the
1056
                        // oracleDelta streaming in from Zero. They would wait for the proposal to
1057
                        // go through and be applied via node.Run.  This saves us from many edge
1058
                        // cases around network partitions and race conditions between prewrites and
1059
                        // commits, etc.
1060
                        if !g.Node.AmLeader() {
134,190✔
1061
                                glog.Errorf("No longer the leader of group %d. Exiting", g.groupId())
×
1062
                                return
×
1063
                        }
×
1064

1065
                        // We should always sort the txns before applying. Otherwise, we might lose some of
1066
                        // these updates, because we never write over a new version.
1067
                        sort.Slice(delta.Txns, func(i, j int) bool {
137,812✔
1068
                                return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
3,622✔
1069
                        })
3,622✔
1070
                        if len(delta.Txns) > 0 {
153,442✔
1071
                                last := delta.Txns[len(delta.Txns)-1]
19,252✔
1072
                                // Update MaxAssigned on commit so best effort queries can get back latest data.
19,252✔
1073
                                delta.MaxAssigned = x.Max(delta.MaxAssigned, last.CommitTs)
19,252✔
1074
                        }
19,252✔
1075
                        if glog.V(3) {
138,590✔
1076
                                glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:",
4,400✔
1077
                                        batch, delta.MaxAssigned)
4,400✔
1078
                                for _, txn := range delta.Txns {
5,140✔
1079
                                        if txn.CommitTs == 0 {
776✔
1080
                                                glog.Infof("Aborted: %d", txn.StartTs)
36✔
1081
                                        } else {
740✔
1082
                                                glog.Infof("Committed: %d -> %d", txn.StartTs, txn.CommitTs)
704✔
1083
                                        }
704✔
1084
                                }
1085
                        }
1086
                        for {
268,380✔
1087
                                // Block forever trying to propose this. Also this proposal should not be counted
134,190✔
1088
                                // towards num pending proposals and be proposed right away.
134,190✔
1089
                                err := g.Node.proposeAndWait(g.Ctx(), &pb.Proposal{Delta: delta})
134,190✔
1090
                                if err == nil {
268,380✔
1091
                                        break
134,190✔
1092
                                }
1093
                                if g.Ctx().Err() != nil {
×
1094
                                        break
×
1095
                                }
1096
                                glog.Errorf("While proposing delta with MaxAssigned: %d and num txns: %d."+
×
1097
                                        " Error=%v. Retrying...\n", delta.MaxAssigned, len(delta.Txns), err)
×
1098
                        }
1099
                }
1100
        }
1101

1102
        for {
12,881✔
1103
                select {
12,790✔
1104
                case <-g.closer.HasBeenClosed():
91✔
1105
                        return
91✔
1106
                case <-ticker.C:
12,699✔
1107
                        // Only the leader needs to connect to Zero and get transaction
12,699✔
1108
                        // updates.
12,699✔
1109
                        if g.Node.AmLeader() {
12,782✔
1110
                                blockingReceiveAndPropose()
83✔
1111
                        }
83✔
1112
                }
1113
        }
1114
}
1115

1116
// GetEEFeaturesList returns a list of Enterprise Features that are available.
1117
func GetEEFeaturesList() []string {
159✔
1118
        if !EnterpriseEnabled() {
186✔
1119
                return nil
27✔
1120
        }
27✔
1121
        var ee []string
132✔
1122
        if len(Config.HmacSecret) > 0 {
188✔
1123
                ee = append(ee, "acl")
56✔
1124
                ee = append(ee, "multi_tenancy")
56✔
1125
        }
56✔
1126
        if x.WorkerConfig.EncryptionKey != nil {
183✔
1127
                ee = append(ee, "encryption_at_rest", "encrypted_backup_restore", "encrypted_export")
51✔
1128
        } else {
132✔
1129
                ee = append(ee, "backup_restore")
81✔
1130
        }
81✔
1131
        if x.WorkerConfig.Audit {
134✔
1132
                ee = append(ee, "audit")
2✔
1133
        }
2✔
1134
        if Config.ChangeDataConf != "" {
264✔
1135
                ee = append(ee, "cdc")
132✔
1136
        }
132✔
1137
        return ee
132✔
1138
}
1139

1140
// EnterpriseEnabled returns whether enterprise features can be used or not.
1141
func EnterpriseEnabled() bool {
707✔
1142
        if !enc.EeBuild {
707✔
1143
                return false
×
1144
        }
×
1145
        state := GetMembershipState()
707✔
1146
        if state == nil {
729✔
1147
                return groups().askZeroForEE()
22✔
1148
        }
22✔
1149
        return state.GetLicense().GetEnabled()
685✔
1150
}
1151

1152
func (g *groupi) askZeroForEE() bool {
22✔
1153
        var err error
22✔
1154
        var connState *pb.ConnectionState
22✔
1155

22✔
1156
        createConn := func() bool {
85✔
1157
                pl := g.connToZeroLeader()
63✔
1158
                if pl == nil {
63✔
1159
                        return false
×
1160
                }
×
1161
                zc := pb.NewZeroClient(pl.Get())
63✔
1162

63✔
1163
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
63✔
1164
                defer cancel()
63✔
1165

63✔
1166
                connState, err = zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
63✔
1167
                if connState == nil ||
63✔
1168
                        connState.GetState() == nil ||
63✔
1169
                        connState.GetState().GetLicense() == nil {
104✔
1170
                        glog.Info("Retry Zero Connection")
41✔
1171
                        return false
41✔
1172
                }
41✔
1173
                if err == nil || x.ShouldCrash(err) {
44✔
1174
                        return true
22✔
1175
                }
22✔
1176
                return false
×
1177
        }
1178

1179
        for !g.IsClosed() {
85✔
1180
                if createConn() {
85✔
1181
                        break
22✔
1182
                }
1183
                time.Sleep(time.Second)
41✔
1184
        }
1185
        return connState.GetState().GetLicense().GetEnabled()
22✔
1186
}
1187

1188
// SubscribeForUpdates will listen for updates for the given group.
1189
func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
1190
        group uint32, closer *z.Closer) {
119✔
1191

119✔
1192
        var prefix []byte
119✔
1193
        if len(prefixes) > 0 {
238✔
1194
                prefix = prefixes[0]
119✔
1195
        }
119✔
1196
        defer func() {
238✔
1197
                glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix)
119✔
1198
                closer.Done()
119✔
1199
        }()
119✔
1200

1201
        listen := func() error {
614✔
1202
                // Connect to any of the group 1 nodes.
495✔
1203
                members := groups().AnyTwoServers(group)
495✔
1204
                // There may be a lag while starting so keep retrying.
495✔
1205
                if len(members) == 0 {
840✔
1206
                        return fmt.Errorf("unable to find any servers for group: %d", group)
345✔
1207
                }
345✔
1208
                pool := conn.GetPools().Connect(members[0], x.WorkerConfig.TLSClientConfig)
150✔
1209
                client := pb.NewWorkerClient(pool.Get())
150✔
1210

150✔
1211
                // Get Subscriber stream.
150✔
1212
                stream, err := client.Subscribe(closer.Ctx(),
150✔
1213
                        &pb.SubscriptionRequest{Matches: x.PrefixesToMatches(prefixes, ignore)})
150✔
1214
                if err != nil {
169✔
1215
                        return errors.Wrapf(err, "error from client.subscribe")
19✔
1216
                }
19✔
1217
                for {
15,922✔
1218
                        // Listen for updates.
15,791✔
1219
                        kvs, err := stream.Recv()
15,791✔
1220
                        if err != nil {
15,922✔
1221
                                return errors.Wrapf(err, "while receiving from stream")
131✔
1222
                        }
131✔
1223
                        cb(kvs)
15,660✔
1224
                }
1225
        }
1226

1227
        for {
614✔
1228
                if err := listen(); err != nil {
990✔
1229
                        glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n",
495✔
1230
                                prefix, err, closer.Ctx().Err())
495✔
1231
                }
495✔
1232
                if closer.Ctx().Err() != nil {
614✔
1233
                        return
119✔
1234
                }
119✔
1235
                time.Sleep(time.Second)
376✔
1236
        }
1237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc