• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20596439889

30 Dec 2025 12:21PM UTC coverage: 88.326% (-0.08%) from 88.405%
20596439889

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode every fiber were named as "fast" in order
to kill it on safe enabling.

But there are no yields in crud's storage methods.
And rebalancing trigger is insert/replace in `_bucket` space
which is yield. So when fiber start in fast mode it will
be in fast mode until first `box.space` operation, "write" for
memtx, "read/write" for vinyl.

Therefor there is no need to mark and kill iproto fibers, which handle
fast requests, because there is no situation, when fiber is in fast mode
and rebalancing is in progress.
Pull Request #475: safe/fast mode: remove fiber kill

79 of 96 new or added lines in 11 files covered. (82.29%)

17 existing lines in 9 files now uncovered.

5266 of 5962 relevant lines covered (88.33%)

13541.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.78
/crud/insert_many.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local batching_utils = require('crud.common.batching_utils')
593✔
9
local sharding = require('crud.common.sharding')
593✔
10
local dev_checks = require('crud.common.dev_checks')
593✔
11
local schema = require('crud.common.schema')
593✔
12
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
13

14
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
593✔
15
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
593✔
16

17
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
593✔
18

19
local insert_many = {}
593✔
20

21
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
593✔
22
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
593✔
23

24
local function insert_many_on_storage(space_name, tuples, opts)
25
    dev_checks('string', 'table', {
3,917✔
26
        add_space_schema_hash = '?boolean',
27
        fields = '?table',
28
        stop_on_error = '?boolean',
29
        rollback_on_error = '?boolean',
30
        sharding_key_hash = '?number',
31
        sharding_func_hash = '?number',
32
        skip_sharding_hash_check = '?boolean',
33
        noreturn = '?boolean',
34
        fetch_latest_metadata = '?boolean',
35
    })
36
    yield_checks.register_fiber()
3,917✔
37

38
    opts = opts or {}
3,917✔
39

40
    local space = box.space[space_name]
3,917✔
41
    if space == nil then
3,917✔
NEW
42
        yield_checks.unregister_fiber()
×
UNCOV
43
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}
×
44
    end
45

46
    local _, err = sharding.check_sharding_hash(space_name,
7,834✔
47
                                                opts.sharding_func_hash,
3,917✔
48
                                                opts.sharding_key_hash,
3,917✔
49
                                                opts.skip_sharding_hash_check)
3,917✔
50

51
    if err ~= nil then
3,917✔
52
        yield_checks.unregister_fiber()
12✔
53
        return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
24✔
54
    end
55

56
    local bucket_ids = {}
3,905✔
57
    for _, tuple in ipairs(tuples) do
232,644✔
58
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
457,478✔
59
    end
60

61
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids)
3,905✔
62
    if not ref_ok then
3,905✔
NEW
63
        yield_checks.unregister_fiber()
×
UNCOV
64
        return nil, bucket_ref_err
×
65
    end
66

67
    local inserted_tuples = {}
3,905✔
68
    local errs = {}
3,905✔
69
    local replica_schema_version = nil
3,905✔
70

71
    local insert_opts = {
3,905✔
72
        add_space_schema_hash = opts.add_space_schema_hash,
3,905✔
73
        field_names = opts.fields,
3,905✔
74
        noreturn = opts.noreturn,
3,905✔
75
        fetch_latest_metadata = opts.fetch_latest_metadata,
3,905✔
76
    }
77

78
    yield_checks.check_no_yields()
3,905✔
79
    box.begin()
3,905✔
80
    for i, tuple in ipairs(tuples) do
232,604✔
81
        -- add_space_schema_hash is true only in case of insert_object_many
82
        -- the only one case when reloading schema can avoid insert error
83
        -- is flattening object on router
84
        local insert_result = schema.wrap_func_result(
457,430✔
85
            space, space.insert, insert_opts,
228,715✔
86
            space, tuple
228,715✔
87
        )
228,715✔
88
        if opts.fetch_latest_metadata then
228,715✔
89
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
90
        end
91

92
        if insert_result.err ~= nil then
228,715✔
93
            local err = {
118✔
94
                err = insert_result.err,
118✔
95
                space_schema_hash = insert_result.space_schema_hash,
118✔
96
                operation_data = tuple,
118✔
97
            }
98

99
            table.insert(errs, err)
118✔
100

101
            if opts.stop_on_error == true then
118✔
102
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
103
                if next(left_tuples) then
16✔
104
                    errs = batching_utils.complement_batching_errors(errs,
32✔
105
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
106
                end
107

108
                if opts.rollback_on_error == true then
16✔
109
                    yield_checks.unregister_fiber()
4✔
110
                    local unref_ok, bucket_unref_err = unref(bucket_ids)
4✔
111
                    box.rollback()
4✔
112
                    if not unref_ok then
4✔
113
                        return nil, bucket_unref_err
×
114
                    end
115
                    if next(inserted_tuples) then
4✔
116
                        errs = batching_utils.complement_batching_errors(errs,
8✔
117
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
118
                    end
119

120
                    return nil, errs, replica_schema_version
4✔
121
                end
122

123
                yield_checks.unregister_fiber()
12✔
124
                local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
125
                box.commit()
12✔
126
                if not unref_ok then
12✔
127
                    return nil, bucket_unref_err
×
128
                end
129

130
                return inserted_tuples, errs, replica_schema_version
12✔
131
            end
132
        end
133

134
        table.insert(inserted_tuples, insert_result.res)
228,699✔
135
    end
136

137
    if next(errs) ~= nil then
3,889✔
138
        if opts.rollback_on_error == true then
76✔
139
            yield_checks.unregister_fiber()
12✔
140
            local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
141
            box.rollback()
12✔
142
            if not unref_ok then
12✔
143
                return nil, bucket_unref_err
×
144
            end
145
            if next(inserted_tuples) then
12✔
146
                errs = batching_utils.complement_batching_errors(errs,
24✔
147
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
148
            end
149

150
            return nil, errs, replica_schema_version
12✔
151
        end
152

153
        yield_checks.unregister_fiber()
64✔
154
        local unref_ok, bucket_unref_err = unref(bucket_ids)
64✔
155
        box.commit()
64✔
156
        if not unref_ok then
64✔
157
            return nil, bucket_unref_err
×
158
        end
159

160
        return inserted_tuples, errs, replica_schema_version
64✔
161
    end
162

163
    yield_checks.unregister_fiber()
3,813✔
164
    local unref_ok, bucket_unref_err = unref(bucket_ids)
3,813✔
165
    box.commit()
3,813✔
166
    if not unref_ok then
3,813✔
167
        return nil, bucket_unref_err
×
168
    end
169

170
    return inserted_tuples, nil, replica_schema_version
3,813✔
171
end
172

173
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
593✔
174

175
-- returns result, err, need_reload
176
-- need_reload indicates if reloading schema could help
177
-- see crud.common.schema.wrap_func_reload()
178
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
179
    dev_checks('table', 'string', 'table', {
3,384✔
180
        timeout = '?number',
181
        fields = '?table',
182
        add_space_schema_hash = '?boolean',
183
        stop_on_error = '?boolean',
184
        rollback_on_error = '?boolean',
185
        vshard_router = '?string|table',
186
        skip_nullability_check_on_flatten = '?boolean',
187
        noreturn = '?boolean',
188
        fetch_latest_metadata = '?boolean',
189
    })
190

191
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
3,384✔
192
    if err ~= nil then
3,384✔
193
        return nil, {
×
194
            InsertManyError:new("An error occurred during the operation: %s", err)
×
195
        }, const.NEED_SCHEMA_RELOAD
×
196
    end
197
    if space == nil then
3,384✔
198
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
16✔
199
    end
200

201
    local tuples = table.deepcopy(original_tuples)
3,376✔
202

203
    local batch_insert_on_storage_opts = {
3,376✔
204
        add_space_schema_hash = opts.add_space_schema_hash,
3,376✔
205
        fields = opts.fields,
3,376✔
206
        stop_on_error = opts.stop_on_error,
3,376✔
207
        rollback_on_error = opts.rollback_on_error,
3,376✔
208
        noreturn = opts.noreturn,
3,376✔
209
        fetch_latest_metadata = opts.fetch_latest_metadata,
3,376✔
210
    }
211

212
    local iter, err = BatchInsertIterator:new({
6,752✔
213
        tuples = tuples,
3,376✔
214
        space = space,
3,376✔
215
        execute_on_storage_opts = batch_insert_on_storage_opts,
3,376✔
216
        vshard_router = vshard_router,
3,376✔
217
    })
218
    if err ~= nil then
3,376✔
219
        return nil, {err}, const.NEED_SCHEMA_RELOAD
8✔
220
    end
221

222
    local postprocessor = BatchPostprocessor:new(vshard_router)
3,368✔
223

224
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
6,736✔
225
        timeout = opts.timeout,
3,368✔
226
        mode = 'write',
227
        iter = iter,
3,368✔
228
        postprocessor = postprocessor,
3,368✔
229
    })
230

231
    if errs ~= nil then
3,368✔
232
        local tuples_count = table.maxn(tuples)
84✔
233
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
168✔
234
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
235
        end
236

237
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
152✔
238
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
239
        end
240
    end
241

242
    if next(rows) == nil then
3,352✔
243
        return nil, errs
1,665✔
244
    end
245

246
    if opts.fetch_latest_metadata == true then
1,687✔
247
        -- This option is temporary and is related to [1], [2].
248
        -- [1] https://github.com/tarantool/crud/issues/236
249
        -- [2] https://github.com/tarantool/crud/issues/361
250
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
251
                                                              storages_info, netbox_schema_version)
8✔
252
    end
253

254
    local res, err = utils.format_result(rows, space, opts.fields)
1,687✔
255
    if err ~= nil then
1,687✔
256
        errs = errs or {}
4✔
257
        table.insert(errs, err)
4✔
258
        return nil, errs
4✔
259
    end
260

261
    return res, errs
1,683✔
262
end
263

264
--- Inserts batch of tuples to the specified space
265
--
266
-- @function tuples
267
--
268
-- @param string space_name
269
--  A space name
270
--
271
-- @param table tuples
272
--  Tuples
273
--
274
-- @tparam ?table opts
275
--  Options of batch_insert.tuples_batch
276
--
277
-- @return[1] tuples
278
-- @treturn[2] nil
279
-- @treturn[2] table of tables Error description
280

281
function insert_many.tuples(space_name, tuples, opts)
593✔
282
    checks('string', 'table', {
3,276✔
283
        timeout = '?number',
284
        fields = '?table',
285
        add_space_schema_hash = '?boolean',
286
        stop_on_error = '?boolean',
287
        rollback_on_error = '?boolean',
288
        vshard_router = '?string|table',
289
        noreturn = '?boolean',
290
        fetch_latest_metadata = '?boolean',
291
    })
292

293
    if next(tuples) == nil then
3,276✔
294
        return nil, {InsertManyError:new("At least one tuple expected")}
4✔
295
    end
296

297
    opts = opts or {}
3,274✔
298

299
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
3,274✔
300
    if err ~= nil then
3,274✔
301
        return nil, {InsertManyError:new(err)}
8✔
302
    end
303

304
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
3,270✔
305
                                   space_name, tuples, opts)
3,270✔
306
end
307

308
--- Inserts batch of objects to the specified space
309
--
310
-- @function objects
311
--
312
-- @param string space_name
313
--  A space name
314
--
315
-- @param table objs
316
--  Objects
317
--
318
-- @tparam ?table opts
319
--  Options of batch_insert.tuples_batch
320
--
321
-- @return[1] objects
322
-- @treturn[2] nil
323
-- @treturn[2] table of tables Error description
324

325
function insert_many.objects(space_name, objs, opts)
593✔
326
    checks('string', 'table', {
144✔
327
        timeout = '?number',
328
        fields = '?table',
329
        stop_on_error = '?boolean',
330
        rollback_on_error = '?boolean',
331
        vshard_router = '?string|table',
332
        skip_nullability_check_on_flatten = '?boolean',
333
        noreturn = '?boolean',
334
        fetch_latest_metadata = '?boolean',
335
    })
336

337
    if next(objs) == nil then
144✔
338
        return nil, {InsertManyError:new("At least one object expected")}
4✔
339
    end
340

341
    opts = opts or {}
142✔
342

343
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
142✔
344
    if err ~= nil then
142✔
345
        return nil, {InsertManyError:new(err)}
8✔
346
    end
347

348
    -- insert can fail if router uses outdated schema to flatten object
349
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
350

351
    local tuples = {}
138✔
352
    local format_errs = {}
138✔
353

354
    for _, obj in ipairs(objs) do
434✔
355

356
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
600✔
357
                                                    opts.skip_nullability_check_on_flatten)
300✔
358
        if err ~= nil then
300✔
359
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
64✔
360
            err_obj.operation_data = obj
64✔
361

362
            if opts.stop_on_error == true then
64✔
363
                return nil, {err_obj}
4✔
364
            end
365

366
            table.insert(format_errs, err_obj)
60✔
367
        end
368

369
        table.insert(tuples, tuple)
296✔
370
    end
371

372
    if next(tuples) == nil then
134✔
373
        return nil, format_errs
40✔
374
    end
375

376
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
188✔
377
                                              space_name, tuples, opts)
94✔
378

379
    if next(format_errs) ~= nil then
94✔
380
        if errs == nil then
6✔
381
            errs = format_errs
4✔
382
        else
383
            errs = utils.list_extend(errs, format_errs)
4✔
384
        end
385
    end
386

387
    return res, errs
94✔
388
end
389

390
return insert_many
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc