• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.84
/crud/select/merger.lua
1
local buffer = require('buffer')
404✔
2
local errors = require('errors')
404✔
3
local msgpack = require('msgpack')
404✔
4
local ffi = require('ffi')
404✔
5
local call = require('crud.common.call')
404✔
6
local fiber = require('fiber')
404✔
7
local sharding = require('crud.common.sharding')
404✔
8
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
404✔
9

10
local compat = require('crud.common.compat')
404✔
11
local merger_lib = compat.require('tuple.merger', 'merger')
404✔
12

13
local Keydef = require('crud.compare.keydef')
404✔
14
local stats = require('crud.stats')
404✔
15
local utils = require("crud.common.utils")
404✔
16

17
local function bswap_u16(num)
18
    return bit.rshift(bit.bswap(tonumber(num)), 16)
×
19
end
20

21
-- See
22
-- https://github.com/tarantool/tarantool/blob/0ab21ac9eeaaae2aa0aef5e598d374669f96df9e/src/lua/msgpackffi.lua
23
-- to understand following hell
24
-- This code works for ALL Tarantool versions
25
local strict_alignment = (jit.arch == 'arm')
404✔
26
local uint16_ptr_t = ffi.typeof('uint16_t *')
404✔
27
local uint32_ptr_t = ffi.typeof('uint32_t *')
404✔
28
local char_ptr = ffi.typeof('char *')
404✔
29

30
local decode_u16
31
local decode_u32
32
if strict_alignment then
404✔
33
    local tmpint = ffi.new('union tmpint[1]')
×
34
    decode_u16 = function(data)
35
        ffi.copy(tmpint, data[0], 2)
×
36
        data[0] = data[0] + 2
×
37
        return tonumber(bswap_u16(tmpint[0].u16))
×
38
    end
39
    decode_u32 = function(data)
40
        ffi.copy(tmpint, data[0], 4)
×
41
        data[0] = data[0] + 4
×
42
        return tonumber(
×
43
            ffi.cast('uint32_t', bit.bswap(tonumber(tmpint[0].u32))))
×
44
    end
45
else
46
    decode_u16 = function(data)
47
        local num = bswap_u16(ffi.cast(uint16_ptr_t, data[0])[0])
×
48
        data[0] = data[0] + 2
×
49
        return tonumber(num)
×
50
    end
51
    decode_u32 = function(data)
52
        local num = ffi.cast('uint32_t',
145,772✔
53
            bit.bswap(tonumber(ffi.cast(uint32_ptr_t, data[0])[0])))
72,886✔
54
        data[0] = data[0] + 4
72,886✔
55
        return tonumber(num)
72,886✔
56
    end
57
end
58

59
local data = ffi.new('const unsigned char *[1]')
404✔
60

61
local function decode_response_headers(buf)
62
    -- {48: [cursor, [tuple_1, tuple_2, ...]]} (exactly 1 pair of key-value)
63
    data[0] = buf.rpos
72,886✔
64

65
    -- 48 (key)
66
    data[0] = data[0] + 1
72,886✔
67

68
    -- [cursor, [tuple_1, tuple_2, ...]] (value)
69
    data[0] = data[0] + 1
72,886✔
70

71
    -- Decode array header
72
    local c = data[0][0]
72,886✔
73
    data[0] = data[0] + 1
72,886✔
74
    if c == 0xdc then
72,886✔
75
        decode_u16(data)
×
76
    elseif c == 0xdd then
72,886✔
77
        decode_u32(data)
72,886✔
78
    end
79

80
    return ffi.cast(char_ptr, data[0])
72,886✔
81
end
82

83
local function decode_metainfo(buf)
84
    -- Skip an array around a call return values.
85
    buf.rpos = decode_response_headers(buf)
145,772✔
86

87
    -- Decode a first return value (metainfo).
88
    local res, err
89
    res, buf.rpos = msgpack.decode(buf.rpos, buf:size())
145,772✔
90

91
    -- If res is nil, decode second return value (error).
92
    if res == nil then
72,886✔
93
        err, buf.rpos = msgpack.decode(buf.rpos, buf:size())
20✔
94
    end
95
    return res, err
72,886✔
96
end
97

98
--- Wait for a data chunk and request for the next data chunk.
99
local function fetch_chunk(context, state)
100
    local net_box_opts = context.net_box_opts
131,723✔
101
    local buf = context.buffer
131,723✔
102
    local func_name = context.func_name
131,723✔
103
    local func_args = context.func_args
131,723✔
104
    local replicaset = context.replicaset
131,723✔
105
    local vshard_call_name = context.vshard_call_name
131,723✔
106
    local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
131,723✔
107
    local space_name = context.space_name
131,723✔
108
    local vshard_router = context.vshard_router
131,723✔
109
    local future = state.future
131,723✔
110

111
    -- The source was entirely drained.
112
    if future == nil then
131,723✔
113
        return nil
58,803✔
114
    end
115

116
    -- Wait for requested data.
117
    local res, err = future:wait_result(timeout)
72,920✔
118
    if res == nil then
72,920✔
119
        local wrapped_err = errors.wrap(utils.update_storage_call_error_description(err, func_name, replicaset.uuid))
68✔
120
        error(wrapped_err)
34✔
121
    end
122

123
    -- Decode metainfo, leave data to be processed by the merger.
124
    local cursor, err = decode_metainfo(buf)
72,886✔
125
    if cursor == nil then
72,886✔
126
        -- Wrap net.box errors error to restore metatable.
127
        local wrapped_err = errors.wrap(err)
10✔
128

129
        if sharding.result_needs_sharding_reload(err) then
20✔
130
            sharding_metadata_module.reload_sharding_cache(vshard_router, space_name)
8✔
131
        end
132

133
        error(wrapped_err)
10✔
134
    end
135

136
    if context.fetch_latest_metadata then
72,876✔
137
        if fiber.self().storage.storages_info_on_select == nil then
8✔
138
            fiber.self().storage.storages_info_on_select = {}
4✔
139
        end
140
        local replica_uuid = cursor.storage_info.replica_uuid
8✔
141
        local replica_schema_version = cursor.storage_info.replica_schema_version
8✔
142
        local fiber_storage = fiber.self().storage.storages_info_on_select
8✔
143
        fiber_storage[replica_uuid] = {}
8✔
144
        fiber_storage[replica_uuid].replica_schema_version = replica_schema_version
8✔
145
    end
146

147
    -- Extract stats info.
148
    -- Stats extracted with callback here and not passed
149
    -- outside to wrapper because fetch for pairs can be
150
    -- called even after pairs() return from generators.
151
    if cursor.stats ~= nil then
72,876✔
152
        stats.update_fetch_stats(cursor.stats, space_name)
72,876✔
153
    end
154

155
    -- Check whether we need the next call.
156
    if cursor.is_end then
72,876✔
157
        local next_state = {}
59,072✔
158
        return next_state, buf
59,072✔
159
    end
160

161
    -- Request the next data while we processing the current ones.
162
    -- Note: We reuse the same buffer for all request to a replicaset.
163
    local next_func_args = func_args
13,804✔
164

165
    -- change context.func_args too, but it does not matter
166
    next_func_args[4].after_tuple = cursor.after_tuple
13,804✔
167
    local next_future = replicaset[vshard_call_name](replicaset, func_name, next_func_args, net_box_opts)
13,804✔
168

169
    local next_state = {future = next_future}
13,804✔
170
    return next_state, buf
13,804✔
171
end
172

173
local reverse_tarantool_iters = {
404✔
174
    [box.index.LE] = true,
404✔
175
    [box.index.LT] = true,
404✔
176
    [box.index.REQ] = true,
404✔
177
}
178

179
local function new(vshard_router, replicasets, space, index_id, func_name, func_args, opts)
180
    opts = opts or {}
35,833✔
181
    local call_opts = opts.call_opts
35,833✔
182
    local mode = call_opts.mode or 'read'
35,833✔
183
    local vshard_call_name = call.get_vshard_call_name(mode, call_opts.prefer_replica, call_opts.balance)
35,833✔
184

185
    -- Request a first data chunk and create merger sources.
186
    local merger_sources = {}
35,833✔
187
    for _, replicaset in pairs(replicasets) do
141,525✔
188
        -- Perform a request.
189
        local buf = buffer.ibuf()
69,859✔
190
        local net_box_opts = {is_async = true, buffer = buf, skip_header = false}
69,859✔
191
        local future = replicaset[vshard_call_name](replicaset, func_name, func_args,
139,718✔
192
                net_box_opts)
69,859✔
193

194
        -- Create a source.
195
        local context = {
69,859✔
196
            net_box_opts = net_box_opts,
69,859✔
197
            buffer = buf,
69,859✔
198
            func_name = func_name,
69,859✔
199
            func_args = func_args,
69,859✔
200
            replicaset = replicaset,
69,859✔
201
            vshard_call_name = vshard_call_name,
69,859✔
202
            timeout = call_opts.timeout,
69,859✔
203
            fetch_latest_metadata = call_opts.fetch_latest_metadata,
69,859✔
204
            space_name = space.name,
69,859✔
205
            vshard_router = vshard_router,
69,859✔
206
        }
207

208
        local state = {future = future}
69,859✔
209
        local source = merger_lib.new_buffer_source(fetch_chunk, context, state)
69,859✔
210
        table.insert(merger_sources, source)
69,859✔
211
    end
212

213
    -- Trick for performance.
214
    --
215
    -- No need to create merger, key_def and pass tuples over the
216
    -- merger, when we have only one tuple source.
217
    if #merger_sources == 1 then
35,833✔
218
        return merger_sources[1]
18,606✔
219
    end
220

221
    local keydef = Keydef.new(space, opts.field_names, index_id)
17,227✔
222
    -- When built-in merger is used with external keydef, `merger_lib.new(keydef)`
223
    -- fails. It's simply fixed by casting `keydef` to 'struct key_def&'.
224
    keydef = ffi.cast('struct key_def&', keydef)
17,227✔
225

226
    local merger = merger_lib.new(keydef, merger_sources, {
34,454✔
227
        reverse = reverse_tarantool_iters[opts.tarantool_iter],
17,227✔
228
    })
229

230
    return merger
17,227✔
231
end
232

233
return {
404✔
234
    new = new,
404✔
235
}
404✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc