• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/crud/upsert_many.lua
1
local checks = require('checks')
404✔
2
local errors = require('errors')
404✔
3

4
local call = require('crud.common.call')
404✔
5
local const = require('crud.common.const')
404✔
6
local utils = require('crud.common.utils')
404✔
7
local batching_utils = require('crud.common.batching_utils')
404✔
8
local sharding = require('crud.common.sharding')
404✔
9
local dev_checks = require('crud.common.dev_checks')
404✔
10
local schema = require('crud.common.schema')
404✔
11

12
local BatchUpsertIterator = require('crud.common.map_call_cases.batch_upsert_iter')
404✔
13
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
404✔
14

15
local UpsertManyError = errors.new_class('UpsertManyError', {capture_stack = false})
404✔
16

17
local upsert_many = {}
404✔
18

19
local UPSERT_MANY_FUNC_NAME = '_crud.upsert_many_on_storage'
404✔
20

21
local function upsert_many_on_storage(space_name, tuples, operations, opts)
22
    dev_checks('string', 'table', 'table', {
380✔
23
        add_space_schema_hash = '?boolean',
24
        stop_on_error = '?boolean',
25
        rollback_on_error = '?boolean',
26
        sharding_key_hash = '?number',
27
        sharding_func_hash = '?number',
28
        skip_sharding_hash_check = '?boolean',
29
        noreturn = '?boolean',
30
        fetch_latest_metadata = '?boolean',
31
    })
32

33
    opts = opts or {}
380✔
34

35
    local space = box.space[space_name]
380✔
36
    if space == nil then
380✔
37
        return nil, UpsertManyError:new("Space %q doesn't exist", space_name)
×
38
    end
39

40
    local _, err = sharding.check_sharding_hash(space_name,
760✔
41
                                                opts.sharding_func_hash,
380✔
42
                                                opts.sharding_key_hash,
380✔
43
                                                opts.skip_sharding_hash_check)
380✔
44

45
    if err ~= nil then
380✔
46
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
47
    end
48

49
    local processed_tuples = {}
368✔
50
    local errs = {}
368✔
51
    local replica_schema_version = nil
368✔
52

53
    box.begin()
368✔
54
    for i, tuple in ipairs(tuples) do
828✔
55
        -- add_space_schema_hash is true only in case of upsert_object_many
56
        -- the only one case when reloading schema can avoid upsert error
57
        -- is flattening object on router
58
        local insert_result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations[i]}, {
952✔
59
            add_space_schema_hash = opts.add_space_schema_hash,
476✔
60
            fetch_latest_metadata = opts.fetch_latest_metadata,
476✔
61
        })
62
        if opts.fetch_latest_metadata then
476✔
63
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
64
        end
65

66
        if insert_result.err ~= nil then
476✔
67
            local err = {
114✔
68
                err = insert_result.err,
114✔
69
                space_schema_hash = insert_result.space_schema_hash,
114✔
70
                operation_data = tuple,
114✔
71
            }
72

73
            table.insert(errs, err)
114✔
74

75
            if opts.stop_on_error == true then
114✔
76
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
77
                if next(left_tuples) then
16✔
78
                    errs = batching_utils.complement_batching_errors(errs,
32✔
79
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
80
                end
81

82
                if opts.rollback_on_error == true then
16✔
83
                    box.rollback()
4✔
84
                    if next(processed_tuples) then
4✔
85
                        errs = batching_utils.complement_batching_errors(errs,
8✔
86
                                batching_utils.rollback_on_error_msg, processed_tuples)
8✔
87
                    end
88

89
                    return nil, errs, replica_schema_version
4✔
90
                end
91

92
                box.commit()
12✔
93

94
                return nil, errs, replica_schema_version
12✔
95
            end
96
        else
97
            table.insert(processed_tuples, tuple)
362✔
98
        end
99
    end
100

101
    if next(errs) ~= nil then
352✔
102
        if opts.rollback_on_error == true then
74✔
103
            box.rollback()
12✔
104
            if next(processed_tuples) then
12✔
105
                errs = batching_utils.complement_batching_errors(errs,
24✔
106
                        batching_utils.rollback_on_error_msg, processed_tuples)
24✔
107
            end
108

109
            return nil, errs, replica_schema_version
12✔
110
        end
111

112
        box.commit()
62✔
113

114
        return nil, errs, replica_schema_version
62✔
115
    end
116

117
    box.commit()
278✔
118

119
    return nil, nil, replica_schema_version
278✔
120
end
121

122
function upsert_many.init()
404✔
123
    _G._crud.upsert_many_on_storage = upsert_many_on_storage
302✔
124
end
125

126
-- returns result, err, need_reload
127
-- need_reload indicates if reloading schema could help
128
-- see crud.common.schema.wrap_func_reload()
129
local function call_upsert_many_on_router(vshard_router, space_name, original_tuples_operation_data, opts)
130
    dev_checks('table', 'string', 'table', {
244✔
131
        timeout = '?number',
132
        fields = '?table',
133
        add_space_schema_hash = '?boolean',
134
        stop_on_error = '?boolean',
135
        rollback_on_error = '?boolean',
136
        vshard_router = '?string|table',
137
        skip_nullability_check_on_flatten = '?boolean',
138
        noreturn = '?boolean',
139
        fetch_latest_metadata = '?boolean',
140
    })
141

142
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
244✔
143
    if err ~= nil then
244✔
144
        return nil, {
×
145
            UpsertManyError:new("An error occurred during the operation: %s", err)
×
146
        }, const.NEED_SCHEMA_RELOAD
×
147
    end
148
    if space == nil then
244✔
149
        return nil, {UpsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
24✔
150
    end
151

152
    local space_format = space:format()
232✔
153
    local tuples = {}
232✔
154
    local operations = {}
232✔
155
    for _, tuple_operation_data in ipairs(original_tuples_operation_data) do
756✔
156
        local tuple = table.deepcopy(tuple_operation_data[1])
524✔
157
        local operations_by_tuple = tuple_operation_data[2]
524✔
158

159
        if not utils.tarantool_supports_fieldpaths() then
1,048✔
160
            local converted_operations, err = utils.convert_operations(operations_by_tuple, space_format)
×
161
            if err ~= nil then
×
162
                return nil, {UpsertManyError:new("Wrong operations are specified: %s", err)}, const.NEED_SCHEMA_RELOAD
×
163
            end
164

165
            operations_by_tuple = converted_operations
×
166
        end
167

168
        table.insert(tuples, tuple)
524✔
169
        table.insert(operations, operations_by_tuple)
524✔
170
    end
171

172
    local upsert_many_on_storage_opts = {
232✔
173
        add_space_schema_hash = opts.add_space_schema_hash,
232✔
174
        stop_on_error = opts.stop_on_error,
232✔
175
        rollback_on_error = opts.rollback_on_error,
232✔
176
        fetch_latest_metadata = opts.fetch_latest_metadata,
232✔
177
    }
178

179
    local iter, err = BatchUpsertIterator:new({
464✔
180
        tuples = tuples,
232✔
181
        space = space,
232✔
182
        operations = operations,
232✔
183
        execute_on_storage_opts = upsert_many_on_storage_opts,
232✔
184
        vshard_router = vshard_router,
232✔
185
    })
186
    if err ~= nil then
232✔
187
        return nil, {err}, const.NEED_SCHEMA_RELOAD
×
188
    end
189

190
    local postprocessor = BatchPostprocessor:new(vshard_router)
232✔
191

192
    local _, errs, storages_info = call.map(vshard_router, UPSERT_MANY_FUNC_NAME, nil, {
464✔
193
        timeout = opts.timeout,
232✔
194
        mode = 'write',
195
        iter = iter,
232✔
196
        postprocessor = postprocessor,
232✔
197
    })
198

199
    if errs ~= nil then
232✔
200
        local tuples_count = table.maxn(tuples)
80✔
201
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
160✔
202
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
203
        end
204

205
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
144✔
206
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
207
        end
208

209
        if table.maxn(tuples) == table.maxn(errs) then
64✔
210
            return nil, errs
42✔
211
        end
212
    end
213

214
    if opts.noreturn == true then
174✔
215
        return nil, errs
8✔
216
    end
217

218
    if opts.fetch_latest_metadata == true then
166✔
219
        -- This option is temporary and is related to [1], [2].
220
        -- [1] https://github.com/tarantool/crud/issues/236
221
        -- [2] https://github.com/tarantool/crud/issues/361
222
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
223
                                                              storages_info, netbox_schema_version)
8✔
224
    end
225

226
    local res, err = utils.format_result(nil, space, opts.fields)
166✔
227
    if err ~= nil then
166✔
228
        errs = errs or {}
4✔
229
        table.insert(errs, err)
4✔
230
        return nil, errs
4✔
231
    end
232

233
    return res, errs
162✔
234
end
235

236
--- Update or insert batch of tuples to the specified space
237
--
238
-- @function tuples
239
--
240
-- @param string space_name
241
--  A space name
242
--
243
-- @param table tuples_operation_data
244
--  Tuples and operations in format
245
--  {{tuple_1, operation_1}, ..., {tuple_n, operation_n}}
246
--
247
-- @tparam ?table opts
248
--  Options of batch_upsert.tuples_batch
249
--
250
-- @return[1] tuples
251
-- @treturn[2] nil
252
-- @treturn[2] table of tables Error description
253

254
function upsert_many.tuples(space_name, tuples_operation_data, opts)
404✔
255
    checks('string', 'table', {
138✔
256
        timeout = '?number',
257
        fields = '?table',
258
        add_space_schema_hash = '?boolean',
259
        stop_on_error = '?boolean',
260
        rollback_on_error = '?boolean',
261
        vshard_router = '?string|table',
262
        noreturn = '?boolean',
263
        fetch_latest_metadata = '?boolean',
264
    })
265

266
    opts = opts or {}
138✔
267

268
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
138✔
269
    if err ~= nil then
138✔
270
        return nil, {UpsertManyError:new(err)}
16✔
271
    end
272

273
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_many_on_router,
130✔
274
                                   space_name, tuples_operation_data, opts)
130✔
275
end
276

277
--- Update or insert batch of objects to the specified space
278
--
279
-- @function objects
280
--
281
-- @param string space_name
282
--  A space name
283
--
284
-- @param table objs_operation_data
285
--  Objects and operations in format
286
--  {{obj_1, operation_1}, ..., {obj_n, operation_n}}
287
--
288
-- @tparam ?table opts
289
--  Options of batch_upsert.tuples_batch
290
--
291
-- @return[1] objects
292
-- @treturn[2] nil
293
-- @treturn[2] table of tables Error description
294

295
function upsert_many.objects(space_name, objs_operation_data, opts)
404✔
296
    checks('string', 'table', {
148✔
297
        timeout = '?number',
298
        fields = '?table',
299
        stop_on_error = '?boolean',
300
        rollback_on_error = '?boolean',
301
        vshard_router = '?string|table',
302
        noreturn = '?boolean',
303
        fetch_latest_metadata = '?boolean',
304
    })
305

306
    opts = opts or {}
148✔
307

308
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
148✔
309
    if err ~= nil then
148✔
310
        return nil, {UpsertManyError:new(err)}
16✔
311
    end
312

313
    -- upsert can fail if router uses outdated schema to flatten object
314
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
280✔
315

316
    local tuples_operation_data = {}
140✔
317
    local format_errs = {}
140✔
318

319
    for _, obj_operation_data in ipairs(objs_operation_data) do
430✔
320
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj_operation_data[1])
294✔
321
        if err ~= nil then
294✔
322
            local err_obj = UpsertManyError:new("Failed to flatten object: %s", err)
66✔
323
            err_obj.operation_data = obj_operation_data[1]
66✔
324

325
            if opts.stop_on_error == true then
66✔
326
                return nil, {err_obj}
4✔
327
            end
328

329
            table.insert(format_errs, err_obj)
62✔
330
        else
331
            table.insert(tuples_operation_data, {tuple, obj_operation_data[2]})
228✔
332
        end
333
    end
334

335
    if next(tuples_operation_data) == nil then
136✔
336
        return nil, format_errs
40✔
337
    end
338

339
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_many_on_router,
192✔
340
                                              space_name, tuples_operation_data, opts)
96✔
341

342
    if next(format_errs) ~= nil then
96✔
343
        if errs == nil then
6✔
344
            errs = format_errs
4✔
345
        else
346
            errs = utils.list_extend(errs, format_errs)
4✔
347
        end
348
    end
349

350
    return res, errs
96✔
351
end
352

353
return upsert_many
404✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc