• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20467770584

23 Dec 2025 05:47PM UTC coverage: 88.51%. First build
20467770584

Pull #467

github

ita-sammann
[TNTP-2109] Add safe implementation of crud methods

For the most of crud methods, when safe mode is enabled, bucket_ref()
for appropriate bucket is called before calling the method itself.
This prevents from writing tuples to the wrong replicaset during or
after vshard rebalance.
Closes #448.

Co-authored-by: Satbek Turganbayev <s.turganbayev@corp.mail.ru>
Pull Request #467: [TNTP-2109] Switch to safe mode on vshard rebalance

282 of 347 new or added lines in 13 files covered. (81.27%)

5215 of 5892 relevant lines covered (88.51%)

13459.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/crud/update.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3

4
local call = require('crud.common.call')
593✔
5
local const = require('crud.common.const')
593✔
6
local utils = require('crud.common.utils')
593✔
7
local sharding = require('crud.common.sharding')
593✔
8
local sharding_key_module = require('crud.common.sharding.sharding_key')
593✔
9
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
593✔
10
local dev_checks = require('crud.common.dev_checks')
593✔
11
local schema = require('crud.common.schema')
593✔
12
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
13

14
local UpdateError = errors.new_class('UpdateError', {capture_stack = false})
593✔
15

16
local update = {}
593✔
17

18
local UPDATE_FUNC_NAME = 'update_on_storage'
593✔
19
local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME)
593✔
20

21
local function update_on_storage(space_name, key, operations, field_names, opts)
22
    dev_checks('string', '?', 'table', '?table', {
128✔
23
        bucket_id = 'number|cdata',
24
        sharding_key_hash = '?number',
25
        sharding_func_hash = '?number',
26
        skip_sharding_hash_check = '?boolean',
27
        noreturn = '?boolean',
28
        fetch_latest_metadata = '?boolean',
29
    })
30

31
    opts = opts or {}
128✔
32

33
    local space = box.space[space_name]
128✔
34
    if space == nil then
128✔
35
        return nil, UpdateError:new("Space %q doesn't exist", space_name)
×
36
    end
37

38
    local _, err = sharding.check_sharding_hash(space_name,
256✔
39
                                                opts.sharding_func_hash,
128✔
40
                                                opts.sharding_key_hash,
128✔
41
                                                opts.skip_sharding_hash_check)
128✔
42

43
    if err ~= nil then
128✔
44
        return nil, err
4✔
45
    end
46

47
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
124✔
48
    if not ref_ok then
124✔
NEW
49
        return nil, bucket_ref_err
×
50
    end
51

52
    -- add_space_schema_hash is false because
53
    -- reloading space format on router can't avoid update error on storage
54
    local res, err = schema.wrap_func_result(space, space.update, {
248✔
55
        add_space_schema_hash = false,
56
        field_names = field_names,
124✔
57
        noreturn = opts.noreturn,
124✔
58
        fetch_latest_metadata = opts.fetch_latest_metadata,
124✔
59
    }, space, key, operations)
124✔
60

61
    if err == nil and res.err ~= nil and utils.is_field_not_found(res.err.code) then
160✔
62
        -- Relevant for Tarantool older than 2.8.1.
63
        -- We can only add fields to end of the tuple.
64
        -- If schema is updated and nullable fields are added, then we will get error.
65
        -- Therefore, we need to add filling of intermediate nullable fields.
66
        -- More details: https://github.com/tarantool/tarantool/issues/3378
67
        operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key))
8✔
68
        res, err = schema.wrap_func_result(space, space.update, {
4✔
69
            add_space_schema_hash = false,
70
            field_names = field_names,
2✔
71
            noreturn = opts.noreturn,
2✔
72
            fetch_latest_metadata = opts.fetch_latest_metadata,
2✔
73
        }, space, key, operations)
4✔
74
    end
75

76
    local unref_ok, err_unref = unref(opts.bucket_id)
124✔
77
    if not unref_ok then
124✔
NEW
78
        return nil, err_unref
×
79
    end
80

81
    return res, err
124✔
82
end
83

84
update.storage_api = {[UPDATE_FUNC_NAME] = update_on_storage}
593✔
85

86
-- returns result, err, need_reload
87
-- need_reload indicates if reloading schema could help
88
-- see crud.common.schema.wrap_func_reload()
89
local function call_update_on_router(vshard_router, space_name, key, user_operations, opts)
90
    dev_checks('table', 'string', '?', 'table', {
144✔
91
        timeout = '?number',
92
        bucket_id = '?',
93
        fields = '?table',
94
        vshard_router = '?string|table',
95
        noreturn = '?boolean',
96
        fetch_latest_metadata = '?boolean',
97
    })
98

99
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
144✔
100
    if err ~= nil then
144✔
101
        return nil, UpdateError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
102
    end
103
    if space == nil then
144✔
104
        return nil, UpdateError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
24✔
105
    end
106

107
    local space_format = space:format()
132✔
108

109
    if box.tuple.is(key) then
264✔
110
        key = key:totable()
×
111
    end
112

113
    local sharding_key = key
132✔
114
    local sharding_key_hash = nil
132✔
115
    local skip_sharding_hash_check = nil
116

117
    if opts.bucket_id == nil then
132✔
118
        local primary_index_parts = space.index[0].parts
120✔
119

120
        local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
120✔
121
        if err ~= nil then
120✔
122
            return nil, err
×
123
        end
124

125
        sharding_key, err = sharding_key_module.extract_from_pk(vshard_router,
240✔
126
                                                                space_name,
120✔
127
                                                                sharding_key_data.value,
120✔
128
                                                                primary_index_parts, key)
240✔
129
        if err ~= nil then
120✔
130
            return nil, err
4✔
131
        end
132

133
        sharding_key_hash = sharding_key_data.hash
116✔
134
    else
135
        skip_sharding_hash_check = true
12✔
136
    end
137

138
    local operations = user_operations
128✔
139
    if not utils.tarantool_supports_fieldpaths() then
256✔
140
        operations, err = utils.convert_operations(user_operations, space_format)
×
141
        if err ~= nil then
×
142
            return nil, UpdateError:new("Wrong operations are specified: %s", err), const.NEED_SCHEMA_RELOAD
×
143
        end
144
    end
145

146
    local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id)
128✔
147
    if err ~= nil then
128✔
148
        return nil, err
2✔
149
    end
150

151
    -- When the sharding index (bucket_id) is the primary index, bucket_id can be passed as box.NULL.
152
    sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
126✔
153

154
    local update_on_storage_opts = {
126✔
155
        bucket_id = bucket_id_data.bucket_id,
126✔
156
        sharding_func_hash = bucket_id_data.sharding_func_hash,
126✔
157
        sharding_key_hash = sharding_key_hash,
126✔
158
        skip_sharding_hash_check = skip_sharding_hash_check,
126✔
159
        noreturn = opts.noreturn,
126✔
160
        fetch_latest_metadata = opts.fetch_latest_metadata,
126✔
161
    }
162

163
    local call_opts = {
126✔
164
        mode = 'write',
165
        timeout = opts.timeout,
126✔
166
    }
167

168
    local storage_result, err = call.single(vshard_router,
252✔
169
        bucket_id_data.bucket_id, CRUD_UPDATE_FUNC_NAME,
126✔
170
        {space_name, key, operations, opts.fields, update_on_storage_opts},
126✔
171
        call_opts
172
    )
126✔
173

174
    if err ~= nil then
126✔
175
        local err_wrapped = UpdateError:new("Failed to call update on storage-side: %s", err)
2✔
176

177
        if sharding.result_needs_sharding_reload(err) then
4✔
178
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
2✔
179
        end
180

181
        return nil, err_wrapped
×
182
    end
183

184
    if storage_result.err ~= nil then
124✔
185
        return nil, UpdateError:new("Failed to update: %s", storage_result.err)
72✔
186
    end
187

188
    if opts.noreturn == true then
88✔
189
        return nil
2✔
190
    end
191

192
    local tuple = storage_result.res
86✔
193

194
    if opts.fetch_latest_metadata == true then
86✔
195
        -- This option is temporary and is related to [1], [2].
196
        -- [1] https://github.com/tarantool/crud/issues/236
197
        -- [2] https://github.com/tarantool/crud/issues/361
198
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
199
                                                                vshard_router, opts, storage_result.storage_info)
8✔
200
    end
201

202
    return utils.format_result({tuple}, space, opts.fields)
86✔
203
end
204

205
--- Updates tuple in the specified space
206
--
207
-- @function call
208
--
209
-- @param string space_name
210
--  A space name
211
--
212
-- @param key
213
--  Primary key value
214
--
215
-- @param table user_operations
216
--  Operations to be performed.
217
--  See `space:update` operations in Tarantool doc
218
--
219
-- @tparam ?number opts.timeout
220
--  Function call timeout
221
--
222
-- @tparam ?number opts.bucket_id
223
--  Bucket ID
224
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
225
--
226
-- @tparam ?string|table opts.vshard_router
227
--  Cartridge vshard group name or vshard router instance.
228
--  Set this parameter if your space is not a part of the
229
--  default vshard cluster.
230
--
231
-- @tparam ?boolean opts.noreturn
232
--  Suppress returning successfully processed tuple.
233
--
234
-- @return[1] object
235
-- @treturn[2] nil
236
-- @treturn[2] table Error description
237
--
238
function update.call(space_name, key, user_operations, opts)
593✔
239
    checks('string', '?', 'table', {
140✔
240
        timeout = '?number',
241
        bucket_id = '?',
242
        fields = '?table',
243
        vshard_router = '?string|table',
244
        noreturn = '?boolean',
245
        fetch_latest_metadata = '?boolean',
246
    })
247

248
    opts = opts or {}
140✔
249
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
140✔
250
    if err ~= nil then
140✔
251
        return nil, UpdateError:new(err)
8✔
252
    end
253

254
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_update_on_router,
136✔
255
                                   space_name, key, user_operations, opts)
136✔
256
end
257

258
return update
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc