• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 27198360611

09 Jun 2026 09:54AM UTC coverage: 88.324% (-0.03%) from 88.349%
27198360611

Pull #497

github

p0rtale
fix: allow read-only operations when all masters are down

Read-only operations (get, select, pairs, count, len, min, max) used to
fail with connection errors if all masters in the cluster were unavailable,
even when healthy replicas were up and failover hadn't processed yet.

To resolve this, the following improvements were made:
- Introduced a `read_only` flag to `utils.get_space[s]` to fetch cluster
  schema from any healthy replica if masters are down.
- Updated `get`, `select`, `pairs`, `count`, `len`, `min`, `max` to use
  this new flag.
- Rewrote `call.any` to iterate through all replicasets and utilize
  vshard's `callro` instead of `call` to fetch metadata from replicas.
Pull Request #497: fix: allow read-only operations when all masters are down

29 of 30 new or added lines in 7 files covered. (96.67%)

3 existing lines in 2 files now uncovered.

5265 of 5961 relevant lines covered (88.32%)

12521.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.11
/crud/common/call.lua
1
local errors = require('errors')
643✔
2

3
local call_cache = require('crud.common.call_cache')
643✔
4
local dev_checks = require('crud.common.dev_checks')
643✔
5
local yield_checks = require('crud.common.yield_checks')
643✔
6
local utils = require('crud.common.utils')
643✔
7
local sharding_utils = require('crud.common.sharding.utils')
643✔
8
local fiber = require('fiber')
643✔
9
local fiber_clock = fiber.clock
643✔
10
local const = require('crud.common.const')
643✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
643✔
12

13
local BaseIterator = require('crud.common.map_call_cases.base_iter')
643✔
14
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
643✔
15

16
local CallError = errors.new_class('CallError')
643✔
17

18
local CALL_FUNC_NAME = 'call_on_storage'
643✔
19
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
643✔
20

21
local call = {}
643✔
22

23
local function call_on_storage(run_as_user, func_name, ...)
24
    return yield_checks.guard(box.session.su, run_as_user, call_cache.func_name_to_func(func_name), ...)
299,192✔
25
end
26

27
call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
643✔
28

29
function call.get_vshard_call_name(mode, prefer_replica, balance)
643✔
30
    dev_checks('string', '?boolean', '?boolean')
140,221✔
31

32
    if mode ~= 'write' and mode ~= 'read' then
140,221✔
33
        return nil, CallError:new("Unknown call mode: %s", mode)
8✔
34
    end
35

36
    if mode == 'write' then
140,217✔
37
        return 'callrw'
136,624✔
38
    end
39

40
    if not prefer_replica and not balance then
3,593✔
41
        return 'callro'
3,533✔
42
    end
43

44
    if not prefer_replica and balance then
60✔
45
        return 'callbro'
24✔
46
    end
47

48
    if prefer_replica and not balance then
36✔
49
        return 'callre'
24✔
50
    end
51

52
    -- prefer_replica and balance
53
    return 'callbre'
12✔
54
end
55

56
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id)
57
    -- Do not rewrite ShardingHashMismatchError class.
58
    if err.class_name == sharding_utils.ShardingHashMismatchError.name then
1,113✔
59
        return errors.wrap(err)
170✔
60
    end
61

62
    if replicaset_id == nil then
551✔
63
        local replicaset, _ = vshard_router:route(bucket_id)
31✔
64
        if replicaset == nil then
31✔
UNCOV
65
            return CallError:new(
×
UNCOV
66
                "Function returned an error, but we couldn't figure out the replicaset: %s", err
×
67
            )
68
        end
69

70
        replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
62✔
71

72
        if replicaset_id == nil then
31✔
73
            return CallError:new(
×
74
                "Function returned an error, but we couldn't figure out the replicaset id: %s", err
×
75
            )
76
        end
77
    end
78

79
    err = utils.update_storage_call_error_description(err, func_name, replicaset_id)
1,102✔
80
    err = errors.wrap(err)
1,102✔
81

82
    return CallError:new(utils.format_replicaset_error(
1,102✔
83
        replicaset_id, "Function returned an error: %s", err
551✔
84
    ))
1,102✔
85
end
86

87
--- Executes a vshard call and retries once after performing recovery actions
88
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
89
local function call_with_retry_and_recovery(vshard_router,
90
    replicaset, method, func_name, func_args, call_opts, is_single_call)
91
    local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
136,773✔
92

93
    -- In case cluster was just bootstrapped with auto master discovery,
94
    -- replicaset may miss master.
95
    local resp, err = replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
136,773✔
96

97
    if err == nil then
136,773✔
98
        return resp, err
136,640✔
99
    end
100

101
    -- This is a partial copy of error handling from vshard.router.router_call_impl()
102
    -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
103
    if err.class_name == bucket_ref_unref.BucketRefError.name then
133✔
104
        if is_single_call and #err.bucket_ref_errs == 1 then
×
105
            local single_err = err.bucket_ref_errs[1]
×
106
            local destination = single_err.vshard_err.destination
×
107
            if destination and vshard_router.replicasets[destination] then
×
108
                replicaset = vshard_router.replicasets[destination]
×
109
            end
110
        end
111

112
        for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
×
113
            local bucket_id = bucket_ref_err.bucket_id
×
114
            local vshard_err = bucket_ref_err.vshard_err
×
115
            if vshard_err.name == 'WRONG_BUCKET' or
×
116
               vshard_err.name == 'BUCKET_IS_LOCKED' or
×
117
               vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
×
118
                vshard_router:_bucket_reset(bucket_id)
×
119
            end
120
        end
121
    elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
133✔
122
        replicaset:locate_master()
×
123
    end
124

125
    -- Retry only once: should be enough for initial discovery,
126
    -- otherwise force user fix up cluster bootstrap.
127
    return replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
133✔
128
end
129

130
function call.map(vshard_router, func_name, func_args, opts)
643✔
131
    dev_checks('table', 'string', '?table', {
5,357✔
132
        mode = 'string',
133
        prefer_replica = '?boolean',
134
        balance = '?boolean',
135
        timeout = '?number',
136
        request_timeout = '?number',
137
        replicasets = '?table',
138
        iter = '?table',
139
        postprocessor = '?table',
140
    })
141
    opts = opts or {}
5,357✔
142

143
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
5,357✔
144
    if err ~= nil then
5,357✔
145
        return nil, err
2✔
146
    end
147

148
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
5,355✔
149

150
    local iter = opts.iter
5,355✔
151
    if iter == nil then
5,355✔
152
        iter, err = BaseIterator:new({
2,096✔
153
                        func_args = func_args,
1,048✔
154
                        replicasets = opts.replicasets,
1,048✔
155
                        vshard_router = vshard_router,
1,048✔
156
                    })
1,048✔
157
        if err ~= nil then
1,048✔
158
            return nil, err
×
159
        end
160
    end
161

162
    local postprocessor = opts.postprocessor
5,355✔
163
    if postprocessor == nil then
5,355✔
164
        postprocessor = BasePostprocessor:new()
2,096✔
165
    end
166

167
    local futures_by_replicasets = {}
5,355✔
168
    local call_opts = {
5,355✔
169
        is_async = true,
170
        request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
5,355✔
171
    }
172
    while iter:has_next() do
24,864✔
173
        local args, replicaset, replicaset_id = iter:get()
7,077✔
174

175
        local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
14,154✔
176
            func_name, args, call_opts, false)
7,077✔
177

178
        if err ~= nil then
7,077✔
179
            local result_info = {
×
180
                key = replicaset_id,
181
                value = nil,
182
            }
183

184
            local err_info = {
×
185
                err_wrapper = wrap_vshard_err,
186
                err = err,
187
                wrapper_args = {func_name, replicaset_id},
188
            }
189

190
            -- Enforce early exit on futures build fail.
191
            postprocessor:collect(result_info, err_info)
×
192
            return postprocessor:get()
×
193
        end
194

195
        futures_by_replicasets[replicaset_id] = future
7,077✔
196
    end
197

198
    local deadline = fiber_clock() + timeout
10,710✔
199
    for replicaset_id, future in pairs(futures_by_replicasets) do
17,727✔
200
        local wait_timeout = deadline - fiber_clock()
14,130✔
201
        if wait_timeout < 0 then
7,065✔
202
            wait_timeout = 0
24✔
203
        end
204

205
        local result, err = future:wait_result(wait_timeout)
7,065✔
206

207
        local result_info = {
7,065✔
208
            key = replicaset_id,
7,065✔
209
            value = result,
7,065✔
210
        }
211

212
        local err_info = {
7,065✔
213
            err_wrapper = wrap_vshard_err,
7,065✔
214
            err = err,
7,065✔
215
            wrapper_args = {func_name, replicaset_id},
7,065✔
216
        }
217

218
        local early_exit = postprocessor:collect(result_info, err_info)
7,065✔
219
        if early_exit then
7,065✔
220
            break
48✔
221
        end
222
    end
223

224
    return postprocessor:get()
5,355✔
225
end
226

227
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
643✔
228
    dev_checks('table', 'number', 'string', '?table', {
129,459✔
229
        mode = 'string',
230
        prefer_replica = '?boolean',
231
        balance = '?boolean',
232
        timeout = '?number',
233
        request_timeout = '?number',
234
    })
235

236
    local vshard_call_name, err = call.get_vshard_call_name(opts.mode, opts.prefer_replica, opts.balance)
129,459✔
237
    if err ~= nil then
129,459✔
238
        return nil, err
2✔
239
    end
240

241
    local replicaset, err = vshard_router:route(bucket_id)
129,457✔
242
    if err ~= nil then
129,457✔
243
        return nil, CallError:new("Failed to get router replicaset: %s", tostring(err))
4✔
244
    end
245

246
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
129,455✔
247
    local request_timeout = opts.mode == 'read' and opts.request_timeout or nil
129,455✔
248

249
    local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
258,910✔
250
        func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
129,455✔
251
    if err ~= nil then
129,455✔
252
        return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
250✔
253
    end
254

255
    if res == box.NULL then
129,330✔
256
        return nil
×
257
    end
258

259
    return res
129,330✔
260
end
261

262
function call.any(vshard_router, func_name, func_args, opts)
643✔
263
    dev_checks('table', 'string', '?table', {
239✔
264
        timeout = '?number',
265
    })
266

267
    local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
239✔
268

269
    local replicasets, err = vshard_router:routeall()
239✔
270
    if replicasets == nil then
239✔
271
        return nil, CallError:new("Failed to get router replicasets: %s", err.err)
×
272
    end
273

274
    local last_replicaset_id = nil
239✔
275
    local last_err = nil
276

277
    for replicaset_id, replicaset in pairs(replicasets) do
484✔
278
        local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'callro',
482✔
279
            func_name, func_args, {timeout = timeout}, false)
241✔
280

281
        if err == nil then
241✔
282
            if res == box.NULL then
235✔
283
                return nil
49✔
284
            end
285
            return res
186✔
286
        end
287

288
        last_replicaset_id = replicaset_id
6✔
289
        last_err = err
6✔
290
    end
291

292
    return nil, wrap_vshard_err(vshard_router, last_err, func_name, last_replicaset_id)
8✔
293
end
294

295
return call
643✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc