• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / crud / 21364862659

26 Jan 2026 04:13PM UTC coverage: 73.492% (-15.0%) from 88.463%
21364862659

push

github

web-flow
Merge f981517ee into a84e19f3e

4253 of 5787 relevant lines covered (73.49%)

55.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.57
/crud/count.lua
1
local checks = require('checks')
19✔
2
local errors = require('errors')
19✔
3
local fiber = require('fiber')
19✔
4

5
local call = require('crud.common.call')
19✔
6
local const = require('crud.common.const')
19✔
7
local utils = require('crud.common.utils')
19✔
8
local sharding = require('crud.common.sharding')
19✔
9
local filters = require('crud.compare.filters')
19✔
10
local count_plan = require('crud.compare.plan')
19✔
11
local dev_checks = require('crud.common.dev_checks')
19✔
12
local ratelimit = require('crud.ratelimit')
19✔
13
local schema = require('crud.common.schema')
19✔
14
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
19✔
15

16
local compare_conditions = require('crud.compare.conditions')
19✔
17

18
local CountError = errors.new_class('CountError', {capture_stack = false})
19✔
19

20
local COUNT_FUNC_NAME = 'count_on_storage'
19✔
21
local CRUD_COUNT_FUNC_NAME = utils.get_storage_call(COUNT_FUNC_NAME)
19✔
22

23
local count = {}
19✔
24

25
local function count_on_storage(space_name, index_id, conditions, opts)
26
    dev_checks('string', 'number', '?table', {
6✔
27
        scan_value = 'table|cdata',
28
        tarantool_iter = 'number',
29
        yield_every = '?number',
30
        scan_condition_num = '?number',
31
        sharding_func_hash = '?number',
32
        sharding_key_hash = '?number',
33
        skip_sharding_hash_check = '?boolean',
34
    })
35

36
    opts = opts or {}
6✔
37

38
    local space = box.space[space_name]
6✔
39

40
    local index = space.index[index_id]
6✔
41
    if index == nil then
6✔
42
        return nil, CountError:new("Index with ID %s doesn't exist", index_id)
×
43
    end
44

45
    local _, err = sharding.check_sharding_hash(space_name,
12✔
46
                                                opts.sharding_func_hash,
6✔
47
                                                opts.sharding_key_hash,
6✔
48
                                                opts.skip_sharding_hash_check)
6✔
49
    if err ~= nil then
6✔
50
        return nil, err
×
51
    end
52

53
    local value = opts.scan_value
6✔
54
    local filter_func
55
    local tuples_count = 0
6✔
56
    local looked_up_tuples = 0
6✔
57

58
    for _, tuple in index:pairs(value, {iterator = opts.tarantool_iter}) do
26✔
59
        if tuple == nil then
4✔
60
            break
61
        elseif not filter_func then
4✔
62
            filter_func, err = filters.gen_func(space, index, conditions, {
8✔
63
                tarantool_iter = opts.tarantool_iter,
4✔
64
                scan_condition_num = opts.scan_condition_num,
4✔
65
            })
4✔
66
            if err ~= nil then
4✔
67
                return nil, CountError:new("Failed to generate tuples filter: %s", err)
×
68
            end
69
        end
70

71
        looked_up_tuples = looked_up_tuples + 1
4✔
72

73
        local matched, early_exit = filter_func(tuple)
4✔
74

75
        if matched then
4✔
76
            tuples_count = tuples_count + 1
4✔
77

78
            if opts.yield_every ~= nil and looked_up_tuples % opts.yield_every == 0 then
4✔
79
                fiber.yield()
×
80
            end
81
        elseif early_exit then
×
82
            break
83
        end
84
    end
85

86
    return tuples_count
6✔
87
end
88

89
count.storage_api = {[COUNT_FUNC_NAME] = count_on_storage}
19✔
90

91
local check_count_safety_rl = ratelimit.new()
19✔
92
local function check_count_safety(space_name, plan, opts)
93
    if opts.fullscan == true then
4✔
94
        return
×
95
    end
96

97
    local iter = plan.tarantool_iter
4✔
98
    if iter == box.index.EQ or iter == box.index.REQ then
4✔
99
        return
4✔
100
    end
101

102
    local rl = check_count_safety_rl
×
103
    local traceback = debug.traceback()
×
104
    rl:log_crit("Potentially long count from space '%s'\n %s", space_name, traceback)
×
105
end
106

107
-- returns result, err, need_reload
108
-- need_reload indicates if reloading schema could help
109
-- see crud.common.schema.wrap_func_reload()
110
local function call_count_on_router(vshard_router, space_name, user_conditions, opts)
111
    checks('table', 'string', '?table', {
8✔
112
        timeout = '?number',
113
        request_timeout = '?number',
114
        bucket_id = '?',
115
        force_map_call = '?boolean',
116
        fullscan = '?boolean',
117
        yield_every = '?number',
118
        prefer_replica = '?boolean',
119
        balance = '?boolean',
120
        mode = '?string',
121
        vshard_router = '?string|table',
122
    })
123

124
    if opts.yield_every ~= nil and opts.yield_every < 1 then
8✔
125
        return nil, CountError:new("yield_every should be > 0")
×
126
    end
127

128
    -- check conditions
129
    local conditions, err = compare_conditions.parse(user_conditions)
8✔
130
    if err ~= nil then
8✔
131
        return nil, CountError:new("Failed to parse conditions: %s", err)
×
132
    end
133

134
    local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
8✔
135
    if err ~= nil then
8✔
136
        return nil, CountError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
×
137
    end
138
    if space == nil then
8✔
139
        return nil, CountError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
8✔
140
    end
141

142
    local sharding_key_data = {}
4✔
143
    local sharding_func_hash = nil
4✔
144
    local skip_sharding_hash_check = nil
145

146
    -- We don't need sharding info if bucket_id specified.
147
    if opts.bucket_id == nil then
4✔
148
        sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
8✔
149
        if err ~= nil then
4✔
150
            return nil, err
×
151
        end
152
    else
153
        skip_sharding_hash_check = true
×
154
    end
155

156
    -- plan count
157
    local plan, err = count_plan.new(space, conditions, {
8✔
158
        sharding_key_as_index_obj = sharding_key_data.value,
4✔
159
    })
160
    if err ~= nil then
4✔
161
        return nil, CountError:new("Failed to plan count: %s", err), const.NEED_SCHEMA_RELOAD
×
162
    end
163
    check_count_safety(space_name, plan, opts)
4✔
164

165
    -- set replicasets to count from
166
    local replicasets_to_count, err = vshard_router:routeall()
4✔
167
    if err ~= nil then
4✔
168
        return nil, CountError:new("Failed to get router replicasets: %s", err)
×
169
    end
170

171
    -- Whether to call one storage replicaset or perform
172
    -- map-reduce?
173
    --
174
    -- If map-reduce is requested explicitly, ignore provided
175
    -- bucket_id and fetch data from all storage replicasets.
176
    --
177
    -- Otherwise:
178
    --
179
    -- 1. If particular replicaset is pointed by a caller (using
180
    --    the bucket_id option[^1]), crud MUST fetch data only
181
    --    from this storage replicaset: disregarding whether other
182
    --    storages have tuples that fit given condition.
183
    --
184
    -- 2. If a replicaset may be deduced from conditions
185
    --    (conditions -> sharding key -> bucket id -> replicaset),
186
    --    fetch data only from the replicaset. It does not change
187
    --    the result[^2], but significantly reduces network
188
    --    pressure.
189
    --
190
    -- 3. Fallback to map-reduce otherwise.
191
    --
192
    -- [^1]: We can change meaning of this option in a future,
193
    --       see gh-190. But now bucket_id points a storage
194
    --       replicaset, not a virtual bucket.
195
    --
196
    -- [^2]: It is correct statement only if we'll turn a blind
197
    --       eye to resharding. However, AFAIU, the optimization
198
    --       does not make the result less consistent (sounds
199
    --       weird, huh?).
200

201
    local perform_map_reduce = opts.force_map_call == true or
4✔
202
        (opts.bucket_id == nil and plan.sharding_key == nil)
4✔
203
    if not perform_map_reduce then
4✔
204
        local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name,
4✔
205
                                                               plan.sharding_key, opts.bucket_id)
2✔
206
        if err ~= nil then
2✔
207
            return nil, err
×
208
        end
209

210
        assert(bucket_id_data.bucket_id ~= nil)
2✔
211

212
        sharding_func_hash = bucket_id_data.sharding_func_hash
2✔
213

214
        local err
215
        replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id)
4✔
216
        if err ~= nil then
2✔
217
            return nil, err, const.NEED_SCHEMA_RELOAD
×
218
        end
219
    else
220
        skip_sharding_hash_check = true
2✔
221
    end
222

223
    local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY
4✔
224

225
    local mode = opts.mode or 'read'
4✔
226

227
    local call_opts = {
4✔
228
        mode = mode,
4✔
229
        prefer_replica = opts.prefer_replica,
4✔
230
        balance = opts.balance,
4✔
231
        timeout = opts.timeout,
4✔
232
        request_timeout = mode == 'read' and opts.request_timeout or nil,
4✔
233
        replicasets = replicasets_to_count,
4✔
234
    }
235

236
    local count_opts = {
4✔
237
        scan_value = plan.scan_value,
4✔
238
        tarantool_iter = plan.tarantool_iter,
4✔
239
        yield_every = yield_every,
4✔
240
        scan_condition_num = plan.scan_condition_num,
4✔
241
        sharding_func_hash = sharding_func_hash,
4✔
242
        sharding_key_hash = sharding_key_data.hash,
4✔
243
        skip_sharding_hash_check = skip_sharding_hash_check,
4✔
244
    }
245

246
    local results, err = call.map(vshard_router, CRUD_COUNT_FUNC_NAME, {
8✔
247
        space_name, plan.index_id, plan.conditions, count_opts
4✔
248
    }, call_opts)
4✔
249

250
    if err ~= nil then
4✔
251
        local err_wrapped = CountError:new("Failed to call count on storage-side: %s", err)
×
252

253
        if sharding.result_needs_sharding_reload(err) then
×
254
            return nil, err_wrapped, const.NEED_SHARDING_RELOAD
×
255
        end
256

257
        return nil, err_wrapped
×
258
    end
259

260
    if results.err ~= nil then
4✔
261
        return nil, CountError:new("Failed to call count: %s", err)
×
262
    end
263

264
    local total_count = 0
4✔
265
    for _, replicaset_results in pairs(results) do
14✔
266
        if replicaset_results[1] ~= nil then
6✔
267
            total_count = total_count + replicaset_results[1]
6✔
268
        end
269
    end
270

271
    return total_count
4✔
272
end
273

274
--- Calculates the number of tuples by conditions
275
--
276
-- @function call
277
--
278
-- @param string space_name
279
--  A space name
280
--
281
-- @param ?table user_conditions
282
--  Conditions by which tuples are counted,
283
--  default value is nil
284
--
285
-- @tparam ?number opts.timeout
286
--  Function call timeout in seconds,
287
--  default value is 2 seconds
288
--
289
-- @tparam ?number opts.request_timeout
290
--  vshard call request_timeout,
291
--  default is the same as opts.timeout
292
--
293
-- @tparam ?number opts.bucket_id
294
--  Bucket ID
295
--  default is vshard.router.bucket_id_strcrc32 of primary key
296
--
297
-- @tparam ?boolean opts.force_map_call
298
--  Call is performed without any optimizations
299
--  default is `false`
300
--
301
-- @tparam ?number opts.yield_every
302
--  Number of tuples processed to yield after,
303
--  default value is 1000
304
--
305
-- @tparam ?boolean opts.prefer_replica
306
--  Call on replica if it's possible,
307
--  default value is `nil`, which works as with `false`
308
--
309
-- @tparam ?boolean opts.balance
310
--  Use replica according to round-robin load balancing
311
--  default value is `nil`, which works as with `false`
312
--
313
-- @tparam ?string opts.mode
314
--  vshard call mode, default value is `read`
315
--
316
-- @tparam ?string|table opts.vshard_router
317
--  Cartridge vshard group name or vshard router instance.
318
--  Set this parameter if your space is not a part of the
319
--  default vshard cluster.
320
--
321
-- @return[1] number
322
-- @treturn[2] nil
323
-- @treturn[2] table Error description
324
--
325
function count.call(space_name, user_conditions, opts)
19✔
326
    checks('string', '?table', {
10✔
327
        timeout = '?number',
328
        request_timeout = '?number',
329
        bucket_id = '?',
330
        force_map_call = '?boolean',
331
        fullscan = '?boolean',
332
        yield_every = '?number',
333
        prefer_replica = '?boolean',
334
        balance = '?boolean',
335
        mode = '?string',
336
        vshard_router = '?string|table',
337
    })
338

339
    opts = opts or {}
10✔
340

341
    local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
10✔
342
    if err ~= nil then
10✔
343
        return nil, CountError:new(err)
8✔
344
    end
345

346
    return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_count_on_router,
6✔
347
                                   space_name, user_conditions, opts)
6✔
348
end
349

350
return count
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc