• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-vshard-router / 13093206091

01 Feb 2025 10:54PM UTC coverage: 71.875% (-0.7%) from 72.558%
13093206091

Pull #45

github

maksim.konovalov
Optimize crc32 hash calculation
Pull Request #45: Optimize crc32 hash calculation

1 of 1 new or added line in 1 file covered. (100.0%)

13 existing lines in 3 files now uncovered.

1150 of 1600 relevant lines covered (71.88%)

108536.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/api.go
1
package vshard_router //nolint:revive
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "time"
8

9
        "github.com/tarantool/go-tarantool/v2"
10
        "github.com/tarantool/go-tarantool/v2/pool"
11
        "github.com/vmihailenco/msgpack/v5"
12
        "github.com/vmihailenco/msgpack/v5/msgpcode"
13
)
14

15
// --------------------------------------------------------------------------------
16
// -- API
17
// --------------------------------------------------------------------------------
18

19
type VshardMode string
20

21
const (
22
        ReadMode  VshardMode = "read"
23
        WriteMode VshardMode = "write"
24

25
        // callTimeoutDefault is a default timeout when no timeout is provided
26
        callTimeoutDefault = 500 * time.Millisecond
27
)
28

29
func (c VshardMode) String() string {
2✔
30
        return string(c)
2✔
31
}
2✔
32

33
type vshardStorageCallResponseProto struct {
34
        AssertError *assertError            // not nil if there is assert error
35
        VshardError *StorageCallVShardError // not nil if there is vshard response
36
        CallResp    VshardRouterCallResp
37
}
38

39
func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
184,862✔
40
        /* vshard.storage.call(func) response has the next 4 possbile formats:
184,862✔
41
        See: https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130
184,862✔
42
        1. vshard error has occurred:
184,862✔
43
                array[nil, vshard_error]
184,862✔
44
        2. User method has finished with some error:
184,862✔
45
                array[false, assert error]
184,862✔
46
        3. User mehod has finished successfully
184,862✔
47
                a) but has not returned anything
184,862✔
48
                        array[true]
184,862✔
49
                b) has returned 1 element
184,862✔
50
                        array[true, elem1]
184,862✔
51
                c) has returned 2 element
184,862✔
52
                        array[true, elem1, elem2]
184,862✔
53
                d) has returned 3 element
184,862✔
54
                        array[true, elem1, elem2, elem3]
184,862✔
55
        */
184,862✔
56

184,862✔
57
        // Ensure it is an array and get array len for protocol violation check
184,862✔
58
        respArrayLen, err := d.DecodeArrayLen()
184,862✔
59
        if err != nil {
184,862✔
60
                return err
×
61
        }
×
62

63
        if respArrayLen == 0 {
184,862✔
64
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
65
        }
×
66

67
        // we need peek code to make our check faster than decode interface
68
        // later we will check if code nil or bool
69
        code, err := d.PeekCode()
184,862✔
70
        if err != nil {
184,862✔
71
                return err
×
72
        }
×
73

74
        // this is storage error
75
        if code == msgpcode.Nil {
184,863✔
76
                err = d.DecodeNil()
1✔
77
                if err != nil {
1✔
78
                        return err
×
79
                }
×
80

81
                if respArrayLen != 2 {
1✔
82
                        return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
×
83
                }
×
84

85
                var vshardError StorageCallVShardError
1✔
86

1✔
87
                err = d.Decode(&vshardError)
1✔
88
                if err != nil {
1✔
89
                        return fmt.Errorf("failed to decode storage vshard error: %w", err)
×
90
                }
×
91

92
                r.VshardError = &vshardError
1✔
93

1✔
94
                return nil
1✔
95
        }
96

97
        isVShardRespOk, err := d.DecodeBool()
184,861✔
98
        if err != nil {
184,861✔
99
                return err
×
100
        }
×
101

102
        if !isVShardRespOk {
184,864✔
103
                // that means we have an assert errors and response is not ok
3✔
104
                if respArrayLen != 2 {
3✔
105
                        return fmt.Errorf("protocol violation: length is %d on assert error case", respArrayLen)
×
106
                }
×
107

108
                var assertError assertError
3✔
109
                err = d.Decode(&assertError)
3✔
110
                if err != nil {
3✔
111
                        return fmt.Errorf("failed to decode storage assert error: %w", err)
×
112
                }
×
113

114
                r.AssertError = &assertError
3✔
115

3✔
116
                return nil
3✔
117
        }
118

119
        // isVShardRespOk is true
120
        buf := bytes.NewBuffer(nil)
184,858✔
121

184,858✔
122
        buf.WriteByte(msgpcode.FixedArrayLow | byte(respArrayLen-1))
184,858✔
123

184,858✔
124
        _, err = buf.ReadFrom(d.Buffered())
184,858✔
125
        if err != nil {
184,858✔
126
                return err
×
127
        }
×
128

129
        r.CallResp.buf = buf
184,858✔
130

184,858✔
131
        return nil
184,858✔
132
}
133

134
type assertError struct {
135
        Code     int         `msgpack:"code"`
136
        BaseType string      `msgpack:"base_type"`
137
        Type     string      `msgpack:"type"`
138
        Message  string      `msgpack:"message"`
139
        Trace    interface{} `msgpack:"trace"`
140
}
141

142
func (s assertError) Error() string {
3✔
143
        // Just print struct as is, use hack with alias type to avoid recursion:
3✔
144
        // %v attempts to call Error() method for s, which is recursion.
3✔
145
        // This alias doesn't have method Error().
3✔
146
        type alias assertError
3✔
147
        return fmt.Sprintf("%+v", alias(s))
3✔
148
}
3✔
149

150
type StorageCallVShardError struct {
151
        BucketID uint64 `msgpack:"bucket_id"`
152
        Reason   string `msgpack:"reason"`
153
        Code     int    `msgpack:"code"`
154
        Type     string `msgpack:"type"`
155
        Message  string `msgpack:"message"`
156
        Name     string `msgpack:"name"`
157
        // These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
158
        // Example: 00000000-0000-0002-0002-000000000000
159
        MasterUUID     string `msgpack:"master"`
160
        ReplicasetUUID string `msgpack:"replicaset"`
161
        ReplicaUUID    string `msgpack:"replica"`
162
        Destination    string `msgpack:"destination"`
163
}
164

165
func (s StorageCallVShardError) Error() string {
2✔
166
        // Just print struct as is, use hack with alias type to avoid recursion:
2✔
167
        // %v attempts to call Error() method for s, which is recursion.
2✔
168
        // This alias doesn't have method Error().
2✔
169
        type alias StorageCallVShardError
2✔
170
        return fmt.Sprintf("%+v", alias(s))
2✔
171
}
2✔
172

173
type CallOpts struct {
174
        Timeout time.Duration
175
}
176

177
// CallMode is a type to represent call mode for Router.Call method.
178
type CallMode int
179

180
const (
181
        // CallModeRO sets a read-only mode for Router.Call.
182
        CallModeRO CallMode = iota
183
        // CallModeRW sets a read-write mode for Router.Call.
184
        CallModeRW
185
        // CallModeRE acts like CallModeRO
186
        // with preference for a replica rather than a master.
187
        // This mode is not supported yet.
188
        CallModeRE
189
        // CallModeBRO acts like CallModeRO with balancing.
190
        CallModeBRO
191
        // CallModeBRE acts like CallModeRO with balancing
192
        // and preference for a replica rather than a master.
193
        CallModeBRE
194
)
195

196
// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
197
type VshardRouterCallResp struct {
198
        buf *bytes.Buffer
199
}
200

201
// Get returns a response from user defined function as []interface{}.
202
func (r VshardRouterCallResp) Get() ([]interface{}, error) {
4✔
203
        var result []interface{}
4✔
204
        err := r.GetTyped(&result)
4✔
205

4✔
206
        return result, err
4✔
207
}
4✔
208

209
// GetTyped decodes a response from user defined function into custom values.
210
func (r VshardRouterCallResp) GetTyped(result interface{}) error {
9✔
211
        return msgpack.Unmarshal(r.buf.Bytes(), result)
9✔
212
}
9✔
213

214
// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
215
func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
216
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
184,864✔
217
        const vshardStorageClientCall = "vshard.storage.call"
184,864✔
218

184,864✔
219
        if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
184,866✔
220
                return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
2✔
221
        }
2✔
222

223
        var poolMode pool.Mode
184,862✔
224
        var vshardMode VshardMode
184,862✔
225

184,862✔
226
        switch mode {
184,862✔
227
        case CallModeRO:
4✔
228
                poolMode, vshardMode = pool.RO, ReadMode
4✔
229
        case CallModeRW:
5✔
230
                poolMode, vshardMode = pool.RW, WriteMode
5✔
231
        case CallModeRE:
×
232
                // poolMode, vshardMode = pool.PreferRO, ReadMode
×
233
                // since go-tarantool always use balance=true politic,
×
234
                // we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
×
235
                return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
×
236
        case CallModeBRO:
184,853✔
237
                poolMode, vshardMode = pool.ANY, ReadMode
184,853✔
238
        case CallModeBRE:
×
239
                poolMode, vshardMode = pool.PreferRO, ReadMode
×
240
        default:
×
241
                return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
×
242
        }
243

244
        timeout := callTimeoutDefault
184,862✔
245
        if opts.Timeout > 0 {
184,862✔
246
                timeout = opts.Timeout
×
247
        }
×
248

249
        ctx, cancel := context.WithTimeout(ctx, timeout)
184,862✔
250
        defer cancel()
184,862✔
251

184,862✔
252
        tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
184,862✔
253
                Context(ctx).
184,862✔
254
                Args([]interface{}{
184,862✔
255
                        bucketID,
184,862✔
256
                        vshardMode,
184,862✔
257
                        fnc,
184,862✔
258
                        args,
184,862✔
259
                })
184,862✔
260

184,862✔
261
        requestStartTime := time.Now()
184,862✔
262

184,862✔
263
        var err error
184,862✔
264

184,862✔
265
        for {
369,725✔
266
                if spent := time.Since(requestStartTime); spent > timeout {
184,863✔
267
                        r.metrics().RequestDuration(spent, false, false)
×
268

×
269
                        r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
×
270
                        if err == nil {
×
271
                                err = fmt.Errorf("cant get call cause call impl timeout")
×
272
                        }
×
273

274
                        return VshardRouterCallResp{}, err
×
275
                }
276

277
                var rs *Replicaset
184,863✔
278

184,863✔
279
                rs, err = r.Route(ctx, bucketID)
184,863✔
280
                if err != nil {
184,863✔
281
                        r.metrics().RetryOnCall("bucket_resolve_error")
×
282

×
283
                        // this error will be returned to a caller in case of timeout
×
284
                        err = fmt.Errorf("cant resolve bucket %d: %w", bucketID, err)
×
285

×
286
                        // TODO: lua vshard router just yields here and retires, no pause is applied.
×
287
                        // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L713
×
288
                        // So we also retry here. But I guess we should add some pause here.
×
289
                        continue
×
290
                }
291

292
                r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)
184,863✔
293

184,863✔
294
                var storageCallResponse vshardStorageCallResponseProto
184,863✔
295
                err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
184,863✔
296
                if err != nil {
184,864✔
297
                        return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
1✔
298
                }
1✔
299

300
                r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)
184,862✔
301

184,862✔
302
                if storageCallResponse.AssertError != nil {
184,865✔
303
                        return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
3✔
304
                }
3✔
305

306
                if storageCallResponse.VshardError != nil {
184,860✔
307
                        vshardError := storageCallResponse.VshardError
1✔
308

1✔
309
                        switch vshardError.Name {
1✔
310
                        case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
1✔
311
                                // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
1✔
312
                                r.BucketReset(bucketID)
1✔
313

1✔
314
                                if destination := vshardError.Destination; destination != "" {
1✔
315
                                        var loggedOnce bool
×
316
                                        for {
×
317
                                                nameToReplicasetRef := r.getNameToReplicaset()
×
318

×
319
                                                _, destinationExists := nameToReplicasetRef[destination]
×
320

×
321
                                                if !destinationExists {
×
322
                                                        // for older logic with uuid we must support backward compatibility
×
323
                                                        // if destination is uuid and not name, lets find it too
×
324
                                                        for _, rsRef := range nameToReplicasetRef {
×
325
                                                                if rsRef.info.UUID.String() == destination {
×
326
                                                                        destinationExists = true
×
327
                                                                        break
×
328
                                                                }
329
                                                        }
330
                                                }
331

332
                                                if destinationExists {
×
333
                                                        _, err := r.BucketSet(bucketID, destination)
×
334
                                                        if err == nil {
×
335
                                                                break // breaks loop
×
336
                                                        }
337
                                                        r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destination, err)
×
338
                                                }
339

340
                                                if !loggedOnce {
×
341
                                                        r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
×
342
                                                                "update configuration", destination)
×
343
                                                        loggedOnce = true
×
344
                                                }
×
345

346
                                                const defaultPoolingPause = 50 * time.Millisecond
×
347
                                                time.Sleep(defaultPoolingPause)
×
348

×
349
                                                if spent := time.Since(requestStartTime); spent > timeout {
×
350
                                                        return VshardRouterCallResp{}, vshardError
×
351
                                                }
×
352
                                        }
353
                                }
354

355
                                // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked
356

357
                                r.metrics().RetryOnCall("bucket_migrate")
1✔
358

1✔
359
                                r.log().Debugf(ctx, "Retrying fnc '%s' cause got vshard error: %v", fnc, vshardError)
1✔
360

1✔
361
                                // this vshardError will be returned to a caller in case of timeout
1✔
362
                                err = vshardError
1✔
363
                                continue
1✔
364
                        case VShardErrNameTransferIsInProgress:
×
365
                                // Since lua vshard router doesn't retry here, we don't retry too.
×
366
                                // There is a comment why lua vshard router doesn't retry:
×
367
                                // https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
×
368
                                r.BucketReset(bucketID)
×
369
                                return VshardRouterCallResp{}, vshardError
×
370
                        case VShardErrNameNonMaster:
×
371
                                // vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
×
372
                                // See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
×
373
                                // Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
×
374
                                // we just return this error as is.
×
375
                                return VshardRouterCallResp{}, vshardError
×
376
                        default:
×
377
                                return VshardRouterCallResp{}, vshardError
×
378
                        }
379
                }
380

381
                r.metrics().RequestDuration(time.Since(requestStartTime), true, false)
184,858✔
382

184,858✔
383
                return storageCallResponse.CallResp, nil
184,858✔
384
        }
385
}
386

387
// CallRO is an alias for Call with CallModeRO.
388
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
389
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
390
        return r.Call(ctx, bucketID, CallModeRO, fnc, args, opts)
×
391
}
×
392

393
// CallRW is an alias for Call with CallModeRW.
394
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
395
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
5✔
396
        return r.Call(ctx, bucketID, CallModeRW, fnc, args, opts)
5✔
397
}
5✔
398

399
// CallRE is an alias for Call with CallModeRE.
400
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
401
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
402
        return r.Call(ctx, bucketID, CallModeRE, fnc, args, opts)
×
403
}
×
404

405
// CallBRO is an alias for Call with CallModeBRO.
406
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
407
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
408
        return r.Call(ctx, bucketID, CallModeBRO, fnc, args, opts)
×
409
}
×
410

411
// CallBRE is an alias for Call with CallModeBRE.
412
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
413
        fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) {
×
414
        return r.Call(ctx, bucketID, CallModeBRE, fnc, args, opts)
×
415
}
×
416

417
// RouterMapCallRWOptions sets options for RouterMapCallRW.
418
type RouterMapCallRWOptions struct {
419
        // Timeout defines timeout for RouterMapCallRW.
420
        Timeout time.Duration
421
}
422

423
type storageMapResponseProto[T any] struct {
424
        ok    bool
425
        value T
426
        err   StorageCallVShardError
427
}
428

429
func (r *storageMapResponseProto[T]) DecodeMsgpack(d *msgpack.Decoder) error {
245,482✔
430
        // proto for 'storage_map' method
245,482✔
431
        // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
245,482✔
432
        respArrayLen, err := d.DecodeArrayLen()
245,482✔
433
        if err != nil {
245,482✔
434
                return err
×
435
        }
×
436

437
        if respArrayLen == 0 {
245,482✔
438
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
439
        }
×
440

441
        code, err := d.PeekCode()
245,482✔
442
        if err != nil {
245,482✔
443
                return err
×
444
        }
×
445

446
        if code == msgpcode.Nil {
245,484✔
447
                err = d.DecodeNil()
2✔
448
                if err != nil {
2✔
449
                        return err
×
450
                }
×
451

452
                if respArrayLen != 2 {
2✔
453
                        return fmt.Errorf("protocol violation: length is %d on vshard error case", respArrayLen)
×
454
                }
×
455

456
                err = d.Decode(&r.err)
2✔
457
                if err != nil {
2✔
458
                        return fmt.Errorf("failed to decode storage vshard error: %w", err)
×
459
                }
×
460

461
                return nil
2✔
462
        }
463

464
        isOk, err := d.DecodeBool()
245,480✔
465
        if err != nil {
245,480✔
466
                return err
×
467
        }
×
468

469
        if !isOk {
245,480✔
470
                return fmt.Errorf("protocol violation: isOk=false")
×
471
        }
×
472

473
        switch respArrayLen {
245,480✔
474
        case 1:
2✔
475
                break
2✔
476
        case 2:
245,478✔
477
                err = d.Decode(&r.value)
245,478✔
478
                if err != nil {
245,478✔
479
                        return fmt.Errorf("can't decode value %T: %w", r.value, err)
×
480
                }
×
481
        default:
×
482
                return fmt.Errorf("protocol violation: invalid array length when no vshard error: %d", respArrayLen)
×
483
        }
484

485
        r.ok = true
245,480✔
486

245,480✔
487
        return nil
245,480✔
488
}
489

490
type storageRefResponseProto struct {
491
        err         error
492
        bucketCount uint64
493
}
494

495
func (r *storageRefResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
245,485✔
496
        respArrayLen, err := d.DecodeArrayLen()
245,485✔
497
        if err != nil {
245,485✔
498
                return err
×
499
        }
×
500

501
        if respArrayLen == 0 {
245,485✔
502
                return fmt.Errorf("protocol violation: invalid array length: %d", respArrayLen)
×
503
        }
×
504

505
        code, err := d.PeekCode()
245,485✔
506
        if err != nil {
245,485✔
507
                return err
×
508
        }
×
509

510
        if code == msgpcode.Nil {
245,485✔
511
                err = d.DecodeNil()
×
512
                if err != nil {
×
513
                        return err
×
514
                }
×
515

516
                if respArrayLen != 2 {
×
517
                        return fmt.Errorf("protocol violation: length is %d on error case", respArrayLen)
×
518
                }
×
519

520
                // The possible variations of error here are fully unknown yet for us, e.g:
521
                // vshard error, assert error or some other type of error. So this question requires research.
522
                // So we do not decode it to some known error format, because we don't use it anyway.
523
                decodedError, err := d.DecodeInterface()
×
524
                if err != nil {
×
525
                        return err
×
526
                }
×
527

528
                // convert empty interface into error
529
                r.err = fmt.Errorf("%v", decodedError)
×
530

×
531
                return nil
×
532
        }
533

534
        r.bucketCount, err = d.DecodeUint64()
245,485✔
535
        if err != nil {
245,485✔
536
                return err
×
537
        }
×
538

539
        return nil
245,485✔
540
}
541

542
type replicasetFuture struct {
543
        // replicaset name
544
        name   string
545
        future *tarantool.Future
546
}
547

548
// RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the
549
// cluster with a guarantee that in case of success it was executed with all
550
// buckets being accessible for reads and writes.
551
// T is a return type of user defined function 'fnc'.
552
// We define it as a distinct function, not a Router method, because golang limitations,
553
// see: https://github.com/golang/go/issues/49085.
554
func RouterMapCallRW[T any](r *Router, ctx context.Context,
555
        fnc string, args interface{}, opts RouterMapCallRWOptions,
556
) (map[string]T, error) {
49,103✔
557
        const vshardStorageServiceCall = "vshard.storage._call"
49,103✔
558

49,103✔
559
        timeout := callTimeoutDefault
49,103✔
560
        if opts.Timeout > 0 {
49,104✔
561
                timeout = opts.Timeout
1✔
562
        }
1✔
563

564
        timeStart := time.Now()
49,103✔
565
        refID := r.refID.Add(1)
49,103✔
566

49,103✔
567
        nameToReplicasetRef := r.getNameToReplicaset()
49,103✔
568

49,103✔
569
        defer func() {
98,206✔
570
                // call function "storage_unref" if map_callrw is failed or successed
49,103✔
571
                storageUnrefReq := tarantool.NewCallRequest(vshardStorageServiceCall).
49,103✔
572
                        Args([]interface{}{"storage_unref", refID})
49,103✔
573

49,103✔
574
                for _, rs := range nameToReplicasetRef {
294,593✔
575
                        future := rs.conn.Do(storageUnrefReq, pool.RW)
245,490✔
576
                        future.SetError(nil) // TODO: does it cancel the request above or not?
245,490✔
577
                }
245,490✔
578
        }()
579

580
        ctx, cancel := context.WithTimeout(ctx, timeout)
49,103✔
581
        defer cancel()
49,103✔
582

49,103✔
583
        // ref stage
49,103✔
584

49,103✔
585
        storageRefReq := tarantool.NewCallRequest(vshardStorageServiceCall).
49,103✔
586
                Context(ctx).
49,103✔
587
                Args([]interface{}{"storage_ref", refID, timeout})
49,103✔
588

49,103✔
589
        var rsFutures = make([]replicasetFuture, 0, len(nameToReplicasetRef))
49,103✔
590

49,103✔
591
        // ref stage: send concurrent ref requests
49,103✔
592
        for name, rs := range nameToReplicasetRef {
294,593✔
593
                rsFutures = append(rsFutures, replicasetFuture{
245,490✔
594
                        name:   name,
245,490✔
595
                        future: rs.conn.Do(storageRefReq, pool.RW),
245,490✔
596
                })
245,490✔
597
        }
245,490✔
598

599
        // ref stage: get their responses
600
        var totalBucketCount uint64
49,103✔
601
        // proto for 'storage_ref' method:
49,103✔
602
        // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137
49,103✔
603
        for _, rsFuture := range rsFutures {
294,589✔
604
                var storageRefResponse storageRefResponseProto
245,486✔
605

245,486✔
606
                if err := rsFuture.future.GetTyped(&storageRefResponse); err != nil {
245,487✔
607
                        return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.name, err)
1✔
608
                }
1✔
609

610
                if storageRefResponse.err != nil {
245,485✔
611
                        return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.name, storageRefResponse.err)
×
612
                }
×
613

614
                totalBucketCount += storageRefResponse.bucketCount
245,485✔
615
        }
616

617
        if totalBucketCount != r.cfg.TotalBucketCount {
49,103✔
618
                return nil, fmt.Errorf("total bucket count got %d, expected %d", totalBucketCount, r.cfg.TotalBucketCount)
1✔
619
        }
1✔
620

621
        // map stage
622

623
        storageMapReq := tarantool.NewCallRequest(vshardStorageServiceCall).
49,101✔
624
                Context(ctx).
49,101✔
625
                Args([]interface{}{"storage_map", refID, fnc, args})
49,101✔
626

49,101✔
627
        // reuse the same slice again
49,101✔
628
        rsFutures = rsFutures[0:0]
49,101✔
629

49,101✔
630
        // map stage: send concurrent map requests
49,101✔
631
        for name, rs := range nameToReplicasetRef {
294,585✔
632
                rsFutures = append(rsFutures, replicasetFuture{
245,484✔
633
                        name:   name,
245,484✔
634
                        future: rs.conn.Do(storageMapReq, pool.RW),
245,484✔
635
                })
245,484✔
636
        }
245,484✔
637

638
        // map stage: get their responses
639
        nameToResult := make(map[string]T)
49,101✔
640
        for _, rsFuture := range rsFutures {
294,583✔
641
                var storageMapResponse storageMapResponseProto[T]
245,482✔
642

245,482✔
643
                err := rsFuture.future.GetTyped(&storageMapResponse)
245,482✔
644
                if err != nil {
245,482✔
UNCOV
645
                        return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.name, err)
×
UNCOV
646
                }
×
647

648
                if !storageMapResponse.ok {
245,484✔
649
                        return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.name, storageMapResponse.err)
2✔
650
                }
2✔
651

652
                nameToResult[rsFuture.name] = storageMapResponse.value
245,480✔
653
        }
654

655
        r.metrics().RequestDuration(time.Since(timeStart), true, true)
49,099✔
656

49,099✔
657
        return nameToResult, nil
49,099✔
658
}
659

660
// RouteAll return map of all replicasets.
661
func (r *Router) RouteAll() map[string]*Replicaset {
8✔
662
        nameToReplicasetRef := r.getNameToReplicaset()
8✔
663

8✔
664
        // Do not expose the original map to prevent unauthorized modification.
8✔
665
        nameToReplicasetCopy := make(map[string]*Replicaset)
8✔
666

8✔
667
        for k, v := range nameToReplicasetRef {
25✔
668
                nameToReplicasetCopy[k] = v
17✔
669
        }
17✔
670

671
        return nameToReplicasetCopy
8✔
672
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc