• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 21364862659

26 Jan 2026 04:13PM UTC coverage: 73.492% (-15.0%) from 88.463%
21364862659

push

github

web-flow
Merge f981517ee into a84e19f3e

4253 of 5787 relevant lines covered (73.49%)

55.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.43
/crud/insert_many.lua
1
local checks = require('checks')
19✔
2
local errors = require('errors')
19✔
3

4
local call = require('crud.common.call')
19✔
5
local const = require('crud.common.const')
19✔
6
local utils = require('crud.common.utils')
19✔
7
local batching_utils = require('crud.common.batching_utils')
19✔
8
local sharding = require('crud.common.sharding')
19✔
9
local dev_checks = require('crud.common.dev_checks')
19✔
10
local schema = require('crud.common.schema')
19✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
19✔
12

13
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
19✔
14
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
19✔
15

16
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
19✔
17

18
local insert_many = {}
19✔
19

20
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
19✔
21
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
19✔
22

23
local function insert_many_on_storage(space_name, tuples, opts)
24
    dev_checks('string', 'table', {
14✔
25
        add_space_schema_hash = '?boolean',
26
        fields = '?table',
27
        stop_on_error = '?boolean',
28
        rollback_on_error = '?boolean',
29
        sharding_key_hash = '?number',
30
        sharding_func_hash = '?number',
31
        skip_sharding_hash_check = '?boolean',
32
        noreturn = '?boolean',
33
        fetch_latest_metadata = '?boolean',
34
    })
35

36
    opts = opts or {}
14✔
37

38
    local space = box.space[space_name]
14✔
39
    if space == nil then
14✔
40
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
41
    end
42

43
    local _, err = sharding.check_sharding_hash(space_name,
28✔
44
                                                opts.sharding_func_hash,
14✔
45
                                                opts.sharding_key_hash,
14✔
46
                                                opts.skip_sharding_hash_check)
14✔
47

48
    if err ~= nil then
14✔
49
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
×
50
    end
51

52
    local bucket_ids = {}
14✔
53
    for _, tuple in ipairs(tuples) do
30✔
54
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
32✔
55
    end
56

57
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids, space.engine)
14✔
58
    if not ref_ok then
14✔
59
        return nil, bucket_ref_err
×
60
    end
61

62
    local inserted_tuples = {}
14✔
63
    local errs = {}
14✔
64
    local replica_schema_version = nil
14✔
65

66
    local insert_opts = {
14✔
67
        add_space_schema_hash = opts.add_space_schema_hash,
14✔
68
        field_names = opts.fields,
14✔
69
        noreturn = opts.noreturn,
14✔
70
        fetch_latest_metadata = opts.fetch_latest_metadata,
14✔
71
    }
72

73
    box.begin()
14✔
74
    for i, tuple in ipairs(tuples) do
30✔
75
        -- add_space_schema_hash is true only in case of insert_object_many
76
        -- the only one case when reloading schema can avoid insert error
77
        -- is flattening object on router
78
        local insert_result = schema.wrap_func_result(
32✔
79
            space, space.insert, insert_opts,
16✔
80
            space, tuple
16✔
81
        )
16✔
82
        if opts.fetch_latest_metadata then
16✔
83
            replica_schema_version = insert_result.storage_info.replica_schema_version
×
84
        end
85

86
        if insert_result.err ~= nil then
16✔
87
            local err = {
×
88
                err = insert_result.err,
89
                space_schema_hash = insert_result.space_schema_hash,
90
                operation_data = tuple,
91
            }
92

93
            table.insert(errs, err)
×
94

95
            if opts.stop_on_error == true then
×
96
                local left_tuples = utils.list_slice(tuples, i + 1)
×
97
                if next(left_tuples) then
×
98
                    errs = batching_utils.complement_batching_errors(errs,
99
                            batching_utils.stop_on_error_msg, left_tuples)
×
100
                end
101

102
                if opts.rollback_on_error == true then
×
103
                    local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
×
104
                    box.rollback()
×
105
                    if not unref_ok then
×
106
                        return nil, bucket_unref_err
×
107
                    end
108
                    if next(inserted_tuples) then
×
109
                        errs = batching_utils.complement_batching_errors(errs,
110
                                batching_utils.rollback_on_error_msg, inserted_tuples)
×
111
                    end
112

113
                    return nil, errs, replica_schema_version
×
114
                end
115

116
                local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
×
117
                box.commit()
×
118
                if not unref_ok then
×
119
                    return nil, bucket_unref_err
×
120
                end
121

122
                return inserted_tuples, errs, replica_schema_version
×
123
            end
124
        end
125

126
        table.insert(inserted_tuples, insert_result.res)
16✔
127
    end
128

129
    if next(errs) ~= nil then
14✔
130
        if opts.rollback_on_error == true then
×
131
            local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
×
132
            box.rollback()
×
133
            if not unref_ok then
×
134
                return nil, bucket_unref_err
×
135
            end
136
            if next(inserted_tuples) then
×
137
                errs = batching_utils.complement_batching_errors(errs,
138
                        batching_utils.rollback_on_error_msg, inserted_tuples)
×
139
            end
140

141
            return nil, errs, replica_schema_version
×
142
        end
143

144
        local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
×
145
        box.commit()
×
146
        if not unref_ok then
×
147
            return nil, bucket_unref_err
×
148
        end
149

150
        return inserted_tuples, errs, replica_schema_version
×
151
    end
152

153
    local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
14✔
154
    box.commit()
14✔
155
    if not unref_ok then
14✔
156
        return nil, bucket_unref_err
×
157
    end
158

159
    return inserted_tuples, nil, replica_schema_version
14✔
160
end
161

162
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
19✔
163

164
-- returns result, err, need_reload
165
-- need_reload indicates if reloading schema could help
166
-- see crud.common.schema.wrap_func_reload()
167
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
168
    dev_checks('table', 'string', 'table', {
12✔
169
        timeout = '?number',
170
        fields = '?table',
171
        add_space_schema_hash = '?boolean',
172
        stop_on_error = '?boolean',
173
        rollback_on_error = '?boolean',
174
        vshard_router = '?string|table',
175
        skip_nullability_check_on_flatten = '?boolean',
176
        noreturn = '?boolean',
177
        fetch_latest_metadata = '?boolean',
178
    })
179

180
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
12✔
181
    if err ~= nil then
12✔
182
        return nil, {
×
183
            InsertManyError:new("An error occurred during the operation: %s", err)
×
184
        }, const.NEED_SCHEMA_RELOAD
×
185
    end
186
    if space == nil then
12✔
187
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
8✔
188
    end
189

190
    local tuples = table.deepcopy(original_tuples)
8✔
191

192
    local batch_insert_on_storage_opts = {
8✔
193
        add_space_schema_hash = opts.add_space_schema_hash,
8✔
194
        fields = opts.fields,
8✔
195
        stop_on_error = opts.stop_on_error,
8✔
196
        rollback_on_error = opts.rollback_on_error,
8✔
197
        noreturn = opts.noreturn,
8✔
198
        fetch_latest_metadata = opts.fetch_latest_metadata,
8✔
199
    }
200

201
    local iter, err = BatchInsertIterator:new({
16✔
202
        tuples = tuples,
8✔
203
        space = space,
8✔
204
        execute_on_storage_opts = batch_insert_on_storage_opts,
8✔
205
        vshard_router = vshard_router,
8✔
206
    })
207
    if err ~= nil then
8✔
208
        return nil, {err}, const.NEED_SCHEMA_RELOAD
×
209
    end
210

211
    local postprocessor = BatchPostprocessor:new(vshard_router)
8✔
212

213
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
16✔
214
        timeout = opts.timeout,
8✔
215
        mode = 'write',
216
        iter = iter,
8✔
217
        postprocessor = postprocessor,
8✔
218
    })
219

220
    if errs ~= nil then
8✔
221
        local tuples_count = table.maxn(tuples)
×
222
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
×
223
            return nil, errs, const.NEED_SHARDING_RELOAD
×
224
        end
225

226
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
×
227
            return nil, errs, const.NEED_SCHEMA_RELOAD
×
228
        end
229
    end
230

231
    if next(rows) == nil then
8✔
232
        return nil, errs
×
233
    end
234

235
    if opts.fetch_latest_metadata == true then
8✔
236
        -- This option is temporary and is related to [1], [2].
237
        -- [1] https://github.com/tarantool/crud/issues/236
238
        -- [2] https://github.com/tarantool/crud/issues/361
239
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
240
                                                              storages_info, netbox_schema_version)
×
241
    end
242

243
    local res, err = utils.format_result(rows, space, opts.fields)
8✔
244
    if err ~= nil then
8✔
245
        errs = errs or {}
×
246
        table.insert(errs, err)
×
247
        return nil, errs
×
248
    end
249

250
    return res, errs
8✔
251
end
252

253
--- Inserts batch of tuples to the specified space
254
--
255
-- @function tuples
256
--
257
-- @param string space_name
258
--  A space name
259
--
260
-- @param table tuples
261
--  Tuples
262
--
263
-- @tparam ?table opts
264
--  Options of batch_insert.tuples_batch
265
--
266
-- @return[1] tuples
267
-- @treturn[2] nil
268
-- @treturn[2] table of tables Error description
269

270
function insert_many.tuples(space_name, tuples, opts)
19✔
271
    checks('string', 'table', {
10✔
272
        timeout = '?number',
273
        fields = '?table',
274
        add_space_schema_hash = '?boolean',
275
        stop_on_error = '?boolean',
276
        rollback_on_error = '?boolean',
277
        vshard_router = '?string|table',
278
        noreturn = '?boolean',
279
        fetch_latest_metadata = '?boolean',
280
    })
281

282
    if next(tuples) == nil then
10✔
283
        return nil, {InsertManyError:new("At least one tuple expected")}
×
284
    end
285

286
    opts = opts or {}
10✔
287

288
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
10✔
289
    if err ~= nil then
10✔
290
        return nil, {InsertManyError:new(err)}
8✔
291
    end
292

293
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
6✔
294
                                   space_name, tuples, opts)
6✔
295
end
296

297
--- Inserts batch of objects to the specified space
298
--
299
-- @function objects
300
--
301
-- @param string space_name
302
--  A space name
303
--
304
-- @param table objs
305
--  Objects
306
--
307
-- @tparam ?table opts
308
--  Options of batch_insert.tuples_batch
309
--
310
-- @return[1] objects
311
-- @treturn[2] nil
312
-- @treturn[2] table of tables Error description
313

314
function insert_many.objects(space_name, objs, opts)
19✔
315
    checks('string', 'table', {
10✔
316
        timeout = '?number',
317
        fields = '?table',
318
        stop_on_error = '?boolean',
319
        rollback_on_error = '?boolean',
320
        vshard_router = '?string|table',
321
        skip_nullability_check_on_flatten = '?boolean',
322
        noreturn = '?boolean',
323
        fetch_latest_metadata = '?boolean',
324
    })
325

326
    if next(objs) == nil then
10✔
327
        return nil, {InsertManyError:new("At least one object expected")}
×
328
    end
329

330
    opts = opts or {}
10✔
331

332
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
10✔
333
    if err ~= nil then
10✔
334
        return nil, {InsertManyError:new(err)}
8✔
335
    end
336

337
    -- insert can fail if router uses outdated schema to flatten object
338
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
12✔
339

340
    local tuples = {}
6✔
341
    local format_errs = {}
6✔
342

343
    for _, obj in ipairs(objs) do
18✔
344

345
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
24✔
346
                                                    opts.skip_nullability_check_on_flatten)
12✔
347
        if err ~= nil then
12✔
348
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
4✔
349
            err_obj.operation_data = obj
4✔
350

351
            if opts.stop_on_error == true then
4✔
352
                return nil, {err_obj}
×
353
            end
354

355
            table.insert(format_errs, err_obj)
4✔
356
        end
357

358
        table.insert(tuples, tuple)
12✔
359
    end
360

361
    if next(tuples) == nil then
6✔
362
        return nil, format_errs
2✔
363
    end
364

365
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
8✔
366
                                              space_name, tuples, opts)
4✔
367

368
    if next(format_errs) ~= nil then
4✔
369
        if errs == nil then
×
370
            errs = format_errs
×
371
        else
372
            errs = utils.list_extend(errs, format_errs)
×
373
        end
374
    end
375

376
    return res, errs
4✔
377
end
378

379
return insert_many
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc