• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-vshard-router / 14330428590

08 Apr 2025 09:46AM UTC coverage: 69.071% (-0.7%) from 69.723%
14330428590

push

github

KaymeKaydex
hotfix: cmake for testing

1130 of 1636 relevant lines covered (69.07%)

12940.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.5
/discovery.go
1
package vshard_router //nolint:revive
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "runtime/debug"
8
        "time"
9

10
        "golang.org/x/sync/errgroup"
11

12
        "github.com/tarantool/go-tarantool/v2"
13
)
14

15
// --------------------------------------------------------------------------------
16
// -- Discovery
17
// --------------------------------------------------------------------------------
18

19
type DiscoveryMode int
20

21
const (
22
        // DiscoveryModeOn is cron discovery with cron timeout
23
        DiscoveryModeOn DiscoveryMode = iota
24
        DiscoveryModeOnce
25
)
26

27
// BucketsSearchMode a type, that used to define policy for Router.Route method.
28
// See type Config for further details.
29
type BucketsSearchMode int
30

31
const (
32
        // BucketsSearchLegacy implements the same logic as lua router:
33
        // send bucket_stat request to every replicaset,
34
        // return a response immediately if any of them succeed.
35
        BucketsSearchLegacy BucketsSearchMode = iota
36
        // BucketsSearchBatchedQuick and BucketsSearchBatchedFull implement another logic:
37
        // send buckets_discovery request to every replicaset with from=bucketID,
38
        // seek our bucketID in their responses.
39
        // Additionally, store other bucketIDs in the route map.
40
        // BucketsSearchBatchedQuick stops iterating over replicasets responses as soon as our bucketID is found.
41
        BucketsSearchBatchedQuick
42
        // BucketsSearchBatchedFull implements the same logic as BucketsSearchBatchedQuick,
43
        // but doesn't stop iterating over replicasets responses as soon as our bucketID is found.
44
        // Instead, it always iterates over all replicasets responses even bucketID is found.
45
        BucketsSearchBatchedFull
46
)
47

48
// Route get replicaset object by bucket identifier.
49
func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error) {
92,440✔
50
        if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
92,443✔
51
                return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
3✔
52
        }
3✔
53

54
        routeMap := r.getRouteMap()
92,437✔
55

92,437✔
56
        rs := routeMap[bucketID].Load()
92,437✔
57
        if rs != nil {
184,870✔
58
                nameToReplicasetRef := r.getNameToReplicaset()
92,433✔
59

92,433✔
60
                actualRs := nameToReplicasetRef[rs.info.Name]
92,433✔
61
                switch {
92,433✔
62
                case actualRs == nil:
×
63
                        // rs is outdated, can't use it -- let's discover bucket again
×
64
                        r.BucketReset(bucketID)
×
65
                case actualRs == rs:
92,433✔
66
                        return rs, nil
92,433✔
67
                default: // actualRs != rs
×
68
                        // update rs -> actualRs for this bucket
×
69
                        _, _ = r.BucketSet(bucketID, actualRs.info.Name)
×
70
                        return actualRs, nil
×
71
                }
72
        }
73

74
        // it`s ok if in the same time we have few active searches
75
        r.log().Infof(ctx, "Discovering bucket %d", bucketID)
4✔
76

4✔
77
        if r.cfg.BucketsSearchMode == BucketsSearchLegacy {
6✔
78
                return r.bucketSearchLegacy(ctx, bucketID)
2✔
79
        }
2✔
80

81
        return r.bucketSearchBatched(ctx, bucketID)
2✔
82
}
83

84
func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
2✔
85
        nameToReplicasetRef := r.getNameToReplicaset()
2✔
86

2✔
87
        type rsFuture struct {
2✔
88
                rsName string
2✔
89
                future *tarantool.Future
2✔
90
        }
2✔
91

2✔
92
        var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef))
2✔
93
        // Send a bunch of parallel requests
2✔
94
        for rsName, rs := range nameToReplicasetRef {
6✔
95
                rsFutures = append(rsFutures, rsFuture{
4✔
96
                        rsName: rsName,
4✔
97
                        future: rs.bucketStatAsync(ctx, bucketID),
4✔
98
                })
4✔
99
        }
4✔
100

101
        for _, rsFuture := range rsFutures {
4✔
102
                if _, err := bucketStatWait(rsFuture.future); err != nil {
2✔
103
                        var vshardError StorageCallVShardError
×
104
                        if !errors.As(err, &vshardError) {
×
105
                                r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsName, err)
×
106
                        }
×
107
                        // just skip, bucket may not belong to this replicaset
108
                        continue
×
109
                }
110

111
                // It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
112
                rs, err := r.BucketSet(bucketID, rsFuture.rsName)
2✔
113
                if err != nil {
2✔
114
                        r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsName, bucketID, err)
×
115
                        return nil, newVShardErrorNoRouteToBucket(bucketID)
×
116
                }
×
117

118
                // TODO: should we release resources for unhandled futures?
119
                return rs, nil
2✔
120
        }
121

122
        // All replicasets were scanned, but a bucket was not found anywhere, so most likely it does not exist.
123
        // It can be wrong, if rebalancing is in progress, and a bucket was found to be RECEIVING on one replicaset,
124
        // and was not found on other replicasets (it was sent during discovery).
125

126
        return nil, newVShardErrorNoRouteToBucket(bucketID)
×
127
}
128

129
// The approach in bucketSearchLegacy is very ineffective because
130
// we use N vshard.storage.bucket_stat requests over the network
131
// to find out the location of a single bucket. So, we use here vshard.storage.buckets_discovery request instead.
132
// As a result, we find the location for N * 1000 buckets while paying almost the same cost (CPU, network).
133
// P.S. 1000 is a batch size in response of buckets_discovery, see:
134
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
135
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
136
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
2✔
137
        nameToReplicasetRef := r.getNameToReplicaset()
2✔
138
        routeMap := r.getRouteMap()
2✔
139

2✔
140
        type rsFuture struct {
2✔
141
                rs     *Replicaset
2✔
142
                rsName string
2✔
143
                future *tarantool.Future
2✔
144
        }
2✔
145

2✔
146
        var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef))
2✔
147
        // Send a bunch of parallel requests
2✔
148
        for rsName, rs := range nameToReplicasetRef {
6✔
149
                rsFutures = append(rsFutures, rsFuture{
4✔
150
                        rs:     rs,
4✔
151
                        rsName: rsName,
4✔
152
                        future: rs.bucketsDiscoveryAsync(ctx, bucketIDToFind),
4✔
153
                })
4✔
154
        }
4✔
155

156
        var rs *Replicaset
2✔
157

2✔
158
        for _, rsFuture := range rsFutures {
6✔
159
                resp, err := bucketsDiscoveryWait(rsFuture.future)
4✔
160
                if err != nil {
4✔
161
                        r.log().Errorf(ctx, "bucketSearchBatched: bucketsDiscoveryWait error for %v: %v", rsFuture.rsName, err)
×
162
                        // just skip, we still may find our bucket in another replicaset
×
163
                        continue
×
164
                }
165

166
                for _, bucketID := range resp.Buckets {
172✔
167
                        if bucketID == bucketIDToFind {
170✔
168
                                // We found where bucketIDToFind is located
2✔
169
                                rs = rsFuture.rs
2✔
170
                        }
2✔
171

172
                        routeMap[bucketID].Store(rs)
168✔
173
                }
174

175
                if bucketIDWasFound := rs != nil; !bucketIDWasFound {
5✔
176
                        continue
1✔
177
                }
178

179
                if r.cfg.BucketsSearchMode == BucketsSearchBatchedQuick {
4✔
180
                        return rs, nil
1✔
181
                }
1✔
182
        }
183

184
        if rs == nil {
1✔
185
                return nil, newVShardErrorNoRouteToBucket(bucketIDToFind)
×
186
        }
×
187

188
        return rs, nil
1✔
189
}
190

191
// DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
192
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64) {
×
193
        routeMap := r.getRouteMap()
×
194
        removedFrom := make(map[*Replicaset]int)
×
195

×
196
        for _, bucketID := range buckets {
×
197
                oldRs := routeMap[bucketID].Swap(rs)
×
198

×
199
                if oldRs == rs {
×
200
                        continue
×
201
                }
202

203
                // NOTE: oldRs and rs might have the same name, we intentionally don't check this case to keep the logic simple
204

205
                // We don't check oldRs for nil here, because it's a valid key too (if rs == nil, it means removed from unknown buckets set)
206
                removedFrom[oldRs]++
×
207
        }
208

209
        var addedToRs int
×
210
        for rs, removedFromRs := range removedFrom {
×
211
                addedToRs += removedFromRs
×
212

×
213
                switch rs {
×
214
                case nil:
×
215
                        r.log().Debugf(ctx, "Added new %d buckets to the cluster map", removedFromRs)
×
216
                default:
×
217
                        r.log().Debugf(ctx, "Removed %d buckets from replicaset %s", removedFromRs, rs.info.Name)
×
218
                }
219
        }
220

221
        r.log().Infof(ctx, "Added %d buckets to replicaset %s", addedToRs, rs.info.Name)
×
222
}
223

224
func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
13✔
225
        t := time.Now()
13✔
226

13✔
227
        r.log().Infof(ctx, "Start discovery all buckets")
13✔
228

13✔
229
        var errGr errgroup.Group
13✔
230

13✔
231
        routeMap := r.getRouteMap()
13✔
232
        nameToReplicasetRef := r.getNameToReplicaset()
13✔
233

13✔
234
        for _, rs := range nameToReplicasetRef {
40✔
235
                rs := rs
27✔
236

27✔
237
                errGr.Go(func() error {
54✔
238
                        var bucketsDiscoveryPaginationFrom uint64
27✔
239

27✔
240
                        for {
54✔
241
                                resp, err := rs.bucketsDiscovery(ctx, bucketsDiscoveryPaginationFrom)
27✔
242
                                if err != nil {
28✔
243
                                        r.log().Errorf(ctx, "Can't bucketsDiscovery for rs %s: %v", rs.info, err)
1✔
244
                                        return err
1✔
245
                                }
1✔
246

247
                                for _, bucketID := range resp.Buckets {
1,326✔
248
                                        if bucketID > r.cfg.TotalBucketCount {
1,300✔
249
                                                r.log().Errorf(ctx, "Ignoring got bucketID is out of range: %d (length %d)",
×
250
                                                        bucketID, r.cfg.TotalBucketCount)
×
251
                                                continue
×
252
                                        }
253

254
                                        routeMap[bucketID].Store(rs)
1,300✔
255
                                }
256

257
                                // There are no more buckets
258
                                // https://github.com/tarantool/vshard/blob/8d299bfe/vshard/storage/init.lua#L1730
259
                                // vshard.storage returns { buckets = [], next_from = nil } if there are no more buckets.
260
                                // Since next_from is always > 0. NextFrom = 0 means that we got next_from = nil, that has not been decoded.
261
                                if resp.NextFrom == 0 {
52✔
262
                                        return nil
26✔
263
                                }
26✔
264

265
                                bucketsDiscoveryPaginationFrom = resp.NextFrom
×
266

×
267
                                // Don't spam many requests at once. Give storages time to handle them and other requests.
×
268
                                // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L308
×
269
                                time.Sleep(r.cfg.DiscoveryWorkStep)
×
270
                        }
271
                })
272
        }
273

274
        err := errGr.Wait()
13✔
275
        if err != nil {
14✔
276
                return fmt.Errorf("errGr.Wait() err: %w", err)
1✔
277
        }
1✔
278
        r.log().Infof(ctx, "Discovery done since: %s", time.Since(t))
12✔
279

12✔
280
        return nil
12✔
281
}
282

283
// cronDiscovery is discovery_service_f analog with goroutines instead fibers
284
func (r *Router) cronDiscovery(ctx context.Context) {
11✔
285
        var iterationCount uint64
11✔
286

11✔
287
        for {
24✔
288
                select {
13✔
289
                case <-ctx.Done():
×
290
                        r.metrics().CronDiscoveryEvent(false, 0, "ctx-cancel")
×
291
                        r.log().Infof(ctx, "[DISCOVERY] cron discovery has been stopped after %d iterations", iterationCount)
×
292
                        return
×
293
                case <-time.After(r.cfg.DiscoveryTimeout):
2✔
294
                        iterationCount++
2✔
295
                }
296

297
                // Since the current for loop should not stop until ctx->Done() event fires,
298
                // we should be able to continue execution even a panic occures.
299
                // Therefore, we should wrap everything into anonymous function that recovers after panic.
300
                // (Similar to pcall in lua/tarantool)
301
                func() {
4✔
302
                        defer func() {
4✔
303
                                if recovered := recover(); recovered != nil {
2✔
304
                                        // Another one panic may happen due to log function below (e.g. bug in log().Errorf), in this case we have two options:
×
305
                                        // 1. recover again and log nothing: panic will be muted and lost
×
306
                                        // 2. don't try to recover, we hope that the second panic will be logged somehow by go runtime
×
307
                                        // So, we desided to combine them in the third behavior: log in another goroutin
×
308
                                        iterationCount := iterationCount
×
309
                                        // get stacktrace in the current goroutine
×
310
                                        debugStack := string(debug.Stack())
×
311

×
312
                                        go func() {
×
313
                                                r.log().Errorf(ctx, "[DISCOVERY] something unexpected has happened in cronDiscovery(%d): panic %v, stacktrace: %s",
×
314
                                                        iterationCount, recovered, debugStack)
×
315
                                        }()
×
316
                                }
317
                        }()
318

319
                        r.log().Infof(ctx, "[DISCOVERY] started cron discovery iteration %d", iterationCount)
2✔
320

2✔
321
                        tStartDiscovery := time.Now()
2✔
322

2✔
323
                        if err := r.DiscoveryAllBuckets(ctx); err != nil {
2✔
324
                                r.metrics().CronDiscoveryEvent(false, time.Since(tStartDiscovery), "discovery-error")
×
325
                                r.log().Errorf(ctx, "[DISCOVERY] cant do cron discovery iteration %d with error: %s", iterationCount, err)
×
326
                                return
×
327
                        }
×
328

329
                        r.log().Infof(ctx, "[DISCOVERY] finished cron discovery iteration %d", iterationCount)
2✔
330

2✔
331
                        r.metrics().CronDiscoveryEvent(true, time.Since(tStartDiscovery), "ok")
2✔
332
                }()
333
        }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc