• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 19510155373

19 Nov 2025 05:17PM UTC coverage: 88.472% (-0.5%) from 88.969%
19510155373

Pull #462

github

ita-sammann
TNTP-2109: Multiple minor fixes
Pull Request #462: TNTP-2109: Switch to "safe" mode on vshard rebalance

124 of 171 new or added lines in 8 files covered. (72.51%)

1 existing line in 1 file now uncovered.

5050 of 5708 relevant lines covered (88.47%)

6039.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.4
/crud/common/call.lua
1
local errors = require('errors')
258✔
2
local vshard = require('vshard')
258✔
3

4
local call_cache = require('crud.common.call_cache')
258✔
5
local dev_checks = require('crud.common.dev_checks')
258✔
6
local utils = require('crud.common.utils')
258✔
7
local sharding_utils = require('crud.common.sharding.utils')
258✔
8
local fiber = require('fiber')
258✔
9
local const = require('crud.common.const')
258✔
10
local rebalance = require('crud.common.rebalance')
258✔
11

12
local BaseIterator = require('crud.common.map_call_cases.base_iter')
258✔
13
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
258✔
14

15
local CallError = errors.new_class('CallError')
258✔
16

17
local CALL_FUNC_NAME = 'call_on_storage'
258✔
18
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
258✔
19
local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'
258✔
20

21
local call = {}
258✔
22

23
local function bucket_unref_many(bucket_ids, mode)
NEW
24
    local all_ok = true
×
25
    local last_err = nil
NEW
26
    for _, bucket_id in pairs(bucket_ids) do
×
NEW
27
        local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
×
NEW
28
        if not ok then
×
NEW
29
            all_ok = nil
×
NEW
30
            last_err = err
×
31
        end
32
    end
NEW
33
    return all_ok, last_err
×
34
end
35

36
local function bucket_ref_many(bucket_ids, mode)
NEW
37
    local reffed = {}
×
NEW
38
    for _, bucket_id in pairs(bucket_ids) do
×
NEW
39
        local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
×
NEW
40
        if not ok then
×
NEW
41
            bucket_unref_many(reffed, mode)
×
NEW
42
            return nil, err
×
43
        end
NEW
44
        table.insert(reffed, bucket_id)
×
45
    end
NEW
46
    return true, nil
×
47
end
48

49
local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
NEW
50
    fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name)
×
51

NEW
52
    local ok, ref_err = bucket_ref_many(bucket_ids, mode)
×
NEW
53
    if not ok then
×
NEW
54
        return nil, ref_err
×
55
    end
56

NEW
57
    local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}
×
58

NEW
59
    ok, ref_err = bucket_unref_many(bucket_ids, mode)
×
NEW
60
    if not ok then
×
NEW
61
        return nil, ref_err
×
62
    end
63

NEW
64
    return unpack(res, 1, table.maxn(res))
×
65
end
66

67
local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
68
    fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name)
74,214✔
69

70
    return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
148,420✔
71
end
72

73
local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast
258✔
74

75
local function safe_mode_enable()
76
    call_on_storage = call_on_storage_safe
2✔
77

78
    for fb_id, fb in pairs(fiber.info()) do
69✔
79
        if string.find(fb.name, CRUD_CALL_FIBER_NAME) then
65✔
80
            fiber.kill(fb_id)
1✔
81
        end
82
    end
83
end
84

85
local function safe_mode_disable()
86
    call_on_storage = call_on_storage_fast
12✔
87
end
88

89
rebalance.register_safe_mode_enable_hook(safe_mode_enable)
258✔
90
rebalance.register_safe_mode_disable_hook(safe_mode_disable)
258✔
91

92
call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
258✔
93

94
function call.get_vshard_call_name(mode, prefer_replica, balance)
258✔
95
    dev_checks('string', '?boolean', '?boolean')
69,801✔
96

97
    if mode ~= 'write' and mode ~= 'read' then
69,801✔
98
        return nil, CallError:new("Unknown call mode: %s", mode)
4✔
99
    end
100

101
    if mode == 'write' then
69,799✔
102
        return 'callrw'
68,131✔
103
    end
104

105
    if not prefer_replica and not balance then
1,668✔
106
        return 'callro'
1,638✔
107
    end
108

109
    if not prefer_replica and balance then
30✔
110
        return 'callbro'
12✔
111
    end
112

113
    if prefer_replica and not balance then
18✔
114
        return 'callre'
12✔
115
    end
116

117
    -- prefer_replica and balance
118
    return 'callbre'
6✔
119
end
120

121
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id)
122
    -- Do not rewrite ShardingHashMismatchError class.
123
    if err.class_name == sharding_utils.ShardingHashMismatchError.name then
607✔
124
        return errors.wrap(err)
85✔
125
    end
126

127
    if replicaset_id == nil then
302✔
128
        local replicaset, _ = vshard_router:route(bucket_id)
16✔
129
        if replicaset == nil then
16✔
130
            return CallError:new(
×
131
                "Function returned an error, but we couldn't figure out the replicaset: %s", err
×
132
            )
133
        end
134

135
        replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
32✔
136

137
        if replicaset_id == nil then
16✔
138
            return CallError:new(
×
139
                "Function returned an error, but we couldn't figure out the replicaset id: %s", err
×
140
            )
141
        end
142
    end
143

144
    err = utils.update_storage_call_error_description(err, func_name, replicaset_id)
604✔
145
    err = errors.wrap(err)
604✔
146

147
    return CallError:new(utils.format_replicaset_error(
604✔
148
        replicaset_id, "Function returned an error: %s", err
302✔
149
    ))
604✔
150
end
151

152
local function retry_call_with_master_discovery(vshard_router, replicaset,
153
                                                method, func_name, func_args,
154
                                                call_opts, mode, bucket_ids)
155
    local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
68,163✔
156

157
    -- In case cluster was just bootstrapped with auto master discovery,
158
    -- replicaset may miss master.
159
    local resp, err = replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
68,163✔
160

161
    if err == nil then
68,163✔
162
        return resp, err
68,097✔
163
    end
164

165
    -- This is a partial copy of error handling from vshard.router.router_call_impl()
166
    -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
167
    if err.name == 'WRONG_BUCKET' or
66✔
168
            err.name == 'BUCKET_IS_LOCKED' or
66✔
169
            err.name == 'TRANSFER_IS_IN_PROGRESS' then
66✔
NEW
170
        vshard_router:_bucket_reset(err.bucket_id)
×
171

172
        -- Substitute replicaset only for single bucket_id calls.
NEW
173
        if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
×
NEW
174
            replicaset = vshard_router.replicasets[err.destination]
×
175
        else
NEW
176
            return nil, err
×
177
        end
178
    elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
66✔
UNCOV
179
        replicaset:locate_master()
×
180
    end
181

182
    -- Retry only once: should be enough for initial discovery,
183
    -- otherwise force user fix up cluster bootstrap.
184
    return replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
66✔
185
end
186

187
function call.map(vshard_router, func_name, func_args, opts)
258✔
188
    dev_checks('table', 'string', '?table', {
2,482✔
189
        mode = 'string',
190
        prefer_replica = '?boolean',
191
        balance = '?boolean',
192
        timeout = '?number',
193
        request_timeout = '?number',
194
        replicasets = '?table',
195
        iter = '?table',
196
        postprocessor = '?table',
197
    })
198
    opts = opts or {}
2,482✔
199

200
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
2,482✔
201
    if err ~= nil then
2,482✔
202
        return nil, err
1✔
203
    end
204

205
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
2,481✔
206

207
    local iter = opts.iter
2,481✔
208
    if iter == nil then
2,481✔
209
        iter, err = BaseIterator:new({
1,054✔
210
                        func_args = func_args,
527✔
211
                        replicasets = opts.replicasets,
527✔
212
                        vshard_router = vshard_router,
527✔
213
                    })
527✔
214
        if err ~= nil then
527✔
215
            return nil, err
×
216
        end
217
    end
218

219
    local postprocessor = opts.postprocessor
2,481✔
220
    if postprocessor == nil then
2,481✔
221
        postprocessor = BasePostprocessor:new()
1,054✔
222
    end
223

224
    local futures_by_replicasets = {}
2,481✔
225
    local call_opts = {
2,481✔
226
        is_async = true,
227
        request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
2,481✔
228
    }
229
    while iter:has_next() do
11,622✔
230
        local args, replicaset, replicaset_id, bucket_ids = iter:get()
3,330✔
231

232
        local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
6,660✔
233
            func_name, args, call_opts, opts.mode, bucket_ids)
3,330✔
234

235
        if err ~= nil then
3,330✔
236
            local result_info = {
×
237
                key = replicaset_id,
238
                value = nil,
239
            }
240

241
            local err_info = {
×
242
                err_wrapper = wrap_vshard_err,
243
                err = err,
244
                wrapper_args = {func_name, replicaset_id},
245
            }
246

247
            -- Enforce early exit on futures build fail.
248
            postprocessor:collect(result_info, err_info)
×
249
            return postprocessor:get()
×
250
        end
251

252
        futures_by_replicasets[replicaset_id] = future
3,330✔
253
    end
254

255
    local deadline = fiber.clock() + timeout
4,962✔
256
    for replicaset_id, future in pairs(futures_by_replicasets) do
8,260✔
257
        local wait_timeout = deadline - fiber.clock()
6,646✔
258
        if wait_timeout < 0 then
3,323✔
259
            wait_timeout = 0
16✔
260
        end
261

262
        local result, err = future:wait_result(wait_timeout)
3,323✔
263

264
        local result_info = {
3,323✔
265
            key = replicaset_id,
3,323✔
266
            value = result,
3,323✔
267
        }
268

269
        local err_info = {
3,323✔
270
            err_wrapper = wrap_vshard_err,
3,323✔
271
            err = err,
3,323✔
272
            wrapper_args = {func_name, replicaset_id},
3,323✔
273
        }
274

275
        local early_exit = postprocessor:collect(result_info, err_info)
3,323✔
276
        if early_exit then
3,323✔
277
            break
25✔
278
        end
279
    end
280

281
    return postprocessor:get()
2,481✔
282
end
283

284
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
258✔
285
    dev_checks('table', 'number', 'string', '?table', {
64,714✔
286
        mode = 'string',
287
        prefer_replica = '?boolean',
288
        balance = '?boolean',
289
        timeout = '?number',
290
        request_timeout = '?number',
291
    })
292

293
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
64,714✔
294
    if err ~= nil then
64,714✔
295
        return nil, err
1✔
296
    end
297

298
    local replicaset, err = vshard_router:route(bucket_id)
64,713✔
299
    if err ~= nil then
64,713✔
300
        return nil, CallError:new("Failed to get router replicaset: %s", tostring(err))
2✔
301
    end
302

303
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
64,712✔
304
    local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
64,712✔
305

306
    local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
129,424✔
307
        func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
64,712✔
308
        opts.mode, {bucket_id})
64,712✔
309
    if err ~= nil then
64,712✔
310
        return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
126✔
311
    end
312

313
    if res == box.NULL then
64,649✔
314
        return nil
×
315
    end
316

317
    return res
64,649✔
318
end
319

320
function call.any(vshard_router, func_name, func_args, opts)
258✔
321
    dev_checks('table', 'string', '?table', {
121✔
322
        timeout = '?number',
323
    })
324

325
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
121✔
326

327
    local replicasets, err = vshard_router:routeall()
121✔
328
    if replicasets == nil then
121✔
329
        return nil, CallError:new("Failed to get router replicasets: %s", err.err)
×
330
    end
331
    local replicaset_id, replicaset = next(replicasets)
121✔
332

333
    local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
242✔
334
        func_name, func_args, {timeout = timeout},
121✔
335
    'read', {})
121✔
336
    if err ~= nil then
121✔
337
        return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
6✔
338
    end
339

340
    if res == box.NULL then
118✔
341
        return nil
24✔
342
    end
343

344
    return res
94✔
345
end
346

347
return call
258✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc