• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.41
/crud/common/schema.lua
1
local fiber = require('fiber')
383✔
2
local msgpack = require('msgpack')
383✔
3
local digest = require('digest')
383✔
4
local errors = require('errors')
383✔
5
local log = require('log')
383✔
6

7
local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = false})
383✔
8

9
local const = require('crud.common.const')
383✔
10
local dev_checks = require('crud.common.dev_checks')
383✔
11

12
local schema = {}
383✔
13

14
local function table_len(t)
15
    local len = 0
1,210✔
16
    for _ in pairs(t) do
4,838✔
17
        len = len + 1
2,418✔
18
    end
19
    return len
1,210✔
20
end
21

22
local function call_reload_schema_on_replicaset(replicaset, channel)
23
    replicaset.master.conn:reload_schema()
2,418✔
24
    channel:put(true)
2,418✔
25
end
26

27
local function call_reload_schema(replicasets)
28
    local replicasets_num = table_len(replicasets)
1,210✔
29
    local channel = fiber.channel(replicasets_num)
1,210✔
30

31
    local fibers = {}
1,210✔
32
    for _, replicaset in pairs(replicasets) do
4,838✔
33
        local f = fiber.new(call_reload_schema_on_replicaset, replicaset, channel)
2,418✔
34
        table.insert(fibers, f)
2,418✔
35
    end
36

37
    for _ = 1,replicasets_num do
3,628✔
38
        if channel:get(const.RELOAD_SCHEMA_TIMEOUT) == nil then
2,418✔
39
            for _, f in ipairs(fibers) do
×
40
                if f:status() ~= 'dead' then
×
41
                    f:cancel()
×
42
                end
43
            end
44
            return nil, ReloadSchemaError:new("Reloading schema timed out")
×
45
        end
46
    end
47

48
    return true
1,210✔
49
end
50

51
local reload_in_progress = {}
383✔
52
local reload_schema_cond = {}
383✔
53

54
function schema.reload_schema(vshard_router)
383✔
55
    local replicasets = vshard_router:routeall()
1,210✔
56
    local vshard_router_name = vshard_router.name
1,210✔
57

58
    if reload_in_progress[vshard_router_name] == true then
1,210✔
59
        if not reload_schema_cond[vshard_router_name]:wait(const.RELOAD_SCHEMA_TIMEOUT) then
×
60
            return nil, ReloadSchemaError:new('Waiting for schema to be reloaded is timed out')
×
61
        end
62
    else
63
        reload_in_progress[vshard_router_name] = true
1,210✔
64
        if reload_schema_cond[vshard_router_name] == nil then
1,210✔
65
            reload_schema_cond[vshard_router_name] = fiber.cond()
47✔
66
        end
67

68
        local ok, err = call_reload_schema(replicasets)
1,210✔
69
        if not ok then
1,210✔
70
            return nil, err
×
71
        end
72

73
        reload_schema_cond[vshard_router_name]:broadcast()
1,210✔
74
        reload_in_progress[vshard_router_name] = false
1,210✔
75
    end
76

77
    return true
1,210✔
78
end
79

80
-- schema.wrap_func_reload calls func with specified arguments.
81
-- func should return `res, err, need_reload`
82
-- If function returned error and `need_reload` is true,
83
-- then schema is reloaded and one more attempt is performed
84
-- (but no more than RELOAD_RETRIES_NUM).
85
-- This wrapper is used for functions that can fail if router uses outdated
86
-- space schema. In case of such errors these functions returns `need_reload`
87
-- for schema-dependent errors.
88
function schema.wrap_func_reload(vshard_router, func, ...)
383✔
89
    local i = 0
162,864✔
90

91
    local res, err, need_reload
92
    while true do
93
        res, err, need_reload = func(vshard_router, ...)
326,972✔
94

95
        if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
163,486✔
96
            break
1,138✔
97
        end
98

99
        local ok, reload_schema_err = schema.reload_schema(vshard_router)
1,173✔
100
        if not ok then
1,173✔
101
            log.warn("Failed to reload schema: %s", reload_schema_err)
×
102
            break
103
        end
104

105
        i = i + 1
1,173✔
106
        if i > const.RELOAD_RETRIES_NUM then
1,173✔
107
            local warn_msg = "Number of attempts to reload schema has been ended: %s"
551✔
108
            log.warn(warn_msg, const.RELOAD_RETRIES_NUM)
551✔
109
            break
551✔
110
        end
111
    end
112

113
    return res, err
162,864✔
114
end
115

116
local function get_space_schema_hash(space)
117
    if space == nil then
400✔
118
        return ''
×
119
    end
120

121
    local indexes_info = {}
400✔
122
    for i = 0, table.maxn(space.index) do
1,376✔
123
        local index = space.index[i]
976✔
124
        if index ~= nil then
976✔
125
            indexes_info[i] = {
976✔
126
                unique = index.unique,
976✔
127
                parts = index.parts,
976✔
128
                id = index.id,
976✔
129
                type = index.type,
976✔
130
                name = index.name,
976✔
131
                path = index.path,
976✔
132
            }
976✔
133
        end
134
    end
135

136
    local space_info = {
400✔
137
        format = space:format(),
800✔
138
        indexes = indexes_info,
400✔
139
    }
140

141
    return digest.murmur(msgpack.encode(space_info))
400✔
142
end
143

144
function schema.filter_obj_fields(obj, field_names)
383✔
145
    if field_names == nil or obj == nil then
15✔
146
        return obj
×
147
    end
148

149
    local result = {}
15✔
150

151
    for _, field_name in ipairs(field_names) do
51✔
152
        result[field_name] = obj[field_name]
36✔
153
    end
154

155
    return result
15✔
156
end
157

158
local function filter_tuple_fields(tuple, field_names)
159
    if field_names == nil or tuple == nil then
191,846✔
160
        return tuple
191,569✔
161
    end
162

163
    local result = {}
277✔
164

165
    for i, field_name in ipairs(field_names) do
1,078✔
166
        result[i] = tuple[field_name]
1,602✔
167
    end
168

169
    return result
277✔
170
end
171

172
function schema.filter_tuples_fields(tuples, field_names)
383✔
173
    dev_checks('?table', '?table')
83,615✔
174

175
    if field_names == nil then
83,615✔
176
        return tuples
83,491✔
177
    end
178

179
    local result = {}
124✔
180

181
    for _, tuple in ipairs(tuples) do
278✔
182
        local filtered_tuple = filter_tuple_fields(tuple, field_names)
154✔
183
        table.insert(result, filtered_tuple)
154✔
184
    end
185

186
    return result
124✔
187
end
188

189
function schema.truncate_row_trailing_fields(tuple, field_names)
383✔
190
    dev_checks('table|tuple', 'table')
43✔
191

192
    local count_names = #field_names
43✔
193
    local index = count_names + 1
43✔
194
    local len_tuple = #tuple
43✔
195

196
    if box.tuple.is(tuple) then
86✔
197
        return tuple:transform(index, len_tuple - count_names)
18✔
198
    end
199

200
    for i = index, len_tuple do
90✔
201
        tuple[i] = nil
56✔
202
    end
203

204
    return tuple
34✔
205
end
206

207
function schema.wrap_func_result(space, func, args, opts)
383✔
208
    dev_checks('table', 'function', 'table', 'table')
298,074✔
209

210
    local result = {}
298,074✔
211

212
    opts = opts or {}
298,074✔
213

214
    local ok, func_res = pcall(func, unpack(args))
298,074✔
215
    if not ok then
298,074✔
216
        result.err = func_res
1,184✔
217
        if opts.add_space_schema_hash then
1,184✔
218
            result.space_schema_hash = get_space_schema_hash(space)
316✔
219
        end
220
    else
221
        if opts.noreturn ~= true then
296,890✔
222
            result.res = filter_tuple_fields(func_res, opts.field_names)
383,384✔
223
        end
224
    end
225

226
    if opts.fetch_latest_metadata == true then
298,074✔
227
        local replica_schema_version
228
        if box.info.schema_version ~= nil then
38✔
229
            replica_schema_version = box.info.schema_version
38✔
230
        else
231
            replica_schema_version = box.internal.schema_version()
×
232
        end
233
        result.storage_info = {
38✔
234
            replica_uuid = box.info().uuid,
38✔
235
            replica_schema_version = replica_schema_version,
38✔
236
        }
38✔
237
    end
238

239
    return result
298,074✔
240
end
241

242
-- schema.wrap_box_space_func_result pcalls some box.space function
243
-- and returns its result as a table
244
-- `{res = ..., err = ..., space_schema_hash = ...}`
245
-- space_schema_hash is computed if function failed and
246
-- `add_space_schema_hash` is true
247
function schema.wrap_box_space_func_result(space, box_space_func_name, box_space_func_args, opts)
383✔
248
    dev_checks('table', 'string', 'table', 'table')
297,810✔
249
    local function func(space, box_space_func_name, box_space_func_args)
250
        return space[box_space_func_name](space, unpack(box_space_func_args))
297,810✔
251
    end
252

253
    return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)
297,810✔
254
end
255

256
-- schema.result_needs_reload checks that schema reload can
257
-- be helpful to avoid storage error.
258
-- It checks if space_schema_hash returned by storage
259
-- is the same as hash of space used on router.
260
-- Note, that storage returns `space_schema_hash = nil`
261
-- if reloading space format can't avoid the error.
262
function schema.result_needs_reload(space, result)
383✔
263
    if result.space_schema_hash == nil then
730✔
264
        return false
706✔
265
    end
266
    return result.space_schema_hash ~= get_space_schema_hash(space)
48✔
267
end
268

269
function schema.batching_result_needs_reload(space, results, tuples_count)
383✔
270
    local storage_errs_count = 0
218✔
271
    local space_schema_hash = get_space_schema_hash(space)
218✔
272
    for _, result in ipairs(results) do
696✔
273
        if result.space_schema_hash ~= nil and result.space_schema_hash ~= space_schema_hash then
478✔
274
            storage_errs_count = storage_errs_count + 1
134✔
275
        end
276
    end
277

278
    return storage_errs_count == tuples_count
218✔
279
end
280

281
return schema
383✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc