• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20467770584

23 Dec 2025 05:47PM UTC coverage: 88.51%. First build
20467770584

Pull #467

github

ita-sammann
[TNTP-2109] Add safe implementation of crud methods

For the most of crud methods, when safe mode is enabled, bucket_ref()
for appropriate bucket is called before calling the method itself.
This prevents from writing tuples to the wrong replicaset during or
after vshard rebalance.
Closes #448.

Co-authored-by: Satbek Turganbayev <s.turganbayev@corp.mail.ru>
Pull Request #467: [TNTP-2109] Switch to safe mode on vshard rebalance

282 of 347 new or added lines in 13 files covered. (81.27%)

5215 of 5892 relevant lines covered (88.51%)

13459.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/crud/insert.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3

4
local call = require('crud.common.call')
593✔
5
local const = require('crud.common.const')
593✔
6
local utils = require('crud.common.utils')
593✔
7
local sharding = require('crud.common.sharding')
593✔
8
local dev_checks = require('crud.common.dev_checks')
593✔
9
local schema = require('crud.common.schema')
593✔
10
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
11

12
local InsertError = errors.new_class('InsertError', {capture_stack = false})
593✔
13

14
local insert = {}
593✔
15

16
local INSERT_FUNC_NAME = 'insert_on_storage'
593✔
17
local CRUD_INSERT_FUNC_NAME = utils.get_storage_call(INSERT_FUNC_NAME)
593✔
18

19
local function insert_on_storage(space_name, tuple, opts)
20
    dev_checks('string', 'table', {
128,226✔
21
        add_space_schema_hash = '?boolean',
22
        fields = '?table',
23
        sharding_key_hash = '?number',
24
        sharding_func_hash = '?number',
25
        skip_sharding_hash_check = '?boolean',
26
        noreturn = '?boolean',
27
        fetch_latest_metadata = '?boolean',
28
    })
29

30
    opts = opts or {}
128,226✔
31

32
    local space = box.space[space_name]
128,226✔
33
    if space == nil then
128,226✔
34
        return nil, InsertError:new("Space %q doesn't exist", space_name)
108✔
35
    end
36

37
    local _, err = sharding.check_sharding_hash(space_name,
256,344✔
38
                                                opts.sharding_func_hash,
128,172✔
39
                                                opts.sharding_key_hash,
128,172✔
40
                                                opts.skip_sharding_hash_check)
128,172✔
41

42
    if err ~= nil then
128,172✔
43
        return nil, err
144✔
44
    end
45

46
    local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
256,056✔
47
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
128,028✔
48

49
    if not ref_ok then
128,028✔
NEW
50
        return nil, bucket_ref_err
×
51
    end
52

53
    -- add_space_schema_hash is true only in case of insert_object
54
    -- the only one case when reloading schema can avoid insert error
55
    -- is flattening object on router
56
    local result = schema.wrap_func_result(space, space.insert, {
256,056✔
57
        add_space_schema_hash = opts.add_space_schema_hash,
128,028✔
58
        field_names = opts.fields,
128,028✔
59
        noreturn = opts.noreturn,
128,028✔
60
        fetch_latest_metadata = opts.fetch_latest_metadata,
128,028✔
61
    }, space, tuple)
128,028✔
62

63
    local unref_ok, err_unref = unref(bucket_id)
128,028✔
64
    if not unref_ok then
128,028✔
NEW
65
        return nil, err_unref
×
66
    end
67

68
    return result
128,028✔
69
end
70

71
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}
593✔
72

73
-- returns result, err, need_reload
74
-- need_reload indicates if reloading schema could help
75
-- see crud.common.schema.wrap_func_reload()
76
local function call_insert_on_router(vshard_router, space_name, original_tuple, opts)
77
    dev_checks('table', 'string', 'table', {
128,194✔
78
        timeout = '?number',
79
        bucket_id = '?',
80
        add_space_schema_hash = '?boolean',
81
        fields = '?table',
82
        vshard_router = '?string|table',
83
        skip_nullability_check_on_flatten = '?boolean',
84
        noreturn = '?boolean',
85
        fetch_latest_metadata = '?boolean',
86
    })
87

88
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
128,194✔
89
    if err ~= nil then
128,194✔
90
        return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
91
    end
92
    if space == nil then
128,194✔
93
        return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
86✔
94
    end
95

96
    local tuple = table.deepcopy(original_tuple)
128,151✔
97

98
    local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id)
128,151✔
99
    if err ~= nil then
128,151✔
100
        return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD
60✔
101
    end
102

103
    local insert_on_storage_opts = {
128,121✔
104
        add_space_schema_hash = opts.add_space_schema_hash,
128,121✔
105
        fields = opts.fields,
128,121✔
106
        sharding_func_hash = sharding_data.sharding_func_hash,
128,121✔
107
        sharding_key_hash = sharding_data.sharding_key_hash,
128,121✔
108
        skip_sharding_hash_check = sharding_data.skip_sharding_hash_check,
128,121✔
109
        noreturn = opts.noreturn,
128,121✔
110
        fetch_latest_metadata = opts.fetch_latest_metadata,
128,121✔
111
    }
112

113
    local call_opts = {
128,121✔
114
        mode = 'write',
115
        timeout = opts.timeout,
128,121✔
116
    }
117

118
    local storage_result, err = call.single(vshard_router,
256,242✔
119
        sharding_data.bucket_id, CRUD_INSERT_FUNC_NAME,
128,121✔
120
        {space_name, tuple, insert_on_storage_opts},
128,121✔
121
        call_opts
122
    )
128,121✔
123

124
    if err ~= nil then
128,121✔
125
        local err_wrapped = InsertError:new("Failed to call insert on storage-side: %s", err)
101✔
126

127
        if sharding.result_needs_sharding_reload(err) then
202✔
128
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
72✔
129
        end
130

131
        return nil, err_wrapped
29✔
132
    end
133

134
    if storage_result.err ~= nil then
128,020✔
135
        local err_wrapped = InsertError:new("Failed to insert: %s", storage_result.err)
1,256✔
136

137
        if schema.result_needs_reload(space, storage_result) then
2,512✔
138
            return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
12✔
139
        end
140

141
        return nil, err_wrapped
1,244✔
142
    end
143

144
    if opts.noreturn == true then
126,764✔
145
        return nil
4✔
146
    end
147

148
    local tuple = storage_result.res
126,760✔
149

150
    if opts.fetch_latest_metadata == true then
126,760✔
151
        -- This option is temporary and is related to [1], [2].
152
        -- [1] https://github.com/tarantool/crud/issues/236
153
        -- [2] https://github.com/tarantool/crud/issues/361
154
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
155
                                                                vshard_router, opts, storage_result.storage_info)
8✔
156
    end
157

158
    return utils.format_result({tuple}, space, opts.fields)
126,760✔
159
end
160

161
--- Inserts a tuple to the specified space
162
--
163
-- @function tuple
164
--
165
-- @param string space_name
166
--  A space name
167
--
168
-- @param table tuple
169
--  Tuple
170
--
171
-- @tparam ?number opts.timeout
172
--  Function call timeout
173
--
174
-- @tparam ?number opts.bucket_id
175
--  Bucket ID
176
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
177
--
178
-- @tparam ?string|table opts.vshard_router
179
--  Cartridge vshard group name or vshard router instance.
180
--  Set this parameter if your space is not a part of the
181
--  default vshard cluster.
182
--
183
-- @tparam ?boolean opts.noreturn
184
--  Suppress returning successfully processed tuple.
185
--
186
-- @return[1] tuple
187
-- @treturn[2] nil
188
-- @treturn[2] table Error description
189
--
190
function insert.tuple(space_name, tuple, opts)
593✔
191
    checks('string', 'table', {
120,229✔
192
        timeout = '?number',
193
        bucket_id = '?',
194
        add_space_schema_hash = '?boolean',
195
        fields = '?table',
196
        vshard_router = '?string|table',
197
        noreturn = '?boolean',
198
        fetch_latest_metadata = '?boolean',
199
    })
200

201
    opts = opts or {}
120,229✔
202

203
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
120,229✔
204
    if err ~= nil then
120,229✔
205
        return nil, InsertError:new(err)
8✔
206
    end
207

208
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
120,225✔
209
                                   space_name, tuple, opts)
120,225✔
210
end
211

212
--- Inserts an object to the specified space
213
--
214
-- @function object
215
--
216
-- @param string space_name
217
--  A space name
218
--
219
-- @param table obj
220
--  Object
221
--
222
-- @tparam ?table opts
223
--  Options of insert.tuple
224
--
225
-- @return[1] object
226
-- @treturn[2] nil
227
-- @treturn[2] table Error description
228
--
229
function insert.object(space_name, obj, opts)
593✔
230
    checks('string', 'table', {
7,886✔
231
        timeout = '?number',
232
        bucket_id = '?',
233
        add_space_schema_hash = '?boolean',
234
        fields = '?table',
235
        vshard_router = '?string|table',
236
        skip_nullability_check_on_flatten = '?boolean',
237
        noreturn = '?boolean',
238
        fetch_latest_metadata = '?boolean',
239
    })
240

241
    opts = opts or {}
7,886✔
242

243
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
7,886✔
244
    if err ~= nil then
7,886✔
245
        return nil, InsertError:new(err)
8✔
246
    end
247

248
    -- insert can fail if router uses outdated schema to flatten object
249
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
15,764✔
250

251
    local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
15,764✔
252
                                                opts.skip_nullability_check_on_flatten)
7,882✔
253
    if err ~= nil then
7,882✔
254
        return nil, InsertError:new("Failed to flatten object: %s", err)
80✔
255
    end
256

257
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
7,842✔
258
                                   space_name, tuple, opts)
7,842✔
259
end
260

261
return insert
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc