• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 8518810120

02 Apr 2024 07:09AM UTC coverage: 88.753% (+0.06%) from 88.694%
8518810120

push

github

DifferentialOrange
storage: use async bootstrap by default for tarantool 3

Closes #412
Part of #415

8 of 9 new or added lines in 2 files covered. (88.89%)

209 existing lines in 16 files now uncovered.

4806 of 5415 relevant lines covered (88.75%)

6165.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.79
/crud/insert_many.lua
1
local checks = require('checks')
248✔
2
local errors = require('errors')
248✔
3

4
local call = require('crud.common.call')
248✔
5
local const = require('crud.common.const')
248✔
6
local utils = require('crud.common.utils')
248✔
7
local batching_utils = require('crud.common.batching_utils')
248✔
8
local sharding = require('crud.common.sharding')
248✔
9
local dev_checks = require('crud.common.dev_checks')
248✔
10
local schema = require('crud.common.schema')
248✔
11

12
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
248✔
13
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
248✔
14

15
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
248✔
16

17
local insert_many = {}
248✔
18

19
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
248✔
20
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
248✔
21

22
local function insert_many_on_storage(space_name, tuples, opts)
23
    dev_checks('string', 'table', {
2,108✔
24
        add_space_schema_hash = '?boolean',
25
        fields = '?table',
26
        stop_on_error = '?boolean',
27
        rollback_on_error = '?boolean',
28
        sharding_key_hash = '?number',
29
        sharding_func_hash = '?number',
30
        skip_sharding_hash_check = '?boolean',
31
        noreturn = '?boolean',
32
        fetch_latest_metadata = '?boolean',
33
    })
34

35
    opts = opts or {}
2,108✔
36

37
    local space = box.space[space_name]
2,108✔
38
    if space == nil then
2,108✔
39
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
40
    end
41

42
    local _, err = sharding.check_sharding_hash(space_name,
4,216✔
43
                                                opts.sharding_func_hash,
2,108✔
44
                                                opts.sharding_key_hash,
2,108✔
45
                                                opts.skip_sharding_hash_check)
2,108✔
46

47
    if err ~= nil then
2,108✔
48
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
12✔
49
    end
50

51
    local inserted_tuples = {}
2,102✔
52
    local errs = {}
2,102✔
53
    local replica_schema_version = nil
2,102✔
54

55
    box.begin()
2,102✔
56
    for i, tuple in ipairs(tuples) do
116,477✔
57
        -- add_space_schema_hash is true only in case of insert_object_many
58
        -- the only one case when reloading schema can avoid insert error
59
        -- is flattening object on router
60
        local insert_result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
228,766✔
61
            add_space_schema_hash = opts.add_space_schema_hash,
114,383✔
62
            field_names = opts.fields,
114,383✔
63
            noreturn = opts.noreturn,
114,383✔
64
            fetch_latest_metadata = opts.fetch_latest_metadata,
114,383✔
65
        })
66
        if opts.fetch_latest_metadata then
114,383✔
67
            replica_schema_version = insert_result.storage_info.replica_schema_version
2✔
68
        end
69

70
        if insert_result.err ~= nil then
114,383✔
71
            local err = {
59✔
72
                err = insert_result.err,
59✔
73
                space_schema_hash = insert_result.space_schema_hash,
59✔
74
                operation_data = tuple,
59✔
75
            }
76

77
            table.insert(errs, err)
59✔
78

79
            if opts.stop_on_error == true then
59✔
80
                local left_tuples = utils.list_slice(tuples, i + 1)
8✔
81
                if next(left_tuples) then
8✔
82
                    errs = batching_utils.complement_batching_errors(errs,
16✔
83
                            batching_utils.stop_on_error_msg, left_tuples)
16✔
84
                end
85

86
                if opts.rollback_on_error == true then
8✔
87
                    box.rollback()
2✔
88
                    if next(inserted_tuples) then
2✔
89
                        errs = batching_utils.complement_batching_errors(errs,
4✔
90
                                batching_utils.rollback_on_error_msg, inserted_tuples)
4✔
91
                    end
92

93
                    return nil, errs, replica_schema_version
2✔
94
                end
95

96
                box.commit()
6✔
97

98
                return inserted_tuples, errs, replica_schema_version
6✔
99
            end
100
        end
101

102
        table.insert(inserted_tuples, insert_result.res)
114,375✔
103
    end
104

105
    if next(errs) ~= nil then
2,094✔
106
        if opts.rollback_on_error == true then
38✔
107
            box.rollback()
6✔
108
            if next(inserted_tuples) then
6✔
109
                errs = batching_utils.complement_batching_errors(errs,
12✔
110
                        batching_utils.rollback_on_error_msg, inserted_tuples)
12✔
111
            end
112

113
            return nil, errs, replica_schema_version
6✔
114
        end
115

116
        box.commit()
32✔
117

118
        return inserted_tuples, errs, replica_schema_version
32✔
119
    end
120

121
    box.commit()
2,056✔
122

123
    return inserted_tuples, nil, replica_schema_version
2,056✔
124
end
125

126
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
248✔
127

128
-- returns result, err, need_reload
129
-- need_reload indicates if reloading schema could help
130
-- see crud.common.schema.wrap_func_reload()
131
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
132
    dev_checks('table', 'string', 'table', {
1,832✔
133
        timeout = '?number',
134
        fields = '?table',
135
        add_space_schema_hash = '?boolean',
136
        stop_on_error = '?boolean',
137
        rollback_on_error = '?boolean',
138
        vshard_router = '?string|table',
139
        skip_nullability_check_on_flatten = '?boolean',
140
        noreturn = '?boolean',
141
        fetch_latest_metadata = '?boolean',
142
    })
143

144
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
1,832✔
145
    if err ~= nil then
1,832✔
UNCOV
146
        return nil, {
×
UNCOV
147
            InsertManyError:new("An error occurred during the operation: %s", err)
×
148
        }, const.NEED_SCHEMA_RELOAD
×
149
    end
150
    if space == nil then
1,832✔
151
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
12✔
152
    end
153

154
    local tuples = table.deepcopy(original_tuples)
1,826✔
155

156
    local batch_insert_on_storage_opts = {
1,826✔
157
        add_space_schema_hash = opts.add_space_schema_hash,
1,826✔
158
        fields = opts.fields,
1,826✔
159
        stop_on_error = opts.stop_on_error,
1,826✔
160
        rollback_on_error = opts.rollback_on_error,
1,826✔
161
        noreturn = opts.noreturn,
1,826✔
162
        fetch_latest_metadata = opts.fetch_latest_metadata,
1,826✔
163
    }
164

165
    local iter, err = BatchInsertIterator:new({
3,652✔
166
        tuples = tuples,
1,826✔
167
        space = space,
1,826✔
168
        execute_on_storage_opts = batch_insert_on_storage_opts,
1,826✔
169
        vshard_router = vshard_router,
1,826✔
170
    })
171
    if err ~= nil then
1,826✔
UNCOV
172
        return nil, {err}, const.NEED_SCHEMA_RELOAD
×
173
    end
174

175
    local postprocessor = BatchPostprocessor:new(vshard_router)
1,826✔
176

177
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
3,652✔
178
        timeout = opts.timeout,
1,826✔
179
        mode = 'write',
180
        iter = iter,
1,826✔
181
        postprocessor = postprocessor,
1,826✔
182
    })
183

184
    if errs ~= nil then
1,826✔
185
        local tuples_count = table.maxn(tuples)
45✔
186
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
90✔
187
            return nil, errs, const.NEED_SHARDING_RELOAD
4✔
188
        end
189

190
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
82✔
191
            return nil, errs, const.NEED_SCHEMA_RELOAD
4✔
192
        end
193
    end
194

195
    if next(rows) == nil then
1,818✔
196
        return nil, errs
934✔
197
    end
198

199
    if opts.fetch_latest_metadata == true then
884✔
200
        -- This option is temporary and is related to [1], [2].
201
        -- [1] https://github.com/tarantool/crud/issues/236
202
        -- [2] https://github.com/tarantool/crud/issues/361
203
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
4✔
204
                                                              storages_info, netbox_schema_version)
4✔
205
    end
206

207
    local res, err = utils.format_result(rows, space, opts.fields)
884✔
208
    if err ~= nil then
884✔
209
        errs = errs or {}
2✔
210
        table.insert(errs, err)
2✔
211
        return nil, errs
2✔
212
    end
213

214
    return res, errs
882✔
215
end
216

217
--- Inserts batch of tuples to the specified space
218
--
219
-- @function tuples
220
--
221
-- @param string space_name
222
--  A space name
223
--
224
-- @param table tuples
225
--  Tuples
226
--
227
-- @tparam ?table opts
228
--  Options of batch_insert.tuples_batch
229
--
230
-- @return[1] tuples
231
-- @treturn[2] nil
232
-- @treturn[2] table of tables Error description
233

234
function insert_many.tuples(space_name, tuples, opts)
248✔
235
    checks('string', 'table', {
1,782✔
236
        timeout = '?number',
237
        fields = '?table',
238
        add_space_schema_hash = '?boolean',
239
        stop_on_error = '?boolean',
240
        rollback_on_error = '?boolean',
241
        vshard_router = '?string|table',
242
        noreturn = '?boolean',
243
        fetch_latest_metadata = '?boolean',
244
    })
245

246
    if next(tuples) == nil then
1,782✔
247
        return nil, {InsertManyError:new("At least one tuple expected")}
2✔
248
    end
249

250
    opts = opts or {}
1,781✔
251

252
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
1,781✔
253
    if err ~= nil then
1,781✔
254
        return nil, {InsertManyError:new(err)}
8✔
255
    end
256

257
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
1,777✔
258
                                   space_name, tuples, opts)
1,777✔
259
end
260

261
--- Inserts batch of objects to the specified space
262
--
263
-- @function objects
264
--
265
-- @param string space_name
266
--  A space name
267
--
268
-- @param table objs
269
--  Objects
270
--
271
-- @tparam ?table opts
272
--  Options of batch_insert.tuples_batch
273
--
274
-- @return[1] objects
275
-- @treturn[2] nil
276
-- @treturn[2] table of tables Error description
277

278
function insert_many.objects(space_name, objs, opts)
248✔
279
    checks('string', 'table', {
74✔
280
        timeout = '?number',
281
        fields = '?table',
282
        stop_on_error = '?boolean',
283
        rollback_on_error = '?boolean',
284
        vshard_router = '?string|table',
285
        skip_nullability_check_on_flatten = '?boolean',
286
        noreturn = '?boolean',
287
        fetch_latest_metadata = '?boolean',
288
    })
289

290
    if next(objs) == nil then
74✔
291
        return nil, {InsertManyError:new("At least one object expected")}
2✔
292
    end
293

294
    opts = opts or {}
73✔
295

296
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
73✔
297
    if err ~= nil then
73✔
298
        return nil, {InsertManyError:new(err)}
8✔
299
    end
300

301
    -- insert can fail if router uses outdated schema to flatten object
302
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
138✔
303

304
    local tuples = {}
69✔
305
    local format_errs = {}
69✔
306

307
    for _, obj in ipairs(objs) do
216✔
308

309
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
298✔
310
                                                    opts.skip_nullability_check_on_flatten)
149✔
311
        if err ~= nil then
149✔
312
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
34✔
313
            err_obj.operation_data = obj
34✔
314

315
            if opts.stop_on_error == true then
34✔
316
                return nil, {err_obj}
2✔
317
            end
318

319
            table.insert(format_errs, err_obj)
32✔
320
        end
321

322
        table.insert(tuples, tuple)
147✔
323
    end
324

325
    if next(tuples) == nil then
67✔
326
        return nil, format_errs
21✔
327
    end
328

329
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
92✔
330
                                              space_name, tuples, opts)
46✔
331

332
    if next(format_errs) ~= nil then
46✔
333
        if errs == nil then
3✔
334
            errs = format_errs
2✔
335
        else
336
            errs = utils.list_extend(errs, format_errs)
2✔
337
        end
338
    end
339

340
    return res, errs
46✔
341
end
342

343
return insert_many
248✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc