• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.62
/crud/upsert.lua
1
local checks = require('checks')
404✔
2
local errors = require('errors')
404✔
3

4
local call = require('crud.common.call')
404✔
5
local const = require('crud.common.const')
404✔
6
local utils = require('crud.common.utils')
404✔
7
local sharding = require('crud.common.sharding')
404✔
8
local dev_checks = require('crud.common.dev_checks')
404✔
9
local schema = require('crud.common.schema')
404✔
10

11
local UpsertError = errors.new_class('UpsertError', { capture_stack = false})
404✔
12

13
local upsert = {}
404✔
14

15
local UPSERT_FUNC_NAME = '_crud.upsert_on_storage'
404✔
16

17
local function upsert_on_storage(space_name, tuple, operations, opts)
18
    dev_checks('string', 'table', 'table', {
238✔
19
        add_space_schema_hash = '?boolean',
20
        fields = '?table',
21
        sharding_key_hash = '?number',
22
        sharding_func_hash = '?number',
23
        skip_sharding_hash_check = '?boolean',
24
        fetch_latest_metadata = '?boolean',
25
    })
26

27
    opts = opts or {}
238✔
28

29
    local space = box.space[space_name]
238✔
30
    if space == nil then
238✔
31
        return nil, UpsertError:new("Space %q doesn't exist", space_name)
×
32
    end
33

34
    local _, err = sharding.check_sharding_hash(space_name,
476✔
35
                                                opts.sharding_func_hash,
238✔
36
                                                opts.sharding_key_hash,
238✔
37
                                                opts.skip_sharding_hash_check)
238✔
38

39
    if err ~= nil then
238✔
40
        return nil, err
8✔
41
    end
42

43
    -- add_space_schema_hash is true only in case of upsert_object
44
    -- the only one case when reloading schema can avoid insert error
45
    -- is flattening object on router
46
    return schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, {
230✔
47
        add_space_schema_hash = opts.add_space_schema_hash,
230✔
48
        fetch_latest_metadata = opts.fetch_latest_metadata,
230✔
49
    })
230✔
50
end
51

52
function upsert.init()
404✔
53
   _G._crud.upsert_on_storage = upsert_on_storage
302✔
54
end
55

56
-- returns result, err, need_reload
57
-- need_reload indicates if reloading schema could help
58
-- see crud.common.schema.wrap_func_reload()
59
local function call_upsert_on_router(vshard_router, space_name, original_tuple, user_operations, opts)
60
    dev_checks('table', 'string', '?', 'table', {
278✔
61
        timeout = '?number',
62
        bucket_id = '?number|cdata',
63
        add_space_schema_hash = '?boolean',
64
        fields = '?table',
65
        vshard_router = '?string|table',
66
        skip_nullability_check_on_flatten = '?boolean',
67
        noreturn = '?boolean',
68
        fetch_latest_metadata = '?boolean',
69
    })
70

71
    local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router, opts.timeout)
278✔
72
    if err ~= nil then
278✔
73
        return nil, UpsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
74
    end
75
    if space == nil then
278✔
76
        return nil, UpsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
56✔
77
    end
78

79
    local space_format = space:format()
250✔
80
    local operations = user_operations
250✔
81
    if not utils.tarantool_supports_fieldpaths() then
500✔
82
        operations, err = utils.convert_operations(user_operations, space_format)
×
83
        if err ~= nil then
×
84
            return nil, UpsertError:new("Wrong operations are specified: %s", err), const.NEED_SCHEMA_RELOAD
×
85
        end
86
    end
87

88
    local tuple = table.deepcopy(original_tuple)
250✔
89

90
    local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id)
250✔
91
    if err ~= nil then
250✔
92
        return nil, UpsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD
24✔
93
    end
94

95
    local upsert_on_storage_opts = {
238✔
96
        add_space_schema_hash = opts.add_space_schema_hash,
238✔
97
        fields = opts.fields,
238✔
98
        sharding_func_hash = sharding_data.sharding_func_hash,
238✔
99
        sharding_key_hash = sharding_data.sharding_key_hash,
238✔
100
        skip_sharding_hash_check = sharding_data.skip_sharding_hash_check,
238✔
101
        fetch_latest_metadata = opts.fetch_latest_metadata,
238✔
102
    }
103

104
    local call_opts = {
238✔
105
        mode = 'write',
106
        timeout = opts.timeout,
238✔
107
    }
108

109
    local storage_result, err = call.single(vshard_router,
476✔
110
        sharding_data.bucket_id, UPSERT_FUNC_NAME,
238✔
111
        {space_name, tuple, operations, upsert_on_storage_opts},
238✔
112
        call_opts
113
    )
238✔
114

115
    if err ~= nil then
238✔
116
        local err_wrapped = UpsertError:new("Failed to call upsert on storage-side: %s", err)
8✔
117

118
        if sharding.result_needs_sharding_reload(err) then
16✔
119
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
8✔
120
        end
121

122
        return nil, err_wrapped
×
123
    end
124

125
    if storage_result.err ~= nil then
230✔
126
        local err_wrapped = UpsertError:new("Failed to upsert: %s", storage_result.err)
44✔
127

128
        if schema.result_needs_reload(space, storage_result) then
88✔
129
            return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
8✔
130
        end
131

132
        return nil, err_wrapped
36✔
133
    end
134

135
    if opts.noreturn == true then
186✔
136
        return nil
4✔
137
    end
138

139
    if opts.fetch_latest_metadata == true then
182✔
140
        -- This option is temporary and is related to [1], [2].
141
        -- [1] https://github.com/tarantool/crud/issues/236
142
        -- [2] https://github.com/tarantool/crud/issues/361
143
        space = utils.fetch_latest_metadata_when_single_storage(space, space_name, netbox_schema_version,
8✔
144
                                                                vshard_router, opts, storage_result.storage_info)
8✔
145
    end
146

147
    -- upsert returns only metadata, without rows
148
    return utils.format_result({}, space, opts.fields)
182✔
149
end
150

151
--- Update or insert a tuple in the specified space
152
--
153
-- @function tuple
154
--
155
-- @param string space_name
156
--  A space name
157
--
158
-- @param table tuple
159
--  Tuple
160
--
161
-- @param table user_operations
162
--  user_operations to be performed.
163
--  See `space:upsert()` operations in Tarantool doc
164
--
165
-- @tparam ?number opts.timeout
166
--  Function call timeout
167
--
168
-- @tparam ?number opts.bucket_id
169
--  Bucket ID
170
--  (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
171
--
172
-- @tparam ?string|table opts.vshard_router
173
--  Cartridge vshard group name or vshard router instance.
174
--  Set this parameter if your space is not a part of the
175
--  default vshard cluster.
176
--
177
-- @tparam ?boolean opts.noreturn
178
--  Suppress returning successfully processed tuple.
179
--
180
-- @return[1] tuple
181
-- @treturn[2] nil
182
-- @treturn[2] table Error description
183
--
184
function upsert.tuple(space_name, tuple, user_operations, opts)
404✔
185
    checks('string', '?', 'table', {
146✔
186
        timeout = '?number',
187
        bucket_id = '?number|cdata',
188
        add_space_schema_hash = '?boolean',
189
        fields = '?table',
190
        vshard_router = '?string|table',
191
        noreturn = '?boolean',
192
        fetch_latest_metadata = '?boolean',
193
    })
194

195
    opts = opts or {}
146✔
196

197
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
146✔
198
    if err ~= nil then
146✔
199
        return nil, UpsertError:new(err)
16✔
200
    end
201

202
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_on_router,
138✔
203
                                   space_name, tuple, user_operations, opts)
138✔
204
end
205

206
--- Update or insert an object in the specified space
207
--
208
-- @function object
209
--
210
-- @param string space_name
211
--  A space name
212
--
213
-- @param table obj
214
--  Object
215
--
216
-- @param table user_operations
217
--  user_operations to be performed.
218
--  See `space:upsert()` operations in Tarantool doc
219
--
220
-- @tparam ?table opts
221
--  Options of upsert.tuple
222
--
223
-- @return[1] object
224
-- @treturn[2] nil
225
-- @treturn[2] table Error description
226
--
227
function upsert.object(space_name, obj, user_operations, opts)
404✔
228
    checks('string', 'table', 'table', {
148✔
229
        timeout = '?number',
230
        bucket_id = '?number|cdata',
231
        add_space_schema_hash = '?boolean',
232
        fields = '?table',
233
        vshard_router = '?string|table',
234
        noreturn = '?boolean',
235
        fetch_latest_metadata = '?boolean',
236
    })
237

238
    opts = opts or {}
148✔
239

240
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
148✔
241
    if err ~= nil then
148✔
242
        return nil, UpsertError:new(err)
16✔
243
    end
244

245
    -- upsert can fail if router uses outdated schema to flatten object
246
    opts = utils.merge_options(opts, {add_space_schema_hash = true})
280✔
247

248
    local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj)
140✔
249
    if err ~= nil then
140✔
250
        return nil, UpsertError:new("Failed to flatten object: %s", err)
72✔
251
    end
252

253
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_on_router,
104✔
254
                                   space_name, tuple, user_operations, opts)
104✔
255
end
256

257
return upsert
404✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc