• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5985667653

26 Aug 2023 03:46PM UTC coverage: 66.655% (-0.5%) from 67.169%
5985667653

push

web-flow
chore(deps): bump github.com/apache/thrift from 0.12.0 to 0.13.0 (#8982)

* Bumps github.com/apache/thrift from 0.12.0 to 0.13.0

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

58142 of 87228 relevant lines covered (66.66%)

2209533.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.79
/dgraph/cmd/zero/tablet.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "fmt"
22
        "sort"
23
        "time"
24

25
        humanize "github.com/dustin/go-humanize"
26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/dgraph/protos/pb"
31
        "github.com/dgraph-io/dgraph/x"
32
)
33

34
const (
35
        predicateMoveTimeout = 120 * time.Minute
36
)
37

38
/*
39
Steps to move predicate p from g1 to g2.
40
Design change:
41
• If you’re not the leader, don’t talk to zero.
42
• Let the leader send you updates via proposals.
43

44
Move:
45
• Dgraph zero would decide that G1 should not serve P, G2 should serve it.
46
• Zero would propose that G1 is read-only for predicate P. This would propagate to the cluster.
47

48
• Zero would tell G1 to move P to G2 (Endpoint: Zero → G1)
49

50
This would trigger G1 to get latest state. Wait for it.
51
• G1 would propose this state to it’s followers.
52
• G1 after proposing would do a call to G2, and start streaming.
53
• Before G2 starts accepting, it should delete any current keys for P.
54
• It should tell Zero whether it succeeded or failed. (Endpoint: G1 → Zero)
55

56
• Zero would then propose that G2 is serving P (or G1 is, if fail above) P would RW.
57
• G1 gets this, G2 gets this.
58
• Both propagate this to their followers.
59

60
*/
61

62
// TODO: Have a event log for everything.
63
func (s *Server) rebalanceTablets() {
66✔
64
        ticker := time.NewTicker(opts.rebalanceInterval)
66✔
65
        for range ticker.C {
81✔
66
                predicate, srcGroup, dstGroup := s.chooseTablet()
15✔
67
                if len(predicate) == 0 {
30✔
68
                        continue
15✔
69
                }
70
                if err := s.movePredicate(predicate, srcGroup, dstGroup); err != nil {
×
71
                        glog.Errorln(err)
×
72
                }
×
73
        }
74
}
75

76
// MoveTablet can be used to move a tablet to a specific group.
77
// It takes in tablet and destination group as argument.
78
// It returns a *pb.Status to be used by the `/moveTablet` HTTP handler in Zero.
79
func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb.Status, error) {
1,009✔
80
        if !s.Node.AmLeader() {
1,009✔
81
                return &pb.Status{Code: 1, Msg: x.Error}, errNotLeader
×
82
        }
×
83

84
        knownGroups := s.KnownGroups()
1,009✔
85
        var isKnown bool
1,009✔
86
        for _, grp := range knownGroups {
2,026✔
87
                if grp == req.DstGroup {
1,025✔
88
                        isKnown = true
8✔
89
                        break
8✔
90
                }
91
        }
92
        if !isKnown {
2,010✔
93
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
1,001✔
94
                        fmt.Errorf("group: [%d] is not a known group", req.DstGroup)
1,001✔
95
        }
1,001✔
96

97
        tablet := x.NamespaceAttr(req.Namespace, req.Tablet)
8✔
98
        tab := s.ServingTablet(tablet)
8✔
99
        if tab == nil {
8✔
100
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
×
101
                        fmt.Errorf("namespace: %d. No tablet found for: %s", req.Namespace, req.Tablet)
×
102
        }
×
103

104
        srcGroup := tab.GroupId
8✔
105
        if srcGroup == req.DstGroup {
13✔
106
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
5✔
107
                        fmt.Errorf("namespace: %d. Tablet: [%s] is already being served by group: [%d]",
5✔
108
                                req.Namespace, req.Tablet, srcGroup)
5✔
109
        }
5✔
110

111
        if err := s.movePredicate(tablet, srcGroup, req.DstGroup); err != nil {
3✔
112
                glog.Errorf("namespace: %d. While moving predicate %s from %d -> %d. Error: %v",
×
113
                        req.Namespace, req.Tablet, srcGroup, req.DstGroup, err)
×
114
                return &pb.Status{Code: 1, Msg: x.Error}, err
×
115
        }
×
116

117
        return &pb.Status{Code: 0, Msg: fmt.Sprintf("namespace: %d. "+
3✔
118
                "Predicate: [%s] moved from group [%d] to [%d]", req.Namespace, req.Tablet, srcGroup,
3✔
119
                req.DstGroup)}, nil
3✔
120
}
121

122
// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
123
// for the entire duration of predicate move. If this Zero stops being the leader, the final
124
// proposal of reassigning the tablet to the destination would fail automatically.
125
func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
3✔
126
        s.moveOngoing <- struct{}{}
3✔
127
        defer func() {
6✔
128
                <-s.moveOngoing
3✔
129
        }()
3✔
130

131
        ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout)
3✔
132
        defer cancel()
3✔
133

3✔
134
        ctx, span := otrace.StartSpan(ctx, "Zero.MovePredicate")
3✔
135
        defer span.End()
3✔
136

3✔
137
        // Ensure that reserved predicates cannot be moved.
3✔
138
        if x.IsReservedPredicate(predicate) {
3✔
139
                return errors.Errorf("Unable to move reserved predicate %s", predicate)
×
140
        }
×
141

142
        // Ensure that I'm connected to the rest of the Zero group, and am the leader.
143
        if _, err := s.latestMembershipState(ctx); err != nil {
3✔
144
                return errors.Wrapf(err, "unable to reach quorum")
×
145
        }
×
146
        if !s.Node.AmLeader() {
3✔
147
                return errors.Errorf("I am not the Zero leader")
×
148
        }
×
149
        tab := s.ServingTablet(predicate)
3✔
150
        if tab == nil {
3✔
151
                return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
×
152
        }
×
153
        msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
3✔
154
                " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
3✔
155
                humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
3✔
156
        glog.Info(msg)
3✔
157
        span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg)
3✔
158

3✔
159
        // Block all commits on this predicate. Keep them blocked until we return from this function.
3✔
160
        unblock := s.blockTablet(predicate)
3✔
161
        defer unblock()
3✔
162

3✔
163
        // Get a new timestamp, beyond which we are sure that no new txns would be committed for this
3✔
164
        // predicate. Source Alpha leader must reach this timestamp before streaming the data.
3✔
165
        ids, err := s.Timestamps(ctx, &pb.Num{Val: 1})
3✔
166
        if err != nil || ids.StartId == 0 {
3✔
167
                return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids)
×
168
        }
×
169

170
        // Get connection to leader of source group.
171
        pl := s.Leader(srcGroup)
3✔
172
        if pl == nil {
3✔
173
                return errors.Errorf("No healthy connection found to leader of group %d", srcGroup)
×
174
        }
×
175
        wc := pb.NewWorkerClient(pl.Get())
3✔
176
        in := &pb.MovePredicatePayload{
3✔
177
                Predicate: predicate,
3✔
178
                SourceGid: srcGroup,
3✔
179
                DestGid:   dstGroup,
3✔
180
                TxnTs:     ids.StartId,
3✔
181
        }
3✔
182
        span.Annotatef(nil, "Starting move: %+v", in)
3✔
183
        glog.Infof("Starting move: %+v", in)
3✔
184
        if _, err := wc.MovePredicate(ctx, in); err != nil {
3✔
185
                return errors.Wrapf(err, "while calling MovePredicate")
×
186
        }
×
187

188
        p := &pb.ZeroProposal{}
3✔
189
        p.Tablet = &pb.Tablet{
3✔
190
                GroupId:           dstGroup,
3✔
191
                Predicate:         predicate,
3✔
192
                OnDiskBytes:       tab.OnDiskBytes,
3✔
193
                UncompressedBytes: tab.UncompressedBytes,
3✔
194
                Force:             true,
3✔
195
                MoveTs:            in.TxnTs,
3✔
196
        }
3✔
197
        msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p)
3✔
198
        span.Annotate(nil, msg)
3✔
199
        glog.Info(msg)
3✔
200
        if err := s.Node.proposeAndWait(ctx, p); err != nil {
3✔
201
                return errors.Wrapf(err, "while proposing tablet reassignment. Proposal: %+v", p)
×
202
        }
×
203
        msg = fmt.Sprintf("Predicate move done for: [%v] from group %d to %d\n",
3✔
204
                predicate, srcGroup, dstGroup)
3✔
205
        glog.Info(msg)
3✔
206
        span.Annotate(nil, msg)
3✔
207

3✔
208
        // Now that the move has happened, we can delete the predicate from the source group. But before
3✔
209
        // doing that, we should ensure the source group understands that the predicate is now being
3✔
210
        // served by the destination group. For that, we pass in the expected checksum for the source
3✔
211
        // group. Only once the source group membership checksum matches, would the source group delete
3✔
212
        // the predicate. This ensures that it does not service any transaction after deletion of data.
3✔
213
        checksums := s.groupChecksums()
3✔
214
        in.ExpectedChecksum = checksums[in.SourceGid]
3✔
215
        in.DestGid = 0 // Indicates deletion of predicate in the source group.
3✔
216
        if _, err := wc.MovePredicate(ctx, in); err != nil {
3✔
217
                msg = fmt.Sprintf("While deleting predicate [%v] in group %d. Error: %v",
×
218
                        in.Predicate, in.SourceGid, err)
×
219
                span.Annotate(nil, msg)
×
220
                glog.Warningf(msg)
×
221
        } else {
3✔
222
                msg = fmt.Sprintf("Deleted predicate %v in group %d", in.Predicate, in.SourceGid)
3✔
223
                span.Annotate(nil, msg)
3✔
224
                glog.V(1).Infof(msg)
3✔
225
        }
3✔
226
        return nil
3✔
227
}
228

229
func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uint32) {
15✔
230
        s.RLock()
15✔
231
        defer s.RUnlock()
15✔
232
        if s.state == nil {
15✔
233
                return
×
234
        }
×
235
        numGroups := len(s.state.Groups)
15✔
236
        if !s.Node.AmLeader() || numGroups <= 1 {
25✔
237
                return
10✔
238
        }
10✔
239

240
        // Sort all groups by their sizes.
241
        type kv struct {
5✔
242
                gid  uint32
5✔
243
                size int64 // in bytes
5✔
244
        }
5✔
245
        var groups []kv
5✔
246
        for k, v := range s.state.Groups {
15✔
247
                space := int64(0)
10✔
248
                for _, tab := range v.Tablets {
420✔
249
                        space += tab.OnDiskBytes
410✔
250
                }
410✔
251
                groups = append(groups, kv{k, space})
10✔
252
        }
253
        sort.Slice(groups, func(i, j int) bool {
10✔
254
                return groups[i].size < groups[j].size
5✔
255
        })
5✔
256

257
        glog.Infof("\n\nGroups sorted by size: %+v\n\n", groups)
5✔
258
        for lastGroup := numGroups - 1; lastGroup > 0; lastGroup-- {
10✔
259
                srcGroup = groups[lastGroup].gid
5✔
260
                dstGroup = groups[0].gid
5✔
261
                sizeDiff := groups[lastGroup].size - groups[0].size
5✔
262
                glog.Infof("size_diff %v\n", sizeDiff)
5✔
263
                // Don't move a node unless you receive atleast one update regarding tablet size.
5✔
264
                // Tablet size would have come up with leader update.
5✔
265
                if !s.hasLeader(dstGroup) {
5✔
266
                        return
×
267
                }
×
268
                // We move the predicate only if the difference between size of both machines is
269
                // atleast 10% of dst group.
270
                if float64(sizeDiff) < 0.1*float64(groups[0].size) {
5✔
271
                        continue
×
272
                }
273

274
                // Try to find a predicate which we can move.
275
                size := int64(0)
5✔
276
                group := s.state.Groups[srcGroup]
5✔
277
                for _, tab := range group.Tablets {
5✔
278
                        // Reserved predicates should always be in group 1 so do not re-balance them.
×
279
                        if x.IsReservedPredicate(tab.Predicate) {
×
280
                                continue
×
281
                        }
282

283
                        // Finds a tablet as big a possible such that on moving it dstGroup's size is
284
                        // less than or equal to srcGroup.
285
                        if tab.OnDiskBytes <= sizeDiff/2 && tab.OnDiskBytes > size {
×
286
                                predicate = tab.Predicate
×
287
                                size = tab.OnDiskBytes
×
288
                        }
×
289
                }
290
                if len(predicate) > 0 {
5✔
291
                        return
×
292
                }
×
293
        }
294
        return
5✔
295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc