• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.71
/crud/insert_many.lua
1
local checks = require('checks')
404✔
2
local errors = require('errors')
404✔
3

4
local call = require('crud.common.call')
404✔
5
local const = require('crud.common.const')
404✔
6
local utils = require('crud.common.utils')
404✔
7
local batching_utils = require('crud.common.batching_utils')
404✔
8
local sharding = require('crud.common.sharding')
404✔
9
local dev_checks = require('crud.common.dev_checks')
404✔
10
local schema = require('crud.common.schema')
404✔
11

12
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
404✔
13
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
404✔
14

15
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
404✔
16

17
local insert_many = {}
404✔
18

19
local INSERT_MANY_FUNC_NAME = '_crud.insert_many_on_storage'
404✔
20

21
local function insert_many_on_storage(space_name, tuples, opts)
22
    dev_checks('string', 'table', {
35,063✔
23
        add_space_schema_hash = '?boolean',
24
        fields = '?table',
25
        stop_on_error = '?boolean',
26
        rollback_on_error = '?boolean',
27
        sharding_key_hash = '?number',
28
        sharding_func_hash = '?number',
29
        skip_sharding_hash_check = '?boolean',
30
        noreturn = '?boolean',
31
        fetch_latest_metadata = '?boolean',
32
    })
33

34
    opts = opts or {}
35,063✔
35

36
    local space = box.space[space_name]
35,063✔
37
    if space == nil then
35,063✔
38
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
39
    end
40

41
    local _, err = sharding.check_sharding_hash(space_name,
70,126✔
42
                                                opts.sharding_func_hash,
35,063✔
43
                                                opts.sharding_key_hash,
35,063✔
44
                                                opts.skip_sharding_hash_check)
35,063✔
45

46
    if err ~= nil then
35,063✔
47
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
48
    end
49

50
    local inserted_tuples = {}
35,051✔
51
    local errs = {}
35,051✔
52
    local replica_schema_version = nil
35,051✔
53

54
    box.begin()
35,051✔
55
    for i, tuple in ipairs(tuples) do
243,241✔
56
        -- add_space_schema_hash is true only in case of insert_object_many
57
        -- the only one case when reloading schema can avoid insert error
58
        -- is flattening object on router
59
        local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
416,412✔
60
            add_space_schema_hash = opts.add_space_schema_hash,
208,206✔
61
            field_names = opts.fields,
208,206✔
62
            noreturn = opts.noreturn,
208,206✔
63
            fetch_latest_metadata = opts.fetch_latest_metadata,
208,206✔
64
        })
65
        if opts.fetch_latest_metadata then
208,206✔
66
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
67
        end
68

69
        if insert_result.err ~= nil then
208,206✔
70
            local err = {
118✔
71
                err = insert_result.err,
118✔
72
                space_schema_hash = insert_result.space_schema_hash,
118✔
73
                operation_data = tuple,
118✔
74
            }
75

76
            table.insert(errs, err)
118✔
77

78
            if opts.stop_on_error == true then
118✔
79
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
80
                if next(left_tuples) then
16✔
81
                    errs = batching_utils.complement_batching_errors(errs,
32✔
82
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
83
                end
84

85
                if opts.rollback_on_error == true then
16✔
86
                    box.rollback()
4✔
87
                    if next(inserted_tuples) then
4✔
88
                        errs = batching_utils.complement_batching_errors(errs,
8✔
89
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
90
                    end
91

92
                    return nil, errs, replica_schema_version
4✔
93
                end
94

95
                box.commit()
12✔
96

97
                return inserted_tuples, errs, replica_schema_version
12✔
98
            end
99
        end
100

101
        table.insert(inserted_tuples, insert_result.res)
208,190✔
102
    end
103

104
    if next(errs) ~= nil then
35,035✔
105
        if opts.rollback_on_error == true then
76✔
106
            box.rollback()
12✔
107
            if next(inserted_tuples) then
12✔
108
                errs = batching_utils.complement_batching_errors(errs,
24✔
109
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
110
            end
111

112
            return nil, errs, replica_schema_version
12✔
113
        end
114

115
        box.commit()
64✔
116

117
        return inserted_tuples, errs, replica_schema_version
64✔
118
    end
119

120
    box.commit()
34,959✔
121

122
    return inserted_tuples, nil, replica_schema_version
34,959✔
123
end
124

125
function insert_many.init()
404✔
126
    _G._crud.insert_many_on_storage = insert_many_on_storage
302✔
127
end
128

129
-- returns result, err, need_reload
130
-- need_reload indicates if reloading schema could help
131
-- see crud.common.schema.wrap_func_reload()
132
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
133
    dev_checks('table', 'string', 'table', {
31,147✔
134
        timeout = '?number',
135
        fields = '?table',
136
        add_space_schema_hash = '?boolean',
137
        stop_on_error = '?boolean',
138
        rollback_on_error = '?boolean',
139
        vshard_router = '?string|table',
140
        skip_nullability_check_on_flatten = '?boolean',
141
        noreturn = '?boolean',
142
        fetch_latest_metadata = '?boolean',
143
    })
144

145
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
31,147✔
146
    if err ~= nil then
31,147✔
147
        return nil, {
×
148
            InsertManyError:new("An error occurred during the operation: %s", err)
×
149
        }, const.NEED_SCHEMA_RELOAD
×
150
    end
151
    if space == nil then
31,147✔
152
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
26✔
153
    end
154

155
    local tuples = table.deepcopy(original_tuples)
31,134✔
156

157
    local batch_insert_on_storage_opts = {
31,134✔
158
        add_space_schema_hash = opts.add_space_schema_hash,
31,134✔
159
        fields = opts.fields,
31,134✔
160
        stop_on_error = opts.stop_on_error,
31,134✔
161
        rollback_on_error = opts.rollback_on_error,
31,134✔
162
        noreturn = opts.noreturn,
31,134✔
163
        fetch_latest_metadata = opts.fetch_latest_metadata,
31,134✔
164
    }
165

166
    local iter, err = BatchInsertIterator:new({
62,268✔
167
        tuples = tuples,
31,134✔
168
        space = space,
31,134✔
169
        execute_on_storage_opts = batch_insert_on_storage_opts,
31,134✔
170
        vshard_router = vshard_router,
31,134✔
171
    })
172
    if err ~= nil then
31,134✔
173
        return nil, {err}, const.NEED_SCHEMA_RELOAD
×
174
    end
175

176
    local postprocessor = BatchPostprocessor:new(vshard_router)
31,134✔
177

178
    local rows, errs, storages_info = call.map(vshard_router, INSERT_MANY_FUNC_NAME, nil, {
62,268✔
179
        timeout = opts.timeout,
31,134✔
180
        mode = 'write',
181
        iter = iter,
31,134✔
182
        postprocessor = postprocessor,
31,134✔
183
    })
184

185
    if errs ~= nil then
31,134✔
186
        local tuples_count = table.maxn(tuples)
82✔
187
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
164✔
188
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
189
        end
190

191
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
148✔
192
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
193
        end
194
    end
195

196
    if next(rows) == nil then
31,118✔
197
        return nil, errs
15,590✔
198
    end
199

200
    if opts.fetch_latest_metadata == true then
15,528✔
201
        -- This option is temporary and is related to [1], [2].
202
        -- [1] https://github.com/tarantool/crud/issues/236
203
        -- [2] https://github.com/tarantool/crud/issues/361
204
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
205
                                                              storages_info, netbox_schema_version)
8✔
206
    end
207

208
    local res, err = utils.format_result(rows, space, opts.fields)
15,528✔
209
    if err ~= nil then
15,528✔
210
        errs = errs or {}
4✔
211
        table.insert(errs, err)
4✔
212
        return nil, errs
4✔
213
    end
214

215
    return res, errs
15,524✔
216
end
217

218
--- Inserts batch of tuples to the specified space
219
--
220
-- @function tuples
221
--
222
-- @param string space_name
223
--  A space name
224
--
225
-- @param table tuples
226
--  Tuples
227
--
228
-- @tparam ?table opts
229
--  Options of batch_insert.tuples_batch
230
--
231
-- @return[1] tuples
232
-- @treturn[2] nil
233
-- @treturn[2] table of tables Error description
234

235
function insert_many.tuples(space_name, tuples, opts)
404✔
236
    checks('string', 'table', {
31,044✔
237
        timeout = '?number',
238
        fields = '?table',
239
        add_space_schema_hash = '?boolean',
240
        stop_on_error = '?boolean',
241
        rollback_on_error = '?boolean',
242
        vshard_router = '?string|table',
243
        noreturn = '?boolean',
244
        fetch_latest_metadata = '?boolean',
245
    })
246

247
    opts = opts or {}
31,044✔
248

249
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
31,044✔
250
    if err ~= nil then
31,044✔
251
        return nil, {InsertManyError:new(err)}
16✔
252
    end
253

254
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
31,036✔
255
                                   space_name, tuples, opts)
31,036✔
256
end
257

258
--- Inserts batch of objects to the specified space
259
--
260
-- @function objects
261
--
262
-- @param string space_name
263
--  A space name
264
--
265
-- @param table objs
266
--  Objects
267
--
268
-- @tparam ?table opts
269
--  Options of batch_insert.tuples_batch
270
--
271
-- @return[1] objects
272
-- @treturn[2] nil
273
-- @treturn[2] table of tables Error description
274

275
function insert_many.objects(space_name, objs, opts)
404✔
276
    checks('string', 'table', {
146✔
277
        timeout = '?number',
278
        fields = '?table',
279
        stop_on_error = '?boolean',
280
        rollback_on_error = '?boolean',
281
        vshard_router = '?string|table',
282
        skip_nullability_check_on_flatten = '?boolean',
283
        noreturn = '?boolean',
284
        fetch_latest_metadata = '?boolean',
285
    })
286

287
    opts = opts or {}
146✔
288

289
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
146✔
290
    if err ~= nil then
146✔
291
        return nil, {InsertManyError:new(err)}
16✔
292
    end
293

294
    -- insert can fail if router uses outdated schema to flatten object
295
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
296

297
    local tuples = {}
138✔
298
    local format_errs = {}
138✔
299

300
    for _, obj in ipairs(objs) do
432✔
301

302
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
596✔
303
                                                    opts.skip_nullability_check_on_flatten)
298✔
304
        if err ~= nil then
298✔
305
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
68✔
306
            err_obj.operation_data = obj
68✔
307

308
            if opts.stop_on_error == true then
68✔
309
                return nil, {err_obj}
4✔
310
            end
311

312
            table.insert(format_errs, err_obj)
64✔
313
        end
314

315
        table.insert(tuples, tuple)
294✔
316
    end
317

318
    if next(tuples) == nil then
134✔
319
        return nil, format_errs
42✔
320
    end
321

322
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
184✔
323
                                              space_name, tuples, opts)
92✔
324

325
    if next(format_errs) ~= nil then
92✔
326
        if errs == nil then
6✔
327
            errs = format_errs
4✔
328
        else
329
            errs = utils.list_extend(errs, format_errs)
4✔
330
        end
331
    end
332

333
    return res, errs
92✔
334
end
335

336
return insert_many
404✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc