• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20598520595

30 Dec 2025 02:14PM UTC coverage: 88.535% (+0.1%) from 88.405%
20598520595

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode, every fiber was named “fast” so it could be
killed when safe mode was enabled. However, crud-storage
methods do not yield until a space operation is performed.

The rebalancing trigger is executed on insert/replace into
the _bucket space; this operation does yield and switches
the implementation of the bucket_ref/bucket_unref functions.

Therefore, once a fiber starts in fast mode, it remains
in fast mode until the first box.space operation: a write
for memtx or a read/write for vinyl.

As a result, there is no need to mark and kill iproto fibers
that handle fast requests, because there is no situation in
which a fiber remains in fast mode while rebalancing is in progress.

Yield checks were added to the tests (yield_checks) to
ensure that no yields occur during request until box operations.
Pull Request #475: safe/fast mode: remove fiber kill

82 of 101 new or added lines in 10 files covered. (81.19%)

92 existing lines in 11 files now uncovered.

5259 of 5940 relevant lines covered (88.54%)

13682.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.17
/crud/insert.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local dev_checks = require('crud.common.dev_checks')
593✔
10
local schema = require('crud.common.schema')
593✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
12

13
local InsertError = errors.new_class('InsertError', {capture_stack = false})
593✔
14

15
local insert = {}
593✔
16

17
local INSERT_FUNC_NAME = 'insert_on_storage'
593✔
18
local CRUD_INSERT_FUNC_NAME = utils.get_storage_call(INSERT_FUNC_NAME)
593✔
19

20
local function insert_on_storage(space_name, tuple, opts)
21
    dev_checks('string', 'table', {
128,172✔
22
        add_space_schema_hash = '?boolean',
23
        fields = '?table',
24
        sharding_key_hash = '?number',
25
        sharding_func_hash = '?number',
26
        skip_sharding_hash_check = '?boolean',
27
        noreturn = '?boolean',
28
        fetch_latest_metadata = '?boolean',
29
    })
30
    -- no-yield guard for tests (TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS)
31
    local finish = yield_checks.start()
128,172✔
32

33
    opts = opts or {}
128,172✔
34

35
    local space = box.space[space_name]
128,172✔
36
    if space == nil then
128,172✔
37
        return finish(nil, InsertError:new("Space %q doesn't exist", space_name))
96✔
38
    end
39

40
    local _, err = sharding.check_sharding_hash(space_name,
256,248✔
41
                                                opts.sharding_func_hash,
128,124✔
42
                                                opts.sharding_key_hash,
128,124✔
43
                                                opts.skip_sharding_hash_check)
128,124✔
44

45
    if err ~= nil then
128,124✔
46
        return finish(nil, err)
144✔
47
    end
48

49
    local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
255,960✔
50
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
127,980✔
51

52
    if not ref_ok then
127,980✔
NEW
UNCOV
53
        return finish(nil, bucket_ref_err)
×
54
    end
55

56
    yield_checks.check_no_yields()
127,980✔
57
    -- add_space_schema_hash is true only in case of insert_object
58
    -- the only one case when reloading schema can avoid insert error
59
    -- is flattening object on router
60
    local result = schema.wrap_func_result(space, space.insert, {
255,960✔
61
        add_space_schema_hash = opts.add_space_schema_hash,
127,980✔
62
        field_names = opts.fields,
127,980✔
63
        noreturn = opts.noreturn,
127,980✔
64
        fetch_latest_metadata = opts.fetch_latest_metadata,
127,980✔
65
    }, space, tuple)
127,980✔
66

67
    finish()
127,980✔
68

69
    local unref_ok, err_unref = unref(bucket_id)
127,980✔
70
    if not unref_ok then
127,980✔
UNCOV
71
        return nil, err_unref
×
72
    end
73

74
    return result
127,980✔
75
end
76

77
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}
593✔
78

79
-- returns result, err, need_reload
80
-- need_reload indicates if reloading schema could help
81
-- see crud.common.schema.wrap_func_reload()
82
local function call_insert_on_router(vshard_router, space_name, original_tuple, opts)
83
    dev_checks('table', 'string', 'table', {
128,160✔
84
        timeout = '?number',
85
        bucket_id = '?',
86
        add_space_schema_hash = '?boolean',
87
        fields = '?table',
88
        vshard_router = '?string|table',
89
        skip_nullability_check_on_flatten = '?boolean',
90
        noreturn = '?boolean',
91
        fetch_latest_metadata = '?boolean',
92
    })
93

94
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
128,160✔
95
    if err ~= nil then
128,160✔
UNCOV
96
        return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
97
    end
98
    if space == nil then
128,160✔
99
        return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
104✔
100
    end
101

102
    local tuple = table.deepcopy(original_tuple)
128,108✔
103

104
    local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id)
128,108✔
105
    if err ~= nil then
128,108✔
106
        return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD
60✔
107
    end
108

109
    local insert_on_storage_opts = {
128,078✔
110
        add_space_schema_hash = opts.add_space_schema_hash,
128,078✔
111
        fields = opts.fields,
128,078✔
112
        sharding_func_hash = sharding_data.sharding_func_hash,
128,078✔
113
        sharding_key_hash = sharding_data.sharding_key_hash,
128,078✔
114
        skip_sharding_hash_check = sharding_data.skip_sharding_hash_check,
128,078✔
115
        noreturn = opts.noreturn,
128,078✔
116
        fetch_latest_metadata = opts.fetch_latest_metadata,
128,078✔
117
    }
118

119
    local call_opts = {
128,078✔
120
        mode = 'write',
121
        timeout = opts.timeout,
128,078✔
122
    }
123

124
    local storage_result, err = call.single(vshard_router,
256,156✔
125
        sharding_data.bucket_id, CRUD_INSERT_FUNC_NAME,
128,078✔
126
        {space_name, tuple, insert_on_storage_opts},
128,078✔
127
        call_opts
128
    )
128,078✔
129

130
    if err ~= nil then
128,078✔
131
        local err_wrapped = InsertError:new("Failed to call insert on storage-side: %s", err)
98✔
132

133
        if sharding.result_needs_sharding_reload(err) then
196✔
134
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
72✔
135
        end
136

137
        return nil, err_wrapped
26✔
138
    end
139

140
    if storage_result.err ~= nil then
127,980✔
141
        local err_wrapped = InsertError:new("Failed to insert: %s", storage_result.err)
1,248✔
142

143
        if schema.result_needs_reload(space, storage_result) then
2,496✔
144
            return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
12✔
145
        end
146

147
        return nil, err_wrapped
1,236✔
148
    end
149

150
    if opts.noreturn == true then
126,732✔
151
        return nil
4✔
152
    end
153

154
    local tuple = storage_result.res
126,728✔
155

156
    if opts.fetch_latest_metadata == true then
126,728✔
157
        -- This option is temporary and is related to [1], [2].
158
        -- [1] https://github.com/tarantool/crud/issues/236
159
        -- [2] https://github.com/tarantool/crud/issues/361
160
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
161
                                                                vshard_router, opts, storage_result.storage_info)
8✔
162
    end
163

164
    return utils.format_result({tuple}, space, opts.fields)
126,728✔
165
end
166

167
--- Inserts a tuple to the specified space
168
--
169
-- @function tuple
170
--
171
-- @param string space_name
172
--  A space name
173
--
174
-- @param table tuple
175
--  Tuple
176
--
177
-- @tparam ?number opts.timeout
178
--  Function call timeout
179
--
180
-- @tparam ?number opts.bucket_id
181
--  Bucket ID
182
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
183
--
184
-- @tparam ?string|table opts.vshard_router
185
--  Cartridge vshard group name or vshard router instance.
186
--  Set this parameter if your space is not a part of the
187
--  default vshard cluster.
188
--
189
-- @tparam ?boolean opts.noreturn
190
--  Suppress returning successfully processed tuple.
191
--
192
-- @return[1] tuple
193
-- @treturn[2] nil
194
-- @treturn[2] table Error description
195
--
196
function insert.tuple(space_name, tuple, opts)
593✔
197
    checks('string', 'table', {
120,189✔
198
        timeout = '?number',
199
        bucket_id = '?',
200
        add_space_schema_hash = '?boolean',
201
        fields = '?table',
202
        vshard_router = '?string|table',
203
        noreturn = '?boolean',
204
        fetch_latest_metadata = '?boolean',
205
    })
206

207
    opts = opts or {}
120,189✔
208

209
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
120,189✔
210
    if err ~= nil then
120,189✔
211
        return nil, InsertError:new(err)
8✔
212
    end
213

214
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
120,185✔
215
                                   space_name, tuple, opts)
120,185✔
216
end
217

218
--- Inserts an object to the specified space
219
--
220
-- @function object
221
--
222
-- @param string space_name
223
--  A space name
224
--
225
-- @param table obj
226
--  Object
227
--
228
-- @tparam ?table opts
229
--  Options of insert.tuple
230
--
231
-- @return[1] object
232
-- @treturn[2] nil
233
-- @treturn[2] table Error description
234
--
235
function insert.object(space_name, obj, opts)
593✔
236
    checks('string', 'table', {
7,886✔
237
        timeout = '?number',
238
        bucket_id = '?',
239
        add_space_schema_hash = '?boolean',
240
        fields = '?table',
241
        vshard_router = '?string|table',
242
        skip_nullability_check_on_flatten = '?boolean',
243
        noreturn = '?boolean',
244
        fetch_latest_metadata = '?boolean',
245
    })
246

247
    opts = opts or {}
7,886✔
248

249
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
7,886✔
250
    if err ~= nil then
7,886✔
251
        return nil, InsertError:new(err)
8✔
252
    end
253

254
    -- insert can fail if router uses outdated schema to flatten object
255
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
15,764✔
256

257
    local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
15,764✔
258
                                                opts.skip_nullability_check_on_flatten)
7,882✔
259
    if err ~= nil then
7,882✔
260
        return nil, InsertError:new("Failed to flatten object: %s", err)
80✔
261
    end
262

263
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
7,842✔
264
                                   space_name, tuple, opts)
7,842✔
265
end
266

267
return insert
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc