• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 6493431653

12 Oct 2023 08:50AM UTC coverage: 89.162% (+0.02%) from 89.14%
6493431653

push

github

DifferentialOrange
schema: support cached schema

6 of 6 new or added lines in 1 file covered. (100.0%)

4615 of 5176 relevant lines covered (89.16%)

17856.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.52
/crud/common/schema.lua
1
local fiber = require('fiber')
457✔
2
local msgpack = require('msgpack')
457✔
3
local digest = require('digest')
457✔
4
local errors = require('errors')
457✔
5
local log = require('log')
457✔
6

7
local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = false})
457✔
8

9
local const = require('crud.common.const')
457✔
10
local dev_checks = require('crud.common.dev_checks')
457✔
11

12
local schema = {}
457✔
13

14
local function table_len(t)
15
    local len = 0
1,500✔
16
    for _ in pairs(t) do
5,998✔
17
        len = len + 1
2,998✔
18
    end
19
    return len
1,500✔
20
end
21

22
local function call_reload_schema_on_replicaset(replicaset, channel)
23
    replicaset.master.conn:reload_schema()
2,998✔
24
    channel:put(true)
2,998✔
25
end
26

27
local function call_reload_schema(replicasets)
28
    local replicasets_num = table_len(replicasets)
1,500✔
29
    local channel = fiber.channel(replicasets_num)
1,500✔
30

31
    local fibers = {}
1,500✔
32
    for _, replicaset in pairs(replicasets) do
5,998✔
33
        local f = fiber.new(call_reload_schema_on_replicaset, replicaset, channel)
2,998✔
34
        table.insert(fibers, f)
2,998✔
35
    end
36

37
    for _ = 1,replicasets_num do
4,498✔
38
        if channel:get(const.RELOAD_SCHEMA_TIMEOUT) == nil then
2,998✔
39
            for _, f in ipairs(fibers) do
×
40
                if f:status() ~= 'dead' then
×
41
                    f:cancel()
×
42
                end
43
            end
44
            return nil, ReloadSchemaError:new("Reloading schema timed out")
×
45
        end
46
    end
47

48
    return true
1,500✔
49
end
50

51
local reload_in_progress = {}
457✔
52
local reload_schema_cond = {}
457✔
53

54
function schema.reload_schema(vshard_router)
457✔
55
    local replicasets = vshard_router:routeall()
1,500✔
56
    local vshard_router_name = vshard_router.name
1,500✔
57

58
    if reload_in_progress[vshard_router_name] == true then
1,500✔
59
        if not reload_schema_cond[vshard_router_name]:wait(const.RELOAD_SCHEMA_TIMEOUT) then
×
60
            return nil, ReloadSchemaError:new('Waiting for schema to be reloaded is timed out')
×
61
        end
62
    else
63
        reload_in_progress[vshard_router_name] = true
1,500✔
64
        if reload_schema_cond[vshard_router_name] == nil then
1,500✔
65
            reload_schema_cond[vshard_router_name] = fiber.cond()
53✔
66
        end
67

68
        local ok, err = call_reload_schema(replicasets)
1,500✔
69
        if not ok then
1,500✔
70
            return nil, err
×
71
        end
72

73
        reload_schema_cond[vshard_router_name]:broadcast()
1,500✔
74
        reload_in_progress[vshard_router_name] = false
1,500✔
75
    end
76

77
    return true
1,500✔
78
end
79

80
-- schema.wrap_func_reload calls func with specified arguments.
81
-- func should return `res, err, need_reload`
82
-- If function returned error and `need_reload` is true,
83
-- then schema is reloaded and one more attempt is performed
84
-- (but no more than RELOAD_RETRIES_NUM).
85
-- This wrapper is used for functions that can fail if router uses outdated
86
-- space schema. In case of such errors these functions returns `need_reload`
87
-- for schema-dependent errors.
88
function schema.wrap_func_reload(vshard_router, func, ...)
457✔
89
    local i = 0
171,708✔
90

91
    local res, err, need_reload
92
    while true do
93
        res, err, need_reload = func(vshard_router, ...)
344,930✔
94

95
        if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
172,465✔
96
            break
1,321✔
97
        end
98

99
        local ok, reload_schema_err = schema.reload_schema(vshard_router)
1,434✔
100
        if not ok then
1,434✔
101
            log.warn("Failed to reload schema: %s", reload_schema_err)
×
102
            break
103
        end
104

105
        i = i + 1
1,434✔
106
        if i > const.RELOAD_RETRIES_NUM then
1,434✔
107
            local warn_msg = "Number of attempts to reload schema has been ended: %s"
677✔
108
            log.warn(warn_msg, const.RELOAD_RETRIES_NUM)
677✔
109
            break
677✔
110
        end
111
    end
112

113
    return res, err
171,708✔
114
end
115

116
schema.get_normalized_space_schema = function(space)
117
    local indexes_info = {}
491✔
118
    for i = 0, table.maxn(space.index) do
1,720✔
119
        local index = space.index[i]
1,229✔
120
        if index ~= nil then
1,229✔
121
            indexes_info[i] = {
1,229✔
122
                unique = index.unique,
1,229✔
123
                parts = index.parts,
1,229✔
124
                id = index.id,
1,229✔
125
                type = index.type,
1,229✔
126
                name = index.name,
1,229✔
127
                path = index.path,
1,229✔
128
            }
1,229✔
129
        end
130
    end
131

132
    return {
491✔
133
        format = space:format(),
982✔
134
        indexes = indexes_info,
491✔
135
    }
491✔
136
end
137

138
local function get_space_schema_hash(space)
139
    if space == nil then
445✔
140
        return ''
×
141
    end
142

143
    local sch = schema.get_normalized_space_schema(space)
445✔
144
    return digest.murmur(msgpack.encode(sch))
445✔
145
end
146

147
function schema.filter_obj_fields(obj, field_names)
457✔
148
    if field_names == nil or obj == nil then
15✔
149
        return obj
×
150
    end
151

152
    local result = {}
15✔
153

154
    for _, field_name in ipairs(field_names) do
51✔
155
        result[field_name] = obj[field_name]
36✔
156
    end
157

158
    return result
15✔
159
end
160

161
local function filter_tuple_fields(tuple, field_names)
162
    if field_names == nil or tuple == nil then
198,432✔
163
        return tuple
198,155✔
164
    end
165

166
    local result = {}
277✔
167

168
    for i, field_name in ipairs(field_names) do
1,078✔
169
        result[i] = tuple[field_name]
1,602✔
170
    end
171

172
    return result
277✔
173
end
174

175
function schema.filter_tuples_fields(tuples, field_names)
457✔
176
    dev_checks('?table', '?table')
85,512✔
177

178
    if field_names == nil then
85,512✔
179
        return tuples
85,388✔
180
    end
181

182
    local result = {}
124✔
183

184
    for _, tuple in ipairs(tuples) do
278✔
185
        local filtered_tuple = filter_tuple_fields(tuple, field_names)
154✔
186
        table.insert(result, filtered_tuple)
154✔
187
    end
188

189
    return result
124✔
190
end
191

192
function schema.truncate_row_trailing_fields(tuple, field_names)
457✔
193
    dev_checks('table|tuple', 'table')
43✔
194

195
    local count_names = #field_names
43✔
196
    local index = count_names + 1
43✔
197
    local len_tuple = #tuple
43✔
198

199
    if box.tuple.is(tuple) then
86✔
200
        return tuple:transform(index, len_tuple - count_names)
18✔
201
    end
202

203
    for i = index, len_tuple do
90✔
204
        tuple[i] = nil
56✔
205
    end
206

207
    return tuple
34✔
208
end
209

210
function schema.wrap_func_result(space, func, args, opts)
457✔
211
    dev_checks('table', 'function', 'table', 'table')
305,261✔
212

213
    local result = {}
305,261✔
214

215
    opts = opts or {}
305,261✔
216

217
    local ok, func_res = pcall(func, unpack(args))
305,261✔
218
    if not ok then
305,261✔
219
        result.err = func_res
1,319✔
220
        if opts.add_space_schema_hash then
1,319✔
221
            result.space_schema_hash = get_space_schema_hash(space)
316✔
222
        end
223
    else
224
        if opts.noreturn ~= true then
303,942✔
225
            result.res = filter_tuple_fields(func_res, opts.field_names)
396,556✔
226
        end
227
    end
228

229
    if opts.fetch_latest_metadata == true then
305,261✔
230
        local replica_schema_version
231
        if box.info.schema_version ~= nil then
38✔
232
            replica_schema_version = box.info.schema_version
38✔
233
        else
234
            replica_schema_version = box.internal.schema_version()
×
235
        end
236
        result.storage_info = {
38✔
237
            replica_uuid = box.info().uuid,
38✔
238
            replica_schema_version = replica_schema_version,
38✔
239
        }
38✔
240
    end
241

242
    return result
305,261✔
243
end
244

245
-- schema.wrap_box_space_func_result pcalls some box.space function
246
-- and returns its result as a table
247
-- `{res = ..., err = ..., space_schema_hash = ...}`
248
-- space_schema_hash is computed if function failed and
249
-- `add_space_schema_hash` is true
250
function schema.wrap_box_space_func_result(space, box_space_func_name, box_space_func_args, opts)
457✔
251
    dev_checks('table', 'string', 'table', 'table')
304,937✔
252
    local function func(space, box_space_func_name, box_space_func_args)
253
        return space[box_space_func_name](space, unpack(box_space_func_args))
304,937✔
254
    end
255

256
    return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)
304,937✔
257
end
258

259
-- schema.result_needs_reload checks that schema reload can
260
-- be helpful to avoid storage error.
261
-- It checks if space_schema_hash returned by storage
262
-- is the same as hash of space used on router.
263
-- Note, that storage returns `space_schema_hash = nil`
264
-- if reloading space format can't avoid the error.
265
function schema.result_needs_reload(space, result)
457✔
266
    if result.space_schema_hash == nil then
775✔
267
        return false
751✔
268
    end
269
    return result.space_schema_hash ~= get_space_schema_hash(space)
48✔
270
end
271

272
function schema.batching_result_needs_reload(space, results, tuples_count)
457✔
273
    local storage_errs_count = 0
263✔
274
    local space_schema_hash = get_space_schema_hash(space)
263✔
275
    for _, result in ipairs(results) do
786✔
276
        if result.space_schema_hash ~= nil and result.space_schema_hash ~= space_schema_hash then
523✔
277
            storage_errs_count = storage_errs_count + 1
134✔
278
        end
279
    end
280

281
    return storage_errs_count == tuples_count
263✔
282
end
283

284
return schema
457✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc