• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20467770584

23 Dec 2025 05:47PM UTC coverage: 88.51%. First build
20467770584

Pull #467

github

ita-sammann
[TNTP-2109] Add safe implementation of crud methods

For the most of crud methods, when safe mode is enabled, bucket_ref()
for appropriate bucket is called before calling the method itself.
This prevents from writing tuples to the wrong replicaset during or
after vshard rebalance.
Closes #448.

Co-authored-by: Satbek Turganbayev <s.turganbayev@corp.mail.ru>
Pull Request #467: [TNTP-2109] Switch to safe mode on vshard rebalance

282 of 347 new or added lines in 13 files covered. (81.27%)

5215 of 5892 relevant lines covered (88.51%)

13459.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.51
/crud/insert_many.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3

4
local call = require('crud.common.call')
593✔
5
local const = require('crud.common.const')
593✔
6
local utils = require('crud.common.utils')
593✔
7
local batching_utils = require('crud.common.batching_utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local dev_checks = require('crud.common.dev_checks')
593✔
10
local schema = require('crud.common.schema')
593✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
12

13
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
593✔
14
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
593✔
15

16
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
593✔
17

18
local insert_many = {}
593✔
19

20
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
593✔
21
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
593✔
22

23
local function insert_many_on_storage(space_name, tuples, opts)
24
    dev_checks('string', 'table', {
4,107✔
25
        add_space_schema_hash = '?boolean',
26
        fields = '?table',
27
        stop_on_error = '?boolean',
28
        rollback_on_error = '?boolean',
29
        sharding_key_hash = '?number',
30
        sharding_func_hash = '?number',
31
        skip_sharding_hash_check = '?boolean',
32
        noreturn = '?boolean',
33
        fetch_latest_metadata = '?boolean',
34
    })
35

36
    opts = opts or {}
4,107✔
37

38
    local space = box.space[space_name]
4,107✔
39
    if space == nil then
4,107✔
40
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
41
    end
42

43
    local _, err = sharding.check_sharding_hash(space_name,
8,214✔
44
                                                opts.sharding_func_hash,
4,107✔
45
                                                opts.sharding_key_hash,
4,107✔
46
                                                opts.skip_sharding_hash_check)
4,107✔
47

48
    if err ~= nil then
4,107✔
49
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
50
    end
51

52
    local bucket_ids = {}
4,095✔
53
    for _, tuple in ipairs(tuples) do
232,714✔
54
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
457,238✔
55
    end
56

57
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids)
4,095✔
58
    if not ref_ok then
4,095✔
NEW
59
        return nil, bucket_ref_err
×
60
    end
61

62
    local inserted_tuples = {}
4,095✔
63
    local errs = {}
4,095✔
64
    local replica_schema_version = nil
4,095✔
65

66
    local insert_opts = {
4,095✔
67
        add_space_schema_hash = opts.add_space_schema_hash,
4,095✔
68
        field_names = opts.fields,
4,095✔
69
        noreturn = opts.noreturn,
4,095✔
70
        fetch_latest_metadata = opts.fetch_latest_metadata,
4,095✔
71
    }
72

73
    box.begin()
4,095✔
74
    for i, tuple in ipairs(tuples) do
232,674✔
75
        -- add_space_schema_hash is true only in case of insert_object_many
76
        -- the only one case when reloading schema can avoid insert error
77
        -- is flattening object on router
78
        local insert_result = schema.wrap_func_result(
457,190✔
79
            space, space.insert, insert_opts,
228,595✔
80
            space, tuple
228,595✔
81
        )
228,595✔
82
        if opts.fetch_latest_metadata then
228,595✔
83
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
84
        end
85

86
        if insert_result.err ~= nil then
228,595✔
87
            local err = {
118✔
88
                err = insert_result.err,
118✔
89
                space_schema_hash = insert_result.space_schema_hash,
118✔
90
                operation_data = tuple,
118✔
91
            }
92

93
            table.insert(errs, err)
118✔
94

95
            if opts.stop_on_error == true then
118✔
96
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
97
                if next(left_tuples) then
16✔
98
                    errs = batching_utils.complement_batching_errors(errs,
32✔
99
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
100
                end
101

102
                if opts.rollback_on_error == true then
16✔
103
                    local unref_ok, bucket_unref_err = unref(bucket_ids)
4✔
104
                    box.rollback()
4✔
105
                    if not unref_ok then
4✔
NEW
106
                        return nil, bucket_unref_err
×
107
                    end
108
                    if next(inserted_tuples) then
4✔
109
                        errs = batching_utils.complement_batching_errors(errs,
8✔
110
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
111
                    end
112

113
                    return nil, errs, replica_schema_version
4✔
114
                end
115

116
                local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
117
                box.commit()
12✔
118
                if not unref_ok then
12✔
NEW
119
                    return nil, bucket_unref_err
×
120
                end
121

122
                return inserted_tuples, errs, replica_schema_version
12✔
123
            end
124
        end
125

126
        table.insert(inserted_tuples, insert_result.res)
228,579✔
127
    end
128

129
    if next(errs) ~= nil then
4,079✔
130
        if opts.rollback_on_error == true then
76✔
131
            local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
132
            box.rollback()
12✔
133
            if not unref_ok then
12✔
NEW
134
                return nil, bucket_unref_err
×
135
            end
136
            if next(inserted_tuples) then
12✔
137
                errs = batching_utils.complement_batching_errors(errs,
24✔
138
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
139
            end
140

141
            return nil, errs, replica_schema_version
12✔
142
        end
143

144
        local unref_ok, bucket_unref_err = unref(bucket_ids)
64✔
145
        box.commit()
64✔
146
        if not unref_ok then
64✔
NEW
147
            return nil, bucket_unref_err
×
148
        end
149

150
        return inserted_tuples, errs, replica_schema_version
64✔
151
    end
152

153
    local unref_ok, bucket_unref_err = unref(bucket_ids)
4,003✔
154
    box.commit()
4,003✔
155
    if not unref_ok then
4,003✔
NEW
156
        return nil, bucket_unref_err
×
157
    end
158

159
    return inserted_tuples, nil, replica_schema_version
4,003✔
160
end
161

162
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
593✔
163

164
-- returns result, err, need_reload
165
-- need_reload indicates if reloading schema could help
166
-- see crud.common.schema.wrap_func_reload()
167
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
168
    dev_checks('table', 'string', 'table', {
3,561✔
169
        timeout = '?number',
170
        fields = '?table',
171
        add_space_schema_hash = '?boolean',
172
        stop_on_error = '?boolean',
173
        rollback_on_error = '?boolean',
174
        vshard_router = '?string|table',
175
        skip_nullability_check_on_flatten = '?boolean',
176
        noreturn = '?boolean',
177
        fetch_latest_metadata = '?boolean',
178
    })
179

180
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
3,561✔
181
    if err ~= nil then
3,561✔
182
        return nil, {
×
183
            InsertManyError:new("An error occurred during the operation: %s", err)
×
184
        }, const.NEED_SCHEMA_RELOAD
×
185
    end
186
    if space == nil then
3,561✔
187
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
16✔
188
    end
189

190
    local tuples = table.deepcopy(original_tuples)
3,553✔
191

192
    local batch_insert_on_storage_opts = {
3,553✔
193
        add_space_schema_hash = opts.add_space_schema_hash,
3,553✔
194
        fields = opts.fields,
3,553✔
195
        stop_on_error = opts.stop_on_error,
3,553✔
196
        rollback_on_error = opts.rollback_on_error,
3,553✔
197
        noreturn = opts.noreturn,
3,553✔
198
        fetch_latest_metadata = opts.fetch_latest_metadata,
3,553✔
199
    }
200

201
    local iter, err = BatchInsertIterator:new({
7,106✔
202
        tuples = tuples,
3,553✔
203
        space = space,
3,553✔
204
        execute_on_storage_opts = batch_insert_on_storage_opts,
3,553✔
205
        vshard_router = vshard_router,
3,553✔
206
    })
207
    if err ~= nil then
3,553✔
208
        return nil, {err}, const.NEED_SCHEMA_RELOAD
8✔
209
    end
210

211
    local postprocessor = BatchPostprocessor:new(vshard_router)
3,545✔
212

213
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
7,090✔
214
        timeout = opts.timeout,
3,545✔
215
        mode = 'write',
216
        iter = iter,
3,545✔
217
        postprocessor = postprocessor,
3,545✔
218
    })
219

220
    if errs ~= nil then
3,545✔
221
        local tuples_count = table.maxn(tuples)
96✔
222
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
192✔
223
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
224
        end
225

226
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
176✔
227
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
228
        end
229
    end
230

231
    if next(rows) == nil then
3,529✔
232
        return nil, errs
1,765✔
233
    end
234

235
    if opts.fetch_latest_metadata == true then
1,764✔
236
        -- This option is temporary and is related to [1], [2].
237
        -- [1] https://github.com/tarantool/crud/issues/236
238
        -- [2] https://github.com/tarantool/crud/issues/361
239
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
240
                                                              storages_info, netbox_schema_version)
8✔
241
    end
242

243
    local res, err = utils.format_result(rows, space, opts.fields)
1,764✔
244
    if err ~= nil then
1,764✔
245
        errs = errs or {}
4✔
246
        table.insert(errs, err)
4✔
247
        return nil, errs
4✔
248
    end
249

250
    return res, errs
1,760✔
251
end
252

253
--- Inserts batch of tuples to the specified space
254
--
255
-- @function tuples
256
--
257
-- @param string space_name
258
--  A space name
259
--
260
-- @param table tuples
261
--  Tuples
262
--
263
-- @tparam ?table opts
264
--  Options of batch_insert.tuples_batch
265
--
266
-- @return[1] tuples
267
-- @treturn[2] nil
268
-- @treturn[2] table of tables Error description
269

270
function insert_many.tuples(space_name, tuples, opts)
593✔
271
    checks('string', 'table', {
3,453✔
272
        timeout = '?number',
273
        fields = '?table',
274
        add_space_schema_hash = '?boolean',
275
        stop_on_error = '?boolean',
276
        rollback_on_error = '?boolean',
277
        vshard_router = '?string|table',
278
        noreturn = '?boolean',
279
        fetch_latest_metadata = '?boolean',
280
    })
281

282
    if next(tuples) == nil then
3,453✔
283
        return nil, {InsertManyError:new("At least one tuple expected")}
4✔
284
    end
285

286
    opts = opts or {}
3,451✔
287

288
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
3,451✔
289
    if err ~= nil then
3,451✔
290
        return nil, {InsertManyError:new(err)}
8✔
291
    end
292

293
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
3,447✔
294
                                   space_name, tuples, opts)
3,447✔
295
end
296

297
--- Inserts batch of objects to the specified space
298
--
299
-- @function objects
300
--
301
-- @param string space_name
302
--  A space name
303
--
304
-- @param table objs
305
--  Objects
306
--
307
-- @tparam ?table opts
308
--  Options of batch_insert.tuples_batch
309
--
310
-- @return[1] objects
311
-- @treturn[2] nil
312
-- @treturn[2] table of tables Error description
313

314
function insert_many.objects(space_name, objs, opts)
593✔
315
    checks('string', 'table', {
144✔
316
        timeout = '?number',
317
        fields = '?table',
318
        stop_on_error = '?boolean',
319
        rollback_on_error = '?boolean',
320
        vshard_router = '?string|table',
321
        skip_nullability_check_on_flatten = '?boolean',
322
        noreturn = '?boolean',
323
        fetch_latest_metadata = '?boolean',
324
    })
325

326
    if next(objs) == nil then
144✔
327
        return nil, {InsertManyError:new("At least one object expected")}
4✔
328
    end
329

330
    opts = opts or {}
142✔
331

332
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
142✔
333
    if err ~= nil then
142✔
334
        return nil, {InsertManyError:new(err)}
8✔
335
    end
336

337
    -- insert can fail if router uses outdated schema to flatten object
338
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
339

340
    local tuples = {}
138✔
341
    local format_errs = {}
138✔
342

343
    for _, obj in ipairs(objs) do
434✔
344

345
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
600✔
346
                                                    opts.skip_nullability_check_on_flatten)
300✔
347
        if err ~= nil then
300✔
348
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
64✔
349
            err_obj.operation_data = obj
64✔
350

351
            if opts.stop_on_error == true then
64✔
352
                return nil, {err_obj}
4✔
353
            end
354

355
            table.insert(format_errs, err_obj)
60✔
356
        end
357

358
        table.insert(tuples, tuple)
296✔
359
    end
360

361
    if next(tuples) == nil then
134✔
362
        return nil, format_errs
40✔
363
    end
364

365
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
188✔
366
                                              space_name, tuples, opts)
94✔
367

368
    if next(format_errs) ~= nil then
94✔
369
        if errs == nil then
6✔
370
            errs = format_errs
4✔
371
        else
372
            errs = utils.list_extend(errs, format_errs)
4✔
373
        end
374
    end
375

376
    return res, errs
94✔
377
end
378

379
return insert_many
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc