• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5078820494

25 May 2023 11:19AM UTC coverage: 67.259% (-0.009%) from 67.268%
5078820494

push

GitHub
dgraphtest: print container logs if the test fails (#8829)

58396 of 86823 relevant lines covered (67.26%)

2270884.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.79
/dgraph/cmd/zero/tablet.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "fmt"
22
        "sort"
23
        "time"
24

25
        humanize "github.com/dustin/go-humanize"
26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/dgraph/protos/pb"
31
        "github.com/dgraph-io/dgraph/x"
32
)
33

34
const (
35
        predicateMoveTimeout = 120 * time.Minute
36
)
37

38
/*
39
Steps to move predicate p from g1 to g2.
40
Design change:
41
• If you’re not the leader, don’t talk to zero.
42
• Let the leader send you updates via proposals.
43

44
Move:
45
• Dgraph zero would decide that G1 should not serve P, G2 should serve it.
46
• Zero would propose that G1 is read-only for predicate P. This would propagate to the cluster.
47

48
• Zero would tell G1 to move P to G2 (Endpoint: Zero → G1)
49

50
This would trigger G1 to get latest state. Wait for it.
51
• G1 would propose this state to it’s followers.
52
• G1 after proposing would do a call to G2, and start streaming.
53
• Before G2 starts accepting, it should delete any current keys for P.
54
• It should tell Zero whether it succeeded or failed. (Endpoint: G1 → Zero)
55

56
• Zero would then propose that G2 is serving P (or G1 is, if fail above) P would RW.
57
• G1 gets this, G2 gets this.
58
• Both propagate this to their followers.
59

60
*/
61

62
// TODO: Have a event log for everything.
63
func (s *Server) rebalanceTablets() {
66✔
64
        ticker := time.NewTicker(opts.rebalanceInterval)
66✔
65
        for range ticker.C {
81✔
66
                predicate, srcGroup, dstGroup := s.chooseTablet()
15✔
67
                if len(predicate) == 0 {
30✔
68
                        continue
15✔
69
                }
70
                if err := s.movePredicate(predicate, srcGroup, dstGroup); err != nil {
×
71
                        glog.Errorln(err)
×
72
                }
×
73
        }
74
}
75

76
// MoveTablet can be used to move a tablet to a specific group.
77
// It takes in tablet and destination group as argument.
78
// It returns a *pb.Status to be used by the `/moveTablet` HTTP handler in Zero.
79
func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb.Status, error) {
1,009✔
80
        if !s.Node.AmLeader() {
1,009✔
81
                return &pb.Status{Code: 1, Msg: x.Error}, errNotLeader
×
82
        }
×
83

84
        knownGroups := s.KnownGroups()
1,009✔
85
        var isKnown bool
1,009✔
86
        for _, grp := range knownGroups {
2,023✔
87
                if grp == req.DstGroup {
1,022✔
88
                        isKnown = true
8✔
89
                        break
8✔
90
                }
91
        }
92
        if !isKnown {
2,010✔
93
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
1,001✔
94
                        fmt.Errorf("group: [%d] is not a known group", req.DstGroup)
1,001✔
95
        }
1,001✔
96

97
        tablet := x.NamespaceAttr(req.Namespace, req.Tablet)
8✔
98
        tab := s.ServingTablet(tablet)
8✔
99
        if tab == nil {
8✔
100
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
×
101
                        fmt.Errorf("namespace: %d. No tablet found for: %s", req.Namespace, req.Tablet)
×
102
        }
×
103

104
        srcGroup := tab.GroupId
8✔
105
        if srcGroup == req.DstGroup {
14✔
106
                return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
6✔
107
                        fmt.Errorf("namespace: %d. Tablet: [%s] is already being served by group: [%d]",
6✔
108
                                req.Namespace, req.Tablet, srcGroup)
6✔
109
        }
6✔
110

111
        if err := s.movePredicate(tablet, srcGroup, req.DstGroup); err != nil {
2✔
112
                glog.Errorf("namespace: %d. While moving predicate %s from %d -> %d. Error: %v",
×
113
                        req.Namespace, req.Tablet, srcGroup, req.DstGroup, err)
×
114
                return &pb.Status{Code: 1, Msg: x.Error}, err
×
115
        }
×
116

117
        return &pb.Status{Code: 0, Msg: fmt.Sprintf("namespace: %d. "+
2✔
118
                "Predicate: [%s] moved from group [%d] to [%d]", req.Namespace, req.Tablet, srcGroup,
2✔
119
                req.DstGroup)}, nil
2✔
120
}
121

122
// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
123
// for the entire duration of predicate move. If this Zero stops being the leader, the final
124
// proposal of reassigning the tablet to the destination would fail automatically.
125
func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
2✔
126
        s.moveOngoing <- struct{}{}
2✔
127
        defer func() {
4✔
128
                <-s.moveOngoing
2✔
129
        }()
2✔
130

131
        ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout)
2✔
132
        defer cancel()
2✔
133

2✔
134
        ctx, span := otrace.StartSpan(ctx, "Zero.MovePredicate")
2✔
135
        defer span.End()
2✔
136

2✔
137
        // Ensure that reserved predicates cannot be moved.
2✔
138
        if x.IsReservedPredicate(predicate) {
2✔
139
                return errors.Errorf("Unable to move reserved predicate %s", predicate)
×
140
        }
×
141

142
        // Ensure that I'm connected to the rest of the Zero group, and am the leader.
143
        if _, err := s.latestMembershipState(ctx); err != nil {
2✔
144
                return errors.Wrapf(err, "unable to reach quorum")
×
145
        }
×
146
        if !s.Node.AmLeader() {
2✔
147
                return errors.Errorf("I am not the Zero leader")
×
148
        }
×
149
        tab := s.ServingTablet(predicate)
2✔
150
        if tab == nil {
2✔
151
                return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
×
152
        }
×
153
        msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
2✔
154
                " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
2✔
155
                humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
2✔
156
        glog.Info(msg)
2✔
157
        span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg)
2✔
158

2✔
159
        // Block all commits on this predicate. Keep them blocked until we return from this function.
2✔
160
        unblock := s.blockTablet(predicate)
2✔
161
        defer unblock()
2✔
162

2✔
163
        // Get a new timestamp, beyond which we are sure that no new txns would be committed for this
2✔
164
        // predicate. Source Alpha leader must reach this timestamp before streaming the data.
2✔
165
        ids, err := s.Timestamps(ctx, &pb.Num{Val: 1})
2✔
166
        if err != nil || ids.StartId == 0 {
2✔
167
                return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids)
×
168
        }
×
169

170
        // Get connection to leader of source group.
171
        pl := s.Leader(srcGroup)
2✔
172
        if pl == nil {
2✔
173
                return errors.Errorf("No healthy connection found to leader of group %d", srcGroup)
×
174
        }
×
175
        wc := pb.NewWorkerClient(pl.Get())
2✔
176
        in := &pb.MovePredicatePayload{
2✔
177
                Predicate: predicate,
2✔
178
                SourceGid: srcGroup,
2✔
179
                DestGid:   dstGroup,
2✔
180
                TxnTs:     ids.StartId,
2✔
181
        }
2✔
182
        span.Annotatef(nil, "Starting move: %+v", in)
2✔
183
        glog.Infof("Starting move: %+v", in)
2✔
184
        if _, err := wc.MovePredicate(ctx, in); err != nil {
2✔
185
                return errors.Wrapf(err, "while calling MovePredicate")
×
186
        }
×
187

188
        p := &pb.ZeroProposal{}
2✔
189
        p.Tablet = &pb.Tablet{
2✔
190
                GroupId:           dstGroup,
2✔
191
                Predicate:         predicate,
2✔
192
                OnDiskBytes:       tab.OnDiskBytes,
2✔
193
                UncompressedBytes: tab.UncompressedBytes,
2✔
194
                Force:             true,
2✔
195
                MoveTs:            in.TxnTs,
2✔
196
        }
2✔
197
        msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p)
2✔
198
        span.Annotate(nil, msg)
2✔
199
        glog.Info(msg)
2✔
200
        if err := s.Node.proposeAndWait(ctx, p); err != nil {
2✔
201
                return errors.Wrapf(err, "while proposing tablet reassignment. Proposal: %+v", p)
×
202
        }
×
203
        msg = fmt.Sprintf("Predicate move done for: [%v] from group %d to %d\n",
2✔
204
                predicate, srcGroup, dstGroup)
2✔
205
        glog.Info(msg)
2✔
206
        span.Annotate(nil, msg)
2✔
207

2✔
208
        // Now that the move has happened, we can delete the predicate from the source group. But before
2✔
209
        // doing that, we should ensure the source group understands that the predicate is now being
2✔
210
        // served by the destination group. For that, we pass in the expected checksum for the source
2✔
211
        // group. Only once the source group membership checksum matches, would the source group delete
2✔
212
        // the predicate. This ensures that it does not service any transaction after deletion of data.
2✔
213
        checksums := s.groupChecksums()
2✔
214
        in.ExpectedChecksum = checksums[in.SourceGid]
2✔
215
        in.DestGid = 0 // Indicates deletion of predicate in the source group.
2✔
216
        if _, err := wc.MovePredicate(ctx, in); err != nil {
2✔
217
                msg = fmt.Sprintf("While deleting predicate [%v] in group %d. Error: %v",
×
218
                        in.Predicate, in.SourceGid, err)
×
219
                span.Annotate(nil, msg)
×
220
                glog.Warningf(msg)
×
221
        } else {
2✔
222
                msg = fmt.Sprintf("Deleted predicate %v in group %d", in.Predicate, in.SourceGid)
2✔
223
                span.Annotate(nil, msg)
2✔
224
                glog.V(1).Infof(msg)
2✔
225
        }
2✔
226
        return nil
2✔
227
}
228

229
func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uint32) {
15✔
230
        s.RLock()
15✔
231
        defer s.RUnlock()
15✔
232
        if s.state == nil {
15✔
233
                return
×
234
        }
×
235
        numGroups := len(s.state.Groups)
15✔
236
        if !s.Node.AmLeader() || numGroups <= 1 {
25✔
237
                return
10✔
238
        }
10✔
239

240
        // Sort all groups by their sizes.
241
        type kv struct {
5✔
242
                gid  uint32
5✔
243
                size int64 // in bytes
5✔
244
        }
5✔
245
        var groups []kv
5✔
246
        for k, v := range s.state.Groups {
15✔
247
                space := int64(0)
10✔
248
                for _, tab := range v.Tablets {
427✔
249
                        space += tab.OnDiskBytes
417✔
250
                }
417✔
251
                groups = append(groups, kv{k, space})
10✔
252
        }
253
        sort.Slice(groups, func(i, j int) bool {
10✔
254
                return groups[i].size < groups[j].size
5✔
255
        })
5✔
256

257
        glog.Infof("\n\nGroups sorted by size: %+v\n\n", groups)
5✔
258
        for lastGroup := numGroups - 1; lastGroup > 0; lastGroup-- {
10✔
259
                srcGroup = groups[lastGroup].gid
5✔
260
                dstGroup = groups[0].gid
5✔
261
                sizeDiff := groups[lastGroup].size - groups[0].size
5✔
262
                glog.Infof("size_diff %v\n", sizeDiff)
5✔
263
                // Don't move a node unless you receive atleast one update regarding tablet size.
5✔
264
                // Tablet size would have come up with leader update.
5✔
265
                if !s.hasLeader(dstGroup) {
5✔
266
                        return
×
267
                }
×
268
                // We move the predicate only if the difference between size of both machines is
269
                // atleast 10% of dst group.
270
                if float64(sizeDiff) < 0.1*float64(groups[0].size) {
5✔
271
                        continue
×
272
                }
273

274
                // Try to find a predicate which we can move.
275
                size := int64(0)
5✔
276
                group := s.state.Groups[srcGroup]
5✔
277
                for _, tab := range group.Tablets {
5✔
278
                        // Reserved predicates should always be in group 1 so do not re-balance them.
×
279
                        if x.IsReservedPredicate(tab.Predicate) {
×
280
                                continue
×
281
                        }
282

283
                        // Finds a tablet as big a possible such that on moving it dstGroup's size is
284
                        // less than or equal to srcGroup.
285
                        if tab.OnDiskBytes <= sizeDiff/2 && tab.OnDiskBytes > size {
×
286
                                predicate = tab.Predicate
×
287
                                size = tab.OnDiskBytes
×
288
                        }
×
289
                }
290
                if len(predicate) > 0 {
5✔
291
                        return
×
292
                }
×
293
        }
294
        return
5✔
295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc