• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 19738999447

27 Nov 2025 02:09PM UTC coverage: 88.331% (-0.1%) from 88.469%
19738999447

Pull #463

github

Satbek
fix: redirect WRONG_BUCKET only for single calls and reuse vshard retry helper
Pull Request #463: TNTP-2109: call bucket_ref/bucket_unref on crud operations

205 of 268 new or added lines in 16 files covered. (76.49%)

10 existing lines in 1 file now uncovered.

5193 of 5879 relevant lines covered (88.33%)

6459.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.38
/crud/insert_many.lua
1
local checks = require('checks')
258✔
2
local errors = require('errors')
258✔
3

4
local call = require('crud.common.call')
258✔
5
local const = require('crud.common.const')
258✔
6
local utils = require('crud.common.utils')
258✔
7
local batching_utils = require('crud.common.batching_utils')
258✔
8
local sharding = require('crud.common.sharding')
258✔
9
local dev_checks = require('crud.common.dev_checks')
258✔
10
local schema = require('crud.common.schema')
258✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
258✔
12

13
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
258✔
14
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
258✔
15

16
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
258✔
17

18
local insert_many = {}
258✔
19

20
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
258✔
21
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
258✔
22

23
local function insert_many_on_storage(space_name, tuples, opts)
24
    dev_checks('string', 'table', {
2,012✔
25
        add_space_schema_hash = '?boolean',
26
        fields = '?table',
27
        stop_on_error = '?boolean',
28
        rollback_on_error = '?boolean',
29
        sharding_key_hash = '?number',
30
        sharding_func_hash = '?number',
31
        skip_sharding_hash_check = '?boolean',
32
        noreturn = '?boolean',
33
        fetch_latest_metadata = '?boolean',
34
    })
35

36
    opts = opts or {}
2,012✔
37

38
    local space = box.space[space_name]
2,012✔
39
    if space == nil then
2,012✔
40
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
41
    end
42

43
    local _, err = sharding.check_sharding_hash(space_name,
4,024✔
44
                                                opts.sharding_func_hash,
2,012✔
45
                                                opts.sharding_key_hash,
2,012✔
46
                                                opts.skip_sharding_hash_check)
2,012✔
47

48
    if err ~= nil then
2,012✔
49
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
12✔
50
    end
51

52
    local bucket_ids = {}
2,006✔
53
    for _, tuple in ipairs(tuples) do
116,618✔
54
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
229,224✔
55
    end
56

57
    local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids)
2,006✔
58
    if not ref_ok then
2,006✔
NEW
59
        return nil, bucket_ref_err
×
60
    end
61

62
    local inserted_tuples = {}
2,006✔
63
    local errs = {}
2,006✔
64
    local replica_schema_version = nil
2,006✔
65

66
    box.begin()
2,006✔
67
    for i, tuple in ipairs(tuples) do
116,598✔
68
        -- add_space_schema_hash is true only in case of insert_object_many
69
        -- the only one case when reloading schema can avoid insert error
70
        -- is flattening object on router
71
        local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
229,200✔
72
            add_space_schema_hash = opts.add_space_schema_hash,
114,600✔
73
            field_names = opts.fields,
114,600✔
74
            noreturn = opts.noreturn,
114,600✔
75
            fetch_latest_metadata = opts.fetch_latest_metadata,
114,600✔
76
        })
77
        if opts.fetch_latest_metadata then
114,600✔
78
            replica_schema_version = insert_result.storage_info.replica_schema_version
2✔
79
        end
80

81
        if insert_result.err ~= nil then
114,600✔
82
            local err = {
59✔
83
                err = insert_result.err,
59✔
84
                space_schema_hash = insert_result.space_schema_hash,
59✔
85
                operation_data = tuple,
59✔
86
            }
87

88
            table.insert(errs, err)
59✔
89

90
            if opts.stop_on_error == true then
59✔
91
                local left_tuples = utils.list_slice(tuples, i + 1)
8✔
92
                if next(left_tuples) then
8✔
93
                    errs = batching_utils.complement_batching_errors(errs,
16✔
94
                            batching_utils.stop_on_error_msg, left_tuples)
16✔
95
                end
96

97
                if opts.rollback_on_error == true then
8✔
98
                    local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
2✔
99
                    box.rollback()
2✔
100
                    if not unref_ok then
2✔
NEW
101
                        return nil, bucket_unref_err
×
102
                    end
103
                    if next(inserted_tuples) then
2✔
104
                        errs = batching_utils.complement_batching_errors(errs,
4✔
105
                                batching_utils.rollback_on_error_msg, inserted_tuples)
4✔
106
                    end
107

108
                    return nil, errs, replica_schema_version
2✔
109
                end
110

111
                local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
6✔
112
                box.commit()
6✔
113
                if not unref_ok then
6✔
NEW
114
                    return nil, bucket_unref_err
×
115
                end
116

117
                return inserted_tuples, errs, replica_schema_version
6✔
118
            end
119
        end
120

121
        table.insert(inserted_tuples, insert_result.res)
114,592✔
122
    end
123

124
    if next(errs) ~= nil then
1,998✔
125
        if opts.rollback_on_error == true then
38✔
126
            local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
6✔
127
            box.rollback()
6✔
128
            if not unref_ok then
6✔
NEW
129
                return nil, bucket_unref_err
×
130
            end
131
            if next(inserted_tuples) then
6✔
132
                errs = batching_utils.complement_batching_errors(errs,
12✔
133
                        batching_utils.rollback_on_error_msg, inserted_tuples)
12✔
134
            end
135

136
            return nil, errs, replica_schema_version
6✔
137
        end
138

139
        local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
32✔
140
        box.commit()
32✔
141
        if not unref_ok then
32✔
NEW
142
            return nil, bucket_unref_err
×
143
        end
144

145
        return inserted_tuples, errs, replica_schema_version
32✔
146
    end
147

148
    local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
1,960✔
149
    box.commit()
1,960✔
150
    if not unref_ok then
1,960✔
NEW
151
        return nil, bucket_unref_err
×
152
    end
153

154
    return inserted_tuples, nil, replica_schema_version
1,960✔
155
end
156

157
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
258✔
158

159
-- returns result, err, need_reload
160
-- need_reload indicates if reloading schema could help
161
-- see crud.common.schema.wrap_func_reload()
162
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
163
    dev_checks('table', 'string', 'table', {
1,730✔
164
        timeout = '?number',
165
        fields = '?table',
166
        add_space_schema_hash = '?boolean',
167
        stop_on_error = '?boolean',
168
        rollback_on_error = '?boolean',
169
        vshard_router = '?string|table',
170
        skip_nullability_check_on_flatten = '?boolean',
171
        noreturn = '?boolean',
172
        fetch_latest_metadata = '?boolean',
173
    })
174

175
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
1,730✔
176
    if err ~= nil then
1,730✔
177
        return nil, {
×
178
            InsertManyError:new("An error occurred during the operation: %s", err)
×
179
        }, const.NEED_SCHEMA_RELOAD
×
180
    end
181
    if space == nil then
1,730✔
182
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
12✔
183
    end
184

185
    local tuples = table.deepcopy(original_tuples)
1,724✔
186

187
    local batch_insert_on_storage_opts = {
1,724✔
188
        add_space_schema_hash = opts.add_space_schema_hash,
1,724✔
189
        fields = opts.fields,
1,724✔
190
        stop_on_error = opts.stop_on_error,
1,724✔
191
        rollback_on_error = opts.rollback_on_error,
1,724✔
192
        noreturn = opts.noreturn,
1,724✔
193
        fetch_latest_metadata = opts.fetch_latest_metadata,
1,724✔
194
    }
195

196
    local iter, err = BatchInsertIterator:new({
3,448✔
197
        tuples = tuples,
1,724✔
198
        space = space,
1,724✔
199
        execute_on_storage_opts = batch_insert_on_storage_opts,
1,724✔
200
        vshard_router = vshard_router,
1,724✔
201
    })
202
    if err ~= nil then
1,724✔
203
        return nil, {err}, const.NEED_SCHEMA_RELOAD
4✔
204
    end
205

206
    local postprocessor = BatchPostprocessor:new(vshard_router)
1,720✔
207

208
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
3,440✔
209
        timeout = opts.timeout,
1,720✔
210
        mode = 'write',
211
        iter = iter,
1,720✔
212
        postprocessor = postprocessor,
1,720✔
213
    })
214

215
    if errs ~= nil then
1,720✔
216
        local tuples_count = table.maxn(tuples)
42✔
217
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
84✔
218
            return nil, errs, const.NEED_SHARDING_RELOAD
4✔
219
        end
220

221
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
76✔
222
            return nil, errs, const.NEED_SCHEMA_RELOAD
4✔
223
        end
224
    end
225

226
    if next(rows) == nil then
1,712✔
227
        return nil, errs
833✔
228
    end
229

230
    if opts.fetch_latest_metadata == true then
879✔
231
        -- This option is temporary and is related to [1], [2].
232
        -- [1] https://github.com/tarantool/crud/issues/236
233
        -- [2] https://github.com/tarantool/crud/issues/361
234
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
4✔
235
                                                              storages_info, netbox_schema_version)
4✔
236
    end
237

238
    local res, err = utils.format_result(rows, space, opts.fields)
879✔
239
    if err ~= nil then
879✔
240
        errs = errs or {}
2✔
241
        table.insert(errs, err)
2✔
242
        return nil, errs
2✔
243
    end
244

245
    return res, errs
877✔
246
end
247

248
--- Inserts batch of tuples to the specified space
249
--
250
-- @function tuples
251
--
252
-- @param string space_name
253
--  A space name
254
--
255
-- @param table tuples
256
--  Tuples
257
--
258
-- @tparam ?table opts
259
--  Options of batch_insert.tuples_batch
260
--
261
-- @return[1] tuples
262
-- @treturn[2] nil
263
-- @treturn[2] table of tables Error description
264

265
function insert_many.tuples(space_name, tuples, opts)
258✔
266
    checks('string', 'table', {
1,675✔
267
        timeout = '?number',
268
        fields = '?table',
269
        add_space_schema_hash = '?boolean',
270
        stop_on_error = '?boolean',
271
        rollback_on_error = '?boolean',
272
        vshard_router = '?string|table',
273
        noreturn = '?boolean',
274
        fetch_latest_metadata = '?boolean',
275
    })
276

277
    if next(tuples) == nil then
1,675✔
278
        return nil, {InsertManyError:new("At least one tuple expected")}
2✔
279
    end
280

281
    opts = opts or {}
1,674✔
282

283
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
1,674✔
284
    if err ~= nil then
1,674✔
285
        return nil, {InsertManyError:new(err)}
8✔
286
    end
287

288
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
1,670✔
289
                                   space_name, tuples, opts)
1,670✔
290
end
291

292
--- Inserts batch of objects to the specified space
293
--
294
-- @function objects
295
--
296
-- @param string space_name
297
--  A space name
298
--
299
-- @param table objs
300
--  Objects
301
--
302
-- @tparam ?table opts
303
--  Options of batch_insert.tuples_batch
304
--
305
-- @return[1] objects
306
-- @treturn[2] nil
307
-- @treturn[2] table of tables Error description
308

309
function insert_many.objects(space_name, objs, opts)
258✔
310
    checks('string', 'table', {
77✔
311
        timeout = '?number',
312
        fields = '?table',
313
        stop_on_error = '?boolean',
314
        rollback_on_error = '?boolean',
315
        vshard_router = '?string|table',
316
        skip_nullability_check_on_flatten = '?boolean',
317
        noreturn = '?boolean',
318
        fetch_latest_metadata = '?boolean',
319
    })
320

321
    if next(objs) == nil then
77✔
322
        return nil, {InsertManyError:new("At least one object expected")}
2✔
323
    end
324

325
    opts = opts or {}
76✔
326

327
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
76✔
328
    if err ~= nil then
76✔
329
        return nil, {InsertManyError:new(err)}
8✔
330
    end
331

332
    -- insert can fail if router uses outdated schema to flatten object
333
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
144✔
334

335
    local tuples = {}
72✔
336
    local format_errs = {}
72✔
337

338
    for _, obj in ipairs(objs) do
226✔
339

340
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
312✔
341
                                                    opts.skip_nullability_check_on_flatten)
156✔
342
        if err ~= nil then
156✔
343
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
34✔
344
            err_obj.operation_data = obj
34✔
345

346
            if opts.stop_on_error == true then
34✔
347
                return nil, {err_obj}
2✔
348
            end
349

350
            table.insert(format_errs, err_obj)
32✔
351
        end
352

353
        table.insert(tuples, tuple)
154✔
354
    end
355

356
    if next(tuples) == nil then
70✔
357
        return nil, format_errs
21✔
358
    end
359

360
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
98✔
361
                                              space_name, tuples, opts)
49✔
362

363
    if next(format_errs) ~= nil then
49✔
364
        if errs == nil then
3✔
365
            errs = format_errs
2✔
366
        else
367
            errs = utils.list_extend(errs, format_errs)
2✔
368
        end
369
    end
370

371
    return res, errs
49✔
372
end
373

374
return insert_many
258✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc