• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20597468703

30 Dec 2025 01:20PM UTC coverage: 88.552% (+0.1%) from 88.405%
20597468703

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode every fiber were named as "fast" in order
to kill it on safe enabling.

But there are no yields in crud's storage methods.
And rebalancing trigger is insert/replace in `_bucket` space
which is yield. So when fiber start in fast mode it will
be in fast mode until first `box.space` operation, "write" for
memtx, "read/write" for vinyl.

Therefor there is no need to mark and kill iproto fibers, which handle
fast requests, because there is no situation, when fiber is in fast mode
and rebalancing is in progress.
Pull Request #475: safe/fast mode: remove fiber kill

82 of 101 new or added lines in 10 files covered. (81.19%)

92 existing lines in 11 files now uncovered.

5260 of 5940 relevant lines covered (88.55%)

13717.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.17
/crud/insert.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local dev_checks = require('crud.common.dev_checks')
593✔
10
local schema = require('crud.common.schema')
593✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
12

13
local InsertError = errors.new_class('InsertError', {capture_stack = false})
593✔
14

15
local insert = {}
593✔
16

17
local INSERT_FUNC_NAME = 'insert_on_storage'
593✔
18
local CRUD_INSERT_FUNC_NAME = utils.get_storage_call(INSERT_FUNC_NAME)
593✔
19

20
local function insert_on_storage(space_name, tuple, opts)
21
    dev_checks('string', 'table', {
128,173✔
22
        add_space_schema_hash = '?boolean',
23
        fields = '?table',
24
        sharding_key_hash = '?number',
25
        sharding_func_hash = '?number',
26
        skip_sharding_hash_check = '?boolean',
27
        noreturn = '?boolean',
28
        fetch_latest_metadata = '?boolean',
29
    })
30
    -- no-yield guard for tests (TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS)
31
    local finish = yield_checks.start()
128,173✔
32

33
    opts = opts or {}
128,173✔
34

35
    local space = box.space[space_name]
128,173✔
36
    if space == nil then
128,173✔
37
        return finish(nil, InsertError:new("Space %q doesn't exist", space_name))
108✔
38
    end
39

40
    local _, err = sharding.check_sharding_hash(space_name,
256,238✔
41
                                                opts.sharding_func_hash,
128,119✔
42
                                                opts.sharding_key_hash,
128,119✔
43
                                                opts.skip_sharding_hash_check)
128,119✔
44

45
    if err ~= nil then
128,119✔
46
        return finish(nil, err)
144✔
47
    end
48

49
    local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
255,950✔
50
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
127,975✔
51

52
    if not ref_ok then
127,975✔
NEW
UNCOV
53
        return finish(nil, bucket_ref_err)
×
54
    end
55

56
    yield_checks.check_no_yields()
127,975✔
57
    -- add_space_schema_hash is true only in case of insert_object
58
    -- the only one case when reloading schema can avoid insert error
59
    -- is flattening object on router
60
    local result = schema.wrap_func_result(space, space.insert, {
255,950✔
61
        add_space_schema_hash = opts.add_space_schema_hash,
127,975✔
62
        field_names = opts.fields,
127,975✔
63
        noreturn = opts.noreturn,
127,975✔
64
        fetch_latest_metadata = opts.fetch_latest_metadata,
127,975✔
65
    }, space, tuple)
127,975✔
66

67
    finish()
127,975✔
68

69
    local unref_ok, err_unref = unref(bucket_id)
127,975✔
70
    if not unref_ok then
127,975✔
UNCOV
71
        return nil, err_unref
×
72
    end
73

74
    return result
127,975✔
75
end
76

77
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}
593✔
78

79
-- returns result, err, need_reload
80
-- need_reload indicates if reloading schema could help
81
-- see crud.common.schema.wrap_func_reload()
82
local function call_insert_on_router(vshard_router, space_name, original_tuple, opts)
83
    dev_checks('table', 'string', 'table', {
128,148✔
84
        timeout = '?number',
85
        bucket_id = '?',
86
        add_space_schema_hash = '?boolean',
87
        fields = '?table',
88
        vshard_router = '?string|table',
89
        skip_nullability_check_on_flatten = '?boolean',
90
        noreturn = '?boolean',
91
        fetch_latest_metadata = '?boolean',
92
    })
93

94
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
128,148✔
95
    if err ~= nil then
128,148✔
UNCOV
96
        return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
97
    end
98
    if space == nil then
128,148✔
99
        return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
84✔
100
    end
101

102
    local tuple = table.deepcopy(original_tuple)
128,106✔
103

104
    local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id)
128,106✔
105
    if err ~= nil then
128,106✔
106
        return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD
60✔
107
    end
108

109
    local insert_on_storage_opts = {
128,076✔
110
        add_space_schema_hash = opts.add_space_schema_hash,
128,076✔
111
        fields = opts.fields,
128,076✔
112
        sharding_func_hash = sharding_data.sharding_func_hash,
128,076✔
113
        sharding_key_hash = sharding_data.sharding_key_hash,
128,076✔
114
        skip_sharding_hash_check = sharding_data.skip_sharding_hash_check,
128,076✔
115
        noreturn = opts.noreturn,
128,076✔
116
        fetch_latest_metadata = opts.fetch_latest_metadata,
128,076✔
117
    }
118

119
    local call_opts = {
128,076✔
120
        mode = 'write',
121
        timeout = opts.timeout,
128,076✔
122
    }
123

124
    local storage_result, err = call.single(vshard_router,
256,152✔
125
        sharding_data.bucket_id, CRUD_INSERT_FUNC_NAME,
128,076✔
126
        {space_name, tuple, insert_on_storage_opts},
128,076✔
127
        call_opts
128
    )
128,076✔
129

130
    if err ~= nil then
128,076✔
131
        local err_wrapped = InsertError:new("Failed to call insert on storage-side: %s", err)
101✔
132

133
        if sharding.result_needs_sharding_reload(err) then
202✔
134
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
72✔
135
        end
136

137
        return nil, err_wrapped
29✔
138
    end
139

140
    if storage_result.err ~= nil then
127,975✔
141
        local err_wrapped = InsertError:new("Failed to insert: %s", storage_result.err)
1,248✔
142

143
        if schema.result_needs_reload(space, storage_result) then
2,496✔
144
            return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
12✔
145
        end
146

147
        return nil, err_wrapped
1,236✔
148
    end
149

150
    if opts.noreturn == true then
126,727✔
151
        return nil
4✔
152
    end
153

154
    local tuple = storage_result.res
126,723✔
155

156
    if opts.fetch_latest_metadata == true then
126,723✔
157
        -- This option is temporary and is related to [1], [2].
158
        -- [1] https://github.com/tarantool/crud/issues/236
159
        -- [2] https://github.com/tarantool/crud/issues/361
160
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
161
                                                                vshard_router, opts, storage_result.storage_info)
8✔
162
    end
163

164
    return utils.format_result({tuple}, space, opts.fields)
126,723✔
165
end
166

167
--- Inserts a tuple to the specified space
168
--
169
-- @function tuple
170
--
171
-- @param string space_name
172
--  A space name
173
--
174
-- @param table tuple
175
--  Tuple
176
--
177
-- @tparam ?number opts.timeout
178
--  Function call timeout
179
--
180
-- @tparam ?number opts.bucket_id
181
--  Bucket ID
182
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
183
--
184
-- @tparam ?string|table opts.vshard_router
185
--  Cartridge vshard group name or vshard router instance.
186
--  Set this parameter if your space is not a part of the
187
--  default vshard cluster.
188
--
189
-- @tparam ?boolean opts.noreturn
190
--  Suppress returning successfully processed tuple.
191
--
192
-- @return[1] tuple
193
-- @treturn[2] nil
194
-- @treturn[2] table Error description
195
--
196
function insert.tuple(space_name, tuple, opts)
593✔
197
    checks('string', 'table', {
120,184✔
198
        timeout = '?number',
199
        bucket_id = '?',
200
        add_space_schema_hash = '?boolean',
201
        fields = '?table',
202
        vshard_router = '?string|table',
203
        noreturn = '?boolean',
204
        fetch_latest_metadata = '?boolean',
205
    })
206

207
    opts = opts or {}
120,184✔
208

209
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
120,184✔
210
    if err ~= nil then
120,184✔
211
        return nil, InsertError:new(err)
8✔
212
    end
213

214
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
120,180✔
215
                                   space_name, tuple, opts)
120,180✔
216
end
217

218
--- Inserts an object to the specified space
219
--
220
-- @function object
221
--
222
-- @param string space_name
223
--  A space name
224
--
225
-- @param table obj
226
--  Object
227
--
228
-- @tparam ?table opts
229
--  Options of insert.tuple
230
--
231
-- @return[1] object
232
-- @treturn[2] nil
233
-- @treturn[2] table Error description
234
--
235
function insert.object(space_name, obj, opts)
593✔
236
    checks('string', 'table', {
7,886✔
237
        timeout = '?number',
238
        bucket_id = '?',
239
        add_space_schema_hash = '?boolean',
240
        fields = '?table',
241
        vshard_router = '?string|table',
242
        skip_nullability_check_on_flatten = '?boolean',
243
        noreturn = '?boolean',
244
        fetch_latest_metadata = '?boolean',
245
    })
246

247
    opts = opts or {}
7,886✔
248

249
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
7,886✔
250
    if err ~= nil then
7,886✔
251
        return nil, InsertError:new(err)
8✔
252
    end
253

254
    -- insert can fail if router uses outdated schema to flatten object
255
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
15,764✔
256

257
    local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
15,764✔
258
                                                opts.skip_nullability_check_on_flatten)
7,882✔
259
    if err ~= nil then
7,882✔
260
        return nil, InsertError:new("Failed to flatten object: %s", err)
80✔
261
    end
262

263
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
7,842✔
264
                                   space_name, tuple, opts)
7,842✔
265
end
266

267
return insert
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc