• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-vshard-router / 14330428590

08 Apr 2025 09:46AM UTC coverage: 69.071% (-0.7%) from 69.723%
14330428590

push

github

KaymeKaydex
hotfix: cmake for testing

1130 of 1636 relevant lines covered (69.07%)

12940.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.7
/api.go
1
package vshard_router //nolint:revive
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "time"
8

9
        "github.com/tarantool/go-tarantool/v2"
10
        "github.com/tarantool/go-tarantool/v2/pool"
11
        "github.com/vmihailenco/msgpack/v5"
12
        "github.com/vmihailenco/msgpack/v5/msgpcode"
13
)
14

15
// --------------------------------------------------------------------------------
16
// -- API
17
// --------------------------------------------------------------------------------
18

19
type VshardMode string
20

21
const (
22
        ReadMode  VshardMode = "read"
23
        WriteMode VshardMode = "write"
24

25
        // callTimeoutDefault is a default timeout when no timeout is provided
26
        callTimeoutDefault = 500 * time.Millisecond
27
)
28

29
func (c VshardMode) String() string {
2✔
30
        return string(c)
2✔
31
}
2✔
32

33
type vshardStorageCallResponseProto struct {
34
        AssertError *assertError            // not nil if there is assert error
35
        VshardError *StorageCallVShardError // not nil if there is vshard response
36
        CallResp    VshardRouterCallResp
37
}
38

39
func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
92,305✔
40
        /* vshard.storage.call(func) response has the next 4 possbile formats:
92,305✔
41
        See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
92,305✔
42
        1. vshard error has occurred:
92,305✔
43
                array[nil, vshard_error]
92,305✔
44
        2. User method has finished with some error:
92,305✔
45
                array[false, assert error]
92,305✔
46
        3. User mehod has finished successfully
92,305✔
47
                a) but has not returned anything
92,305✔
48
                        array[true]
92,305✔
49
                b) has returned 1 element
92,305✔
50
                        array[true, elem1]
92,305✔
51
                c) has returned 2 element
92,305✔
52
                        array[true, elem1, elem2]
92,305✔
53
                d) has returned 3 element
92,305✔
54
                        array[true, elem1, elem2, elem3]
92,305✔
55
        */
92,305✔
56

92,305✔
57
        // Ensure it is an array and get array len for protocol violation check
92,305✔
58
        respArrayLen, err := d.DecodeArrayLen()
92,305✔
59
        if err != nil {
92,306✔
60
                return err
1✔
61
        }
1✔
62

63
        if respArrayLen <= 0 {
92,304✔
64
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
65
        }
×
66

67
        // we need peek code to make our check faster than decode interface
68
        // later we will check if code nil or bool
69
        code, err := d.PeekCode()
92,304✔
70
        if err != nil {
92,304✔
71
                return err
×
72
        }
×
73

74
        // this is storage error
75
        if code == msgpcode.Nil {
92,307✔
76
                err = d.DecodeNil()
3✔
77
                if err != nil {
3✔
78
                        return err
×
79
                }
×
80

81
                if respArrayLen != 2 {
3✔
82
                        return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
×
83
                }
×
84

85
                var vshardError StorageCallVShardError
3✔
86

3✔
87
                err = d.Decode(&vshardError)
3✔
88
                if err != nil {
3✔
89
                        return fmt.Errorf("failed to decode storage vshard error: %w", err)
×
90
                }
×
91

92
                r.VshardError = &vshardError
3✔
93

3✔
94
                return nil
3✔
95
        }
96

97
        isVShardRespOk, err := d.DecodeBool()
92,301✔
98
        if err != nil {
92,302✔
99
                return err
1✔
100
        }
1✔
101

102
        if !isVShardRespOk {
92,305✔
103
                // that means we have an assert errors and response is not ok
5✔
104
                if respArrayLen != 2 {
6✔
105
                        return fmt.Errorf("protocol violation: length is %d on assert error case", respArrayLen)
1✔
106
                }
1✔
107

108
                var assertError assertError
4✔
109
                err = d.Decode(&assertError)
4✔
110
                if err != nil {
4✔
111
                        return fmt.Errorf("failed to decode storage assert error: %w", err)
×
112
                }
×
113

114
                r.AssertError = &assertError
4✔
115

4✔
116
                return nil
4✔
117
        }
118

119
        // isVShardRespOk is true
120
        buf := bytes.NewBuffer(nil)
92,295✔
121

92,295✔
122
        switch outputLen := respArrayLen - 1; {
92,295✔
123
        case outputLen < 16:
92,295✔
124
                // This way of encoding len for msgpack array works only for len < 16.
92,295✔
125
                // The case when len >= 16 is not possible currently, since it is limited by 3
92,295✔
126
                // due to lua vshard storage implementation. See:
92,295✔
127
                // https://github.com/tarantool/vshard/blob/76b3ad19b539150bf597a5ffec91b97758b69a00/vshard/storage/init.lua#L3168
92,295✔
128
                // However, it may change over time, so we should be ready to this.
92,295✔
129
                err = buf.WriteByte(msgpcode.FixedArrayLow | byte(outputLen))
92,295✔
130
                if err != nil {
92,295✔
131
                        return fmt.Errorf("can't buf.WriteByte to encode outputLen: %w", err)
×
132
                }
×
133
        default:
×
134
                return fmt.Errorf("unexpected outputLen: %d", outputLen)
×
135
        }
136

137
        _, err = buf.ReadFrom(d.Buffered())
92,295✔
138
        if err != nil {
92,295✔
139
                return fmt.Errorf("can't buf.ReadFrom: %w", err)
×
140
        }
×
141

142
        r.CallResp.buf = buf
92,295✔
143

92,295✔
144
        return nil
92,295✔
145
}
146

147
type assertError struct {
148
        Code     int         `msgpack:"code"`
149
        BaseType string      `msgpack:"base_type"`
150
        Type     string      `msgpack:"type"`
151
        Message  string      `msgpack:"message"`
152
        Trace    interface{} `msgpack:"trace"`
153
}
154

155
func (s assertError) Error() string {
3✔
156
        // Just print struct as is, use hack with alias type to avoid recursion:
3✔
157
        // %v attempts to call Error() method for s, which is recursion.
3✔
158
        // This alias doesn't have method Error().
3✔
159
        type alias assertError
3✔
160
        return fmt.Sprintf("%+v", alias(s))
3✔
161
}
3✔
162

163
type StorageCallVShardError struct {
164
        BucketID uint64 `msgpack:"bucket_id"`
165
        Reason   string `msgpack:"reason"`
166
        Code     int    `msgpack:"code"`
167
        Type     string `msgpack:"type"`
168
        Message  string `msgpack:"message"`
169
        Name     string `msgpack:"name"`
170
        // These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
171
        // Example: 00000000-0000-0002-0002-000000000000
172
        MasterUUID     string `msgpack:"master"`
173
        ReplicasetUUID string `msgpack:"replicaset"`
174
        ReplicaUUID    string `msgpack:"replica"`
175
        Destination    string `msgpack:"destination"`
176
}
177

178
func (s StorageCallVShardError) Error() string {
2✔
179
        // Just print struct as is, use hack with alias type to avoid recursion:
2✔
180
        // %v attempts to call Error() method for s, which is recursion.
2✔
181
        // This alias doesn't have method Error().
2✔
182
        type alias StorageCallVShardError
2✔
183
        return fmt.Sprintf("%+v", alias(s))
2✔
184
}
2✔
185

186
type CallOpts struct {
187
        Timeout time.Duration
188
}
189

190
// CallMode is a type to represent call mode for Router.Call method.
191
type CallMode int
192

193
const (
194
        // CallModeRO sets a read-only mode for Router.Call.
195
        CallModeRO CallMode = iota
196
        // CallModeRW sets a read-write mode for Router.Call.
197
        CallModeRW
198
        // CallModeRE acts like CallModeRO
199
        // with preference for a replica rather than a master.
200
        // This mode is not supported yet.
201
        CallModeRE
202
        // CallModeBRO acts like CallModeRO with balancing.
203
        CallModeBRO
204
        // CallModeBRE acts like CallModeRO with balancing
205
        // and preference for a replica rather than a master.
206
        CallModeBRE
207
)
208

209
// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
210
type VshardRouterCallResp struct {
211
        buf *bytes.Buffer
212
}
213

214
// Get returns a response from user defined function as []interface{}.
215
func (r VshardRouterCallResp) Get() ([]interface{}, error) {
5✔
216
        var result []interface{}
5✔
217
        err := r.GetTyped(&result)
5✔
218

5✔
219
        return result, err
5✔
220
}
5✔
221

222
// GetTyped decodes a response from user defined function into custom values.
223
func (r VshardRouterCallResp) GetTyped(result interface{}) error {
10✔
224
        return msgpack.Unmarshal(r.buf.Bytes(), result)
10✔
225
}
10✔
226

227
// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
228
func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
229
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
92,334✔
230
        const vshardStorageClientCall = "vshard.storage.call"
92,334✔
231

92,334✔
232
        if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
92,336✔
233
                return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
2✔
234
        }
2✔
235

236
        var poolMode pool.Mode
92,332✔
237
        var vshardMode VshardMode
92,332✔
238

92,332✔
239
        switch mode {
92,332✔
240
        case CallModeRO:
4✔
241
                poolMode, vshardMode = pool.RO, ReadMode
4✔
242
        case CallModeRW:
5✔
243
                poolMode, vshardMode = pool.RW, WriteMode
5✔
244
        case CallModeRE:
×
245
                // poolMode, vshardMode = pool.PreferRO, ReadMode
×
246
                // since go-tarantool always use balance=true politic,
×
247
                // we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
×
248
                return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
×
249
        case CallModeBRO:
92,323✔
250
                poolMode, vshardMode = pool.ANY, ReadMode
92,323✔
251
        case CallModeBRE:
×
252
                poolMode, vshardMode = pool.PreferRO, ReadMode
×
253
        default:
×
254
                return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
×
255
        }
256

257
        timeout := callTimeoutDefault
92,332✔
258
        if opts.Timeout > 0 {
92,332✔
259
                timeout = opts.Timeout
×
260
        }
×
261

262
        ctx, cancel := context.WithTimeout(ctx, timeout)
92,332✔
263
        defer cancel()
92,332✔
264

92,332✔
265
        tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
92,332✔
266
                Context(ctx).
92,332✔
267
                Args([]interface{}{
92,332✔
268
                        bucketID,
92,332✔
269
                        vshardMode,
92,332✔
270
                        fnc,
92,332✔
271
                        args,
92,332✔
272
                })
92,332✔
273

92,332✔
274
        requestStartTime := time.Now()
92,332✔
275

92,332✔
276
        var err error
92,332✔
277

92,332✔
278
        for {
184,665✔
279
                if spent := time.Since(requestStartTime); spent > timeout {
92,333✔
280
                        r.metrics().RequestDuration(spent, fnc, false, false)
×
281

×
282
                        r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
×
283
                        if err == nil {
×
284
                                err = fmt.Errorf("cant get call cause call impl timeout")
×
285
                        }
×
286

287
                        return VshardRouterCallResp{}, err
×
288
                }
289

290
                var rs *Replicaset
92,333✔
291

92,333✔
292
                rs, err = r.Route(ctx, bucketID)
92,333✔
293
                if err != nil {
92,333✔
294
                        r.metrics().RetryOnCall("bucket_resolve_error")
×
295

×
296
                        // this error will be returned to a caller in case of timeout
×
297
                        err = fmt.Errorf("cant resolve bucket %d: %w", bucketID, err)
×
298

×
299
                        // TODO: lua vshard router just yields here and retires, no pause is applied.
×
300
                        // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L713
×
301
                        // So we also retry here. But I guess we should add some pause here.
×
302
                        continue
×
303
                }
304

305
                r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)
92,333✔
306

92,333✔
307
                storageCallResponse := vshardStorageCallResponseProto{}
92,333✔
308

92,333✔
309
                err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
92,333✔
310
                if err != nil {
92,368✔
311
                        return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
35✔
312
                }
35✔
313

314
                r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)
92,298✔
315

92,298✔
316
                if storageCallResponse.AssertError != nil {
92,301✔
317
                        return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
3✔
318
                }
3✔
319

320
                if storageCallResponse.VshardError != nil {
92,296✔
321
                        vshardError := storageCallResponse.VshardError
1✔
322

1✔
323
                        switch vshardError.Name {
1✔
324
                        case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
1✔
325
                                // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
1✔
326
                                r.BucketReset(bucketID)
1✔
327

1✔
328
                                destination := vshardError.Destination
1✔
329
                                if destination != "" {
1✔
330
                                        var loggedOnce bool
×
331
                                        for {
×
332
                                                nameToReplicasetRef := r.getNameToReplicaset()
×
333

×
334
                                                // In some cases destination contains UUID (prior to tnt 3.x), in some cases it contains replicaset name.
×
335
                                                // So, at this point we don't know what destination is: a name or an UUID.
×
336
                                                // But we need a name to access values in nameToReplicasetRef map, so let's find it out.
×
337
                                                var destinationName string
×
338

×
339
                                                _, destinationExists := nameToReplicasetRef[destination]
×
340
                                                if destinationExists {
×
341
                                                        destinationName = destination
×
342
                                                } else {
×
343
                                                        // for older logic with uuid we must support backward compatibility
×
344
                                                        // if destination is uuid and not name, lets find it too
×
345
                                                        for rsName, rs := range nameToReplicasetRef {
×
346
                                                                if rs.info.UUID.String() == destination {
×
347
                                                                        destinationExists = true
×
348
                                                                        destinationName = rsName
×
349
                                                                        break
×
350
                                                                }
351
                                                        }
352
                                                }
353

354
                                                if destinationExists {
×
355
                                                        _, err := r.BucketSet(bucketID, destinationName)
×
356
                                                        if err == nil {
×
357
                                                                break // breaks loop
×
358
                                                        }
359
                                                        r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationName, err)
×
360
                                                }
361

362
                                                if !loggedOnce {
×
363
                                                        r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
×
364
                                                                "update configuration", destinationName)
×
365
                                                        loggedOnce = true
×
366
                                                }
×
367

368
                                                const defaultPoolingPause = 50 * time.Millisecond
×
369
                                                time.Sleep(defaultPoolingPause)
×
370

×
371
                                                if spent := time.Since(requestStartTime); spent > timeout {
×
372
                                                        return VshardRouterCallResp{}, vshardError
×
373
                                                }
×
374
                                        }
375
                                }
376

377
                                // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked
378

379
                                r.metrics().RetryOnCall("bucket_migrate")
1✔
380

1✔
381
                                r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError)
1✔
382

1✔
383
                                // this vshardError will be returned to a caller in case of timeout
1✔
384
                                err = vshardError
1✔
385
                                continue
1✔
386
                        case VShardErrNameTransferIsInProgress:
×
387
                                // Since lua vshard router doesn't retry here, we don't retry too.
×
388
                                // There is a comment why lua vshard router doesn't retry:
×
389
                                // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
×
390
                                r.BucketReset(bucketID)
×
391
                                return VshardRouterCallResp{}, vshardError
×
392
                        case VShardErrNameNonMaster:
×
393
                                // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
×
394
                                // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
×
395
                                // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
×
396
                                // we just return this error as is.
×
397
                                return VshardRouterCallResp{}, vshardError
×
398
                        default:
×
399
                                return VshardRouterCallResp{}, vshardError
×
400
                        }
401
                }
402

403
                r.metrics().RequestDuration(time.Since(requestStartTime), fnc, true, false)
92,294✔
404

92,294✔
405
                return storageCallResponse.CallResp, nil
92,294✔
406
        }
407
}
408

409
// CallRO is an alias for Call with CallModeRO.
410
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
411
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
412
        return r.Call(ctx, bucketID, CallModeRO, fnc, args, opts)
×
413
}
×
414

415
// CallRW is an alias for Call with CallModeRW.
416
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
417
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
5✔
418
        return r.Call(ctx, bucketID, CallModeRW, fnc, args, opts)
5✔
419
}
5✔
420

421
// CallRE is an alias for Call with CallModeRE.
422
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
423
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
424
        return r.Call(ctx, bucketID, CallModeRE, fnc, args, opts)
×
425
}
×
426

427
// CallBRO is an alias for Call with CallModeBRO.
428
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
429
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
430
        return r.Call(ctx, bucketID, CallModeBRO, fnc, args, opts)
×
431
}
×
432

433
// CallBRE is an alias for Call with CallModeBRE.
434
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
435
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
436
        return r.Call(ctx, bucketID, CallModeBRE, fnc, args, opts)
×
437
}
×
438

439
// RouterMapCallRWOptions sets options for RouterMapCallRW.
440
type RouterMapCallRWOptions struct {
441
        // Timeout defines timeout for RouterMapCallRW.
442
        Timeout time.Duration
443
}
444

445
type storageMapResponseProto[T any] struct {
446
        ok    bool
447
        value T
448
        err   StorageCallVShardError
449
}
450

451
func (r *storageMapResponseProto[T]) DecodeMsgpack(d *msgpack.Decoder) error {
808✔
452
        // proto for 'storage_map' method
808✔
453
        // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
808✔
454
        respArrayLen, err := d.DecodeArrayLen()
808✔
455
        if err != nil {
808✔
456
                return err
×
457
        }
×
458

459
        if respArrayLen <= 0 {
808✔
460
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
461
        }
×
462

463
        code, err := d.PeekCode()
808✔
464
        if err != nil {
808✔
465
                return err
×
466
        }
×
467

468
        if code == msgpcode.Nil {
810✔
469
                err = d.DecodeNil()
2✔
470
                if err != nil {
2✔
471
                        return err
×
472
                }
×
473

474
                if respArrayLen != 2 {
2✔
475
                        return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
×
476
                }
×
477

478
                err = d.Decode(&r.err)
2✔
479
                if err != nil {
2✔
480
                        return fmt.Errorf("failed to decode storage vshard error: %w", err)
×
481
                }
×
482

483
                return nil
2✔
484
        }
485

486
        isOk, err := d.DecodeBool()
806✔
487
        if err != nil {
806✔
488
                return err
×
489
        }
×
490

491
        if !isOk {
806✔
492
                return fmt.Errorf("protocol violation: isOk=false")
×
493
        }
×
494

495
        switch respArrayLen {
806✔
496
        case 1:
2✔
497
                break
2✔
498
        case 2:
804✔
499
                err = d.Decode(&r.value)
804✔
500
                if err != nil {
804✔
501
                        return fmt.Errorf("can't decode value %T: %w", r.value, err)
×
502
                }
×
503
        default:
×
504
                return fmt.Errorf("protocol violation: invalid array length when no vshard error: %d", respArrayLen)
×
505
        }
506

507
        r.ok = true
806✔
508

806✔
509
        return nil
806✔
510
}
511

512
type storageRefResponseProto struct {
513
        err         error
514
        bucketCount uint64
515
}
516

517
func (r *storageRefResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
813✔
518
        respArrayLen, err := d.DecodeArrayLen()
813✔
519
        if err != nil {
813✔
520
                return err
×
521
        }
×
522

523
        if respArrayLen <= 0 {
813✔
524
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
525
        }
×
526

527
        code, err := d.PeekCode()
813✔
528
        if err != nil {
813✔
529
                return err
×
530
        }
×
531

532
        if code == msgpcode.Nil {
813✔
533
                err = d.DecodeNil()
×
534
                if err != nil {
×
535
                        return err
×
536
                }
×
537

538
                if respArrayLen != 2 {
×
539
                        return fmt.Errorf("protocol violation: length is %d on error case", respArrayLen)
×
540
                }
×
541

542
                // The possible variations of error here are fully unknown yet for us, e.g:
543
                // vshard error, assert error or some other type of error. So this question requires research.
544
                // So we do not decode it to some known error format, because we don't use it anyway.
545
                decodedError, err := d.DecodeInterface()
×
546
                if err != nil {
×
547
                        return err
×
548
                }
×
549

550
                // convert empty interface into error
551
                r.err = fmt.Errorf("%v", decodedError)
×
552

×
553
                return nil
×
554
        }
555

556
        r.bucketCount, err = d.DecodeUint64()
813✔
557
        if err != nil {
813✔
558
                return err
×
559
        }
×
560

561
        return nil
813✔
562
}
563

564
type replicasetFuture struct {
565
        // replicaset name
566
        name   string
567
        future *tarantool.Future
568
}
569

570
// RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the
571
// cluster with a guarantee that in case of success it was executed with all
572
// buckets being accessible for reads and writes.
573
// T is a return type of user defined function 'fnc'.
574
// We define it as a distinct function, not a Router method, because golang limitations,
575
// see: https://github.com/golang/go/issues/49085.
576
func RouterMapCallRW[T any](r *Router, ctx context.Context,
577
        fnc string, args interface{}, opts RouterMapCallRWOptions,
578
) (map[string]T, error) {
407✔
579
        const vshardStorageServiceCall = "vshard.storage._call"
407✔
580

407✔
581
        timeout := callTimeoutDefault
407✔
582
        if opts.Timeout > 0 {
408✔
583
                timeout = opts.Timeout
1✔
584
        }
1✔
585

586
        timeStart := time.Now()
407✔
587
        refID := r.refID.Add(1)
407✔
588

407✔
589
        nameToReplicasetRef := r.getNameToReplicaset()
407✔
590

407✔
591
        defer func() {
814✔
592
                // call function "storage_unref" if map_callrw is failed or successed
407✔
593
                storageUnrefReq := tarantool.NewCallRequest(vshardStorageServiceCall).
407✔
594
                        Args([]interface{}{"storage_unref", refID})
407✔
595

407✔
596
                for _, rs := range nameToReplicasetRef {
1,220✔
597
                        future := rs.conn.Do(storageUnrefReq, pool.RW)
813✔
598
                        future.SetError(nil) // TODO: does it cancel the request above or not?
813✔
599
                }
813✔
600
        }()
601

602
        ctx, cancel := context.WithTimeout(ctx, timeout)
407✔
603
        defer cancel()
407✔
604

407✔
605
        // ref stage
407✔
606

407✔
607
        storageRefReq := tarantool.NewCallRequest(vshardStorageServiceCall).
407✔
608
                Context(ctx).
407✔
609
                Args([]interface{}{"storage_ref", refID, timeout})
407✔
610

407✔
611
        var rsFutures = make([]replicasetFuture, 0, len(nameToReplicasetRef))
407✔
612

407✔
613
        // ref stage: send concurrent ref requests
407✔
614
        for name, rs := range nameToReplicasetRef {
1,220✔
615
                rsFutures = append(rsFutures, replicasetFuture{
813✔
616
                        name:   name,
813✔
617
                        future: rs.conn.Do(storageRefReq, pool.RW),
813✔
618
                })
813✔
619
        }
813✔
620

621
        // ref stage: get their responses
622
        var totalBucketCount uint64
407✔
623
        // proto for 'storage_ref' method:
407✔
624
        // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137
407✔
625
        for _, rsFuture := range rsFutures {
1,220✔
626
                var storageRefResponse storageRefResponseProto
813✔
627

813✔
628
                if err := rsFuture.future.GetTyped(&storageRefResponse); err != nil {
813✔
629
                        return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.name, err)
×
630
                }
×
631

632
                if storageRefResponse.err != nil {
813✔
633
                        return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.name, storageRefResponse.err)
×
634
                }
×
635

636
                totalBucketCount += storageRefResponse.bucketCount
813✔
637
        }
638

639
        if totalBucketCount != r.cfg.TotalBucketCount {
408✔
640
                return nil, fmt.Errorf("total bucket count got %d, expected %d", totalBucketCount, r.cfg.TotalBucketCount)
1✔
641
        }
1✔
642

643
        // map stage
644

645
        storageMapReq := tarantool.NewCallRequest(vshardStorageServiceCall).
406✔
646
                Context(ctx).
406✔
647
                Args([]interface{}{"storage_map", refID, fnc, args})
406✔
648

406✔
649
        // reuse the same slice again
406✔
650
        rsFutures = rsFutures[0:0]
406✔
651

406✔
652
        // map stage: send concurrent map requests
406✔
653
        for name, rs := range nameToReplicasetRef {
1,218✔
654
                rsFutures = append(rsFutures, replicasetFuture{
812✔
655
                        name:   name,
812✔
656
                        future: rs.conn.Do(storageMapReq, pool.RW),
812✔
657
                })
812✔
658
        }
812✔
659

660
        // map stage: get their responses
661
        nameToResult := make(map[string]T)
406✔
662
        for _, rsFuture := range rsFutures {
1,215✔
663
                storageMapResponse := storageMapResponseProto[T]{}
809✔
664

809✔
665
                err := rsFuture.future.GetTyped(&storageMapResponse)
809✔
666
                if err != nil {
810✔
667
                        return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.name, err)
1✔
668
                }
1✔
669

670
                if !storageMapResponse.ok {
810✔
671
                        return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.name, storageMapResponse.err)
2✔
672
                }
2✔
673

674
                nameToResult[rsFuture.name] = storageMapResponse.value
806✔
675
        }
676

677
        r.metrics().RequestDuration(time.Since(timeStart), fnc, true, true)
403✔
678

403✔
679
        return nameToResult, nil
403✔
680
}
681

682
// RouteAll return map of all replicasets.
683
func (r *Router) RouteAll() map[string]*Replicaset {
7✔
684
        nameToReplicasetRef := r.getNameToReplicaset()
7✔
685

7✔
686
        // Do not expose the original map to prevent unauthorized modification.
7✔
687
        return copyMap(nameToReplicasetRef)
7✔
688
}
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc