• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 21588931143

02 Feb 2026 11:50AM UTC coverage: 88.194% (-0.3%) from 88.463%
21588931143

push

github

ita-sammann
safe/fast mode: move bucket_unref out of transaction

Unref inside transaction did not cause any problems.
This change is made to make bucket_unref in `*_many` operations
to be uniform with bucket_ref/unref in single operations.

15 of 15 new or added lines in 3 files covered. (100.0%)

48 existing lines in 3 files now uncovered.

5214 of 5912 relevant lines covered (88.19%)

12606.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.49
/crud/insert_many.lua
1
local checks = require('checks')
623✔
2
local errors = require('errors')
623✔
3

4
local call = require('crud.common.call')
623✔
5
local const = require('crud.common.const')
623✔
6
local utils = require('crud.common.utils')
623✔
7
local batching_utils = require('crud.common.batching_utils')
623✔
8
local sharding = require('crud.common.sharding')
623✔
9
local dev_checks = require('crud.common.dev_checks')
623✔
10
local schema = require('crud.common.schema')
623✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
623✔
12

13
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
623✔
14
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
623✔
15

16
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
623✔
17

18
local insert_many = {}
623✔
19

20
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
623✔
21
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
623✔
22

23
local function insert_many_on_storage(space_name, tuples, opts)
24
    dev_checks('string', 'table', {
4,158✔
25
        add_space_schema_hash = '?boolean',
26
        fields = '?table',
27
        stop_on_error = '?boolean',
28
        rollback_on_error = '?boolean',
29
        sharding_key_hash = '?number',
30
        sharding_func_hash = '?number',
31
        skip_sharding_hash_check = '?boolean',
32
        noreturn = '?boolean',
33
        fetch_latest_metadata = '?boolean',
34
    })
35

36
    opts = opts or {}
4,158✔
37

38
    local space = box.space[space_name]
4,158✔
39
    if space == nil then
4,158✔
40
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
41
    end
42

43
    local _, err = sharding.check_sharding_hash(space_name,
8,316✔
44
                                                opts.sharding_func_hash,
4,158✔
45
                                                opts.sharding_key_hash,
4,158✔
46
                                                opts.skip_sharding_hash_check)
4,158✔
47

48
    if err ~= nil then
4,158✔
49
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
50
    end
51

52
    local bucket_ids = {}
4,146✔
53
    for _, tuple in ipairs(tuples) do
233,204✔
54
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
458,116✔
55
    end
56

57
    local inserted_tuples = {}
4,146✔
58
    local errs = {}
4,146✔
59
    local replica_schema_version = nil
4,146✔
60

61
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids, space.engine)
4,146✔
62
    if not ref_ok then
4,146✔
UNCOV
63
        table.insert(errs, bucket_ref_err)
×
UNCOV
64
        return nil, errs, replica_schema_version
×
65
    end
66

67
    local insert_opts = {
4,146✔
68
        add_space_schema_hash = opts.add_space_schema_hash,
4,146✔
69
        field_names = opts.fields,
4,146✔
70
        noreturn = opts.noreturn,
4,146✔
71
        fetch_latest_metadata = opts.fetch_latest_metadata,
4,146✔
72
    }
73

74
    box.begin()
4,146✔
75
    for i, tuple in ipairs(tuples) do
233,164✔
76
        -- add_space_schema_hash is true only in case of insert_object_many
77
        -- the only one case when reloading schema can avoid insert error
78
        -- is flattening object on router
79
        local insert_result = schema.wrap_func_result(
458,068✔
80
            space, space.insert, insert_opts,
229,034✔
81
            space, tuple
229,034✔
82
        )
229,034✔
83
        if opts.fetch_latest_metadata then
229,034✔
84
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
85
        end
86

87
        if insert_result.err ~= nil then
229,034✔
88
            local err = {
118✔
89
                err = insert_result.err,
118✔
90
                space_schema_hash = insert_result.space_schema_hash,
118✔
91
                operation_data = tuple,
118✔
92
            }
93

94
            table.insert(errs, err)
118✔
95

96
            if opts.stop_on_error == true then
118✔
97
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
98
                if next(left_tuples) then
16✔
99
                    errs = batching_utils.complement_batching_errors(errs,
32✔
100
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
101
                end
102

103
                if opts.rollback_on_error == true then
16✔
104
                    box.rollback()
4✔
105
                    local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
4✔
106
                    if not unref_ok then
4✔
UNCOV
107
                        table.insert(errs, bucket_unref_err)
×
UNCOV
108
                        return nil, errs, replica_schema_version
×
109
                    end
110
                    if next(inserted_tuples) then
4✔
111
                        errs = batching_utils.complement_batching_errors(errs,
8✔
112
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
113
                    end
114

115
                    return nil, errs, replica_schema_version
4✔
116
                end
117

118
                box.commit()
12✔
119
                local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
12✔
120
                if not unref_ok then
12✔
UNCOV
121
                    table.insert(errs, bucket_unref_err)
×
UNCOV
122
                    return nil, errs, replica_schema_version
×
123
                end
124

125
                return inserted_tuples, errs, replica_schema_version
12✔
126
            end
127
        end
128

129
        table.insert(inserted_tuples, insert_result.res)
229,018✔
130
    end
131

132
    if next(errs) ~= nil then
4,130✔
133
        if opts.rollback_on_error == true then
76✔
134
            box.rollback()
12✔
135
            local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
12✔
136
            if not unref_ok then
12✔
UNCOV
137
                table.insert(errs, bucket_unref_err)
×
UNCOV
138
                return nil, errs, replica_schema_version
×
139
            end
140
            if next(inserted_tuples) then
12✔
141
                errs = batching_utils.complement_batching_errors(errs,
24✔
142
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
143
            end
144

145
            return nil, errs, replica_schema_version
12✔
146
        end
147

148
        box.commit()
64✔
149
        local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
64✔
150
        if not unref_ok then
64✔
UNCOV
151
            table.insert(errs, bucket_unref_err)
×
UNCOV
152
            return nil, errs, replica_schema_version
×
153
        end
154

155
        return inserted_tuples, errs, replica_schema_version
64✔
156
    end
157

158
    box.commit()
4,054✔
159
    local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
4,054✔
160
    if not unref_ok then
4,054✔
UNCOV
161
        table.insert(errs, bucket_unref_err)
×
UNCOV
162
        return nil, errs, replica_schema_version
×
163
    end
164

165
    return inserted_tuples, nil, replica_schema_version
4,054✔
166
end
167

168
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
623✔
169

170
-- returns result, err, need_reload
171
-- need_reload indicates if reloading schema could help
172
-- see crud.common.schema.wrap_func_reload()
173
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
174
    dev_checks('table', 'string', 'table', {
3,604✔
175
        timeout = '?number',
176
        fields = '?table',
177
        add_space_schema_hash = '?boolean',
178
        stop_on_error = '?boolean',
179
        rollback_on_error = '?boolean',
180
        vshard_router = '?string|table',
181
        skip_nullability_check_on_flatten = '?boolean',
182
        noreturn = '?boolean',
183
        fetch_latest_metadata = '?boolean',
184
    })
185

186
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
3,604✔
187
    if err ~= nil then
3,604✔
UNCOV
188
        return nil, {
×
UNCOV
189
            InsertManyError:new("An error occurred during the operation: %s", err)
×
UNCOV
190
        }, const.NEED_SCHEMA_RELOAD
×
191
    end
192
    if space == nil then
3,604✔
193
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
16✔
194
    end
195

196
    local tuples = table.deepcopy(original_tuples)
3,596✔
197

198
    local batch_insert_on_storage_opts = {
3,596✔
199
        add_space_schema_hash = opts.add_space_schema_hash,
3,596✔
200
        fields = opts.fields,
3,596✔
201
        stop_on_error = opts.stop_on_error,
3,596✔
202
        rollback_on_error = opts.rollback_on_error,
3,596✔
203
        noreturn = opts.noreturn,
3,596✔
204
        fetch_latest_metadata = opts.fetch_latest_metadata,
3,596✔
205
    }
206

207
    local iter, err = BatchInsertIterator:new({
7,192✔
208
        tuples = tuples,
3,596✔
209
        space = space,
3,596✔
210
        execute_on_storage_opts = batch_insert_on_storage_opts,
3,596✔
211
        vshard_router = vshard_router,
3,596✔
212
    })
213
    if err ~= nil then
3,596✔
214
        return nil, {err}, const.NEED_SCHEMA_RELOAD
8✔
215
    end
216

217
    local postprocessor = BatchPostprocessor:new(vshard_router)
3,588✔
218

219
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
7,176✔
220
        timeout = opts.timeout,
3,588✔
221
        mode = 'write',
222
        iter = iter,
3,588✔
223
        postprocessor = postprocessor,
3,588✔
224
    })
225

226
    if errs ~= nil then
3,588✔
227
        local tuples_count = table.maxn(tuples)
92✔
228
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
184✔
229
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
230
        end
231

232
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
168✔
233
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
234
        end
235
    end
236

237
    if next(rows) == nil then
3,572✔
238
        return nil, errs
1,787✔
239
    end
240

241
    if opts.fetch_latest_metadata == true then
1,785✔
242
        -- This option is temporary and is related to [1], [2].
243
        -- [1] https://github.com/tarantool/crud/issues/236
244
        -- [2] https://github.com/tarantool/crud/issues/361
245
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
246
                                                              storages_info, netbox_schema_version)
8✔
247
    end
248

249
    local res, err = utils.format_result(rows, space, opts.fields)
1,785✔
250
    if err ~= nil then
1,785✔
251
        errs = errs or {}
4✔
252
        table.insert(errs, err)
4✔
253
        return nil, errs
4✔
254
    end
255

256
    return res, errs
1,781✔
257
end
258

259
--- Inserts batch of tuples to the specified space
260
--
261
-- @function tuples
262
--
263
-- @param string space_name
264
--  A space name
265
--
266
-- @param table tuples
267
--  Tuples
268
--
269
-- @tparam ?table opts
270
--  Options of batch_insert.tuples_batch
271
--
272
-- @return[1] tuples
273
-- @treturn[2] nil
274
-- @treturn[2] table of tables Error description
275

276
function insert_many.tuples(space_name, tuples, opts)
623✔
277
    checks('string', 'table', {
3,496✔
278
        timeout = '?number',
279
        fields = '?table',
280
        add_space_schema_hash = '?boolean',
281
        stop_on_error = '?boolean',
282
        rollback_on_error = '?boolean',
283
        vshard_router = '?string|table',
284
        noreturn = '?boolean',
285
        fetch_latest_metadata = '?boolean',
286
    })
287

288
    if next(tuples) == nil then
3,496✔
289
        return nil, {InsertManyError:new("At least one tuple expected")}
4✔
290
    end
291

292
    opts = opts or {}
3,494✔
293

294
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
3,494✔
295
    if err ~= nil then
3,494✔
296
        return nil, {InsertManyError:new(err)}
8✔
297
    end
298

299
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
3,490✔
300
                                   space_name, tuples, opts)
3,490✔
301
end
302

303
--- Inserts batch of objects to the specified space
304
--
305
-- @function objects
306
--
307
-- @param string space_name
308
--  A space name
309
--
310
-- @param table objs
311
--  Objects
312
--
313
-- @tparam ?table opts
314
--  Options of batch_insert.tuples_batch
315
--
316
-- @return[1] objects
317
-- @treturn[2] nil
318
-- @treturn[2] table of tables Error description
319

320
function insert_many.objects(space_name, objs, opts)
623✔
321
    checks('string', 'table', {
144✔
322
        timeout = '?number',
323
        fields = '?table',
324
        stop_on_error = '?boolean',
325
        rollback_on_error = '?boolean',
326
        vshard_router = '?string|table',
327
        skip_nullability_check_on_flatten = '?boolean',
328
        noreturn = '?boolean',
329
        fetch_latest_metadata = '?boolean',
330
    })
331

332
    if next(objs) == nil then
144✔
333
        return nil, {InsertManyError:new("At least one object expected")}
4✔
334
    end
335

336
    opts = opts or {}
142✔
337

338
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
142✔
339
    if err ~= nil then
142✔
340
        return nil, {InsertManyError:new(err)}
8✔
341
    end
342

343
    -- insert can fail if router uses outdated schema to flatten object
344
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
345

346
    local tuples = {}
138✔
347
    local format_errs = {}
138✔
348

349
    for _, obj in ipairs(objs) do
434✔
350

351
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
600✔
352
                                                    opts.skip_nullability_check_on_flatten)
300✔
353
        if err ~= nil then
300✔
354
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
64✔
355
            err_obj.operation_data = obj
64✔
356

357
            if opts.stop_on_error == true then
64✔
358
                return nil, {err_obj}
4✔
359
            end
360

361
            table.insert(format_errs, err_obj)
60✔
362
        end
363

364
        table.insert(tuples, tuple)
296✔
365
    end
366

367
    if next(tuples) == nil then
134✔
368
        return nil, format_errs
40✔
369
    end
370

371
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
188✔
372
                                              space_name, tuples, opts)
94✔
373

374
    if next(format_errs) ~= nil then
94✔
375
        if errs == nil then
6✔
376
            errs = format_errs
4✔
377
        else
378
            errs = utils.list_extend(errs, format_errs)
4✔
379
        end
380
    end
381

382
    return res, errs
94✔
383
end
384

385
return insert_many
623✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc