• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-vshard-router / 14330428590

08 Apr 2025 09:46AM UTC coverage: 69.071% (-0.7%) from 69.723%
14330428590

push

github

KaymeKaydex
hotfix: cmake for testing

1130 of 1636 relevant lines covered (69.07%)

12940.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.56
/replicaset.go
1
package vshard_router //nolint:revive
2

3
import (
4
        "context"
5
        "fmt"
6
        "math"
7
        "time"
8

9
        "github.com/google/uuid"
10
        "github.com/tarantool/go-tarantool/v2"
11
        "github.com/tarantool/go-tarantool/v2/pool"
12
        "github.com/vmihailenco/msgpack/v5"
13
        "github.com/vmihailenco/msgpack/v5/msgpcode"
14
)
15

16
// ReplicasetInfo represents information about a replicaset, including its name, unique identifier, weight, and state.
17
type ReplicasetInfo struct {
18
        // Name — the name of the replicaset.
19
        // This string is required and is used to identify the replicaset.
20
        Name string
21
        // UUID — the unique identifier of the replica.
22
        // This is an optional value that can be used to uniquely distinguish each replicaset.
23
        UUID uuid.UUID
24
        // Weight — the weight of the replicaset.
25
        // This floating-point number may be used to determine the importance or priority of the replicaset.
26
        Weight float64
27
        // PinnedCount — the number of pinned items.
28
        // This value indicates how many items or tasks are associated with the replicaset.
29
        PinnedCount uint64
30
        // IgnoreDisbalance — a flag indicating whether to ignore load imbalance when distributing tasks.
31
        // If true, the replicaset will be excluded from imbalance checks.
32
        IgnoreDisbalance bool
33
}
34

35
func (ri ReplicasetInfo) Validate() error {
201,463✔
36
        if ri.Name == "" {
201,465✔
37
                return fmt.Errorf("%w: rsInfo.Name is empty", ErrInvalidReplicasetInfo)
2✔
38
        }
2✔
39

40
        return nil
201,461✔
41
}
42

43
func (ri ReplicasetInfo) String() string {
4✔
44
        return fmt.Sprintf("{name: %s, uuid: %s}", ri.Name, ri.UUID)
4✔
45
}
4✔
46

47
type ReplicasetCallOpts struct {
48
        PoolMode pool.Mode
49
        Timeout  time.Duration
50
}
51

52
// Pooler is an interface for the tarantool.Pool wrapper,
53
// which is necessary for the correct operation of the library.
54
type Pooler interface {
55
        pool.Pooler
56
        // GetInfo is an addition to the standard interface that allows you
57
        // to get the current state of the connection pool.
58
        // This is necessary for proper operation with topology providers
59
        // for adding or removing instances.
60
        GetInfo() map[string]pool.ConnectionInfo
61
}
62

63
type Replicaset struct {
64
        conn              Pooler
65
        info              ReplicasetInfo
66
        EtalonBucketCount uint64
67
}
68

69
func (rs *Replicaset) Pooler() pool.Pooler {
2✔
70
        return rs.conn
2✔
71
}
2✔
72

73
func (rs *Replicaset) String() string {
1✔
74
        return rs.info.String()
1✔
75
}
1✔
76

77
func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) {
2✔
78
        future := rs.bucketStatAsync(ctx, bucketID)
2✔
79

2✔
80
        return bucketStatWait(future)
2✔
81
}
2✔
82

83
func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future {
6✔
84
        const bucketStatFnc = "vshard.storage.bucket_stat"
6✔
85

6✔
86
        return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
6✔
87
}
6✔
88

89
type vshardStorageBucketStatResponseProto struct {
90
        ok   bool
91
        info BucketStatInfo
92
        err  StorageCallVShardError
93
}
94

95
func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
2✔
96
        // bucket_stat returns pair: stat, err
2✔
97
        // https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413
2✔
98

2✔
99
        respArrayLen, err := d.DecodeArrayLen()
2✔
100
        if err != nil {
2✔
101
                return err
×
102
        }
×
103

104
        if respArrayLen <= 0 {
2✔
105
                return fmt.Errorf("protocol violation bucketStatWait: respArrayLen=%d", respArrayLen)
×
106
        }
×
107

108
        code, err := d.PeekCode()
2✔
109
        if err != nil {
2✔
110
                return err
×
111
        }
×
112

113
        if code == msgpcode.Nil {
2✔
114
                err = d.DecodeNil()
×
115
                if err != nil {
×
116
                        return err
×
117
                }
×
118

119
                if respArrayLen != 2 {
×
120
                        return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen)
×
121
                }
×
122

123
                err = d.Decode(&r.err)
×
124
                if err != nil {
×
125
                        return fmt.Errorf("failed to decode storage vshard error: %w", err)
×
126
                }
×
127

128
                return nil
×
129
        }
130

131
        err = d.Decode(&r.info)
2✔
132
        if err != nil {
2✔
133
                return fmt.Errorf("failed to decode bucket stat info: %w", err)
×
134
        }
×
135

136
        r.ok = true
2✔
137

2✔
138
        return nil
2✔
139
}
140

141
func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
4✔
142
        var bucketStatResponse vshardStorageBucketStatResponseProto
4✔
143

4✔
144
        err := future.GetTyped(&bucketStatResponse)
4✔
145
        if err != nil {
6✔
146
                return BucketStatInfo{}, err
2✔
147
        }
2✔
148

149
        if !bucketStatResponse.ok {
2✔
150
                return BucketStatInfo{}, bucketStatResponse.err
×
151
        }
×
152

153
        return bucketStatResponse.info, nil
2✔
154
}
155

156
// CallAsync sends async request to remote storage
157
func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future {
149✔
158
        if opts.Timeout > 0 {
150✔
159
                // Don't set any timeout by default, parent context timeout would be inherited in this case.
1✔
160
                // Don't call cancel in defer, because this we send request asynchronously,
1✔
161
                // and wait for result outside from this function.
1✔
162
                // suppress linter warning: lostcancel: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (govet)
1✔
163
                //nolint:govet
1✔
164
                ctx, _ = context.WithTimeout(ctx, opts.Timeout)
1✔
165
        }
1✔
166

167
        req := tarantool.NewCallRequest(fnc).
149✔
168
                Context(ctx).
149✔
169
                Args(args)
149✔
170

149✔
171
        return rs.conn.Do(req, opts.PoolMode)
149✔
172
}
173

174
func (rs *Replicaset) bucketsDiscoveryAsync(ctx context.Context, from uint64) *tarantool.Future {
31✔
175
        const bucketsDiscoveryFnc = "vshard.storage.buckets_discovery"
31✔
176

31✔
177
        var bucketsDiscoveryPaginationRequest = struct {
31✔
178
                From uint64 `msgpack:"from"`
31✔
179
        }{From: from}
31✔
180

31✔
181
        return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.PreferRO}, bucketsDiscoveryFnc,
31✔
182
                []interface{}{bucketsDiscoveryPaginationRequest})
31✔
183
}
31✔
184

185
type bucketsDiscoveryResp struct {
186
        Buckets  []uint64 `msgpack:"buckets"`
187
        NextFrom uint64   `msgpack:"next_from"`
188
}
189

190
func bucketsDiscoveryWait(future *tarantool.Future) (bucketsDiscoveryResp, error) {
31✔
191
        // We intentionally don't support old vshard storages that mentioned here:
31✔
192
        // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/router/init.lua#L343
31✔
193
        var resp bucketsDiscoveryResp
31✔
194

31✔
195
        err := future.GetTyped(&[]interface{}{&resp})
31✔
196
        if err != nil {
32✔
197
                return resp, fmt.Errorf("future.GetTyped() failed: %v", err)
1✔
198
        }
1✔
199

200
        return resp, nil
30✔
201
}
202

203
func (rs *Replicaset) bucketsDiscovery(ctx context.Context, from uint64) (bucketsDiscoveryResp, error) {
27✔
204
        future := rs.bucketsDiscoveryAsync(ctx, from)
27✔
205

27✔
206
        return bucketsDiscoveryWait(future)
27✔
207
}
27✔
208

209
// CalculateEtalonBalance computes the ideal bucket count for each replicaset.
210
// This iterative algorithm seeks the optimal balance within a cluster by
211
// calculating the ideal bucket count for each replicaset at every step.
212
// If the ideal count cannot be achieved due to pinned buckets, the algorithm
213
// makes a best effort to approximate balance by ignoring the replicaset with
214
// pinned buckets and its associated pinned count. After each iteration, a new
215
// balance is recalculated. However, this can lead to scenarios where the
216
// conditions are still unmet; ignoring pinned buckets in overloaded
217
// replicasets can reduce the ideal bucket count in others, potentially
218
// causing new values to fall below their pinned count.
219
//
220
// At each iteration, the algorithm either concludes or disregards at least
221
// one new overloaded replicaset. Therefore, its time complexity is O(N^2),
222
// where N is the number of replicasets.
223
// based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358
224
func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error {
5✔
225
        isBalanceFound := false
5✔
226
        weightSum := 0.0
5✔
227
        stepCount := 0
5✔
228
        replicasetCount := len(replicasets)
5✔
229

5✔
230
        // Calculate total weight
5✔
231
        for _, replicaset := range replicasets {
16✔
232
                weightSum += replicaset.info.Weight
11✔
233
        }
11✔
234

235
        // Balance calculation loop
236
        for !isBalanceFound {
11✔
237
                stepCount++
6✔
238
                if weightSum <= 0 {
7✔
239
                        return fmt.Errorf("weightSum should be greater than 0")
1✔
240
                }
1✔
241

242
                bucketPerWeight := float64(bucketCount) / weightSum
5✔
243
                bucketsCalculated := uint64(0)
5✔
244

5✔
245
                // Calculate etalon bucket count for each replicaset
5✔
246
                for i := range replicasets {
16✔
247
                        if !replicasets[i].info.IgnoreDisbalance {
21✔
248
                                replicasets[i].EtalonBucketCount = uint64(math.Ceil(replicasets[i].info.Weight * bucketPerWeight))
10✔
249
                                bucketsCalculated += replicasets[i].EtalonBucketCount
10✔
250
                        }
10✔
251
                }
252

253
                bucketsRest := bucketsCalculated - bucketCount
5✔
254
                isBalanceFound = true
5✔
255

5✔
256
                // Spread disbalance and check for pinned buckets
5✔
257
                for i := range replicasets {
16✔
258
                        if !replicasets[i].info.IgnoreDisbalance {
21✔
259
                                if bucketsRest > 0 {
11✔
260
                                        n := replicasets[i].info.Weight * bucketPerWeight
1✔
261
                                        ceil := math.Ceil(n)
1✔
262
                                        floor := math.Floor(n)
1✔
263
                                        if replicasets[i].EtalonBucketCount > 0 && ceil != floor {
2✔
264
                                                replicasets[i].EtalonBucketCount--
1✔
265
                                                bucketsRest--
1✔
266
                                        }
1✔
267
                                }
268

269
                                // Handle pinned buckets
270
                                pinned := replicasets[i].info.PinnedCount
10✔
271
                                if pinned > 0 && replicasets[i].EtalonBucketCount < pinned {
11✔
272
                                        isBalanceFound = false
1✔
273
                                        bucketCount -= pinned
1✔
274
                                        replicasets[i].EtalonBucketCount = pinned
1✔
275
                                        replicasets[i].info.IgnoreDisbalance = true
1✔
276
                                        weightSum -= replicasets[i].info.Weight
1✔
277
                                }
1✔
278
                        }
279
                }
280

281
                if bucketsRest != 0 {
5✔
282
                        return fmt.Errorf("bucketsRest should be 0")
×
283
                }
×
284

285
                // Safety check to prevent infinite loops
286
                if stepCount > replicasetCount {
5✔
287
                        return fmt.Errorf("[PANIC]: the rebalancer is broken")
×
288
                }
×
289
        }
290

291
        return nil
4✔
292
}
293

294
func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error) {
×
295
        const bucketCountFnc = "vshard.storage.buckets_count"
×
296

×
297
        var bucketCount uint64
×
298

×
299
        fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.ANY}, bucketCountFnc, nil)
×
300
        err := fut.GetTyped(&[]interface{}{&bucketCount})
×
301

×
302
        return bucketCount, err
×
303
}
×
304

305
func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error {
×
306
        const bucketForceCreateFnc = "vshard.storage.bucket_force_create"
×
307

×
308
        fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RW}, bucketForceCreateFnc, []interface{}{firstBucketID, count})
×
309
        _, err := fut.GetResponse()
×
310

×
311
        return err
×
312
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc