• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20598520595

30 Dec 2025 02:14PM UTC coverage: 88.535% (+0.1%) from 88.405%
20598520595

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode, every fiber was named “fast” so it could be
killed when safe mode was enabled. However, crud-storage
methods do not yield until a space operation is performed.

The rebalancing trigger is executed on insert/replace into
the _bucket space; this operation does yield and switches
the implementation of the bucket_ref/bucket_unref functions.

Therefore, once a fiber starts in fast mode, it remains
in fast mode until the first box.space operation: a write
for memtx or a read/write for vinyl.

As a result, there is no need to mark and kill iproto fibers
that handle fast requests, because there is no situation in
which a fiber remains in fast mode while rebalancing is in progress.

Yield checks were added to the tests (yield_checks) to
ensure that no yields occur during request until box operations.
Pull Request #475: safe/fast mode: remove fiber kill

82 of 101 new or added lines in 10 files covered. (81.19%)

92 existing lines in 11 files now uncovered.

5259 of 5940 relevant lines covered (88.54%)

13682.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/crud/insert_many.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local batching_utils = require('crud.common.batching_utils')
593✔
9
local sharding = require('crud.common.sharding')
593✔
10
local dev_checks = require('crud.common.dev_checks')
593✔
11
local schema = require('crud.common.schema')
593✔
12
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
13

14
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
593✔
15
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
593✔
16

17
local InsertManyError = errors.new_class('InsertManyError', {capture_stack = false})
593✔
18

19
local insert_many = {}
593✔
20

21
local INSERT_MANY_FUNC_NAME = 'insert_many_on_storage'
593✔
22
local CRUD_INSERT_MANY_FUNC_NAME = utils.get_storage_call(INSERT_MANY_FUNC_NAME)
593✔
23

24
local function insert_many_on_storage(space_name, tuples, opts)
25
    dev_checks('string', 'table', {
4,105✔
26
        add_space_schema_hash = '?boolean',
27
        fields = '?table',
28
        stop_on_error = '?boolean',
29
        rollback_on_error = '?boolean',
30
        sharding_key_hash = '?number',
31
        sharding_func_hash = '?number',
32
        skip_sharding_hash_check = '?boolean',
33
        noreturn = '?boolean',
34
        fetch_latest_metadata = '?boolean',
35
    })
36
    -- no-yield guard for tests (TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS)
37
    local finish = yield_checks.start()
4,105✔
38

39
    opts = opts or {}
4,105✔
40

41
    local space = box.space[space_name]
4,105✔
42
    if space == nil then
4,105✔
NEW
UNCOV
43
        return finish(nil, {InsertManyError:new("Space %q doesn't exist", space_name)})
×
44
    end
45

46
    local _, err = sharding.check_sharding_hash(space_name,
8,210✔
47
                                                opts.sharding_func_hash,
4,105✔
48
                                                opts.sharding_key_hash,
4,105✔
49
                                                opts.skip_sharding_hash_check)
4,105✔
50

51
    if err ~= nil then
4,105✔
52
        return finish(nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples))
24✔
53
    end
54

55
    local bucket_ids = {}
4,093✔
56
    for _, tuple in ipairs(tuples) do
232,869✔
57
        bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
457,552✔
58
    end
59

60
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids)
4,093✔
61
    if not ref_ok then
4,093✔
NEW
UNCOV
62
        return finish(nil, bucket_ref_err)
×
63
    end
64

65
    local inserted_tuples = {}
4,093✔
66
    local errs = {}
4,093✔
67
    local replica_schema_version = nil
4,093✔
68

69
    local insert_opts = {
4,093✔
70
        add_space_schema_hash = opts.add_space_schema_hash,
4,093✔
71
        field_names = opts.fields,
4,093✔
72
        noreturn = opts.noreturn,
4,093✔
73
        fetch_latest_metadata = opts.fetch_latest_metadata,
4,093✔
74
    }
75

76
    yield_checks.check_no_yields()
4,093✔
77
    box.begin()
4,093✔
78
    for i, tuple in ipairs(tuples) do
232,829✔
79
        -- add_space_schema_hash is true only in case of insert_object_many
80
        -- the only one case when reloading schema can avoid insert error
81
        -- is flattening object on router
82
        local insert_result = schema.wrap_func_result(
457,504✔
83
            space, space.insert, insert_opts,
228,752✔
84
            space, tuple
228,752✔
85
        )
228,752✔
86
        if opts.fetch_latest_metadata then
228,752✔
87
            replica_schema_version = insert_result.storage_info.replica_schema_version
4✔
88
        end
89

90
        if insert_result.err ~= nil then
228,752✔
91
            local err = {
118✔
92
                err = insert_result.err,
118✔
93
                space_schema_hash = insert_result.space_schema_hash,
118✔
94
                operation_data = tuple,
118✔
95
            }
96

97
            table.insert(errs, err)
118✔
98

99
            if opts.stop_on_error == true then
118✔
100
                local left_tuples = utils.list_slice(tuples, i + 1)
16✔
101
                if next(left_tuples) then
16✔
102
                    errs = batching_utils.complement_batching_errors(errs,
32✔
103
                            batching_utils.stop_on_error_msg, left_tuples)
32✔
104
                end
105

106
                if opts.rollback_on_error == true then
16✔
107
                    finish()
4✔
108
                    local unref_ok, bucket_unref_err = unref(bucket_ids)
4✔
109
                    box.rollback()
4✔
110
                    if not unref_ok then
4✔
UNCOV
111
                        return nil, bucket_unref_err
×
112
                    end
113
                    if next(inserted_tuples) then
4✔
114
                        errs = batching_utils.complement_batching_errors(errs,
8✔
115
                                batching_utils.rollback_on_error_msg, inserted_tuples)
8✔
116
                    end
117

118
                    return nil, errs, replica_schema_version
4✔
119
                end
120

121
                finish()
12✔
122
                local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
123
                box.commit()
12✔
124
                if not unref_ok then
12✔
UNCOV
125
                    return nil, bucket_unref_err
×
126
                end
127

128
                return inserted_tuples, errs, replica_schema_version
12✔
129
            end
130
        end
131

132
        table.insert(inserted_tuples, insert_result.res)
228,736✔
133
    end
134

135
    if next(errs) ~= nil then
4,077✔
136
        if opts.rollback_on_error == true then
76✔
137
            finish()
12✔
138
            local unref_ok, bucket_unref_err = unref(bucket_ids)
12✔
139
            box.rollback()
12✔
140
            if not unref_ok then
12✔
UNCOV
141
                return nil, bucket_unref_err
×
142
            end
143
            if next(inserted_tuples) then
12✔
144
                errs = batching_utils.complement_batching_errors(errs,
24✔
145
                        batching_utils.rollback_on_error_msg, inserted_tuples)
24✔
146
            end
147

148
            return nil, errs, replica_schema_version
12✔
149
        end
150

151
        finish()
64✔
152
        local unref_ok, bucket_unref_err = unref(bucket_ids)
64✔
153
        box.commit()
64✔
154
        if not unref_ok then
64✔
UNCOV
155
            return nil, bucket_unref_err
×
156
        end
157

158
        return inserted_tuples, errs, replica_schema_version
64✔
159
    end
160

161
    finish()
4,001✔
162
    local unref_ok, bucket_unref_err = unref(bucket_ids)
4,001✔
163
    box.commit()
4,001✔
164
    if not unref_ok then
4,001✔
UNCOV
165
        return nil, bucket_unref_err
×
166
    end
167

168
    return inserted_tuples, nil, replica_schema_version
4,001✔
169
end
170

171
insert_many.storage_api = {[INSERT_MANY_FUNC_NAME] = insert_many_on_storage}
593✔
172

173
-- returns result, err, need_reload
174
-- need_reload indicates if reloading schema could help
175
-- see crud.common.schema.wrap_func_reload()
176
local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts)
177
    dev_checks('table', 'string', 'table', {
3,565✔
178
        timeout = '?number',
179
        fields = '?table',
180
        add_space_schema_hash = '?boolean',
181
        stop_on_error = '?boolean',
182
        rollback_on_error = '?boolean',
183
        vshard_router = '?string|table',
184
        skip_nullability_check_on_flatten = '?boolean',
185
        noreturn = '?boolean',
186
        fetch_latest_metadata = '?boolean',
187
    })
188

189
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
3,565✔
190
    if err ~= nil then
3,565✔
UNCOV
191
        return nil, {
×
UNCOV
192
            InsertManyError:new("An error occurred during the operation: %s", err)
×
UNCOV
193
        }, const.NEED_SCHEMA_RELOAD
×
194
    end
195
    if space == nil then
3,565✔
196
        return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD
16✔
197
    end
198

199
    local tuples = table.deepcopy(original_tuples)
3,557✔
200

201
    local batch_insert_on_storage_opts = {
3,557✔
202
        add_space_schema_hash = opts.add_space_schema_hash,
3,557✔
203
        fields = opts.fields,
3,557✔
204
        stop_on_error = opts.stop_on_error,
3,557✔
205
        rollback_on_error = opts.rollback_on_error,
3,557✔
206
        noreturn = opts.noreturn,
3,557✔
207
        fetch_latest_metadata = opts.fetch_latest_metadata,
3,557✔
208
    }
209

210
    local iter, err = BatchInsertIterator:new({
7,114✔
211
        tuples = tuples,
3,557✔
212
        space = space,
3,557✔
213
        execute_on_storage_opts = batch_insert_on_storage_opts,
3,557✔
214
        vshard_router = vshard_router,
3,557✔
215
    })
216
    if err ~= nil then
3,557✔
217
        return nil, {err}, const.NEED_SCHEMA_RELOAD
8✔
218
    end
219

220
    local postprocessor = BatchPostprocessor:new(vshard_router)
3,549✔
221

222
    local rows, errs, storages_info = call.map(vshard_router, CRUD_INSERT_MANY_FUNC_NAME, nil, {
7,098✔
223
        timeout = opts.timeout,
3,549✔
224
        mode = 'write',
225
        iter = iter,
3,549✔
226
        postprocessor = postprocessor,
3,549✔
227
    })
228

229
    if errs ~= nil then
3,549✔
230
        local tuples_count = table.maxn(tuples)
96✔
231
        if sharding.batching_result_needs_sharding_reload(errs, tuples_count) then
192✔
232
            return nil, errs, const.NEED_SHARDING_RELOAD
8✔
233
        end
234

235
        if schema.batching_result_needs_reload(space, errs, tuples_count) then
176✔
236
            return nil, errs, const.NEED_SCHEMA_RELOAD
8✔
237
        end
238
    end
239

240
    if next(rows) == nil then
3,533✔
241
        return nil, errs
1,776✔
242
    end
243

244
    if opts.fetch_latest_metadata == true then
1,757✔
245
        -- This option is temporary and is related to [1], [2].
246
        -- [1] https://github.com/tarantool/crud/issues/236
247
        -- [2] https://github.com/tarantool/crud/issues/361
248
        space = utils.fetch_latest_metadata_when_map_storages(space, space_name, vshard_router, opts,
8✔
249
                                                              storages_info, netbox_schema_version)
8✔
250
    end
251

252
    local res, err = utils.format_result(rows, space, opts.fields)
1,757✔
253
    if err ~= nil then
1,757✔
254
        errs = errs or {}
4✔
255
        table.insert(errs, err)
4✔
256
        return nil, errs
4✔
257
    end
258

259
    return res, errs
1,753✔
260
end
261

262
--- Inserts batch of tuples to the specified space
263
--
264
-- @function tuples
265
--
266
-- @param string space_name
267
--  A space name
268
--
269
-- @param table tuples
270
--  Tuples
271
--
272
-- @tparam ?table opts
273
--  Options of batch_insert.tuples_batch
274
--
275
-- @return[1] tuples
276
-- @treturn[2] nil
277
-- @treturn[2] table of tables Error description
278

279
function insert_many.tuples(space_name, tuples, opts)
593✔
280
    checks('string', 'table', {
3,457✔
281
        timeout = '?number',
282
        fields = '?table',
283
        add_space_schema_hash = '?boolean',
284
        stop_on_error = '?boolean',
285
        rollback_on_error = '?boolean',
286
        vshard_router = '?string|table',
287
        noreturn = '?boolean',
288
        fetch_latest_metadata = '?boolean',
289
    })
290

291
    if next(tuples) == nil then
3,457✔
292
        return nil, {InsertManyError:new("At least one tuple expected")}
4✔
293
    end
294

295
    opts = opts or {}
3,455✔
296

297
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
3,455✔
298
    if err ~= nil then
3,455✔
299
        return nil, {InsertManyError:new(err)}
8✔
300
    end
301

302
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
3,451✔
303
                                   space_name, tuples, opts)
3,451✔
304
end
305

306
--- Inserts batch of objects to the specified space
307
--
308
-- @function objects
309
--
310
-- @param string space_name
311
--  A space name
312
--
313
-- @param table objs
314
--  Objects
315
--
316
-- @tparam ?table opts
317
--  Options of batch_insert.tuples_batch
318
--
319
-- @return[1] objects
320
-- @treturn[2] nil
321
-- @treturn[2] table of tables Error description
322

323
function insert_many.objects(space_name, objs, opts)
593✔
324
    checks('string', 'table', {
144✔
325
        timeout = '?number',
326
        fields = '?table',
327
        stop_on_error = '?boolean',
328
        rollback_on_error = '?boolean',
329
        vshard_router = '?string|table',
330
        skip_nullability_check_on_flatten = '?boolean',
331
        noreturn = '?boolean',
332
        fetch_latest_metadata = '?boolean',
333
    })
334

335
    if next(objs) == nil then
144✔
336
        return nil, {InsertManyError:new("At least one object expected")}
4✔
337
    end
338

339
    opts = opts or {}
142✔
340

341
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
142✔
342
    if err ~= nil then
142✔
343
        return nil, {InsertManyError:new(err)}
8✔
344
    end
345

346
    -- insert can fail if router uses outdated schema to flatten object
347
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
276✔
348

349
    local tuples = {}
138✔
350
    local format_errs = {}
138✔
351

352
    for _, obj in ipairs(objs) do
434✔
353

354
        local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
600✔
355
                                                    opts.skip_nullability_check_on_flatten)
300✔
356
        if err ~= nil then
300✔
357
            local err_obj = InsertManyError:new("Failed to flatten object: %s", err)
64✔
358
            err_obj.operation_data = obj
64✔
359

360
            if opts.stop_on_error == true then
64✔
361
                return nil, {err_obj}
4✔
362
            end
363

364
            table.insert(format_errs, err_obj)
60✔
365
        end
366

367
        table.insert(tuples, tuple)
296✔
368
    end
369

370
    if next(tuples) == nil then
134✔
371
        return nil, format_errs
40✔
372
    end
373

374
    local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router,
188✔
375
                                              space_name, tuples, opts)
94✔
376

377
    if next(format_errs) ~= nil then
94✔
378
        if errs == nil then
6✔
379
            errs = format_errs
4✔
380
        else
381
            errs = utils.list_extend(errs, format_errs)
4✔
382
        end
383
    end
384

385
    return res, errs
94✔
386
end
387

388
return insert_many
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc