• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 6309942558

26 Sep 2023 08:16AM UTC coverage: 89.654% (-3.4%) from 93.065%
6309942558

Pull #372

github

better0fdead
test
Pull Request #372: crud: add readview support

233 of 233 new or added lines in 5 files covered. (100.0%)

4584 of 5113 relevant lines covered (89.65%)

16580.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.44
/crud/select/compat/select.lua
1
local checks = require('checks')
409✔
2
local errors = require('errors')
409✔
3

4
local fiber = require('fiber')
409✔
5
local const = require('crud.common.const')
409✔
6
local utils = require('crud.common.utils')
409✔
7
local sharding = require('crud.common.sharding')
409✔
8
local dev_checks = require('crud.common.dev_checks')
409✔
9
local common = require('crud.select.compat.common')
409✔
10
local schema = require('crud.common.schema')
409✔
11
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
409✔
12
local stats = require('crud.stats')
409✔
13

14
local compare_conditions = require('crud.compare.conditions')
409✔
15
local select_plan = require('crud.compare.plan')
409✔
16

17
local Merger = require('crud.select.merger')
409✔
18

19
local SelectError = errors.new_class('SelectError')
409✔
20

21
local select_module = {}
409✔
22

23
local function build_select_iterator(vshard_router, space_name, user_conditions, opts)
24
    dev_checks('table', 'string', '?table', {
33,181✔
25
        after = '?table|cdata',
26
        first = '?number',
27
        batch_size = '?number',
28
        bucket_id = '?number|cdata',
29
        force_map_call = '?boolean',
30
        field_names = '?table',
31
        yield_every = '?number',
32
        call_opts = 'table',
33
        readview = '?boolean',
34
        readview_uuid = '?table',
35
    })
36

37
    opts = opts or {}
33,181✔
38

39
    if opts.batch_size ~= nil and opts.batch_size < 1 then
33,181✔
40
        return nil, SelectError:new("batch_size should be > 0")
×
41
    end
42

43
    if opts.yield_every ~= nil and opts.yield_every < 1 then
33,181✔
44
        return nil, SelectError:new("yield_every should be > 0")
8✔
45
    end
46

47
    local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
33,177✔
48

49
    -- check conditions
50
    local conditions, err = compare_conditions.parse(user_conditions)
33,177✔
51
    if err ~= nil then
33,177✔
52
        return nil, SelectError:new("Failed to parse conditions: %s", err)
60✔
53
    end
54

55
    local space, err, netbox_schema_version  = utils.get_space(space_name, vshard_router)
33,147✔
56
    if err ~= nil then
33,147✔
57
        return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
58
    end
59
    if space == nil then
33,147✔
60
        return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
54✔
61
    end
62

63
    local sharding_key_data = {}
33,120✔
64
    local sharding_func_hash = nil
33,120✔
65
    local skip_sharding_hash_check = nil
66

67
    -- We don't need sharding info if bucket_id specified.
68
    if opts.bucket_id == nil then
33,120✔
69
        sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
59,058✔
70
        if err ~= nil then
29,529✔
71
            return nil, err
×
72
        end
73
    else
74
        skip_sharding_hash_check = true
3,591✔
75
    end
76

77
    -- plan select
78
    local plan, err = select_plan.new(space, conditions, {
66,240✔
79
        first = opts.first,
33,120✔
80
        after_tuple = opts.after,
33,120✔
81
        field_names = opts.field_names,
33,120✔
82
        sharding_key_as_index_obj = sharding_key_data.value,
33,120✔
83
        bucket_id = opts.bucket_id,
33,120✔
84
    })
85

86
    if err ~= nil then
33,120✔
87
        return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD
8✔
88
    end
89

90
    -- set replicasets to select from
91
    local replicasets_to_select, err = vshard_router:routeall()
33,116✔
92
    if err ~= nil then
33,116✔
93
        return nil, SelectError:new("Failed to get router replicasets: %s", err)
×
94
    end
95

96
    -- Whether to call one storage replicaset or perform
97
    -- map-reduce?
98
    --
99
    -- If map-reduce is requested explicitly, ignore provided
100
    -- bucket_id and fetch data from all storage replicasets.
101
    --
102
    -- Otherwise:
103
    --
104
    -- 1. If particular replicaset is pointed by a caller (using
105
    --    the bucket_id option[^1]), crud MUST fetch data only
106
    --    from this storage replicaset: disregarding whether other
107
    --    storages have tuples that fit given condition.
108
    --
109
    -- 2. If a replicaset may be deduced from conditions
110
    --    (conditions -> sharding key -> bucket id -> replicaset),
111
    --    fetch data only from the replicaset. It does not change
112
    --    the result[^2], but significantly reduces network
113
    --    pressure.
114
    --
115
    -- 3. Fallback to map-reduce otherwise.
116
    --
117
    -- [^1]: We can change meaning of this option in a future,
118
    --       see gh-190. But now bucket_id points a storage
119
    --       replicaset, not a virtual bucket.
120
    --
121
    -- [^2]: It is correct statement only if we'll turn a blind
122
    --       eye to resharding. However, AFAIU, the optimization
123
    --       does not make the result less consistent (sounds
124
    --       weird, huh?).
125
    local perform_map_reduce = opts.force_map_call == true or
33,116✔
126
        (opts.bucket_id == nil and plan.sharding_key == nil)
33,116✔
127
    if not perform_map_reduce then
33,116✔
128
        local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name,
34,686✔
129
                                                               plan.sharding_key, opts.bucket_id)
17,343✔
130
        if err ~= nil then
17,343✔
131
            return nil, err
×
132
        end
133

134
        assert(bucket_id_data.bucket_id ~= nil)
17,343✔
135

136
        local err
137
        replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id)
34,686✔
138
        if err ~= nil then
17,343✔
139
            return nil, err, const.NEED_SCHEMA_RELOAD
×
140
        end
141

142
        sharding_func_hash = bucket_id_data.sharding_func_hash
17,343✔
143
    else
144
        stats.update_map_reduces(space_name)
15,773✔
145
        skip_sharding_hash_check = true
15,773✔
146
    end
147

148
    local tuples_limit = opts.first
33,116✔
149
    if tuples_limit ~= nil then
33,116✔
150
        tuples_limit = math.abs(tuples_limit)
21,763✔
151
    end
152

153
    -- If opts.batch_size is missed we should specify it to min(tuples_limit, DEFAULT_BATCH_SIZE)
154
    local batch_size
155
    if opts.batch_size == nil then
33,116✔
156
        if tuples_limit ~= nil and tuples_limit < common.DEFAULT_BATCH_SIZE then
24,136✔
157
            batch_size = tuples_limit
12,905✔
158
        else
159
            batch_size = common.DEFAULT_BATCH_SIZE
11,231✔
160
        end
161
    else
162
        batch_size = opts.batch_size
8,980✔
163
    end
164

165
    local select_opts = {
33,116✔
166
        scan_value = plan.scan_value,
33,116✔
167
        after_tuple = plan.after_tuple,
33,116✔
168
        tarantool_iter = plan.tarantool_iter,
33,116✔
169
        limit = batch_size,
33,116✔
170
        scan_condition_num = plan.scan_condition_num,
33,116✔
171
        field_names = plan.field_names,
33,116✔
172
        sharding_func_hash = sharding_func_hash,
33,116✔
173
        sharding_key_hash = sharding_key_data.hash,
33,116✔
174
        skip_sharding_hash_check = skip_sharding_hash_check,
33,116✔
175
        yield_every = yield_every,
33,116✔
176
        fetch_latest_metadata = opts.call_opts.fetch_latest_metadata,
33,116✔
177
    }
178
    local merger
179

180
    if opts.readview then
33,116✔
181
        merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_uuid,
182
        space, plan.index_id, '_crud.select_readview_on_storage',
×
183
        {space_name, plan.index_id, plan.conditions, select_opts},
×
184
        {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
×
185
    )
186
    else
187
        merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id,
66,232✔
188
                common.SELECT_FUNC_NAME,
33,116✔
189
                {space_name, plan.index_id, plan.conditions, select_opts},
33,116✔
190
                {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts}
33,116✔
191
            )
33,116✔
192
    end
193
    return {
33,116✔
194
        tuples_limit = tuples_limit,
33,116✔
195
        merger = merger,
33,116✔
196
        plan = plan,
33,116✔
197
        space = space,
33,116✔
198
        netbox_schema_version = netbox_schema_version,
33,116✔
199
    }
33,116✔
200
end
201

202
function select_module.pairs(space_name, user_conditions, opts)
409✔
203
    checks('string', '?table', {
9,094✔
204
        after = '?table|cdata',
205
        first = '?number',
206
        batch_size = '?number',
207
        use_tomap = '?boolean',
208
        bucket_id = '?number|cdata',
209
        force_map_call = '?boolean',
210
        fields = '?table',
211
        fetch_latest_metadata = '?boolean',
212

213
        mode = '?vshard_call_mode',
214
        prefer_replica = '?boolean',
215
        balance = '?boolean',
216
        timeout = '?number',
217
        readview = '?boolean',
218
        readview_uuid = '?table',
219

220
        vshard_router = '?string|table',
221

222
        yield_every = '?number',
223
    })
224

225
    opts = opts or {}
9,094✔
226

227
    if opts.readview == true then
9,094✔
228
        if opts.mode ~= nil then
×
229
            return nil, SelectError:new("Readview does not support 'mode' option")
×
230
        end
231

232
        if opts.prefer_replica ~= nil then
×
233
            return nil, SelectError:new("Readview does not support 'prefer_replica' option")
×
234
        end
235

236
        if opts.balance ~= nil then
×
237
            return nil, SelectError:new("Readview does not support 'balance' option")
×
238
        end
239

240
        if opts.vshard_router ~= nil then
×
241
            return nil, SelectError:new("Readview does not support 'vshard_router' option")
×
242
        end
243
    end
244

245
    if opts.first ~= nil and opts.first < 0 then
9,094✔
246
        error(string.format("Negative first isn't allowed for pairs"))
2✔
247
    end
248

249
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
9,092✔
250
    if err ~= nil then
9,092✔
251
        error(err)
8✔
252
    end
253

254
    local iterator_opts = {
9,084✔
255
        after = opts.after,
9,084✔
256
        first = opts.first,
9,084✔
257
        batch_size = opts.batch_size,
9,084✔
258
        bucket_id = opts.bucket_id,
9,084✔
259
        force_map_call = opts.force_map_call,
9,084✔
260
        field_names = opts.fields,
9,084✔
261
        yield_every = opts.yield_every,
9,084✔
262
        call_opts = {
9,084✔
263
            mode = opts.mode,
9,084✔
264
            prefer_replica = opts.prefer_replica,
9,084✔
265
            balance = opts.balance,
9,084✔
266
            timeout = opts.timeout,
9,084✔
267
            fetch_latest_metadata = opts.fetch_latest_metadata,
9,084✔
268
        },
9,084✔
269
        readview = opts.readview,
9,084✔
270
        readview_uuid = opts.readview_uuid,
9,084✔
271
    }
272

273
    local iter, err = schema.wrap_func_reload(
18,168✔
274
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
9,084✔
275
    )
9,084✔
276

277
    if err ~= nil then
9,084✔
278
        error(string.format("Failed to generate iterator: %s", err))
36✔
279
    end
280

281
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
282
    -- to pass it user as metadata
283
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
18,096✔
284
    if err ~= nil then
9,048✔
285
        return nil, err
×
286
    end
287

288
    local gen, param, state = iter.merger:pairs()
18,096✔
289
    if opts.use_tomap == true then
9,048✔
290
        gen, param, state = gen:map(function(tuple)
84✔
291
            if opts.fetch_latest_metadata then
88✔
292
                -- This option is temporary and is related to [1], [2].
293
                -- [1] https://github.com/tarantool/crud/issues/236
294
                -- [2] https://github.com/tarantool/crud/issues/361
295
                local storages_info = fiber.self().storage.storages_info_on_select
2✔
296
                iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
4✔
297
                                                              storages_info, iter)
4✔
298
                filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
6✔
299
                if err ~= nil then
2✔
300
                    return nil, err
×
301
                end
302
            end
303
            local result
304
            result, err = utils.unflatten(tuple, filtered_space_format)
176✔
305
            if err ~= nil then
88✔
306
                error(string.format("Failed to unflatten next object: %s", err))
×
307
            end
308
            return result
88✔
309
        end)
310
    end
311

312
    if iter.tuples_limit ~= nil then
9,048✔
313
        gen, param, state = gen:take_n(iter.tuples_limit)
17,668✔
314
    end
315

316
    return gen, param, state
9,048✔
317
end
318

319
local function select_module_call_xc(vshard_router, space_name, user_conditions, opts)
320
    checks('table', 'string', '?table', {
24,082✔
321
        after = '?table|cdata',
322
        first = '?number',
323
        batch_size = '?number',
324
        bucket_id = '?number|cdata',
325
        force_map_call = '?boolean',
326
        fields = '?table',
327
        fullscan = '?boolean',
328
        fetch_latest_metadata = '?boolean',
329

330
        mode = '?vshard_call_mode',
331
        prefer_replica = '?boolean',
332
        balance = '?boolean',
333
        timeout = '?number',
334
        readview = '?boolean',
335
        readview_uuid = '?table',
336

337
        vshard_router = '?string|table',
338

339
        yield_every = '?number',
340
    })
341

342
    if opts.readview == true then
24,082✔
343
        if opts.mode ~= nil then
×
344
            return nil, SelectError:new("Readview does not support 'mode' option")
×
345
        end
346

347
        if opts.prefer_replica ~= nil then
×
348
            return nil, SelectError:new("Readview does not support 'prefer_replica' option")
×
349
        end
350

351
        if opts.balance ~= nil then
×
352
            return nil, SelectError:new("Readview does not support 'balance' option")
×
353
        end
354

355
        if opts.vshard_router ~= nil then
×
356
            return nil, SelectError:new("Readview does not support 'vshard_router' option")
×
357
        end
358
    end
359

360
    if opts.first ~= nil and opts.first < 0 then
24,082✔
361
        if opts.after == nil then
44✔
362
            return nil, SelectError:new("Negative first should be specified only with after option")
4✔
363
        end
364
    end
365

366
    local iterator_opts = {
24,080✔
367
        after = opts.after,
24,080✔
368
        first = opts.first,
24,080✔
369
        batch_size = opts.batch_size,
24,080✔
370
        bucket_id = opts.bucket_id,
24,080✔
371
        force_map_call = opts.force_map_call,
24,080✔
372
        field_names = opts.fields,
24,080✔
373
        yield_every = opts.yield_every,
24,080✔
374
        call_opts = {
24,080✔
375
            mode = opts.mode,
24,080✔
376
            prefer_replica = opts.prefer_replica,
24,080✔
377
            balance = opts.balance,
24,080✔
378
            timeout = opts.timeout,
24,080✔
379
            fetch_latest_metadata = opts.fetch_latest_metadata,
24,080✔
380
        },
24,080✔
381
        readview = opts.readview,
24,080✔
382
        readview_uuid = opts.readview_uuid,
24,080✔
383
    }
384

385
    local iter, err = schema.wrap_func_reload(
48,160✔
386
            vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts
24,080✔
387
    )
24,080✔
388

389
    if err ~= nil then
24,080✔
390
        return nil, err
12✔
391
    end
392
    common.check_select_safety(space_name, iter.plan, opts)
24,068✔
393

394
    local tuples = {}
24,068✔
395

396
    local count = 0
24,068✔
397
    local first = opts.first and math.abs(opts.first)
24,068✔
398
    for _, tuple in iter.merger:pairs() do
138,628✔
399
        if first ~= nil and count >= first then
38,432✔
400
            break
5,200✔
401
        end
402

403
        table.insert(tuples, tuple)
33,232✔
404
        count = count + 1
33,232✔
405
    end
406

407
    if opts.first ~= nil and opts.first < 0 then
24,028✔
408
        utils.reverse_inplace(tuples)
42✔
409
    end
410

411
    if opts.fetch_latest_metadata then
24,028✔
412
        -- This option is temporary and is related to [1], [2].
413
        -- [1] https://github.com/tarantool/crud/issues/236
414
        -- [2] https://github.com/tarantool/crud/issues/361
415
        local storages_info = fiber.self().storage.storages_info_on_select
2✔
416
        iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts,
4✔
417
                                                      storages_info, iter)
4✔
418
    end
419

420
    -- filter space format by plan.field_names (user defined fields + primary key + scan key)
421
    -- to pass it user as metadata
422
    local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names)
48,056✔
423
    if err ~= nil then
24,028✔
424
        return nil, err
2✔
425
    end
426

427
    return {
24,026✔
428
        metadata = table.copy(filtered_space_format),
48,052✔
429
        rows = tuples,
24,026✔
430
    }
24,026✔
431
end
432

433
function select_module.call(space_name, user_conditions, opts)
409✔
434
    opts = opts or {}
24,086✔
435

436
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
24,086✔
437
    if err ~= nil then
24,086✔
438
        return nil, SelectError:new(err)
16✔
439
    end
440

441
    return SelectError:pcall(sharding.wrap_select_method, vshard_router, select_module_call_xc,
24,078✔
442
                             space_name, user_conditions, opts)
24,078✔
443
end
444

445
return select_module
409✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc