• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 19920822787

04 Dec 2025 07:14AM UTC coverage: 88.424%. First build
19920822787

Pull #463

github

Satbek
enbale double buckets test
Pull Request #463: TNTP-2109: call bucket_ref/bucket_unref on crud operations

205 of 269 new or added lines in 16 files covered. (76.21%)

5217 of 5900 relevant lines covered (88.42%)

12857.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.38
/crud/insert_many.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3

4
local call = require('crud.common.call')
593✔
5
local const = require('crud.common.const')
593✔
6
local utils = require('crud.common.utils')
593✔
7
local batching_utils = require('crud.common.batching_utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local dev_checks = require('crud.common.dev_checks')
593✔
10
local schema = require('crud.common.schema')
593✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
12

13
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
593✔
14
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
593✔
15

16
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
593✔
17

18
local insert_many = {}
593✔
19

20
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
593✔
21
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
593✔
22

23
local function insert_many_on_storage(space_name, tuples, opts)
24
    dev_checks('string', 'table', {
4,732✔
25
        add_space_schema_hash = '?boolean',
26
        fields = '?table',
27
        stop_on_error = '?boolean',
28
        rollback_on_error = '?boolean',
29
        sharding_key_hash = '?number',
30
        sharding_func_hash = '?number',
31
        skip_sharding_hash_check = '?boolean',
32
        noreturn = '?boolean',
33
        fetch_latest_metadata = '?boolean',
34
    })
35

36
    opts = opts or {}
4,732✔
37

38
    local space = box.space[space_name]
4,732✔
39
    if space == nil then
4,732✔
40
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
41
    end
42

43
    local _, err = sharding.check_sharding_hash(space_name,
9,464✔
44
                                                opts.sharding_func_hash,
4,732✔
45
                                                opts.sharding_key_hash,
4,732✔
46
                                                opts.skip_sharding_hash_check)
4,732✔
47

48
    if err ~= nil then
4,732✔
49
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
50
    end
51

52
    local bucket_ids = {}
4,720✔
53
    for _, tuple in ipairs(tuples) do
234,172✔
54
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
458,904✔
55
    end
56

57
    local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids)
4,720✔
58
    if not ref_ok then
4,720✔
NEW
59
        return nil, bucket_ref_err
×
60
    end
61

62
    local inserted_tuples = {}
4,720✔
63
    local errs = {}
4,720✔
64
    local replica_schema_version = nil
4,720✔
65

66
    box.begin()
4,720✔
67
    for i, tuple in ipairs(tuples) do
234,132✔
68
        -- add_space_schema_hash is true only in case of insert_object_many
69
        -- the only one case when reloading schema can avoid insert error
70
        -- is flattening object on router
71
        local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
458,856✔
72
            add_space_schema_hash = opts.add_space_schema_hash,
229,428✔
73
            field_names = opts.fields,
229,428✔
74
            noreturn = opts.noreturn,
229,428✔
75
            fetch_latest_metadata = opts.fetch_latest_metadata,
229,428✔
76
        })
77
        if opts.fetch_latest_metadata then
229,428✔
78
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
79
        end
80

81
        if insert_result.err ~= nil then
229,428✔
82
            local err = {
118✔
83
                err = insert_result.err,
118✔
84
                space_schema_hash = insert_result.space_schema_hash,
118✔
85
                operation_data = tuple,
118✔
86
            }
87

88
            table.insert(errs, err)
118✔
89

90
            if opts.stop_on_error == true then
118✔
91
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
92
                if next(left_tuples) then
16✔
93
                    errs = batching_utils.complement_batching_errors(errs,
32✔
94
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
95
                end
96

97
                if opts.rollback_on_error == true then
16✔
98
                    local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
4✔
99
                    box.rollback()
4✔
100
                    if not unref_ok then
4✔
NEW
101
                        return nil, bucket_unref_err
×
102
                    end
103
                    if next(inserted_tuples) then
4✔
104
                        errs = batching_utils.complement_batching_errors(errs,
8✔
105
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
106
                    end
107

108
                    return nil, errs, replica_schema_version
4✔
109
                end
110

111
                local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
12✔
112
                box.commit()
12✔
113
                if not unref_ok then
12✔
NEW
114
                    return nil, bucket_unref_err
×
115
                end
116

117
                return inserted_tuples, errs, replica_schema_version
12✔
118
            end
119
        end
120

121
        table.insert(inserted_tuples, insert_result.res)
229,412✔
122
    end
123

124
    if next(errs) ~= nil then
4,704✔
125
        if opts.rollback_on_error == true then
76✔
126
            local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
12✔
127
            box.rollback()
12✔
128
            if not unref_ok then
12✔
NEW
129
                return nil, bucket_unref_err
×
130
            end
131
            if next(inserted_tuples) then
12✔
132
                errs = batching_utils.complement_batching_errors(errs,
24✔
133
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
134
            end
135

136
            return nil, errs, replica_schema_version
12✔
137
        end
138

139
        local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
64✔
140
        box.commit()
64✔
141
        if not unref_ok then
64✔
NEW
142
            return nil, bucket_unref_err
×
143
        end
144

145
        return inserted_tuples, errs, replica_schema_version
64✔
146
    end
147

148
    local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
4,628✔
149
    box.commit()
4,628✔
150
    if not unref_ok then
4,628✔
NEW
151
        return nil, bucket_unref_err
×
152
    end
153

154
    return inserted_tuples, nil, replica_schema_version
4,628✔
155
end
156

157
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
593✔
158

159
-- returns result, err, need_reload
160
-- need_reload indicates if reloading schema could help
161
-- see crud.common.schema.wrap_func_reload()
162
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
163
    dev_checks('table', 'string', 'table', {
4,178✔
164
        timeout = '?number',
165
        fields = '?table',
166
        add_space_schema_hash = '?boolean',
167
        stop_on_error = '?boolean',
168
        rollback_on_error = '?boolean',
169
        vshard_router = '?string|table',
170
        skip_nullability_check_on_flatten = '?boolean',
171
        noreturn = '?boolean',
172
        fetch_latest_metadata = '?boolean',
173
    })
174

175
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
4,178✔
176
    if err ~= nil then
4,178✔
177
        return nil, {
×
178
            InsertManyError:new("An error occurred during the operation: %s", err)
×
179
        }, const.NEED_SCHEMA_RELOAD
×
180
    end
181
    if space == nil then
4,178✔
182
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
16✔
183
    end
184

185
    local tuples = table.deepcopy(original_tuples)
4,170✔
186

187
    local batch_insert_on_storage_opts = {
4,170✔
188
        add_space_schema_hash = opts.add_space_schema_hash,
4,170✔
189
        fields = opts.fields,
4,170✔
190
        stop_on_error = opts.stop_on_error,
4,170✔
191
        rollback_on_error = opts.rollback_on_error,
4,170✔
192
        noreturn = opts.noreturn,
4,170✔
193
        fetch_latest_metadata = opts.fetch_latest_metadata,
4,170✔
194
    }
195

196
    local iter, err = BatchInsertIterator:new({
8,340✔
197
        tuples = tuples,
4,170✔
198
        space = space,
4,170✔
199
        execute_on_storage_opts = batch_insert_on_storage_opts,
4,170✔
200
        vshard_router = vshard_router,
4,170✔
201
    })
202
    if err ~= nil then
4,170✔
203
        return nil, {err}, const.NEED_SCHEMA_RELOAD
8✔
204
    end
205

206
    local postprocessor = BatchPostprocessor:new(vshard_router)
4,162✔
207

208
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
8,324✔
209
        timeout = opts.timeout,
4,162✔
210
        mode = 'write',
211
        iter = iter,
4,162✔
212
        postprocessor = postprocessor,
4,162✔
213
    })
214

215
    if errs ~= nil then
4,162✔
216
        local tuples_count = table.maxn(tuples)
96✔
217
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
192✔
218
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
219
        end
220

221
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
176✔
222
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
223
        end
224
    end
225

226
    if next(rows) == nil then
4,146✔
227
        return nil, errs
2,015✔
228
    end
229

230
    if opts.fetch_latest_metadata == true then
2,131✔
231
        -- This option is temporary and is related to [1], [2].
232
        -- [1] https://github.com/tarantool/crud/issues/236
233
        -- [2] https://github.com/tarantool/crud/issues/361
234
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
235
                                                              storages_info, netbox_schema_version)
8✔
236
    end
237

238
    local res, err = utils.format_result(rows, space, opts.fields)
2,131✔
239
    if err ~= nil then
2,131✔
240
        errs = errs or {}
4✔
241
        table.insert(errs, err)
4✔
242
        return nil, errs
4✔
243
    end
244

245
    return res, errs
2,127✔
246
end
247

248
--- Inserts batch of tuples to the specified space
249
--
250
-- @function tuples
251
--
252
-- @param string space_name
253
--  A space name
254
--
255
-- @param table tuples
256
--  Tuples
257
--
258
-- @tparam ?table opts
259
--  Options of batch_insert.tuples_batch
260
--
261
-- @return[1] tuples
262
-- @treturn[2] nil
263
-- @treturn[2] table of tables Error description
264

265
function insert_many.tuples(space_name, tuples, opts)
593✔
266
    checks('string', 'table', {
4,070✔
267
        timeout = '?number',
268
        fields = '?table',
269
        add_space_schema_hash = '?boolean',
270
        stop_on_error = '?boolean',
271
        rollback_on_error = '?boolean',
272
        vshard_router = '?string|table',
273
        noreturn = '?boolean',
274
        fetch_latest_metadata = '?boolean',
275
    })
276

277
    if next(tuples) == nil then
4,070✔
278
        return nil, {InsertManyError:new("At least one tuple expected")}
4✔
279
    end
280

281
    opts = opts or {}
4,068✔
282

283
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
4,068✔
284
    if err ~= nil then
4,068✔
285
        return nil, {InsertManyError:new(err)}
8✔
286
    end
287

288
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
4,064✔
289
                                   space_name, tuples, opts)
4,064✔
290
end
291

292
--- Inserts batch of objects to the specified space
293
--
294
-- @function objects
295
--
296
-- @param string space_name
297
--  A space name
298
--
299
-- @param table objs
300
--  Objects
301
--
302
-- @tparam ?table opts
303
--  Options of batch_insert.tuples_batch
304
--
305
-- @return[1] objects
306
-- @treturn[2] nil
307
-- @treturn[2] table of tables Error description
308

309
function insert_many.objects(space_name, objs, opts)
593✔
310
    checks('string', 'table', {
144✔
311
        timeout = '?number',
312
        fields = '?table',
313
        stop_on_error = '?boolean',
314
        rollback_on_error = '?boolean',
315
        vshard_router = '?string|table',
316
        skip_nullability_check_on_flatten = '?boolean',
317
        noreturn = '?boolean',
318
        fetch_latest_metadata = '?boolean',
319
    })
320

321
    if next(objs) == nil then
144✔
322
        return nil, {InsertManyError:new("At least one object expected")}
4✔
323
    end
324

325
    opts = opts or {}
142✔
326

327
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
142✔
328
    if err ~= nil then
142✔
329
        return nil, {InsertManyError:new(err)}
8✔
330
    end
331

332
    -- insert can fail if router uses outdated schema to flatten object
333
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
334

335
    local tuples = {}
138✔
336
    local format_errs = {}
138✔
337

338
    for _, obj in ipairs(objs) do
434✔
339

340
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
600✔
341
                                                    opts.skip_nullability_check_on_flatten)
300✔
342
        if err ~= nil then
300✔
343
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
64✔
344
            err_obj.operation_data = obj
64✔
345

346
            if opts.stop_on_error == true then
64✔
347
                return nil, {err_obj}
4✔
348
            end
349

350
            table.insert(format_errs, err_obj)
60✔
351
        end
352

353
        table.insert(tuples, tuple)
296✔
354
    end
355

356
    if next(tuples) == nil then
134✔
357
        return nil, format_errs
40✔
358
    end
359

360
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
188✔
361
                                              space_name, tuples, opts)
94✔
362

363
    if next(format_errs) ~= nil then
94✔
364
        if errs == nil then
6✔
365
            errs = format_errs
4✔
366
        else
367
            errs = utils.list_extend(errs, format_errs)
4✔
368
        end
369
    end
370

371
    return res, errs
94✔
372
end
373

374
return insert_many
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc