• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 21364862659

26 Jan 2026 04:13PM UTC coverage: 73.492% (-15.0%) from 88.463%
21364862659

push

github

web-flow
Merge f981517ee into a84e19f3e

4253 of 5787 relevant lines covered (73.49%)

55.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.57
/crud/select/compat/select.lua
1
local checks = require('checks')
19✔
2
local errors = require('errors')
19✔
3

4
local fiber = require('fiber')
19✔
5
local const = require('crud.common.const')
19✔
6
local utils = require('crud.common.utils')
19✔
7
local sharding = require('crud.common.sharding')
19✔
8
local dev_checks = require('crud.common.dev_checks')
19✔
9
local common = require('crud.select.compat.common')
19✔
10
local schema = require('crud.common.schema')
19✔
11
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
19✔
12
local stats = require('crud.stats')
19✔
13

14
local compare_conditions = require('crud.compare.conditions')
19✔
15
local select_plan = require('crud.compare.plan')
19✔
16

17
local Merger = require('crud.select.merger')
19✔
18

19
local SelectError = errors.new_class('SelectError')
19✔
20

21
local select_module = {}
19✔
22

23
local function build_select_iterator(vshard_router, space_name, user_conditions, opts)
24
    dev_checks('table', 'string', '?table', {
18✔
25
        after = '?table|cdata',
26
        first = '?number',
27
        batch_size = '?number',
28
        bucket_id = '?',
29
        force_map_call = '?boolean',
30
        field_names = '?table',
31
        yield_every = '?number',
32
        call_opts = 'table',
33
        readview = '?boolean',
34
        readview_info = '?table',
35
    })
36

37
    opts = opts or {}
18✔
38

39
    if opts.batch_size ~= nil and opts.batch_size < 1 then
18✔
40
        return nil, SelectError:new("batch_size should be > 0")
×
41
    end
42

43
    if opts.yield_every ~= nil and opts.yield_every < 1 then
18✔
44
        return nil, SelectError:new("yield_every should be > 0")
×
45
    end
46

47
    local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
18✔
48

49
    -- check conditions
50
    local conditions, err = compare_conditions.parse(user_conditions)
18✔
51
    if err ~= nil then
18✔
52
        return nil, SelectError:new("Failed to parse conditions: %s", err)
×
53
    end
54

55
    local space, err, netbox_schema_version  = utils.get_space(space_name, vshard_router)
18✔
56
    if err ~= nil then
18✔
57
        return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
58
    end
59
    if space == nil then
18✔
60
        return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
16✔
61
    end
62

63
    local sharding_key_data = {}
10✔
64
    local sharding_func_hash = nil
10✔
65
    local skip_sharding_hash_check = nil
66

67
    -- We don't need sharding info if bucket_id specified.
68
    if opts.bucket_id == nil then
10✔
69
        sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
20✔
70
        if err ~= nil then
10✔
71
            return nil, err
×
72
        end
73
    else
74
        skip_sharding_hash_check = true
×
75
    end
76

77
    -- plan select
78
    local plan, err = select_plan.new(space, conditions, {
20✔
79
        first = opts.first,
10✔
80
        after_tuple = opts.after,
10✔
81
        field_names = opts.field_names,
10✔
82
        sharding_key_as_index_obj = sharding_key_data.value,
10✔
83
        bucket_id = opts.bucket_id,
10✔
84
    })
85

86
    if err ~= nil then
10✔
87
        return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD
×
88
    end
89

90
    -- set replicasets to select from
91
    local replicasets_to_select, err = vshard_router:routeall()
10✔
92
    if err ~= nil then
10✔
93
        return nil, SelectError:new("Failed to get router replicasets: %s", err)
×
94
    end
95

96
    -- Whether to call one storage replicaset or perform
97
    -- map-reduce?
98
    --
99
    -- If map-reduce is requested explicitly, ignore provided
100
    -- bucket_id and fetch data from all storage replicasets.
101
    --
102
    -- Otherwise:
103
    --
104
    -- 1. If particular replicaset is pointed by a caller (using
105
    --    the bucket_id option[^1]), crud MUST fetch data only
106
    --    from this storage replicaset: disregarding whether other
107
    --    storages have tuples that fit given condition.
108
    --
109
    -- 2. If a replicaset may be deduced from conditions
110
    --    (conditions -> sharding key -> bucket id -> replicaset),
111
    --    fetch data only from the replicaset. It does not change
112
    --    the result[^2], but significantly reduces network
113
    --    pressure.
114
    --
115
    -- 3. Fallback to map-reduce otherwise.
116
    --
117
    -- [^1]: We can change meaning of this option in a future,
118
    --       see gh-190. But now bucket_id points a storage
119
    --       replicaset, not a virtual bucket.
120
    --
121
    -- [^2]: It is correct statement only if we'll turn a blind
122
    --       eye to resharding. However, AFAIU, the optimization
123
    --       does not make the result less consistent (sounds
124
    --       weird, huh?).
125
    local perform_map_reduce = opts.force_map_call == true or
10✔
126
        (opts.bucket_id == nil and plan.sharding_key == nil)
10✔
127
    if not perform_map_reduce then
10✔
128
        local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name,
16✔
129
                                                               plan.sharding_key, opts.bucket_id)
8✔
130
        if err ~= nil then
8✔
131
            return nil, err
×
132
        end
133

134
        assert(bucket_id_data.bucket_id ~= nil)
8✔
135

136
        local err
137
        replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id)
16✔
138
        if err ~= nil then
8✔
139
            return nil, err, const.NEED_SCHEMA_RELOAD
×
140
        end
141

142
        sharding_func_hash = bucket_id_data.sharding_func_hash
8✔
143
    else
144
        stats.update_map_reduces(space_name)
2✔
145
        skip_sharding_hash_check = true
2✔
146
    end
147

148
    local tuples_limit = opts.first
10✔
149
    if tuples_limit ~= nil then
10✔
150
        tuples_limit = math.abs(tuples_limit)
×
151
    end
152

153
    -- If opts.batch_size is missed we should specify it to min(tuples_limit, DEFAULT_BATCH_SIZE)
154
    local batch_size
155
    if opts.batch_size == nil then
10✔
156
        if tuples_limit ~= nil and tuples_limit < common.DEFAULT_BATCH_SIZE then
10✔
157
            batch_size = tuples_limit
×
158
        else
159
            batch_size = common.DEFAULT_BATCH_SIZE
10✔
160
        end
161
    else
162
        batch_size = opts.batch_size
×
163
    end
164

165
    local select_opts = {
10✔
166
        scan_value = plan.scan_value,
10✔
167
        after_tuple = plan.after_tuple,
10✔
168
        tarantool_iter = plan.tarantool_iter,
10✔
169
        limit = batch_size,
10✔
170
        scan_condition_num = plan.scan_condition_num,
10✔
171
        field_names = plan.field_names,
10✔
172
        sharding_func_hash = sharding_func_hash,
10✔
173
        sharding_key_hash = sharding_key_data.hash,
10✔
174
        skip_sharding_hash_check = skip_sharding_hash_check,
10✔
175
        yield_every = yield_every,
10✔
176
        fetch_latest_metadata = opts.call_opts.fetch_latest_metadata,
10✔
177
    }
178
    local merger
179

180
    if opts.readview then
10✔
181
        merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_info,
182
        space, plan.index_id, common.READVIEW_SELECT_FUNC_NAME,
×
183
        {space_name, plan.index_id, plan.conditions, select_opts},
×
184
        {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
×
185
    )
186
    else
187
        merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id,
20✔
188
                common.SELECT_FUNC_NAME,
10✔
189
                {space_name, plan.index_id, plan.conditions, select_opts},
10✔
190
                {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
10✔
191
            )
10✔
192
    end
193
    return {
10✔
194
        tuples_limit = tuples_limit,
10✔
195
        merger = merger,
10✔
196
        plan = plan,
10✔
197
        space = space,
10✔
198
        netbox_schema_version = netbox_schema_version,
10✔
199
    }
10✔
200
end
201

202
function select_module.pairs(space_name, user_conditions, opts)
19✔
203
    checks('string', '?table', {
10✔
204
        after = '?table|cdata',
205
        first = '?number',
206
        batch_size = '?number',
207
        use_tomap = '?boolean',
208
        bucket_id = '?',
209
        force_map_call = '?boolean',
210
        fields = '?table',
211
        fetch_latest_metadata = '?boolean',
212

213
        mode = '?vshard_call_mode',
214
        prefer_replica = '?boolean',
215
        balance = '?boolean',
216
        timeout = '?number',
217
        request_timeout = '?number',
218
        readview = '?boolean',
219
        readview_info = '?table',
220

221
        vshard_router = '?string|table',
222

223
        yield_every = '?number',
224
    })
225

226
    opts = opts or {}
10✔
227

228
    if opts.bucket_id ~= nil then
10✔
229
        local err = sharding.validate_bucket_id(opts.bucket_id)
×
230
        if err ~= nil then
×
231
            return error(err)
×
232
        end
233
    end
234

235
    if opts.readview == true then
10✔
236
        if opts.mode ~= nil then
×
237
            return nil, SelectError:new("Readview does not support 'mode' option")
×
238
        end
239

240
        if opts.prefer_replica ~= nil then
×
241
            return nil, SelectError:new("Readview does not support 'prefer_replica' option")
×
242
        end
243

244
        if opts.balance ~= nil then
×
245
            return nil, SelectError:new("Readview does not support 'balance' option")
×
246
        end
247

248
        if opts.vshard_router ~= nil then
×
249
            return nil, SelectError:new("Readview does not support 'vshard_router' option")
×
250
        end
251

252
        if opts.request_timeout ~= nil then
×
253
            return nil, SelectError:new("Readview does not support 'request_timeout' option")
×
254
        end
255
    end
256

257
    if opts.first ~= nil and opts.first < 0 then
10✔
258
        error(string.format("Negative first isn't allowed for pairs"))
×
259
    end
260

261
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
10✔
262
    if err ~= nil then
10✔
263
        error(err)
4✔
264
    end
265

266
    local iterator_opts = {
6✔
267
        after = opts.after,
6✔
268
        first = opts.first,
6✔
269
        batch_size = opts.batch_size,
6✔
270
        bucket_id = opts.bucket_id,
6✔
271
        force_map_call = opts.force_map_call,
6✔
272
        field_names = opts.fields,
6✔
273
        yield_every = opts.yield_every,
6✔
274
        call_opts = {
6✔
275
            mode = opts.mode,
6✔
276
            prefer_replica = opts.prefer_replica,
6✔
277
            balance = opts.balance,
6✔
278
            timeout = opts.timeout,
6✔
279
            request_timeout = (opts.mode == 'read' or opts.mode == nil) and
6✔
280
                              opts.request_timeout or nil,
6✔
281
            fetch_latest_metadata = opts.fetch_latest_metadata,
6✔
282
        },
6✔
283
        readview = opts.readview,
6✔
284
        readview_info = opts.readview_info,
6✔
285
    }
286

287
    local iter, err = schema.wrap_func_reload(
12✔
288
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
6✔
289
    )
6✔
290

291
    if err ~= nil then
6✔
292
        error(string.format("Failed to generate iterator: %s", err))
2✔
293
    end
294

295
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
296
    -- to pass it user as metadata
297
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
8✔
298
    if err ~= nil then
4✔
299
        return nil, err
×
300
    end
301

302
    local gen, param, state = iter.merger:pairs()
8✔
303
    if opts.use_tomap == true then
4✔
304
        gen, param, state = gen:map(function(tuple)
×
305
            if opts.fetch_latest_metadata then
×
306
                -- This option is temporary and is related to [1], [2].
307
                -- [1] https://github.com/tarantool/crud/issues/236
308
                -- [2] https://github.com/tarantool/crud/issues/361
309
                local storages_info = fiber.self().storage.storages_info_on_select
×
310
                iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
311
                                                              storages_info, iter)
×
312
                filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
×
313
                if err ~= nil then
×
314
                    return nil, err
×
315
                end
316
            end
317
            local result
318
            result, err = utils.unflatten(tuple, filtered_space_format)
×
319
            if err ~= nil then
×
320
                error(string.format("Failed to unflatten next object: %s", err))
×
321
            end
322
            return result
×
323
        end)
324
    end
325

326
    if iter.tuples_limit ~= nil then
4✔
327
        gen, param, state = gen:take_n(iter.tuples_limit)
×
328
    end
329

330
    return gen, param, state
4✔
331
end
332

333
local function select_module_call_xc(vshard_router, space_name, user_conditions, opts)
334
    checks('table', 'string', '?table', {
8✔
335
        after = '?table|cdata',
336
        first = '?number',
337
        batch_size = '?number',
338
        bucket_id = '?',
339
        force_map_call = '?boolean',
340
        fields = '?table',
341
        fullscan = '?boolean',
342
        fetch_latest_metadata = '?boolean',
343

344
        mode = '?vshard_call_mode',
345
        prefer_replica = '?boolean',
346
        balance = '?boolean',
347
        timeout = '?number',
348
        request_timeout = '?number',
349
        readview = '?boolean',
350
        readview_info = '?table',
351

352
        vshard_router = '?string|table',
353

354
        yield_every = '?number',
355
    })
356

357
    if opts.readview == true then
8✔
358
        if opts.mode ~= nil then
×
359
            return nil, SelectError:new("Readview does not support 'mode' option")
×
360
        end
361

362
        if opts.prefer_replica ~= nil then
×
363
            return nil, SelectError:new("Readview does not support 'prefer_replica' option")
×
364
        end
365

366
        if opts.balance ~= nil then
×
367
            return nil, SelectError:new("Readview does not support 'balance' option")
×
368
        end
369

370
        if opts.vshard_router ~= nil then
×
371
            return nil, SelectError:new("Readview does not support 'vshard_router' option")
×
372
        end
373

374
        if opts.request_timeout ~= nil then
×
375
            return nil, SelectError:new("Readview does not support 'request_timeout' option")
×
376
        end
377
    end
378

379
    if opts.first ~= nil and opts.first < 0 then
8✔
380
        if opts.after == nil then
×
381
            return nil, SelectError:new("Negative first should be specified only with after option")
×
382
        end
383
    end
384

385
    local iterator_opts = {
8✔
386
        after = opts.after,
8✔
387
        first = opts.first,
8✔
388
        batch_size = opts.batch_size,
8✔
389
        bucket_id = opts.bucket_id,
8✔
390
        force_map_call = opts.force_map_call,
8✔
391
        field_names = opts.fields,
8✔
392
        yield_every = opts.yield_every,
8✔
393
        call_opts = {
8✔
394
            mode = opts.mode,
8✔
395
            prefer_replica = opts.prefer_replica,
8✔
396
            balance = opts.balance,
8✔
397
            timeout = opts.timeout,
8✔
398
            request_timeout = (opts.mode == 'read' or opts.mode == nil) and
8✔
399
                              opts.request_timeout or nil,
8✔
400
            fetch_latest_metadata = opts.fetch_latest_metadata,
8✔
401
        },
8✔
402
        readview = opts.readview,
8✔
403
        readview_info = opts.readview_info,
8✔
404
    }
405

406
    local iter, err = schema.wrap_func_reload(
16✔
407
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
8✔
408
    )
8✔
409

410
    if err ~= nil then
8✔
411
        return nil, err
2✔
412
    end
413
    common.check_select_safety(space_name, iter.plan, opts)
6✔
414

415
    local tuples = {}
6✔
416

417
    local count = 0
6✔
418
    local first = opts.first and math.abs(opts.first)
6✔
419
    for _, tuple in iter.merger:pairs() do
534✔
420
        if first ~= nil and count >= first then
258✔
421
            break
422
        end
423

424
        table.insert(tuples, tuple)
258✔
425
        count = count + 1
258✔
426
    end
427

428
    if opts.first ~= nil and opts.first < 0 then
6✔
429
        utils.reverse_inplace(tuples)
×
430
    end
431

432
    if opts.fetch_latest_metadata then
6✔
433
        -- This option is temporary and is related to [1], [2].
434
        -- [1] https://github.com/tarantool/crud/issues/236
435
        -- [2] https://github.com/tarantool/crud/issues/361
436
        local storages_info = fiber.self().storage.storages_info_on_select
×
437
        iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
438
                                                      storages_info, iter)
×
439
    end
440

441
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
442
    -- to pass it user as metadata
443
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
12✔
444
    if err ~= nil then
6✔
445
        return nil, err
×
446
    end
447

448
    return {
6✔
449
        metadata = table.copy(filtered_space_format),
12✔
450
        rows = tuples,
6✔
451
    }
6✔
452
end
453

454
function select_module.call(space_name, user_conditions, opts)
19✔
455
    opts = opts or {}
12✔
456

457
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
12✔
458
    if err ~= nil then
12✔
459
        return nil, SelectError:new(err)
8✔
460
    end
461

462
    return SelectError:pcall(sharding.wrap_select_method, vshard_router, select_module_call_xc,
8✔
463
                             space_name, user_conditions, opts)
8✔
464
end
465

466
return select_module
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc