• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20596439889

30 Dec 2025 12:21PM UTC coverage: 88.326% (-0.08%) from 88.405%
20596439889

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode every fiber were named as "fast" in order
to kill it on safe enabling.

But there are no yields in crud's storage methods.
And rebalancing trigger is insert/replace in `_bucket` space
which is yield. So when fiber start in fast mode it will
be in fast mode until first `box.space` operation, "write" for
memtx, "read/write" for vinyl.

Therefor there is no need to mark and kill iproto fibers, which handle
fast requests, because there is no situation, when fiber is in fast mode
and rebalancing is in progress.
Pull Request #475: safe/fast mode: remove fiber kill

79 of 96 new or added lines in 11 files covered. (82.29%)

17 existing lines in 9 files now uncovered.

5266 of 5962 relevant lines covered (88.33%)

13541.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.33
/crud/insert.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local dev_checks = require('crud.common.dev_checks')
593✔
10
local schema = require('crud.common.schema')
593✔
11
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
12

13
local InsertError = errors.new_class('InsertError', {capture_stack = false})
593✔
14

15
local insert = {}
593✔
16

17
local INSERT_FUNC_NAME = 'insert_on_storage'
593✔
18
local CRUD_INSERT_FUNC_NAME = utils.get_storage_call(INSERT_FUNC_NAME)
593✔
19

20
local function insert_on_storage(space_name, tuple, opts)
21
    dev_checks('string', 'table', {
128,002✔
22
        add_space_schema_hash = '?boolean',
23
        fields = '?table',
24
        sharding_key_hash = '?number',
25
        sharding_func_hash = '?number',
26
        skip_sharding_hash_check = '?boolean',
27
        noreturn = '?boolean',
28
        fetch_latest_metadata = '?boolean',
29
    })
30
    yield_checks.register_fiber()
128,002✔
31

32
    opts = opts or {}
128,002✔
33

34
    local space = box.space[space_name]
128,002✔
35
    if space == nil then
128,002✔
36
        yield_checks.unregister_fiber()
52✔
37
        return nil, InsertError:new("Space %q doesn't exist", space_name)
104✔
38
    end
39

40
    local _, err = sharding.check_sharding_hash(space_name,
255,900✔
41
                                                opts.sharding_func_hash,
127,950✔
42
                                                opts.sharding_key_hash,
127,950✔
43
                                                opts.skip_sharding_hash_check)
127,950✔
44

45
    if err ~= nil then
127,950✔
46
        yield_checks.unregister_fiber()
144✔
47
        return nil, err
144✔
48
    end
49

50
    local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
255,612✔
51
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
127,806✔
52

53
    if not ref_ok then
127,806✔
NEW
54
        yield_checks.unregister_fiber()
×
UNCOV
55
        return nil, bucket_ref_err
×
56
    end
57

58
    yield_checks.check_no_yields()
127,806✔
59
    -- add_space_schema_hash is true only in case of insert_object
60
    -- the only one case when reloading schema can avoid insert error
61
    -- is flattening object on router
62
    local result = schema.wrap_func_result(space, space.insert, {
255,612✔
63
        add_space_schema_hash = opts.add_space_schema_hash,
127,806✔
64
        field_names = opts.fields,
127,806✔
65
        noreturn = opts.noreturn,
127,806✔
66
        fetch_latest_metadata = opts.fetch_latest_metadata,
127,806✔
67
    }, space, tuple)
127,806✔
68

69
    yield_checks.unregister_fiber()
127,806✔
70

71
    local unref_ok, err_unref = unref(bucket_id)
127,806✔
72
    if not unref_ok then
127,806✔
73
        return nil, err_unref
×
74
    end
75

76
    return result
127,806✔
77
end
78

79
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}
593✔
80

81
-- returns result, err, need_reload
82
-- need_reload indicates if reloading schema could help
83
-- see crud.common.schema.wrap_func_reload()
84
local function call_insert_on_router(vshard_router, space_name, original_tuple, opts)
85
    dev_checks('table', 'string', 'table', {
127,983✔
86
        timeout = '?number',
87
        bucket_id = '?',
88
        add_space_schema_hash = '?boolean',
89
        fields = '?table',
90
        vshard_router = '?string|table',
91
        skip_nullability_check_on_flatten = '?boolean',
92
        noreturn = '?boolean',
93
        fetch_latest_metadata = '?boolean',
94
    })
95

96
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
127,983✔
97
    if err ~= nil then
127,983✔
98
        return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
99
    end
100
    if space == nil then
127,983✔
101
        return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
92✔
102
    end
103

104
    local tuple = table.deepcopy(original_tuple)
127,937✔
105

106
    local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id)
127,937✔
107
    if err ~= nil then
127,937✔
108
        return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD
62✔
109
    end
110

111
    local insert_on_storage_opts = {
127,906✔
112
        add_space_schema_hash = opts.add_space_schema_hash,
127,906✔
113
        fields = opts.fields,
127,906✔
114
        sharding_func_hash = sharding_data.sharding_func_hash,
127,906✔
115
        sharding_key_hash = sharding_data.sharding_key_hash,
127,906✔
116
        skip_sharding_hash_check = sharding_data.skip_sharding_hash_check,
127,906✔
117
        noreturn = opts.noreturn,
127,906✔
118
        fetch_latest_metadata = opts.fetch_latest_metadata,
127,906✔
119
    }
120

121
    local call_opts = {
127,906✔
122
        mode = 'write',
123
        timeout = opts.timeout,
127,906✔
124
    }
125

126
    local storage_result, err = call.single(vshard_router,
255,812✔
127
        sharding_data.bucket_id, CRUD_INSERT_FUNC_NAME,
127,906✔
128
        {space_name, tuple, insert_on_storage_opts},
127,906✔
129
        call_opts
130
    )
127,906✔
131

132
    if err ~= nil then
127,906✔
133
        local err_wrapped = InsertError:new("Failed to call insert on storage-side: %s", err)
100✔
134

135
        if sharding.result_needs_sharding_reload(err) then
200✔
136
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
72✔
137
        end
138

139
        return nil, err_wrapped
28✔
140
    end
141

142
    if storage_result.err ~= nil then
127,806✔
143
        local err_wrapped = InsertError:new("Failed to insert: %s", storage_result.err)
1,248✔
144

145
        if schema.result_needs_reload(space, storage_result) then
2,496✔
146
            return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
12✔
147
        end
148

149
        return nil, err_wrapped
1,236✔
150
    end
151

152
    if opts.noreturn == true then
126,558✔
153
        return nil
4✔
154
    end
155

156
    local tuple = storage_result.res
126,554✔
157

158
    if opts.fetch_latest_metadata == true then
126,554✔
159
        -- This option is temporary and is related to [1], [2].
160
        -- [1] https://github.com/tarantool/crud/issues/236
161
        -- [2] https://github.com/tarantool/crud/issues/361
162
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
163
                                                                vshard_router, opts, storage_result.storage_info)
8✔
164
    end
165

166
    return utils.format_result({tuple}, space, opts.fields)
126,554✔
167
end
168

169
--- Inserts a tuple to the specified space
170
--
171
-- @function tuple
172
--
173
-- @param string space_name
174
--  A space name
175
--
176
-- @param table tuple
177
--  Tuple
178
--
179
-- @tparam ?number opts.timeout
180
--  Function call timeout
181
--
182
-- @tparam ?number opts.bucket_id
183
--  Bucket ID
184
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
185
--
186
-- @tparam ?string|table opts.vshard_router
187
--  Cartridge vshard group name or vshard router instance.
188
--  Set this parameter if your space is not a part of the
189
--  default vshard cluster.
190
--
191
-- @tparam ?boolean opts.noreturn
192
--  Suppress returning successfully processed tuple.
193
--
194
-- @return[1] tuple
195
-- @treturn[2] nil
196
-- @treturn[2] table Error description
197
--
198
function insert.tuple(space_name, tuple, opts)
593✔
199
    checks('string', 'table', {
120,015✔
200
        timeout = '?number',
201
        bucket_id = '?',
202
        add_space_schema_hash = '?boolean',
203
        fields = '?table',
204
        vshard_router = '?string|table',
205
        noreturn = '?boolean',
206
        fetch_latest_metadata = '?boolean',
207
    })
208

209
    opts = opts or {}
120,015✔
210

211
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
120,015✔
212
    if err ~= nil then
120,015✔
213
        return nil, InsertError:new(err)
8✔
214
    end
215

216
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
120,011✔
217
                                   space_name, tuple, opts)
120,011✔
218
end
219

220
--- Inserts an object to the specified space
221
--
222
-- @function object
223
--
224
-- @param string space_name
225
--  A space name
226
--
227
-- @param table obj
228
--  Object
229
--
230
-- @tparam ?table opts
231
--  Options of insert.tuple
232
--
233
-- @return[1] object
234
-- @treturn[2] nil
235
-- @treturn[2] table Error description
236
--
237
function insert.object(space_name, obj, opts)
593✔
238
    checks('string', 'table', {
7,886✔
239
        timeout = '?number',
240
        bucket_id = '?',
241
        add_space_schema_hash = '?boolean',
242
        fields = '?table',
243
        vshard_router = '?string|table',
244
        skip_nullability_check_on_flatten = '?boolean',
245
        noreturn = '?boolean',
246
        fetch_latest_metadata = '?boolean',
247
    })
248

249
    opts = opts or {}
7,886✔
250

251
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
7,886✔
252
    if err ~= nil then
7,886✔
253
        return nil, InsertError:new(err)
8✔
254
    end
255

256
    -- insert can fail if router uses outdated schema to flatten object
257
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
15,764✔
258

259
    local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj,
15,764✔
260
                                                opts.skip_nullability_check_on_flatten)
7,882✔
261
    if err ~= nil then
7,882✔
262
        return nil, InsertError:new("Failed to flatten object: %s", err)
80✔
263
    end
264

265
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router,
7,842✔
266
                                   space_name, tuple, opts)
7,842✔
267
end
268

269
return insert
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc