• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 4763653586

21 Apr 2023 10:46AM UTC coverage: 66.83% (-0.09%) from 66.924%
4763653586

push

GitHub
fix(vscode): fixed Jaeger parameters (#8801)

58024 of 86823 relevant lines covered (66.83%)

2239173.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.56
/worker/groups.go
1
/*
2
 * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "context"
21
        "fmt"
22
        "io"
23
        "sort"
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/golang/glog"
29
        "github.com/golang/protobuf/proto"
30
        "github.com/pkg/errors"
31

32
        badgerpb "github.com/dgraph-io/badger/v4/pb"
33
        "github.com/dgraph-io/dgo/v210/protos/api"
34
        "github.com/dgraph-io/dgraph/conn"
35
        "github.com/dgraph-io/dgraph/ee/enc"
36
        "github.com/dgraph-io/dgraph/protos/pb"
37
        "github.com/dgraph-io/dgraph/raftwal"
38
        "github.com/dgraph-io/dgraph/schema"
39
        "github.com/dgraph-io/dgraph/x"
40
        "github.com/dgraph-io/ristretto/z"
41
)
42

43
type groupi struct {
44
        x.SafeMutex
45
        state        *pb.MembershipState
46
        Node         *node
47
        gid          uint32
48
        tablets      map[string]*pb.Tablet
49
        triggerCh    chan struct{} // Used to trigger membership sync
50
        blockDeletes *sync.Mutex   // Ensure that deletion won't happen when move is going on.
51
        closer       *z.Closer
52

53
        // Group checksum is used to determine if the tablets served by the groups have changed from
54
        // the membership information that the Alpha has. If so, Alpha cannot service a read.
55
        deltaChecksum      uint64 // Checksum received by OracleDelta.
56
        membershipChecksum uint64 // Checksum received by MembershipState.
57
}
58

59
var gr = &groupi{
60
        blockDeletes: new(sync.Mutex),
61
        tablets:      make(map[string]*pb.Tablet),
62
        closer:       z.NewCloser(3), // Match CLOSER:1 in this file.
63
}
64

65
func groups() *groupi {
3,165,934✔
66
        return gr
3,165,934✔
67
}
3,165,934✔
68

69
// StartRaftNodes will read the WAL dir, create the RAFT groups,
70
// and either start or restart RAFT nodes.
71
// This function triggers RAFT nodes to be created, and is the entrance to the RAFT
72
// world from main.go.
73
func StartRaftNodes(walStore *raftwal.DiskStorage, bindall bool) {
96✔
74
        if x.WorkerConfig.MyAddr == "" {
96✔
75
                x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort())
×
76
        } else {
96✔
77
                // check if address is valid or not
96✔
78
                x.Check(x.ValidateAddress(x.WorkerConfig.MyAddr))
96✔
79
                if !bindall {
96✔
80
                        glog.Errorln("--my flag is provided without bindall, Did you forget to specify bindall?")
×
81
                }
×
82
        }
83

84
        x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
96✔
85
        for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
244✔
86
                x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
148✔
87
                        "Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.",
148✔
88
                        zeroAddr, x.WorkerConfig.MyAddr)
148✔
89
        }
148✔
90

91
        raftIdx := x.WorkerConfig.Raft.GetUint64("idx")
96✔
92
        if raftIdx == 0 {
158✔
93
                raftIdx = walStore.Uint(raftwal.RaftId)
62✔
94

62✔
95
                // If the w directory already contains raft information, ignore the proposed
62✔
96
                // group ID stored inside the p directory.
62✔
97
                if raftIdx > 0 {
62✔
98
                        x.WorkerConfig.ProposedGroupId = 0
×
99
                }
×
100
        }
101
        glog.Infof("Current Raft Id: %#x\n", raftIdx)
96✔
102

96✔
103
        if x.WorkerConfig.ProposedGroupId == 0 {
190✔
104
                x.WorkerConfig.ProposedGroupId = x.WorkerConfig.Raft.GetUint32("group")
94✔
105
        }
94✔
106
        // Successfully connect with dgraphzero, before doing anything else.
107
        // Connect with Zero leader and figure out what group we should belong to.
108
        m := &pb.Member{
96✔
109
                Id:      raftIdx,
96✔
110
                GroupId: x.WorkerConfig.ProposedGroupId,
96✔
111
                Addr:    x.WorkerConfig.MyAddr,
96✔
112
                Learner: x.WorkerConfig.Raft.GetBool("learner"),
96✔
113
        }
96✔
114
        if m.GroupId > 0 {
119✔
115
                m.ForceGroupId = true
23✔
116
        }
23✔
117
        glog.Infof("Sending member request to Zero: %+v\n", m)
96✔
118
        var connState *pb.ConnectionState
96✔
119
        var err error
96✔
120

96✔
121
        for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
242✔
122
                pl := gr.connToZeroLeader()
146✔
123
                if pl == nil {
146✔
124
                        continue
×
125
                }
126
                zc := pb.NewZeroClient(pl.Get())
146✔
127
                connState, err = zc.Connect(gr.Ctx(), m)
146✔
128
                if err == nil || x.ShouldCrash(err) {
242✔
129
                        break
96✔
130
                }
131
        }
132
        x.CheckfNoTrace(err)
96✔
133
        if connState.GetMember() == nil || connState.GetState() == nil {
96✔
134
                x.Fatalf("Unable to join cluster via dgraphzero")
×
135
        }
×
136
        glog.Infof("Connected to group zero. Assigned group: %+v\n", connState.GetMember().GetGroupId())
96✔
137
        raftIdx = connState.GetMember().GetId()
96✔
138
        glog.Infof("Raft Id after connection to Zero: %#x\n", raftIdx)
96✔
139

96✔
140
        // This timestamp would be used for reading during snapshot after bulk load.
96✔
141
        // The stream is async, we need this information before we start or else replica might
96✔
142
        // not get any data.
96✔
143
        gr.applyState(raftIdx, connState.GetState())
96✔
144

96✔
145
        gid := gr.groupId()
96✔
146
        gr.triggerCh = make(chan struct{}, 1)
96✔
147

96✔
148
        // Initialize DiskStorage and pass it along.
96✔
149
        walStore.SetUint(raftwal.RaftId, raftIdx)
96✔
150
        walStore.SetUint(raftwal.GroupId, uint64(gid))
96✔
151

96✔
152
        gr.Node = newNode(walStore, gid, raftIdx, x.WorkerConfig.MyAddr)
96✔
153

96✔
154
        x.Checkf(schema.LoadFromDb(context.Background()), "Error while initializing schema")
96✔
155
        glog.Infof("Load schema from DB: OK")
96✔
156
        raftServer.UpdateNode(gr.Node.Node)
96✔
157
        gr.Node.InitAndStartNode()
96✔
158
        glog.Infof("Init and start Raft node: OK")
96✔
159

96✔
160
        go gr.sendMembershipUpdates()
96✔
161
        go gr.receiveMembershipUpdates()
96✔
162
        go gr.processOracleDeltaStream()
96✔
163

96✔
164
        gr.informZeroAboutTablets()
96✔
165
        glog.Infof("Informed Zero about tablets I have: OK")
96✔
166
        gr.applyInitialSchema()
96✔
167
        gr.applyInitialTypes()
96✔
168
        glog.Infof("Upserted Schema and Types: OK")
96✔
169

96✔
170
        x.UpdateHealthStatus(true)
96✔
171
        glog.Infof("Server is ready: OK")
96✔
172
}
173

174
func (g *groupi) Ctx() context.Context {
146,813✔
175
        return g.closer.Ctx()
146,813✔
176
}
146,813✔
177

178
func (g *groupi) IsClosed() bool {
358✔
179
        return g.closer.Ctx().Err() != nil
358✔
180
}
358✔
181

182
func (g *groupi) informZeroAboutTablets() {
96✔
183
        // Before we start this Alpha, let's pick up all the predicates we have in our postings
96✔
184
        // directory, and ask Zero if we are allowed to serve it. Do this irrespective of whether
96✔
185
        // this node is the leader or the follower, because this early on, we might not have
96✔
186
        // figured that out.
96✔
187
        ticker := time.NewTicker(time.Second)
96✔
188
        defer ticker.Stop()
96✔
189

96✔
190
        for range ticker.C {
192✔
191
                preds := schema.State().Predicates()
96✔
192
                if _, err := g.Inform(preds); err != nil {
96✔
193
                        glog.Errorf("Error while getting tablet for preds %v", err)
×
194
                } else {
96✔
195
                        glog.V(1).Infof("Done informing Zero about the %d tablets I have", len(preds))
96✔
196
                        return
96✔
197
                }
96✔
198
        }
199
}
200

201
func (g *groupi) applyInitialTypes() {
96✔
202
        initialTypes := schema.InitialTypes(x.GalaxyNamespace)
96✔
203
        for _, t := range initialTypes {
372✔
204
                if _, ok := schema.State().GetType(t.TypeName); ok {
278✔
205
                        continue
2✔
206
                }
207
                // It is okay to write initial types at ts=1.
208
                if err := updateType(t.GetTypeName(), *t, 1); err != nil {
274✔
209
                        glog.Errorf("Error while applying initial type: %s", err)
×
210
                }
×
211
        }
212
}
213

214
func (g *groupi) applyInitialSchema() {
96✔
215
        if g.groupId() != 1 {
129✔
216
                return
33✔
217
        }
33✔
218
        initialSchema := schema.InitialSchema(x.GalaxyNamespace)
63✔
219
        ctx := g.Ctx()
63✔
220

63✔
221
        apply := func(s *pb.SchemaUpdate) {
471✔
222
                // There are 2 cases: either the alpha is fresh or it restarted. If it is fresh cluster
408✔
223
                // then we can write the schema at ts=1. If alpha restarted, then we will already have the
408✔
224
                // schema at higher version and this operation will be a no-op.
408✔
225
                if err := applySchema(s, 1); err != nil {
408✔
226
                        glog.Errorf("Error while applying initial schema: %s", err)
×
227
                }
×
228
        }
229

230
        for _, s := range initialSchema {
486✔
231
                if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil {
423✔
232
                        glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.",
×
233
                                s.Predicate)
×
234
                        apply(s)
×
235
                } else if gid == 0 {
742✔
236
                        // The tablet is not being served currently.
319✔
237
                        apply(s)
319✔
238
                } else if curr, _ := schema.State().Get(ctx, s.Predicate); gid == g.groupId() &&
423✔
239
                        !proto.Equal(s, &curr) {
193✔
240
                        // If this tablet is served to the group, do not upsert the schema unless the
89✔
241
                        // stored schema and the proposed one are different.
89✔
242
                        apply(s)
89✔
243
                } else {
104✔
244
                        // The schema for this predicate has already been proposed.
15✔
245
                        glog.V(1).Infof("Schema found for predicate %s: %+v", s.Predicate, curr)
15✔
246
                        continue
15✔
247
                }
248
        }
249
}
250

251
func applySchema(s *pb.SchemaUpdate, ts uint64) error {
8,936✔
252
        if err := updateSchema(s, ts); err != nil {
8,936✔
253
                return err
×
254
        }
×
255
        if servesTablet, err := groups().ServesTablet(s.Predicate); err != nil {
8,936✔
256
                return err
×
257
        } else if !servesTablet {
8,936✔
258
                return errors.Errorf("group 1 should always serve reserved predicate %s", s.Predicate)
×
259
        }
×
260
        return nil
8,936✔
261
}
262

263
// No locks are acquired while accessing this function.
264
// Don't acquire RW lock during this, otherwise we might deadlock.
265
func (g *groupi) groupId() uint32 {
1,127,652✔
266
        return atomic.LoadUint32(&g.gid)
1,127,652✔
267
}
1,127,652✔
268

269
// MaxLeaseId returns the maximum UID that has been leased.
270
func MaxLeaseId() uint64 {
338,452✔
271
        g := groups()
338,452✔
272
        g.RLock()
338,452✔
273
        defer g.RUnlock()
338,452✔
274
        if g.state == nil {
338,452✔
275
                return 0
×
276
        }
×
277
        return g.state.MaxUID
338,452✔
278
}
279

280
// GetMembershipState returns the current membership state.
281
func GetMembershipState() *pb.MembershipState {
833✔
282
        g := groups()
833✔
283
        g.RLock()
833✔
284
        defer g.RUnlock()
833✔
285
        return proto.Clone(g.state).(*pb.MembershipState)
833✔
286
}
833✔
287

288
// UpdateMembershipState contacts zero for an update on membership state.
289
func UpdateMembershipState(ctx context.Context) error {
212✔
290
        g := groups()
212✔
291
        p := g.Leader(0)
212✔
292
        if p == nil {
212✔
293
                return errors.Errorf("don't have the address of any dgraph zero leader")
×
294
        }
×
295

296
        c := pb.NewZeroClient(p.Get())
212✔
297
        state, err := c.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
212✔
298
        if err != nil {
212✔
299
                return err
×
300
        }
×
301
        g.applyState(g.Node.Id, state.GetState())
212✔
302
        return nil
212✔
303
}
304

305
func (g *groupi) applyState(myId uint64, state *pb.MembershipState) {
21,368✔
306
        x.AssertTrue(state != nil)
21,368✔
307
        g.Lock()
21,368✔
308
        defer g.Unlock()
21,368✔
309
        // We don't update state if we get any old state. Counter stores the raftindex of
21,368✔
310
        // last update. For leader changes at zero since we don't propose, state can get
21,368✔
311
        // updated at same counter value. So ignore only if counter is less.
21,368✔
312
        if g.state != nil && g.state.Counter > state.Counter {
21,368✔
313
                return
×
314
        }
×
315

316
        invalid := state.License != nil && !state.License.Enabled
21,368✔
317
        if g.Node != nil && g.Node.RaftContext.IsLearner && invalid {
21,368✔
318
                glog.Errorf("ENTERPRISE_ONLY_LEARNER: License Expired. Cannot run learner nodes.")
×
319
                x.ServerCloser.Signal()
×
320
                return
×
321
        }
×
322

323
        oldState := g.state
21,368✔
324
        g.state = state
21,368✔
325

21,368✔
326
        // Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues.
21,368✔
327
        var foundSelf bool
21,368✔
328
        g.tablets = make(map[string]*pb.Tablet)
21,368✔
329
        for gid, group := range g.state.Groups {
63,231✔
330
                for _, member := range group.Members {
157,713✔
331
                        if myId == member.Id {
137,218✔
332
                                foundSelf = true
21,368✔
333
                                atomic.StoreUint32(&g.gid, gid)
21,368✔
334
                        }
21,368✔
335
                        if x.WorkerConfig.MyAddr != member.Addr {
210,332✔
336
                                conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
94,482✔
337
                        }
94,482✔
338
                }
339
                for _, tablet := range group.Tablets {
1,576,566✔
340
                        g.tablets[tablet.Predicate] = tablet
1,534,703✔
341
                }
1,534,703✔
342
                if gid == g.groupId() {
63,231✔
343
                        glog.V(3).Infof("group %d checksum: %d", g.groupId(), group.Checksum)
21,368✔
344
                        atomic.StoreUint64(&g.membershipChecksum, group.Checksum)
21,368✔
345
                }
21,368✔
346
        }
347
        for _, member := range g.state.Zeros {
78,452✔
348
                if x.WorkerConfig.MyAddr != member.Addr {
114,168✔
349
                        conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig)
57,084✔
350
                }
57,084✔
351
        }
352
        if !foundSelf {
21,368✔
353
                // I'm not part of this cluster. I should crash myself.
×
354
                glog.Fatalf("Unable to find myself [id:%d group:%d] in membership state: %+v. Goodbye!",
×
355
                        myId, g.groupId(), state)
×
356
        }
×
357

358
        // While restarting we fill Node information after retrieving initial state.
359
        if g.Node != nil {
42,640✔
360
                // Lets have this block before the one that adds the new members, else we may end up
21,272✔
361
                // removing a freshly added node.
21,272✔
362

21,272✔
363
                for _, member := range g.state.GetRemoved() {
21,272✔
364
                        // TODO: This leader check can be done once instead of repeatedly.
×
365
                        if member.GetGroupId() == g.Node.gid && g.Node.AmLeader() {
×
366
                                member := member // capture range variable
×
367
                                go func() {
×
368
                                        // Don't try to remove a member if it's already marked as removed in
×
369
                                        // the membership state and is not a current peer of the node.
×
370
                                        _, isPeer := g.Node.Peer(member.GetId())
×
371
                                        // isPeer should only be true if the removed node is not the same as this node.
×
372
                                        isPeer = isPeer && member.GetId() != g.Node.RaftContext.Id
×
373

×
374
                                        for _, oldMember := range oldState.GetRemoved() {
×
375
                                                if oldMember.GetId() == member.GetId() && !isPeer {
×
376
                                                        return
×
377
                                                }
×
378
                                        }
379

380
                                        if err := g.Node.ProposePeerRemoval(g.Ctx(), member.GetId()); err != nil {
×
381
                                                glog.Errorf("Error while proposing node removal: %+v", err)
×
382
                                        }
×
383
                                }()
384
                        }
385
                }
386
                conn.GetPools().RemoveInvalid(g.state)
21,272✔
387
        }
388
}
389

390
func (g *groupi) ServesGroup(gid uint32) bool {
172,430✔
391
        return g.groupId() == gid
172,430✔
392
}
172,430✔
393

394
func (g *groupi) ChecksumsMatch(ctx context.Context) error {
83,423✔
395
        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
166,104✔
396
                return nil
82,681✔
397
        }
82,681✔
398
        t := time.NewTicker(100 * time.Millisecond)
742✔
399
        defer t.Stop()
742✔
400
        for {
6,241✔
401
                select {
5,499✔
402
                case <-t.C:
5,498✔
403
                        if atomic.LoadUint64(&g.deltaChecksum) == atomic.LoadUint64(&g.membershipChecksum) {
6,239✔
404
                                return nil
741✔
405
                        }
741✔
406
                case <-ctx.Done():
1✔
407
                        return errors.Errorf("Group checksum mismatch for id: %d", g.groupId())
1✔
408
                }
409
        }
410
}
411

412
func (g *groupi) BelongsTo(key string) (uint32, error) {
267,008✔
413
        if tablet, err := g.Tablet(key); err != nil {
267,008✔
414
                return 0, err
×
415
        } else if tablet != nil {
534,016✔
416
                return tablet.GroupId, nil
267,008✔
417
        }
267,008✔
418
        return 0, nil
×
419
}
420

421
// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
422
// the tablet for key if no group is currently serving it.
423
// The ts passed should be the start ts of the query, so this method can compare that against a
424
// tablet move timestamp. If the tablet was moved to this group after the start ts of the query, we
425
// should reject that query.
426
func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) {
1,028,139✔
427
        g.RLock()
1,028,139✔
428
        tablet := g.tablets[key]
1,028,139✔
429
        g.RUnlock()
1,028,139✔
430
        if tablet != nil {
2,054,933✔
431
                if ts > 0 && ts < tablet.MoveTs {
1,026,794✔
432
                        return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
433
                                ts, tablet.MoveTs, key)
×
434
                }
×
435
                return tablet.GetGroupId(), nil
1,026,794✔
436
        }
437

438
        // We don't know about this tablet. Talk to dgraphzero to find out who is
439
        // serving this tablet.
440
        pl := g.connToZeroLeader()
1,345✔
441
        zc := pb.NewZeroClient(pl.Get())
1,345✔
442

1,345✔
443
        tablet = &pb.Tablet{
1,345✔
444
                Predicate: key,
1,345✔
445
                ReadOnly:  true,
1,345✔
446
        }
1,345✔
447
        out, err := zc.ShouldServe(g.Ctx(), tablet)
1,345✔
448
        if err != nil {
1,345✔
449
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
450
                return 0, err
×
451
        }
×
452
        if out.GetGroupId() == 0 {
2,644✔
453
                return 0, nil
1,299✔
454
        }
1,299✔
455

456
        g.Lock()
46✔
457
        defer g.Unlock()
46✔
458
        g.tablets[key] = out
46✔
459
        if out != nil && ts > 0 && ts < out.MoveTs {
46✔
460
                return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
×
461
                        ts, out.MoveTs, key)
×
462
        }
×
463
        return out.GetGroupId(), nil
46✔
464
}
465

466
func (g *groupi) ServesTablet(key string) (bool, error) {
9,400✔
467
        if tablet, err := g.Tablet(key); err != nil {
9,400✔
468
                return false, err
×
469
        } else if tablet != nil && tablet.GroupId == groups().groupId() {
18,796✔
470
                return true, nil
9,396✔
471
        }
9,396✔
472
        return false, nil
4✔
473
}
474

475
func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) {
7,616✔
476
        pl := g.connToZeroLeader()
7,616✔
477
        zc := pb.NewZeroClient(pl.Get())
7,616✔
478

7,616✔
479
        out, err := zc.ShouldServe(g.Ctx(), tablet)
7,616✔
480
        if err != nil {
7,616✔
481
                glog.Errorf("Error while ShouldServe grpc call %v", err)
×
482
                return nil, err
×
483
        }
×
484

485
        // Do not store tablets with group ID 0, as they are just dummy tablets for
486
        // predicates that do no exist.
487
        if out.GroupId > 0 {
15,232✔
488
                g.Lock()
7,616✔
489
                g.tablets[out.GetPredicate()] = out
7,616✔
490
                g.Unlock()
7,616✔
491
        }
7,616✔
492

493
        if out.GroupId == groups().groupId() {
15,159✔
494
                glog.Infof("Serving tablet for: %v\n", tablet.GetPredicate())
7,543✔
495
        }
7,543✔
496
        return out, nil
7,616✔
497
}
498

499
func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) {
96✔
500
        unknownPreds := make([]*pb.Tablet, 0)
96✔
501
        tablets := make([]*pb.Tablet, 0)
96✔
502
        g.RLock()
96✔
503
        for _, p := range preds {
194✔
504
                if len(p) == 0 {
98✔
505
                        continue
×
506
                }
507

508
                if tab, ok := g.tablets[p]; !ok {
188✔
509
                        unknownPreds = append(unknownPreds, &pb.Tablet{GroupId: g.groupId(), Predicate: p})
90✔
510
                } else {
98✔
511
                        tablets = append(tablets, tab)
8✔
512
                }
8✔
513
        }
514
        g.RUnlock()
96✔
515

96✔
516
        if len(unknownPreds) == 0 {
190✔
517
                return nil, nil
94✔
518
        }
94✔
519

520
        pl := g.connToZeroLeader()
2✔
521
        zc := pb.NewZeroClient(pl.Get())
2✔
522
        out, err := zc.Inform(g.Ctx(), &pb.TabletRequest{
2✔
523
                Tablets: unknownPreds,
2✔
524
                GroupId: g.groupId(),
2✔
525
        })
2✔
526
        if err != nil {
2✔
527
                glog.Errorf("Error while Inform grpc call %v", err)
×
528
                return nil, err
×
529
        }
×
530

531
        // Do not store tablets with group ID 0, as they are just dummy tablets for
532
        // predicates that do no exist.
533
        g.Lock()
2✔
534
        for _, t := range out.Tablets {
92✔
535
                if t.GroupId > 0 {
180✔
536
                        g.tablets[t.GetPredicate()] = t
90✔
537
                        tablets = append(tablets, t)
90✔
538
                }
90✔
539

540
                if t.GroupId == groups().groupId() {
180✔
541
                        glog.Infof("Serving tablet for: %v\n", t.GetPredicate())
90✔
542
                }
90✔
543
        }
544
        g.Unlock()
2✔
545
        return tablets, nil
2✔
546
}
547

548
// Do not modify the returned Tablet
549
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
552,056✔
550
        // TODO: Remove all this later, create a membership state and apply it
552,056✔
551
        g.RLock()
552,056✔
552
        tablet, ok := g.tablets[key]
552,056✔
553
        g.RUnlock()
552,056✔
554
        if ok {
1,100,530✔
555
                return tablet, nil
548,474✔
556
        }
548,474✔
557

558
        // We don't know about this tablet.
559
        // Check with dgraphzero if we can serve it.
560
        tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key}
3,582✔
561
        return g.sendTablet(tablet)
3,582✔
562
}
563

564
func (g *groupi) ForceTablet(key string) (*pb.Tablet, error) {
4,034✔
565
        return g.sendTablet(&pb.Tablet{GroupId: g.groupId(), Predicate: key, Force: true})
4,034✔
566
}
4,034✔
567

568
func (g *groupi) HasMeInState() bool {
×
569
        g.RLock()
×
570
        defer g.RUnlock()
×
571
        if g.state == nil {
×
572
                return false
×
573
        }
×
574

575
        group, has := g.state.Groups[g.groupId()]
×
576
        if !has {
×
577
                return false
×
578
        }
×
579
        _, has = group.Members[g.Node.Id]
×
580
        return has
×
581
}
582

583
// Returns 0, 1, or 2 valid server addrs.
584
func (g *groupi) AnyTwoServers(gid uint32) []string {
18,887✔
585
        g.RLock()
18,887✔
586
        defer g.RUnlock()
18,887✔
587

18,887✔
588
        if g.state == nil {
19,249✔
589
                return []string{}
362✔
590
        }
362✔
591
        group, has := g.state.Groups[gid]
18,525✔
592
        if !has {
18,525✔
593
                return []string{}
×
594
        }
×
595
        var res []string
18,525✔
596
        for _, m := range group.Members {
54,577✔
597
                // map iteration gives us members in no particular order.
36,052✔
598
                res = append(res, m.Addr)
36,052✔
599
                if len(res) >= 2 {
53,579✔
600
                        break
17,527✔
601
                }
602
        }
603
        return res
18,525✔
604
}
605

606
func (g *groupi) members(gid uint32) map[uint64]*pb.Member {
120,361✔
607
        g.RLock()
120,361✔
608
        defer g.RUnlock()
120,361✔
609

120,361✔
610
        if g.state == nil {
120,821✔
611
                return nil
460✔
612
        }
460✔
613
        if gid == 0 {
221,620✔
614
                return g.state.Zeros
101,719✔
615
        }
101,719✔
616
        group, has := g.state.Groups[gid]
18,182✔
617
        if !has {
18,182✔
618
                return nil
×
619
        }
×
620
        return group.Members
18,182✔
621
}
622

623
func (g *groupi) AnyServer(gid uint32) *conn.Pool {
348✔
624
        members := g.members(gid)
348✔
625
        for _, m := range members {
436✔
626
                pl, err := conn.GetPools().Get(m.Addr)
88✔
627
                if err == nil {
145✔
628
                        return pl
57✔
629
                }
57✔
630
        }
631
        return nil
291✔
632
}
633

634
func (g *groupi) MyPeer() (uint64, bool) {
287✔
635
        members := g.members(g.groupId())
287✔
636
        for _, m := range members {
602✔
637
                if m.Id != g.Node.Id {
392✔
638
                        return m.Id, true
77✔
639
                }
77✔
640
        }
641
        return 0, false
210✔
642
}
643

644
// Leader will try to return the leader of a given group, based on membership information.
645
// There is currently no guarantee that the returned server is the leader of the group.
646
func (g *groupi) Leader(gid uint32) *conn.Pool {
119,725✔
647
        members := g.members(gid)
119,725✔
648
        if members == nil {
119,925✔
649
                return nil
200✔
650
        }
200✔
651
        for _, m := range members {
303,271✔
652
                if m.Leader {
303,247✔
653
                        if pl, err := conn.GetPools().Get(m.Addr); err == nil {
238,992✔
654
                                return pl
119,491✔
655
                        }
119,491✔
656
                }
657
        }
658
        return nil
34✔
659
}
660

661
func (g *groupi) KnownGroups() (gids []uint32) {
18,264✔
662
        g.RLock()
18,264✔
663
        defer g.RUnlock()
18,264✔
664
        if g.state == nil {
18,264✔
665
                return
×
666
        }
×
667
        for gid := range g.state.Groups {
53,652✔
668
                gids = append(gids, gid)
35,388✔
669
        }
35,388✔
670
        return
18,264✔
671
}
672

673
// KnownGroups returns the known groups using the global groupi instance.
674
func KnownGroups() []uint32 {
×
675
        return groups().KnownGroups()
×
676
}
×
677

678
// GroupId returns the group to which this worker belongs to.
679
func GroupId() uint32 {
152✔
680
        return groups().groupId()
152✔
681
}
152✔
682

683
// NodeId returns the raft id of the node.
684
func NodeId() uint64 {
2✔
685
        return groups().Node.Id
2✔
686
}
2✔
687

688
func (g *groupi) triggerMembershipSync() {
2,298✔
689
        // It's ok if we miss the trigger, periodic membership sync runs every minute.
2,298✔
690
        select {
2,298✔
691
        case g.triggerCh <- struct{}{}:
2,201✔
692
        // It's ok to ignore it, since we would be sending update of a later state
693
        default:
97✔
694
        }
695
}
696

697
const connBaseDelay = 100 * time.Millisecond
698

699
func (g *groupi) connToZeroLeader() *conn.Pool {
75,872✔
700
        pl := g.Leader(0)
75,872✔
701
        if pl != nil {
151,535✔
702
                return pl
75,663✔
703
        }
75,663✔
704
        glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...")
209✔
705

209✔
706
        getLeaderConn := func(zc pb.ZeroClient) *conn.Pool {
500✔
707
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
291✔
708
                defer cancel()
291✔
709

291✔
710
                connState, err := zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
291✔
711
                if err != nil || connState == nil {
382✔
712
                        glog.V(1).Infof("While retrieving Zero leader info. Error: %v. Retrying...", err)
91✔
713
                        return nil
91✔
714
                }
91✔
715
                for _, mz := range connState.State.GetZeros() {
463✔
716
                        if mz.Leader {
463✔
717
                                return conn.GetPools().Connect(mz.GetAddr(), x.WorkerConfig.TLSClientConfig)
200✔
718
                        }
200✔
719
                }
720
                return nil
×
721
        }
722

723
        // No leader found. Let's get the latest membership state from Zero.
724
        delay := connBaseDelay
209✔
725
        maxHalfDelay := time.Second
209✔
726
        for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
509✔
727
                if g.IsClosed() {
309✔
728
                        return nil
9✔
729
                }
9✔
730

731
                time.Sleep(delay)
291✔
732
                if delay <= maxHalfDelay {
576✔
733
                        delay *= 2
285✔
734
                }
285✔
735

736
                zAddrList := x.WorkerConfig.ZeroAddr
291✔
737
                // Pick addresses in round robin manner.
291✔
738
                addr := zAddrList[i%len(zAddrList)]
291✔
739

291✔
740
                pl := g.AnyServer(0)
291✔
741
                if pl == nil {
582✔
742
                        pl = conn.GetPools().Connect(addr, x.WorkerConfig.TLSClientConfig)
291✔
743
                }
291✔
744
                if pl == nil {
291✔
745
                        glog.V(1).Infof("No healthy Zero server found. Retrying...")
×
746
                        continue
×
747
                }
748
                zc := pb.NewZeroClient(pl.Get())
291✔
749
                if pl := getLeaderConn(zc); pl != nil {
491✔
750
                        glog.V(1).Infof("Found connection to leader: %s", pl.Addr)
200✔
751
                        return pl
200✔
752
                }
200✔
753
                glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...")
91✔
754
        }
755
}
756

757
func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error {
2,181✔
758
        leader := g.Node.AmLeader()
2,181✔
759
        member := &pb.Member{
2,181✔
760
                Id:         g.Node.Id,
2,181✔
761
                GroupId:    g.groupId(),
2,181✔
762
                Addr:       x.WorkerConfig.MyAddr,
2,181✔
763
                Leader:     leader,
2,181✔
764
                LastUpdate: uint64(time.Now().Unix()),
2,181✔
765
        }
2,181✔
766
        group := &pb.Group{
2,181✔
767
                Members: make(map[uint64]*pb.Member),
2,181✔
768
        }
2,181✔
769
        group.Members[member.Id] = member
2,181✔
770
        if leader {
3,060✔
771
                // Do not send tablet information, if I'm not the leader.
879✔
772
                group.Tablets = tablets
879✔
773
                if snap, err := g.Node.Snapshot(); err == nil {
1,758✔
774
                        group.SnapshotTs = snap.ReadTs
879✔
775
                }
879✔
776
                group.CheckpointTs = atomic.LoadUint64(&g.Node.checkpointTs)
879✔
777
        }
778

779
        pl := g.connToZeroLeader()
2,181✔
780
        if pl == nil {
2,183✔
781
                return errNoConnection
2✔
782
        }
2✔
783
        c := pb.NewZeroClient(pl.Get())
2,179✔
784
        ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
2,179✔
785
        defer cancel()
2,179✔
786
        reply, err := c.UpdateMembership(ctx, group)
2,179✔
787
        if err != nil {
2,202✔
788
                return err
23✔
789
        }
23✔
790
        if string(reply.GetData()) == "OK" {
4,312✔
791
                return nil
2,156✔
792
        }
2,156✔
793
        return errors.Errorf(string(reply.GetData()))
×
794
}
795

796
// sendMembershipUpdates sends the membership update to Zero leader. If this Alpha is the leader, it
797
// would also calculate the tablet sizes and send them to Zero.
798
func (g *groupi) sendMembershipUpdates() {
96✔
799
        defer func() {
192✔
800
                glog.Infoln("Closing sendMembershipUpdates")
96✔
801
                g.closer.Done() // CLOSER:1
96✔
802
        }()
96✔
803

804
        ticker := time.NewTicker(time.Second)
96✔
805
        defer ticker.Stop()
96✔
806

96✔
807
        consumeTriggers := func() {
2,277✔
808
                for {
4,372✔
809
                        select {
2,191✔
810
                        case <-g.triggerCh:
10✔
811
                        default:
2,181✔
812
                                return
2,181✔
813
                        }
814
                }
815
        }
816

817
        g.triggerMembershipSync() // Ticker doesn't start immediately
96✔
818
        var lastSent time.Time
96✔
819
        for {
23,370✔
820
                select {
23,274✔
821
                case <-g.closer.HasBeenClosed():
96✔
822
                        return
96✔
823
                case <-ticker.C:
20,997✔
824
                        if time.Since(lastSent) > 10*time.Second {
22,898✔
825
                                // On start of node if it becomes a leader, we would send tablets size for sure.
1,901✔
826
                                g.triggerMembershipSync()
1,901✔
827
                        }
1,901✔
828
                case <-g.triggerCh:
2,181✔
829
                        // Let's send update even if not leader, zero will know that this node is still active.
2,181✔
830
                        // We don't need to send tablet information everytime. So, let's only send it when we
2,181✔
831
                        // calculate it.
2,181✔
832
                        consumeTriggers()
2,181✔
833
                        if err := g.doSendMembership(nil); err != nil {
2,206✔
834
                                glog.Errorf("While sending membership update: %v", err)
25✔
835
                        } else {
2,181✔
836
                                lastSent = time.Now()
2,156✔
837
                        }
2,156✔
838
                }
839
        }
840
}
841

842
// receiveMembershipUpdates receives membership updates from ANY Zero server. This is the main
843
// connection which tells Alpha about the state of the cluster, including the latest Zero leader.
844
// All the other connections to Zero, are made only to the leader.
845
func (g *groupi) receiveMembershipUpdates() {
96✔
846
        defer func() {
192✔
847
                glog.Infoln("Closing receiveMembershipUpdates")
96✔
848
                g.closer.Done() // CLOSER:1
96✔
849
        }()
96✔
850

851
        ticker := time.NewTicker(10 * time.Second)
96✔
852
        defer ticker.Stop()
96✔
853

96✔
854
START:
96✔
855
        select {
96✔
856
        case <-g.closer.HasBeenClosed():
96✔
857
                return
96✔
858
        default:
139✔
859
        }
860

861
        pl := g.connToZeroLeader()
139✔
862
        // We should always have some connection to dgraphzero.
139✔
863
        if pl == nil {
142✔
864
                glog.Warningln("Membership update: No Zero server known.")
3✔
865
                time.Sleep(time.Second)
3✔
866
                goto START
3✔
867
        }
868
        glog.Infof("Got address of a Zero leader: %s", pl.Addr)
136✔
869

136✔
870
        c := pb.NewZeroClient(pl.Get())
136✔
871
        ctx, cancel := context.WithCancel(g.Ctx())
136✔
872
        stream, err := c.StreamMembership(ctx, &api.Payload{})
136✔
873
        if err != nil {
169✔
874
                cancel()
33✔
875
                glog.Errorf("Error while calling update %v\n", err)
33✔
876
                time.Sleep(time.Second)
33✔
877
                goto START
33✔
878
        }
879

880
        stateCh := make(chan *pb.MembershipState, 10)
103✔
881
        go func() {
206✔
882
                glog.Infof("Starting a new membership stream receive from %s.", pl.Addr)
103✔
883
                for i := 0; ; i++ {
21,266✔
884
                        // Blocking, should return if sending on stream fails(Need to verify).
21,163✔
885
                        state, err := stream.Recv()
21,163✔
886
                        if err != nil || state == nil {
21,266✔
887
                                if err == io.EOF {
103✔
888
                                        glog.Infoln("Membership sync stream closed.")
×
889
                                } else {
103✔
890
                                        glog.Errorf("Unable to sync memberships. Error: %v. State: %v", err, state)
103✔
891
                                }
103✔
892
                                // If zero server is lagging behind leader.
893
                                if ctx.Err() == nil {
131✔
894
                                        cancel()
28✔
895
                                }
28✔
896
                                return
103✔
897
                        }
898
                        if i == 0 {
21,156✔
899
                                glog.Infof("Received first state update from Zero: %+v", state)
96✔
900
                                x.WriteCidFile(state.Cid)
96✔
901
                        }
96✔
902
                        select {
21,060✔
903
                        case stateCh <- state:
21,060✔
904
                        case <-ctx.Done():
×
905
                                return
×
906
                        }
907
                }
908
        }()
909

910
        lastRecv := time.Now()
103✔
911
OUTER:
103✔
912
        for {
23,321✔
913
                select {
23,218✔
914
                case <-g.closer.HasBeenClosed():
75✔
915
                        if err := stream.CloseSend(); err != nil {
75✔
916
                                glog.Errorf("Error closing send stream: %+v", err)
×
917
                        }
×
918
                        break OUTER
75✔
919
                case <-ctx.Done():
28✔
920
                        if err := stream.CloseSend(); err != nil {
28✔
921
                                glog.Errorf("Error closing send stream: %+v", err)
×
922
                        }
×
923
                        break OUTER
28✔
924
                case state := <-stateCh:
21,060✔
925
                        lastRecv = time.Now()
21,060✔
926
                        g.applyState(g.Node.Id, state)
21,060✔
927
                case <-ticker.C:
2,055✔
928
                        if time.Since(lastRecv) > 10*time.Second {
2,055✔
929
                                // Zero might have gone under partition. We should recreate our connection.
×
930
                                glog.Warningf("No membership update for 10s. Closing connection to Zero.")
×
931
                                if err := stream.CloseSend(); err != nil {
×
932
                                        glog.Errorf("Error closing send stream: %+v", err)
×
933
                                }
×
934
                                break OUTER
×
935
                        }
936
                }
937
        }
938
        cancel()
103✔
939
        goto START
103✔
940
}
941

942
// processOracleDeltaStream is used to process oracle delta stream from Zero.
943
// Zero sends information about aborted/committed transactions and maxPending.
944
func (g *groupi) processOracleDeltaStream() {
96✔
945
        defer func() {
192✔
946
                glog.Infoln("Closing processOracleDeltaStream")
96✔
947
                g.closer.Done() // CLOSER:1
96✔
948
        }()
96✔
949

950
        ticker := time.NewTicker(time.Second)
96✔
951
        defer ticker.Stop()
96✔
952

96✔
953
        blockingReceiveAndPropose := func() {
198✔
954
                glog.Infof("Leader idx=%#x of group=%d is connecting to Zero for txn updates\n",
102✔
955
                        g.Node.Id, g.groupId())
102✔
956

102✔
957
                pl := g.connToZeroLeader()
102✔
958
                if pl == nil {
106✔
959
                        glog.Warningln("Oracle delta stream: No Zero leader known.")
4✔
960
                        if g.IsClosed() {
8✔
961
                                return
4✔
962
                        }
4✔
963
                        time.Sleep(time.Second)
×
964
                        return
×
965
                }
966
                glog.Infof("Got Zero leader: %s", pl.Addr)
98✔
967

98✔
968
                // The following code creates a stream. Then runs a goroutine to pick up events from the
98✔
969
                // stream and pushes them to a channel. The main loop loops over the channel, doing smart
98✔
970
                // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of
98✔
971
                // times proposals happen, which is a great optimization to have (and a common one in our
98✔
972
                // code base).
98✔
973
                ctx, cancel := context.WithCancel(g.Ctx())
98✔
974
                defer cancel()
98✔
975

98✔
976
                c := pb.NewZeroClient(pl.Get())
98✔
977
                stream, err := c.Oracle(ctx, &api.Payload{})
98✔
978
                if err != nil {
120✔
979
                        glog.Errorf("Error while calling Oracle %v\n", err)
22✔
980
                        time.Sleep(time.Second)
22✔
981
                        return
22✔
982
                }
22✔
983

984
                deltaCh := make(chan *pb.OracleDelta, 100)
76✔
985
                go func() {
152✔
986
                        // This would exit when either a Recv() returns error. Or, cancel() is called by
76✔
987
                        // something outside of this goroutine.
76✔
988
                        defer func() {
152✔
989
                                if err := stream.CloseSend(); err != nil {
76✔
990
                                        glog.Errorf("Error closing send stream: %+v", err)
×
991
                                }
×
992
                        }()
993
                        defer close(deltaCh)
76✔
994

76✔
995
                        for {
149,216✔
996
                                delta, err := stream.Recv()
149,140✔
997
                                if err != nil || delta == nil {
149,216✔
998
                                        glog.Errorf("Error in oracle delta stream. Error: %v", err)
76✔
999
                                        return
76✔
1000
                                }
76✔
1001

1002
                                select {
149,064✔
1003
                                case deltaCh <- delta:
149,064✔
1004
                                case <-ctx.Done():
×
1005
                                        return
×
1006
                                }
1007
                        }
1008
                }()
1009

1010
                for {
143,566✔
1011
                        var delta *pb.OracleDelta
143,490✔
1012
                        var batch int
143,490✔
1013
                        select {
143,490✔
1014
                        case delta = <-deltaCh:
134,891✔
1015
                                if delta == nil {
134,909✔
1016
                                        return
18✔
1017
                                }
18✔
1018
                                batch++
134,873✔
1019
                        case <-ticker.C:
8,541✔
1020
                                newLead := g.Leader(0)
8,541✔
1021
                                if newLead == nil || newLead.Addr != pl.Addr {
8,541✔
1022
                                        glog.Infof("Zero leadership changed. Renewing oracle delta stream.")
×
1023
                                        return
×
1024
                                }
×
1025
                                continue
8,541✔
1026

1027
                        case <-ctx.Done():
1✔
1028
                                return
1✔
1029
                        case <-g.closer.HasBeenClosed():
57✔
1030
                                return
57✔
1031
                        }
1032

1033
                SLURP:
134,873✔
1034
                        for {
283,937✔
1035
                                select {
149,064✔
1036
                                case more := <-deltaCh:
14,191✔
1037
                                        if more == nil {
14,191✔
1038
                                                return
×
1039
                                        }
×
1040
                                        batch++
14,191✔
1041
                                        if delta.GroupChecksums == nil {
14,191✔
1042
                                                delta.GroupChecksums = make(map[uint32]uint64)
×
1043
                                        }
×
1044
                                        delta.Txns = append(delta.Txns, more.Txns...)
14,191✔
1045
                                        delta.MaxAssigned = x.Max(delta.MaxAssigned, more.MaxAssigned)
14,191✔
1046
                                        for gid, checksum := range more.GroupChecksums {
41,073✔
1047
                                                delta.GroupChecksums[gid] = checksum
26,882✔
1048
                                        }
26,882✔
1049
                                default:
134,873✔
1050
                                        break SLURP
134,873✔
1051
                                }
1052
                        }
1053

1054
                        // Only the leader needs to propose the oracleDelta retrieved from Zero.
1055
                        // The leader and the followers would not directly apply or use the
1056
                        // oracleDelta streaming in from Zero. They would wait for the proposal to
1057
                        // go through and be applied via node.Run.  This saves us from many edge
1058
                        // cases around network partitions and race conditions between prewrites and
1059
                        // commits, etc.
1060
                        if !g.Node.AmLeader() {
134,873✔
1061
                                glog.Errorf("No longer the leader of group %d. Exiting", g.groupId())
×
1062
                                return
×
1063
                        }
×
1064

1065
                        // We should always sort the txns before applying. Otherwise, we might lose some of
1066
                        // these updates, because we never write over a new version.
1067
                        sort.Slice(delta.Txns, func(i, j int) bool {
138,868✔
1068
                                return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
3,995✔
1069
                        })
3,995✔
1070
                        if len(delta.Txns) > 0 {
154,074✔
1071
                                last := delta.Txns[len(delta.Txns)-1]
19,201✔
1072
                                // Update MaxAssigned on commit so best effort queries can get back latest data.
19,201✔
1073
                                delta.MaxAssigned = x.Max(delta.MaxAssigned, last.CommitTs)
19,201✔
1074
                        }
19,201✔
1075
                        if glog.V(3) {
139,268✔
1076
                                glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:",
4,395✔
1077
                                        batch, delta.MaxAssigned)
4,395✔
1078
                                for _, txn := range delta.Txns {
5,135✔
1079
                                        if txn.CommitTs == 0 {
776✔
1080
                                                glog.Infof("Aborted: %d", txn.StartTs)
36✔
1081
                                        } else {
740✔
1082
                                                glog.Infof("Committed: %d -> %d", txn.StartTs, txn.CommitTs)
704✔
1083
                                        }
704✔
1084
                                }
1085
                        }
1086
                        for {
269,746✔
1087
                                // Block forever trying to propose this. Also this proposal should not be counted
134,873✔
1088
                                // towards num pending proposals and be proposed right away.
134,873✔
1089
                                err := g.Node.proposeAndWait(g.Ctx(), &pb.Proposal{Delta: delta})
134,873✔
1090
                                if err == nil {
269,745✔
1091
                                        break
134,872✔
1092
                                }
1093
                                if g.Ctx().Err() != nil {
2✔
1094
                                        break
1✔
1095
                                }
1096
                                glog.Errorf("While proposing delta with MaxAssigned: %d and num txns: %d."+
×
1097
                                        " Error=%v. Retrying...\n", delta.MaxAssigned, len(delta.Txns), err)
×
1098
                        }
1099
                }
1100
        }
1101

1102
        for {
12,633✔
1103
                select {
12,537✔
1104
                case <-g.closer.HasBeenClosed():
96✔
1105
                        return
96✔
1106
                case <-ticker.C:
12,441✔
1107
                        // Only the leader needs to connect to Zero and get transaction
12,441✔
1108
                        // updates.
12,441✔
1109
                        if g.Node.AmLeader() {
12,543✔
1110
                                blockingReceiveAndPropose()
102✔
1111
                        }
102✔
1112
                }
1113
        }
1114
}
1115

1116
// GetEEFeaturesList returns a list of Enterprise Features that are available.
1117
func GetEEFeaturesList() []string {
174✔
1118
        if !EnterpriseEnabled() {
208✔
1119
                return nil
34✔
1120
        }
34✔
1121
        var ee []string
140✔
1122
        if len(Config.HmacSecret) > 0 {
196✔
1123
                ee = append(ee, "acl")
56✔
1124
                ee = append(ee, "multi_tenancy")
56✔
1125
        }
56✔
1126
        if x.WorkerConfig.EncryptionKey != nil {
194✔
1127
                ee = append(ee, "encryption_at_rest", "encrypted_backup_restore", "encrypted_export")
54✔
1128
        } else {
140✔
1129
                ee = append(ee, "backup_restore")
86✔
1130
        }
86✔
1131
        if x.WorkerConfig.Audit {
142✔
1132
                ee = append(ee, "audit")
2✔
1133
        }
2✔
1134
        if Config.ChangeDataConf != "" {
280✔
1135
                ee = append(ee, "cdc")
140✔
1136
        }
140✔
1137
        return ee
140✔
1138
}
1139

1140
// EnterpriseEnabled returns whether enterprise features can be used or not.
1141
func EnterpriseEnabled() bool {
724✔
1142
        if !enc.EeBuild {
724✔
1143
                return false
×
1144
        }
×
1145
        state := GetMembershipState()
724✔
1146
        if state == nil {
746✔
1147
                return groups().askZeroForEE()
22✔
1148
        }
22✔
1149
        return state.GetLicense().GetEnabled()
702✔
1150
}
1151

1152
func (g *groupi) askZeroForEE() bool {
22✔
1153
        var err error
22✔
1154
        var connState *pb.ConnectionState
22✔
1155

22✔
1156
        createConn := func() bool {
76✔
1157
                pl := g.connToZeroLeader()
54✔
1158
                if pl == nil {
54✔
1159
                        return false
×
1160
                }
×
1161
                zc := pb.NewZeroClient(pl.Get())
54✔
1162

54✔
1163
                ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second)
54✔
1164
                defer cancel()
54✔
1165

54✔
1166
                connState, err = zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true})
54✔
1167
                if connState == nil ||
54✔
1168
                        connState.GetState() == nil ||
54✔
1169
                        connState.GetState().GetLicense() == nil {
86✔
1170
                        glog.Info("Retry Zero Connection")
32✔
1171
                        return false
32✔
1172
                }
32✔
1173
                if err == nil || x.ShouldCrash(err) {
44✔
1174
                        return true
22✔
1175
                }
22✔
1176
                return false
×
1177
        }
1178

1179
        for !g.IsClosed() {
76✔
1180
                if createConn() {
76✔
1181
                        break
22✔
1182
                }
1183
                time.Sleep(time.Second)
32✔
1184
        }
1185
        return connState.GetState().GetLicense().GetEnabled()
22✔
1186
}
1187

1188
// SubscribeForUpdates will listen for updates for the given group.
1189
func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
1190
        group uint32, closer *z.Closer) {
124✔
1191

124✔
1192
        var prefix []byte
124✔
1193
        if len(prefixes) > 0 {
248✔
1194
                prefix = prefixes[0]
124✔
1195
        }
124✔
1196
        defer func() {
248✔
1197
                glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix)
124✔
1198
                closer.Done()
124✔
1199
        }()
124✔
1200

1201
        listen := func() error {
653✔
1202
                // Connect to any of the group 1 nodes.
529✔
1203
                members := groups().AnyTwoServers(group)
529✔
1204
                // There may be a lag while starting so keep retrying.
529✔
1205
                if len(members) == 0 {
891✔
1206
                        return fmt.Errorf("unable to find any servers for group: %d", group)
362✔
1207
                }
362✔
1208
                pool := conn.GetPools().Connect(members[0], x.WorkerConfig.TLSClientConfig)
167✔
1209
                client := pb.NewWorkerClient(pool.Get())
167✔
1210

167✔
1211
                // Get Subscriber stream.
167✔
1212
                stream, err := client.Subscribe(closer.Ctx(),
167✔
1213
                        &pb.SubscriptionRequest{Matches: x.PrefixesToMatches(prefixes, ignore)})
167✔
1214
                if err != nil {
196✔
1215
                        return errors.Wrapf(err, "error from client.subscribe")
29✔
1216
                }
29✔
1217
                for {
15,321✔
1218
                        // Listen for updates.
15,183✔
1219
                        kvs, err := stream.Recv()
15,183✔
1220
                        if err != nil {
15,321✔
1221
                                return errors.Wrapf(err, "while receiving from stream")
138✔
1222
                        }
138✔
1223
                        cb(kvs)
15,045✔
1224
                }
1225
        }
1226

1227
        for {
653✔
1228
                if err := listen(); err != nil {
1,058✔
1229
                        glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n",
529✔
1230
                                prefix, err, closer.Ctx().Err())
529✔
1231
                }
529✔
1232
                if closer.Ctx().Err() != nil {
653✔
1233
                        return
124✔
1234
                }
124✔
1235
                time.Sleep(time.Second)
405✔
1236
        }
1237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc