• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 19479805971

18 Nov 2025 08:09PM UTC coverage: 88.483% (-0.06%) from 88.539%
19479805971

push

github

ita-sammann
TNTP-2109: Process bucket_ref errors in crud.router

7 of 11 new or added lines in 1 file covered. (63.64%)

1 existing line in 1 file now uncovered.

5063 of 5722 relevant lines covered (88.48%)

6252.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.4
/crud/common/call.lua
1
local errors = require('errors')
258✔
2
local vshard = require('vshard')
258✔
3

4
local call_cache = require('crud.common.call_cache')
258✔
5
local dev_checks = require('crud.common.dev_checks')
258✔
6
local utils = require('crud.common.utils')
258✔
7
local sharding_utils = require('crud.common.sharding.utils')
258✔
8
local fiber = require('fiber')
258✔
9
local const = require('crud.common.const')
258✔
10
local rebalance = require('crud.common.rebalance')
258✔
11

12
local BaseIterator = require('crud.common.map_call_cases.base_iter')
258✔
13
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
258✔
14

15
local CallError = errors.new_class('CallError')
258✔
16

17
local CALL_FUNC_NAME = 'call_on_storage'
258✔
18
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
258✔
19
local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'
258✔
20

21
local call = {}
258✔
22

23
local function bucket_unref_many(bucket_ids, mode)
24
    local all_ok = true
×
25
    local last_err = nil
26
    for _, bucket_id in pairs(bucket_ids) do
×
27
        local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
×
28
        if not ok then
×
29
            all_ok = nil
×
30
            last_err = err
×
31
        end
32
    end
33
    return all_ok, last_err
×
34
end
35

36
local function bucket_ref_many(bucket_ids, mode)
37
    local reffed = {}
×
38
    for _, bucket_id in pairs(bucket_ids) do
×
39
        local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
×
40
        if not ok then
×
41
            bucket_unref_many(reffed, mode)
×
42
            return nil, err
×
43
        end
44
        table.insert(reffed, bucket_id)
×
45
    end
46
    return true, nil
×
47
end
48

49
local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
50
    fiber.name(CRUD_CALL_FIBER_NAME .. func_name)
×
51

52
    local ok, ref_err = bucket_ref_many(bucket_ids, mode)
×
53
    if not ok then
×
54
        return nil, ref_err
×
55
    end
56

57
    local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}
×
58

59
    ok, ref_err = bucket_unref_many(bucket_ids, mode)
×
60
    if not ok then
×
61
        return nil, ref_err
×
62
    end
63

64
    return unpack(res, 1, table.maxn(res))
×
65
end
66

67
local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
68
    fiber.name(CRUD_CALL_FIBER_NAME .. func_name)
74,196✔
69

70
    return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
148,384✔
71
end
72

73
local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast
258✔
74

75
local function safe_mode_enable()
76
    call_on_storage = call_on_storage_safe
118✔
77

78
    for fb_id, fb in pairs(fiber.info()) do
3,528✔
79
        if string.find(fb.name, CRUD_CALL_FIBER_NAME) then
3,292✔
80
            fiber.kill(fb_id)
31✔
81
        end
82
    end
83
end
84

85
local function safe_mode_disable()
86
    call_on_storage = call_on_storage_fast
112✔
87
end
88

89
rebalance.register_safe_mode_enable_hook(safe_mode_enable)
258✔
90
rebalance.register_safe_mode_disable_hook(safe_mode_disable)
258✔
91

92
call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
258✔
93

94
function call.get_vshard_call_name(mode, prefer_replica, balance)
258✔
95
    dev_checks('string', '?boolean', '?boolean')
69,810✔
96

97
    if mode ~= 'write' and mode ~= 'read' then
69,810✔
98
        return nil, CallError:new("Unknown call mode: %s", mode)
4✔
99
    end
100

101
    if mode == 'write' then
69,808✔
102
        return 'callrw'
68,117✔
103
    end
104

105
    if not prefer_replica and not balance then
1,691✔
106
        return 'callro'
1,661✔
107
    end
108

109
    if not prefer_replica and balance then
30✔
110
        return 'callbro'
12✔
111
    end
112

113
    if prefer_replica and not balance then
18✔
114
        return 'callre'
12✔
115
    end
116

117
    -- prefer_replica and balance
118
    return 'callbre'
6✔
119
end
120

121
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id)
122
    -- Do not rewrite ShardingHashMismatchError class.
123
    if err.class_name == sharding_utils.ShardingHashMismatchError.name then
605✔
124
        return errors.wrap(err)
85✔
125
    end
126

127
    if replicaset_id == nil then
300✔
128
        local replicaset, _ = vshard_router:route(bucket_id)
14✔
129
        if replicaset == nil then
14✔
130
            return CallError:new(
×
131
                "Function returned an error, but we couldn't figure out the replicaset: %s", err
×
132
            )
133
        end
134

135
        replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
28✔
136

137
        if replicaset_id == nil then
14✔
138
            return CallError:new(
×
139
                "Function returned an error, but we couldn't figure out the replicaset id: %s", err
×
140
            )
141
        end
142
    end
143

144
    err = utils.update_storage_call_error_description(err, func_name, replicaset_id)
600✔
145
    err = errors.wrap(err)
600✔
146

147
    return CallError:new(utils.format_replicaset_error(
600✔
148
        replicaset_id, "Function returned an error: %s", err
300✔
149
    ))
600✔
150
end
151

152
local function retry_call_with_master_discovery(vshard_router, replicaset, method, func_name, func_args, call_opts, mode, bucket_ids)
153
    local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)
68,149✔
154

155
    -- In case cluster was just bootstrapped with auto master discovery,
156
    -- replicaset may miss master.
157
    local resp, err = replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
68,149✔
158

159
    if err == nil then
68,149✔
160
        return resp, err
68,085✔
161
    end
162

163
    if err.name == 'WRONG_BUCKET' or
64✔
164
       err.name == 'BUCKET_IS_LOCKED' or
64✔
165
       err.name == 'TRANSFER_IS_IN_PROGRESS' then
64✔
NEW
166
        vshard_router:_bucket_reset(err.bucket_id)
×
167

168
        -- Substitute replicaset only for single bucket_id calls.
NEW
169
        if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
×
NEW
170
            replicaset = vshard_router.replicasets[err.destination]
×
171
        else
NEW
172
            return nil, err
×
173
        end
174
    elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
64✔
UNCOV
175
        replicaset:locate_master()
×
176
    end
177

178
    -- Retry only once: should be enough for initial discovery,
179
    -- otherwise force user fix up cluster bootstrap.
180
    return replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
64✔
181
end
182

183
function call.map(vshard_router, func_name, func_args, opts)
258✔
184
    dev_checks('table', 'string', '?table', {
2,469✔
185
        mode = 'string',
186
        prefer_replica = '?boolean',
187
        balance = '?boolean',
188
        timeout = '?number',
189
        request_timeout = '?number',
190
        replicasets = '?table',
191
        iter = '?table',
192
        postprocessor = '?table',
193
    })
194
    opts = opts or {}
2,469✔
195

196
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
2,469✔
197
    if err ~= nil then
2,469✔
198
        return nil, err
1✔
199
    end
200

201
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
2,468✔
202

203
    local iter = opts.iter
2,468✔
204
    if iter == nil then
2,468✔
205
        iter, err = BaseIterator:new({
1,054✔
206
                        func_args = func_args,
527✔
207
                        replicasets = opts.replicasets,
527✔
208
                        vshard_router = vshard_router,
527✔
209
                    })
527✔
210
        if err ~= nil then
527✔
211
            return nil, err
×
212
        end
213
    end
214

215
    local postprocessor = opts.postprocessor
2,468✔
216
    if postprocessor == nil then
2,468✔
217
        postprocessor = BasePostprocessor:new()
1,054✔
218
    end
219

220
    local futures_by_replicasets = {}
2,468✔
221
    local call_opts = {
2,468✔
222
        is_async = true,
223
        request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
2,468✔
224
    }
225
    while iter:has_next() do
11,570✔
226
        local args, replicaset, replicaset_id, bucket_ids = iter:get()
3,317✔
227

228
        local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
6,634✔
229
            func_name, args, call_opts, opts.mode, bucket_ids)
3,317✔
230

231
        if err ~= nil then
3,317✔
232
            local result_info = {
×
233
                key = replicaset_id,
234
                value = nil,
235
            }
236

237
            local err_info = {
×
238
                err_wrapper = wrap_vshard_err,
239
                err = err,
240
                wrapper_args = {func_name, replicaset_id},
241
            }
242

243
            -- Enforce early exit on futures build fail.
244
            postprocessor:collect(result_info, err_info)
×
245
            return postprocessor:get()
×
246
        end
247

248
        futures_by_replicasets[replicaset_id] = future
3,317✔
249
    end
250

251
    local deadline = fiber.clock() + timeout
4,936✔
252
    for replicaset_id, future in pairs(futures_by_replicasets) do
8,221✔
253
        local wait_timeout = deadline - fiber.clock()
6,620✔
254
        if wait_timeout < 0 then
3,310✔
255
            wait_timeout = 0
16✔
256
        end
257

258
        local result, err = future:wait_result(wait_timeout)
3,310✔
259

260
        local result_info = {
3,310✔
261
            key = replicaset_id,
3,310✔
262
            value = result,
3,310✔
263
        }
264

265
        local err_info = {
3,310✔
266
            err_wrapper = wrap_vshard_err,
3,310✔
267
            err = err,
3,310✔
268
            wrapper_args = {func_name, replicaset_id},
3,310✔
269
        }
270

271
        local early_exit = postprocessor:collect(result_info, err_info)
3,310✔
272
        if early_exit then
3,310✔
273
            break
25✔
274
        end
275
    end
276

277
    return postprocessor:get()
2,468✔
278
end
279

280
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
258✔
281
    dev_checks('table', 'number', 'string', '?table', {
64,713✔
282
        mode = 'string',
283
        prefer_replica = '?boolean',
284
        balance = '?boolean',
285
        timeout = '?number',
286
        request_timeout = '?number',
287
    })
288

289
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
64,713✔
290
    if err ~= nil then
64,713✔
291
        return nil, err
1✔
292
    end
293

294
    local replicaset, err = vshard_router:route(bucket_id)
64,712✔
295
    if err ~= nil then
64,712✔
296
        return nil, CallError:new("Failed to get router replicaset: %s", tostring(err))
2✔
297
    end
298

299
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
64,711✔
300
    local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
64,711✔
301

302
    local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
129,422✔
303
        func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
64,711✔
304
        opts.mode, {bucket_id})
64,711✔
305
    if err ~= nil then
64,711✔
306
        return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
122✔
307
    end
308

309
    if res == box.NULL then
64,650✔
310
        return nil
×
311
    end
312

313
    return res
64,650✔
314
end
315

316
function call.any(vshard_router, func_name, func_args, opts)
258✔
317
    dev_checks('table', 'string', '?table', {
121✔
318
        timeout = '?number',
319
    })
320

321
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
121✔
322

323
    local replicasets, err = vshard_router:routeall()
121✔
324
    if replicasets == nil then
121✔
325
        return nil, CallError:new("Failed to get router replicasets: %s", err.err)
×
326
    end
327
    local replicaset_id, replicaset = next(replicasets)
121✔
328

329
    local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
242✔
330
        func_name, func_args, {timeout = timeout},
121✔
331
    'read', {})
121✔
332
    if err ~= nil then
121✔
333
        return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
6✔
334
    end
335

336
    if res == box.NULL then
118✔
337
        return nil
24✔
338
    end
339

340
    return res
94✔
341
end
342

343
return call
258✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc