• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 20467770584

23 Dec 2025 05:47PM UTC coverage: 88.51%. First build
20467770584

Pull #467

github

ita-sammann
[TNTP-2109] Add safe implementation of crud methods

For the most of crud methods, when safe mode is enabled, bucket_ref()
for appropriate bucket is called before calling the method itself.
This prevents from writing tuples to the wrong replicaset during or
after vshard rebalance.
Closes #448.

Co-authored-by: Satbek Turganbayev <s.turganbayev@corp.mail.ru>
Pull Request #467: [TNTP-2109] Switch to safe mode on vshard rebalance

282 of 347 new or added lines in 13 files covered. (81.27%)

5215 of 5892 relevant lines covered (88.51%)

13459.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/crud/delete.lua
1
local checks = require('checks')
593✔
2
local errors = require('errors')
593✔
3

4
local call = require('crud.common.call')
593✔
5
local const = require('crud.common.const')
593✔
6
local utils = require('crud.common.utils')
593✔
7
local sharding = require('crud.common.sharding')
593✔
8
local sharding_key_module = require('crud.common.sharding.sharding_key')
593✔
9
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
593✔
10
local dev_checks = require('crud.common.dev_checks')
593✔
11
local schema = require('crud.common.schema')
593✔
12
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
593✔
13

14
local DeleteError = errors.new_class('DeleteError', {capture_stack = false})
593✔
15

16
local delete = {}
593✔
17

18
local DELETE_FUNC_NAME = 'delete_on_storage'
593✔
19
local CRUD_DELETE_FUNC_NAME = utils.get_storage_call(DELETE_FUNC_NAME)
593✔
20

21
local function delete_on_storage(space_name, key, field_names, opts)
22
    dev_checks('string', '?', '?table', {
150✔
23
        bucket_id = 'number|cdata',
24
        sharding_key_hash = '?number',
25
        sharding_func_hash = '?number',
26
        skip_sharding_hash_check = '?boolean',
27
        noreturn = '?boolean',
28
        fetch_latest_metadata = '?boolean',
29
    })
30

31
    opts = opts or {}
150✔
32

33
    local space = box.space[space_name]
150✔
34
    if space == nil then
150✔
35
        return nil, DeleteError:new("Space %q doesn't exist", space_name)
×
36
    end
37

38
    local _, err = sharding.check_sharding_hash(space_name,
300✔
39
                                                opts.sharding_func_hash,
150✔
40
                                                opts.sharding_key_hash,
150✔
41
                                                opts.skip_sharding_hash_check)
150✔
42

43
    if err ~= nil then
150✔
44
        return nil, err
4✔
45
    end
46

47
    local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
146✔
48
    if not ref_ok then
146✔
NEW
49
        return nil, bucket_ref_err
×
50
    end
51

52
    -- add_space_schema_hash is false because
53
    -- reloading space format on router can't avoid delete error on storage
54
    local result =  schema.wrap_func_result(space, space.delete, {
292✔
55
        add_space_schema_hash = false,
56
        field_names = field_names,
146✔
57
        noreturn = opts.noreturn,
146✔
58
        fetch_latest_metadata = opts.fetch_latest_metadata,
146✔
59
    }, space, key)
146✔
60

61
    local unref_ok, err_unref = unref(opts.bucket_id)
146✔
62
    if not unref_ok then
146✔
NEW
63
        return nil, err_unref
×
64
    end
65

66
    return result
146✔
67
end
68

69
delete.storage_api = {[DELETE_FUNC_NAME] = delete_on_storage}
593✔
70

71
-- returns result, err, need_reload
72
-- need_reload indicates if reloading schema could help
73
-- see crud.common.schema.wrap_func_reload()
74
local function call_delete_on_router(vshard_router, space_name, key, opts)
75
    dev_checks('table', 'string', '?', {
171✔
76
        timeout = '?number',
77
        bucket_id = '?',
78
        fields = '?table',
79
        vshard_router = '?string|table',
80
        noreturn = '?boolean',
81
        fetch_latest_metadata = '?boolean',
82
    })
83

84
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
171✔
85
    if err ~= nil then
171✔
86
        return nil, DeleteError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
87
    end
88
    if space == nil then
171✔
89
        return nil, DeleteError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
34✔
90
    end
91

92
    if box.tuple.is(key) then
308✔
93
        key = key:totable()
×
94
    end
95

96
    local sharding_key_hash = nil
154✔
97
    local skip_sharding_hash_check = nil
98

99
    local sharding_key = key
154✔
100
    if opts.bucket_id == nil then
154✔
101
        if space.index[0] == nil then
142✔
102
            return nil, DeleteError:new("Cannot fetch primary index parts"), const.NEED_SCHEMA_RELOAD
×
103
        end
104
        local primary_index_parts = space.index[0].parts
142✔
105

106
        local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
142✔
107
        if err ~= nil then
142✔
108
            return nil, err
×
109
        end
110

111
        sharding_key, err = sharding_key_module.extract_from_pk(vshard_router,
284✔
112
                                                                space_name,
142✔
113
                                                                sharding_key_data.value,
142✔
114
                                                                primary_index_parts, key)
284✔
115
        if err ~= nil then
142✔
116
            return nil, err
4✔
117
        end
118

119
        sharding_key_hash = sharding_key_data.hash
138✔
120
    else
121
        skip_sharding_hash_check = true
12✔
122
    end
123

124
    local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id)
150✔
125
    if err ~= nil then
150✔
126
        return nil, err
2✔
127
    end
128

129
    -- When the sharding index (bucket_id) is the primary index, bucket_id can be passed as box.NULL.
130
    sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
148✔
131

132
    local delete_on_storage_opts = {
148✔
133
        bucket_id = bucket_id_data.bucket_id,
148✔
134
        sharding_func_hash = bucket_id_data.sharding_func_hash,
148✔
135
        sharding_key_hash = sharding_key_hash,
148✔
136
        skip_sharding_hash_check = skip_sharding_hash_check,
148✔
137
        noreturn = opts.noreturn,
148✔
138
        fetch_latest_metadata = opts.fetch_latest_metadata,
148✔
139
    }
140

141
    local call_opts = {
148✔
142
        mode = 'write',
143
        timeout = opts.timeout,
148✔
144
    }
145

146
    local storage_result, err = call.single(vshard_router,
296✔
147
        bucket_id_data.bucket_id, CRUD_DELETE_FUNC_NAME,
148✔
148
        {space_name, key, opts.fields, delete_on_storage_opts},
148✔
149
        call_opts
150
    )
148✔
151

152
    if err ~= nil then
148✔
153
        local err_wrapped = DeleteError:new("Failed to call delete on storage-side: %s", err)
2✔
154

155
        if sharding.result_needs_sharding_reload(err) then
4✔
156
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
2✔
157
        end
158

159
        return nil, err_wrapped
×
160
    end
161

162
    if storage_result.err ~= nil then
146✔
163
        return nil, DeleteError:new("Failed to delete: %s", storage_result.err)
68✔
164
    end
165

166
    if opts.noreturn == true then
112✔
167
        return nil
2✔
168
    end
169

170
    local tuple = storage_result.res
110✔
171

172
    if opts.fetch_latest_metadata == true then
110✔
173
        -- This option is temporary and is related to [1], [2].
174
        -- [1] https://github.com/tarantool/crud/issues/236
175
        -- [2] https://github.com/tarantool/crud/issues/361
176
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
4✔
177
                                                                vshard_router, opts, storage_result.storage_info)
4✔
178
    end
179

180
    return utils.format_result({tuple}, space, opts.fields)
110✔
181
end
182

183
--- Deletes tuple from the specified space by key
184
--
185
-- @function call
186
--
187
-- @param string space_name
188
--  A space name
189
--
190
-- @param key
191
--  Primary key value
192
--
193
-- @tparam ?number opts.timeout
194
--  Function call timeout
195
--
196
-- @tparam ?number opts.bucket_id
197
--  Bucket ID
198
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
199
--
200
-- @tparam ?string|table opts.vshard_router
201
--  Cartridge vshard group name or vshard router instance.
202
--  Set this parameter if your space is not a part of the
203
--  default vshard cluster.
204
--
205
-- @tparam ?boolean opts.noreturn
206
--  Suppress returning successfully processed tuple.
207
--
208
-- @return[1] object
209
-- @treturn[2] nil
210
-- @treturn[2] table Error description
211
--
212
function delete.call(space_name, key, opts)
593✔
213
    checks('string', '?', {
162✔
214
        timeout = '?number',
215
        bucket_id = '?',
216
        fields = '?table',
217
        vshard_router = '?string|table',
218
        noreturn = '?boolean',
219
        fetch_latest_metadata = '?boolean',
220
    })
221

222
    opts = opts or {}
162✔
223

224
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
162✔
225
    if err ~= nil then
162✔
226
        return nil, DeleteError:new(err)
8✔
227
    end
228

229
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_delete_on_router,
158✔
230
                                   space_name, key, opts)
158✔
231
end
232

233
return delete
593✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc