• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 5199459215

pending completion
5199459215

push

github

DifferentialOrange
Release 1.2.0

Overview

  This release add two new flags: `noreturn` to ignore return values
  excessive transfer and encoding/decoding for insert/replace/etc
  (performance improvement up to 10% for batch requests) and
  `fetch_latest_metadata` to force fetching latest space format metadata
  right after a live migration (performance overhead may be up to 15%).

New features
  * Add `noreturn` option for operations:
    `insert`, `insert_object`, `insert_many`, `insert_object_many`,
    `replace`, `replace_object`, `replace_many`, `insert_object_many`,
    `upsert`, `upsert_object`, `upsert_many`, `upsert_object_many`,
    `update`, `delete` (#267).

Bugfixes
  * Crud DML operations returning stale schema for metadata generation.
    Now you may use `fetch_latest_metadata` flag to work with latest
    schema (#236).

1 of 1 new or added line in 1 file covered. (100.0%)

4549 of 4888 relevant lines covered (93.06%)

18261.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.41
/crud/select/compat/select.lua
1
local checks = require('checks')
404✔
2
local errors = require('errors')
404✔
3

4
local fiber = require('fiber')
404✔
5
local const = require('crud.common.const')
404✔
6
local utils = require('crud.common.utils')
404✔
7
local sharding = require('crud.common.sharding')
404✔
8
local dev_checks = require('crud.common.dev_checks')
404✔
9
local common = require('crud.select.compat.common')
404✔
10
local schema = require('crud.common.schema')
404✔
11
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
404✔
12
local stats = require('crud.stats')
404✔
13

14
local compare_conditions = require('crud.compare.conditions')
404✔
15
local select_plan = require('crud.compare.plan')
404✔
16

17
local Merger = require('crud.select.merger')
404✔
18

19
local SelectError = errors.new_class('SelectError')
404✔
20

21
local select_module = {}
404✔
22

23
local function build_select_iterator(vshard_router, space_name, user_conditions, opts)
24
    dev_checks('table', 'string', '?table', {
35,899✔
25
        after = '?table|cdata',
26
        first = '?number',
27
        batch_size = '?number',
28
        bucket_id = '?number|cdata',
29
        force_map_call = '?boolean',
30
        field_names = '?table',
31
        yield_every = '?number',
32
        call_opts = 'table',
33
    })
34

35
    opts = opts or {}
35,899✔
36

37
    if opts.batch_size ~= nil and opts.batch_size < 1 then
35,899✔
38
        return nil, SelectError:new("batch_size should be > 0")
×
39
    end
40

41
    if opts.yield_every ~= nil and opts.yield_every < 1 then
35,899✔
42
        return nil, SelectError:new("yield_every should be > 0")
8✔
43
    end
44

45
    local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
35,895✔
46

47
    -- check conditions
48
    local conditions, err = compare_conditions.parse(user_conditions)
35,895✔
49
    if err ~= nil then
35,895✔
50
        return nil, SelectError:new("Failed to parse conditions: %s", err)
60✔
51
    end
52

53
    local space, err, netbox_schema_version  = utils.get_space(space_name, vshard_router)
35,865✔
54
    if err ~= nil then
35,865✔
55
        return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
56
    end
57
    if space == nil then
35,865✔
58
        return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
56✔
59
    end
60

61
    local sharding_key_data = {}
35,837✔
62
    local sharding_func_hash = nil
35,837✔
63
    local skip_sharding_hash_check = nil
64

65
    -- We don't need sharding info if bucket_id specified.
66
    if opts.bucket_id == nil then
35,837✔
67
        sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
63,520✔
68
        if err ~= nil then
31,760✔
69
            return nil, err
×
70
        end
71
    else
72
        skip_sharding_hash_check = true
4,077✔
73
    end
74

75
    -- plan select
76
    local plan, err = select_plan.new(space, conditions, {
71,674✔
77
        first = opts.first,
35,837✔
78
        after_tuple = opts.after,
35,837✔
79
        field_names = opts.field_names,
35,837✔
80
        sharding_key_as_index_obj = sharding_key_data.value,
35,837✔
81
        bucket_id = opts.bucket_id,
35,837✔
82
    })
83

84
    if err ~= nil then
35,837✔
85
        return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD
8✔
86
    end
87

88
    -- set replicasets to select from
89
    local replicasets_to_select, err = vshard_router:routeall()
35,833✔
90
    if err ~= nil then
35,833✔
91
        return nil, SelectError:new("Failed to get router replicasets: %s", err)
×
92
    end
93

94
    -- Whether to call one storage replicaset or perform
95
    -- map-reduce?
96
    --
97
    -- If map-reduce is requested explicitly, ignore provided
98
    -- bucket_id and fetch data from all storage replicasets.
99
    --
100
    -- Otherwise:
101
    --
102
    -- 1. If particular replicaset is pointed by a caller (using
103
    --    the bucket_id option[^1]), crud MUST fetch data only
104
    --    from this storage replicaset: disregarding whether other
105
    --    storages have tuples that fit given condition.
106
    --
107
    -- 2. If a replicaset may be deduced from conditions
108
    --    (conditions -> sharding key -> bucket id -> replicaset),
109
    --    fetch data only from the replicaset. It does not change
110
    --    the result[^2], but significantly reduces network
111
    --    pressure.
112
    --
113
    -- 3. Fallback to map-reduce otherwise.
114
    --
115
    -- [^1]: We can change meaning of this option in a future,
116
    --       see gh-190. But now bucket_id points a storage
117
    --       replicaset, not a virtual bucket.
118
    --
119
    -- [^2]: It is correct statement only if we'll turn a blind
120
    --       eye to resharding. However, AFAIU, the optimization
121
    --       does not make the result less consistent (sounds
122
    --       weird, huh?).
123
    local perform_map_reduce = opts.force_map_call == true or
35,833✔
124
        (opts.bucket_id == nil and plan.sharding_key == nil)
35,833✔
125
    if not perform_map_reduce then
35,833✔
126
        local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name,
37,212✔
127
                                                               plan.sharding_key, opts.bucket_id)
18,606✔
128
        if err ~= nil then
18,606✔
129
            return nil, err
×
130
        end
131

132
        assert(bucket_id_data.bucket_id ~= nil)
18,606✔
133

134
        local err
135
        replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id)
37,212✔
136
        if err ~= nil then
18,606✔
137
            return nil, err, const.NEED_SCHEMA_RELOAD
×
138
        end
139

140
        sharding_func_hash = bucket_id_data.sharding_func_hash
18,606✔
141
    else
142
        stats.update_map_reduces(space_name)
17,227✔
143
        skip_sharding_hash_check = true
17,227✔
144
    end
145

146
    local tuples_limit = opts.first
35,833✔
147
    if tuples_limit ~= nil then
35,833✔
148
        tuples_limit = math.abs(tuples_limit)
23,708✔
149
    end
150

151
    -- If opts.batch_size is missed we should specify it to min(tuples_limit, DEFAULT_BATCH_SIZE)
152
    local batch_size
153
    if opts.batch_size == nil then
35,833✔
154
        if tuples_limit ~= nil and tuples_limit < common.DEFAULT_BATCH_SIZE then
26,096✔
155
            batch_size = tuples_limit
14,093✔
156
        else
157
            batch_size = common.DEFAULT_BATCH_SIZE
12,003✔
158
        end
159
    else
160
        batch_size = opts.batch_size
9,737✔
161
    end
162

163
    local select_opts = {
35,833✔
164
        scan_value = plan.scan_value,
35,833✔
165
        after_tuple = plan.after_tuple,
35,833✔
166
        tarantool_iter = plan.tarantool_iter,
35,833✔
167
        limit = batch_size,
35,833✔
168
        scan_condition_num = plan.scan_condition_num,
35,833✔
169
        field_names = plan.field_names,
35,833✔
170
        sharding_func_hash = sharding_func_hash,
35,833✔
171
        sharding_key_hash = sharding_key_data.hash,
35,833✔
172
        skip_sharding_hash_check = skip_sharding_hash_check,
35,833✔
173
        yield_every = yield_every,
35,833✔
174
        fetch_latest_metadata = opts.call_opts.fetch_latest_metadata,
35,833✔
175
    }
176

177
    local merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id,
71,666✔
178
            common.SELECT_FUNC_NAME,
35,833✔
179
            {space_name, plan.index_id, plan.conditions, select_opts},
35,833✔
180
            {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
35,833✔
181
        )
182

183
    return {
35,833✔
184
        tuples_limit = tuples_limit,
35,833✔
185
        merger = merger,
35,833✔
186
        plan = plan,
35,833✔
187
        space = space,
35,833✔
188
        netbox_schema_version = netbox_schema_version,
35,833✔
189
    }
35,833✔
190
end
191

192
function select_module.pairs(space_name, user_conditions, opts)
404✔
193
    checks('string', '?table', {
9,851✔
194
        after = '?table|cdata',
195
        first = '?number',
196
        batch_size = '?number',
197
        use_tomap = '?boolean',
198
        bucket_id = '?number|cdata',
199
        force_map_call = '?boolean',
200
        fields = '?table',
201
        fetch_latest_metadata = '?boolean',
202

203
        mode = '?vshard_call_mode',
204
        prefer_replica = '?boolean',
205
        balance = '?boolean',
206
        timeout = '?number',
207

208
        vshard_router = '?string|table',
209

210
        yield_every = '?number',
211
    })
212

213
    opts = opts or {}
9,851✔
214

215
    if opts.first ~= nil and opts.first < 0 then
9,851✔
216
        error(string.format("Negative first isn't allowed for pairs"))
2✔
217
    end
218

219
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
9,849✔
220
    if err ~= nil then
9,849✔
221
        error(err)
8✔
222
    end
223

224
    local iterator_opts = {
9,841✔
225
        after = opts.after,
9,841✔
226
        first = opts.first,
9,841✔
227
        batch_size = opts.batch_size,
9,841✔
228
        bucket_id = opts.bucket_id,
9,841✔
229
        force_map_call = opts.force_map_call,
9,841✔
230
        field_names = opts.fields,
9,841✔
231
        yield_every = opts.yield_every,
9,841✔
232
        call_opts = {
9,841✔
233
            mode = opts.mode,
9,841✔
234
            prefer_replica = opts.prefer_replica,
9,841✔
235
            balance = opts.balance,
9,841✔
236
            timeout = opts.timeout,
9,841✔
237
            fetch_latest_metadata = opts.fetch_latest_metadata,
9,841✔
238
        },
9,841✔
239
    }
240

241
    local iter, err = schema.wrap_func_reload(
19,682✔
242
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
9,841✔
243
    )
9,841✔
244

245
    if err ~= nil then
9,841✔
246
        error(string.format("Failed to generate iterator: %s", err))
36✔
247
    end
248

249
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
250
    -- to pass it user as metadata
251
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
19,610✔
252
    if err ~= nil then
9,805✔
253
        return nil, err
×
254
    end
255

256
    local gen, param, state = iter.merger:pairs()
19,610✔
257
    if opts.use_tomap == true then
9,805✔
258
        gen, param, state = gen:map(function(tuple)
84✔
259
            if opts.fetch_latest_metadata then
88✔
260
                -- This option is temporary and is related to [1], [2].
261
                -- [1] https://github.com/tarantool/crud/issues/236
262
                -- [2] https://github.com/tarantool/crud/issues/361
263
                local storages_info = fiber.self().storage.storages_info_on_select
2✔
264
                iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
4✔
265
                                                              storages_info, iter)
4✔
266
                filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
6✔
267
                if err ~= nil then
2✔
268
                    return nil, err
×
269
                end
270
            end
271
            local result
272
            result, err = utils.unflatten(tuple, filtered_space_format)
176✔
273
            if err ~= nil then
88✔
274
                error(string.format("Failed to unflatten next object: %s", err))
×
275
            end
276
            return result
88✔
277
        end)
278
    end
279

280
    if iter.tuples_limit ~= nil then
9,805✔
281
        gen, param, state = gen:take_n(iter.tuples_limit)
19,182✔
282
    end
283

284
    return gen, param, state
9,805✔
285
end
286

287
local function select_module_call_xc(vshard_router, space_name, user_conditions, opts)
288
    checks('table', 'string', '?table', {
26,042✔
289
        after = '?table|cdata',
290
        first = '?number',
291
        batch_size = '?number',
292
        bucket_id = '?number|cdata',
293
        force_map_call = '?boolean',
294
        fields = '?table',
295
        fullscan = '?boolean',
296
        fetch_latest_metadata = '?boolean',
297

298
        mode = '?vshard_call_mode',
299
        prefer_replica = '?boolean',
300
        balance = '?boolean',
301
        timeout = '?number',
302

303
        vshard_router = '?string|table',
304

305
        yield_every = '?number',
306
    })
307

308
    if opts.first ~= nil and opts.first < 0 then
26,042✔
309
        if opts.after == nil then
44✔
310
            return nil, SelectError:new("Negative first should be specified only with after option")
4✔
311
        end
312
    end
313

314
    local iterator_opts = {
26,040✔
315
        after = opts.after,
26,040✔
316
        first = opts.first,
26,040✔
317
        batch_size = opts.batch_size,
26,040✔
318
        bucket_id = opts.bucket_id,
26,040✔
319
        force_map_call = opts.force_map_call,
26,040✔
320
        field_names = opts.fields,
26,040✔
321
        yield_every = opts.yield_every,
26,040✔
322
        call_opts = {
26,040✔
323
            mode = opts.mode,
26,040✔
324
            prefer_replica = opts.prefer_replica,
26,040✔
325
            balance = opts.balance,
26,040✔
326
            timeout = opts.timeout,
26,040✔
327
            fetch_latest_metadata = opts.fetch_latest_metadata,
26,040✔
328
        },
26,040✔
329
    }
330

331
    local iter, err = schema.wrap_func_reload(
52,080✔
332
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
26,040✔
333
    )
26,040✔
334

335
    if err ~= nil then
26,040✔
336
        return nil, err
12✔
337
    end
338
    common.check_select_safety(space_name, iter.plan, opts)
26,028✔
339

340
    local tuples = {}
26,028✔
341

342
    local count = 0
26,028✔
343
    local first = opts.first and math.abs(opts.first)
26,028✔
344
    for _, tuple in iter.merger:pairs() do
149,598✔
345
        if first ~= nil and count >= first then
41,412✔
346
            break
5,635✔
347
        end
348

349
        table.insert(tuples, tuple)
35,777✔
350
        count = count + 1
35,777✔
351
    end
352

353
    if opts.first ~= nil and opts.first < 0 then
25,988✔
354
        utils.reverse_inplace(tuples)
42✔
355
    end
356

357
    if opts.fetch_latest_metadata then
25,988✔
358
        -- This option is temporary and is related to [1], [2].
359
        -- [1] https://github.com/tarantool/crud/issues/236
360
        -- [2] https://github.com/tarantool/crud/issues/361
361
        local storages_info = fiber.self().storage.storages_info_on_select
2✔
362
        iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
4✔
363
                                                      storages_info, iter)
4✔
364
    end
365

366
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
367
    -- to pass it user as metadata
368
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
51,976✔
369
    if err ~= nil then
25,988✔
370
        return nil, err
2✔
371
    end
372

373
    return {
25,986✔
374
        metadata = table.copy(filtered_space_format),
51,972✔
375
        rows = tuples,
25,986✔
376
    }
25,986✔
377
end
378

379
function select_module.call(space_name, user_conditions, opts)
404✔
380
    opts = opts or {}
26,046✔
381

382
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
26,046✔
383
    if err ~= nil then
26,046✔
384
        return nil, SelectError:new(err)
16✔
385
    end
386

387
    return SelectError:pcall(sharding.wrap_select_method, vshard_router, select_module_call_xc,
26,038✔
388
                             space_name, user_conditions, opts)
26,038✔
389
end
390

391
return select_module
404✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc