• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20596439889

30 Dec 2025 12:21PM UTC coverage: 88.326% (-0.08%) from 88.405%
20596439889

Pull #475

github

Satbek
safe/fast mode: remove fiber kill

In fast mode every fiber were named as "fast" in order
to kill it on safe enabling.

But there are no yields in crud's storage methods.
And rebalancing trigger is insert/replace in `_bucket` space
which is yield. So when fiber start in fast mode it will
be in fast mode until first `box.space` operation, "write" for
memtx, "read/write" for vinyl.

Therefor there is no need to mark and kill iproto fibers, which handle
fast requests, because there is no situation, when fiber is in fast mode
and rebalancing is in progress.
Pull Request #475: safe/fast mode: remove fiber kill

79 of 96 new or added lines in 11 files covered. (82.29%)

17 existing lines in 9 files now uncovered.

5266 of 5962 relevant lines covered (88.33%)

13541.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/crud/update.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3
local yield_checks = require('crud.common.yield_checks')
593✔
4

5
local call = require('crud.common.call')
593✔
6
local const = require('crud.common.const')
593✔
7
local utils = require('crud.common.utils')
593✔
8
local sharding = require('crud.common.sharding')
593✔
9
local sharding_key_module = require('crud.common.sharding.sharding_key')
593✔
10
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
593✔
11
local dev_checks = require('crud.common.dev_checks')
593✔
12
local schema = require('crud.common.schema')
593✔
13
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
14

15
local UpdateError = errors.new_class('UpdateError', {capture_stack = false})
593✔
16

17
local update = {}
593✔
18

19
local UPDATE_FUNC_NAME = 'update_on_storage'
593✔
20
local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME)
593✔
21

22
local function update_on_storage(space_name, key, operations, field_names, opts)
23
    dev_checks('string', '?', 'table', '?table', {
128✔
24
        bucket_id = 'number|cdata',
25
        sharding_key_hash = '?number',
26
        sharding_func_hash = '?number',
27
        skip_sharding_hash_check = '?boolean',
28
        noreturn = '?boolean',
29
        fetch_latest_metadata = '?boolean',
30
    })
31
    yield_checks.register_fiber()
128✔
32

33
    opts = opts or {}
128✔
34

35
    local space = box.space[space_name]
128✔
36
    if space == nil then
128✔
NEW
37
        yield_checks.unregister_fiber()
×
UNCOV
38
        return nil, UpdateError:new("Space %q doesn't exist", space_name)
×
39
    end
40

41
    local _, err = sharding.check_sharding_hash(space_name,
256✔
42
                                                opts.sharding_func_hash,
128✔
43
                                                opts.sharding_key_hash,
128✔
44
                                                opts.skip_sharding_hash_check)
128✔
45

46
    if err ~= nil then
128✔
47
        yield_checks.unregister_fiber()
4✔
48
        return nil, err
4✔
49
    end
50

51
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
124✔
52
    if not ref_ok then
124✔
NEW
53
        yield_checks.unregister_fiber()
×
UNCOV
54
        return nil, bucket_ref_err
×
55
    end
56

57
    yield_checks.check_no_yields()
124✔
58
    -- add_space_schema_hash is false because
59
    -- reloading space format on router can't avoid update error on storage
60
    local res, err = schema.wrap_func_result(space, space.update, {
248✔
61
        add_space_schema_hash = false,
62
        field_names = field_names,
124✔
63
        noreturn = opts.noreturn,
124✔
64
        fetch_latest_metadata = opts.fetch_latest_metadata,
124✔
65
    }, space, key, operations)
124✔
66

67
    if err == nil and res.err ~= nil and utils.is_field_not_found(res.err.code) then
160✔
68
        -- Relevant for Tarantool older than 2.8.1.
69
        -- We can only add fields to end of the tuple.
70
        -- If schema is updated and nullable fields are added, then we will get error.
71
        -- Therefore, we need to add filling of intermediate nullable fields.
72
        -- More details: https://github.com/tarantool/tarantool/issues/3378
73
        operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key))
8✔
74
        res, err = schema.wrap_func_result(space, space.update, {
4✔
75
            add_space_schema_hash = false,
76
            field_names = field_names,
2✔
77
            noreturn = opts.noreturn,
2✔
78
            fetch_latest_metadata = opts.fetch_latest_metadata,
2✔
79
        }, space, key, operations)
4✔
80
    end
81

82
    yield_checks.unregister_fiber()
124✔
83

84
    local unref_ok, err_unref = unref(opts.bucket_id)
124✔
85
    if not unref_ok then
124✔
86
        return nil, err_unref
×
87
    end
88

89
    return res, err
124✔
90
end
91

92
update.storage_api = {[UPDATE_FUNC_NAME] = update_on_storage}
593✔
93

94
-- returns result, err, need_reload
95
-- need_reload indicates if reloading schema could help
96
-- see crud.common.schema.wrap_func_reload()
97
local function call_update_on_router(vshard_router, space_name, key, user_operations, opts)
98
    dev_checks('table', 'string', '?', 'table', {
144✔
99
        timeout = '?number',
100
        bucket_id = '?',
101
        fields = '?table',
102
        vshard_router = '?string|table',
103
        noreturn = '?boolean',
104
        fetch_latest_metadata = '?boolean',
105
    })
106

107
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
144✔
108
    if err ~= nil then
144✔
109
        return nil, UpdateError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
110
    end
111
    if space == nil then
144✔
112
        return nil, UpdateError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
24✔
113
    end
114

115
    local space_format = space:format()
132✔
116

117
    if box.tuple.is(key) then
264✔
118
        key = key:totable()
×
119
    end
120

121
    local sharding_key = key
132✔
122
    local sharding_key_hash = nil
132✔
123
    local skip_sharding_hash_check = nil
124

125
    if opts.bucket_id == nil then
132✔
126
        local primary_index_parts = space.index[0].parts
120✔
127

128
        local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
120✔
129
        if err ~= nil then
120✔
130
            return nil, err
×
131
        end
132

133
        sharding_key, err = sharding_key_module.extract_from_pk(vshard_router,
240✔
134
                                                                space_name,
120✔
135
                                                                sharding_key_data.value,
120✔
136
                                                                primary_index_parts, key)
240✔
137
        if err ~= nil then
120✔
138
            return nil, err
4✔
139
        end
140

141
        sharding_key_hash = sharding_key_data.hash
116✔
142
    else
143
        skip_sharding_hash_check = true
12✔
144
    end
145

146
    local operations = user_operations
128✔
147
    if not utils.tarantool_supports_fieldpaths() then
256✔
148
        operations, err = utils.convert_operations(user_operations, space_format)
×
149
        if err ~= nil then
×
150
            return nil, UpdateError:new("Wrong operations are specified: %s", err), const.NEED_SCHEMA_RELOAD
×
151
        end
152
    end
153

154
    local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id)
128✔
155
    if err ~= nil then
128✔
156
        return nil, err
2✔
157
    end
158

159
    -- When the sharding index (bucket_id) is the primary index, bucket_id can be passed as box.NULL.
160
    sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
126✔
161

162
    local update_on_storage_opts = {
126✔
163
        bucket_id = bucket_id_data.bucket_id,
126✔
164
        sharding_func_hash = bucket_id_data.sharding_func_hash,
126✔
165
        sharding_key_hash = sharding_key_hash,
126✔
166
        skip_sharding_hash_check = skip_sharding_hash_check,
126✔
167
        noreturn = opts.noreturn,
126✔
168
        fetch_latest_metadata = opts.fetch_latest_metadata,
126✔
169
    }
170

171
    local call_opts = {
126✔
172
        mode = 'write',
173
        timeout = opts.timeout,
126✔
174
    }
175

176
    local storage_result, err = call.single(vshard_router,
252✔
177
        bucket_id_data.bucket_id, CRUD_UPDATE_FUNC_NAME,
126✔
178
        {space_name, key, operations, opts.fields, update_on_storage_opts},
126✔
179
        call_opts
180
    )
126✔
181

182
    if err ~= nil then
126✔
183
        local err_wrapped = UpdateError:new("Failed to call update on storage-side: %s", err)
2✔
184

185
        if sharding.result_needs_sharding_reload(err) then
4✔
186
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
2✔
187
        end
188

189
        return nil, err_wrapped
×
190
    end
191

192
    if storage_result.err ~= nil then
124✔
193
        return nil, UpdateError:new("Failed to update: %s", storage_result.err)
72✔
194
    end
195

196
    if opts.noreturn == true then
88✔
197
        return nil
2✔
198
    end
199

200
    local tuple = storage_result.res
86✔
201

202
    if opts.fetch_latest_metadata == true then
86✔
203
        -- This option is temporary and is related to [1], [2].
204
        -- [1] https://github.com/tarantool/crud/issues/236
205
        -- [2] https://github.com/tarantool/crud/issues/361
206
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
207
                                                                vshard_router, opts, storage_result.storage_info)
8✔
208
    end
209

210
    return utils.format_result({tuple}, space, opts.fields)
86✔
211
end
212

213
--- Updates tuple in the specified space
214
--
215
-- @function call
216
--
217
-- @param string space_name
218
--  A space name
219
--
220
-- @param key
221
--  Primary key value
222
--
223
-- @param table user_operations
224
--  Operations to be performed.
225
--  See `space:update` operations in Tarantool doc
226
--
227
-- @tparam ?number opts.timeout
228
--  Function call timeout
229
--
230
-- @tparam ?number opts.bucket_id
231
--  Bucket ID
232
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
233
--
234
-- @tparam ?string|table opts.vshard_router
235
--  Cartridge vshard group name or vshard router instance.
236
--  Set this parameter if your space is not a part of the
237
--  default vshard cluster.
238
--
239
-- @tparam ?boolean opts.noreturn
240
--  Suppress returning successfully processed tuple.
241
--
242
-- @return[1] object
243
-- @treturn[2] nil
244
-- @treturn[2] table Error description
245
--
246
function update.call(space_name, key, user_operations, opts)
593✔
247
    checks('string', '?', 'table', {
140✔
248
        timeout = '?number',
249
        bucket_id = '?',
250
        fields = '?table',
251
        vshard_router = '?string|table',
252
        noreturn = '?boolean',
253
        fetch_latest_metadata = '?boolean',
254
    })
255

256
    opts = opts or {}
140✔
257
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
140✔
258
    if err ~= nil then
140✔
259
        return nil, UpdateError:new(err)
8✔
260
    end
261

262
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_update_on_router,
136✔
263
                                   space_name, key, user_operations, opts)
136✔
264
end
265

266
return update
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc