• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / xqueue / 10058436550

23 Jul 2024 11:57AM UTC coverage: 73.154%. First build
10058436550

Pull #19

github

ochaton
test: adds tests and ci
Pull Request #19: Implements per tube statistics

63 of 113 new or added lines in 1 file covered. (55.75%)

654 of 894 relevant lines covered (73.15%)

5362.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.15
/xqueue.lua
1
local M = {}
25✔
2

3
local ffi = require 'ffi'
25✔
4
local log = require 'log'
25✔
5
local fiber = require 'fiber'
25✔
6
local clock = require 'clock'
25✔
7

8
local tuple_ctype = ffi.typeof(box.tuple.new())
25✔
9

10
local monotonic_max_age = 10*365*86400;
25✔
11

12
local function table_clear(t)
13
        if type(t) ~= 'table' then
10,383✔
14
                error("bad argument #1 to 'clear' (table expected, got "..(t ~= nil and type(t) or 'no value')..")",2)
×
15
        end
16
        local count = #t
10,383✔
17
        for i=0, count do t[i]=nil end
10,442✔
18
        return
10,383✔
19
end
20

21
local function is_array(t)
22
        local gen,param,state = ipairs(t)
133,490✔
23
        -- print(gen,param,state)
24
        local v = gen(param,state)
133,490✔
25
        -- print(v)
26
        return v ~= nil
133,490✔
27
end
28

29
local peers = {}
25✔
30

31
rawset(_G,"\0xq.on_connect",box.session.on_connect(function()
50✔
32
        local sid = box.session.id()
8✔
33
        local peer = box.session.peer()
8✔
34
        box.session.storage.peer = box.session.peer()
16✔
35
        peers[ sid ] = peer
8✔
36
        log.info("connected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
8✔
37
end,rawget(_G,"\0xq.on_connect")))
33✔
38

39
rawset(_G,"\0xq.on_disconnect",box.session.on_disconnect(function()
50✔
40
        local sid = box.session.id()
29✔
41
        local peer = peers[ sid ]
29✔
42
        peers[ sid ] = nil
29✔
43
        log.info("disconnected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
29✔
44
end,rawget(_G,"\0xq.on_disconnect")))
54✔
45

46

47
--[[
48

49
Field sets:
50
1. id, status (minimal required configuration)
51
2. id, status, priority
52
3. id, status, runat
53
4. id, status, priority, runat
54

55
Primary index variants:
56
1. index(status, [ id ])
57
2. index(status, priority, [ id ])
58

59

60
Format:
61
local format = box.space._space.index.name:get(space_name)[ 7 ]
62

63
Status:
64
        R - ready - task is ready to be taken
65
                created by put without delay
66
                turned on by release/kick without delay
67
                turned on from W when delay passed
68
                deleted after ttl if ttl is enabled
69

70
        T - taken - task is taken by consumer. may not be taken by other.
71
                turned into R after ttr if ttr is enabled
72

73
        W - waiting - task is not ready to be taken. waiting for its `delay`
74
                requires `runat`
75
                `delay` may be set during put, release, kick
76
                turned into R after delay
77

78
        B - buried - task was temporary discarded from queue by consumer
79
                may be revived using kick by administrator
80
                use it in unpredicted conditions, when man intervention is required
81
                Without mandatory monitoring (stats.buried) usage of buried is useless and awry
82

83
        Z - zombie - task was processed and ack'ed and *temporary* kept for delay
84

85
        D - done - task was processed and ack'ed and permanently left in database
86
                enabled when keep feature is set
87

88
        X - reserved for statistics
89

90
(TODO: reload/upgrade and feature switch)
91

92
Interface:
93

94
# Creator methods:
95

96
        M.upgrade(space, {
97
                format = {
98
                        -- space format. applied to space.format() if passed
99
                },
100
                fields = {
101
                        -- id is always taken from pk
102
                        status   = 'status_field_name'    | status_field_no,
103
                        runat    = 'runat_field_name'     | runat_field_no,
104
                        priority = 'priority_field_name'  | priority_field_no,
105
                },
106
                features = {
107
                        id = 'auto_increment' | 'uuid' | 'required' | function
108
                                -- auto_increment - if pk is number, then use it for auto_increment
109
                                -- uuid - if pk is string, then use uuid for id
110
                                -- required - primary key MUST be present in tuple during put
111
                                -- function - funciton will be called to aquire id for task
112

113
                        buried = true,           -- if true, support bury/kick
114
                        delayed = true,          -- if true, support delayed tasks, requires `runat`
115

116
                        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
117
                                -- mutually exclusive with zombie
118

119
                        zombie = true|number,    -- requires `runat` field
120
                                -- if number, then with default zombie delay, otherwise only if set delay during ack
121
                                -- mutually exclusive with keep
122

123
                        ttl    = true|number,    -- requires `runat` field
124
                                -- if number, then with default ttl, otherwise only if set during put/release
125
                        ttr    = true|number,    -- requires `runat` field
126
                                -- if number, then with default ttl, otherwise only if set
127
                },
128
        })
129

130
Producer methods:
131
        sp:put({...}, [ attr ]) (array or table (if have format) or tuple)
132

133
Consumer methods:
134

135
* id:
136
        - array of keyfields of pk
137
        - table of named keyields
138
        - tuple
139

140
sp:take(timeout) -> tuple
141
sp:ack(id, [ attr ])
142
        attr.update (only if support zombie)
143
sp:release(id, [ attr ])
144
        attr.ttr (only if support ttr)
145
        attr.ttl (only if support ttl)
146
        attr.update
147
sp:bury(id, [ attr ])
148
        attr.ttl (only if support ttl)
149
        attr.update
150

151
Admin methods:
152

153
sp:queue_stats()
154
sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]eady
155

156

157
]]
158

159
local json = require 'json'
25✔
160
json.cfg{ encode_invalid_as_nil = true }
25✔
161

162

163
local function typeeq(src, ref)
164
        if ref == 'str' then
196✔
165
                return src == 'STR' or src == 'str' or src == 'string'
123✔
166
        elseif ref == 'num' then
73✔
167
                return src == 'NUM' or src == 'num' or src == 'number' or src == 'unsigned'
73✔
168
        else
169
                return src == ref
×
170
        end
171
end
172

173
local function is_eq(a, b, depth)
174
        local t = type(a)
1,309✔
175
        if t ~= type(b) then return false end
1,309✔
176
        if t == 'table' then
1,309✔
177
                for k,v in pairs(a) do
1,081✔
178
                        if b[k] == nil then return end
640✔
179
                        if not is_eq(v,b[k]) then return false end
1,280✔
180
                end
181
                for k,v in pairs(b) do
1,081✔
182
                        if a[k] == nil then return end
640✔
183
                        if not is_eq(v,a[k]) then return false end
1,280✔
184
                end
185
                return true
285✔
186
        elseif t == 'string' or t == 'number' then
1,024✔
187
                return a == b
1,024✔
188
        elseif t == 'cdata' then
×
189
                return ffi.typeof(a) == ffi.typeof(b) and a == b
×
190
        else
191
                error("Wrong types for equality", depth or 2)
×
192
        end
193
end
194

195

196
local function _tuple2table ( qformat )
197
        local rows = {}
45✔
198
        for k,v in ipairs(qformat) do
237✔
199
                table.insert(rows,"\t['"..v.name.."'] = t["..tostring(k).."];\n")
192✔
200
        end
201
        local fun = "return function(t,...) "..
45✔
202
                "if select('#',...) > 0 then error('excess args',2) end "..
45✔
203
                "return t and {\n"..table.concat(rows, "").."} or nil end\n"
45✔
204
        return dostring(fun)
45✔
205
end
206

207
local function _table2tuple ( qformat )
208
        local rows = {}
45✔
209
        for _,v in ipairs(qformat) do
237✔
210
                table.insert(rows,"\tt['"..v.name.."'] == nil and NULL or t['"..v.name.."'];\n")
192✔
211
        end
212
        local fun = "local NULL = require'msgpack'.NULL return function(t) return "..
45✔
213
                "t and box.tuple.new({\n"..table.concat(rows, "").."}) or nil end\n"
45✔
214
        -- print(fun)
215
        return dostring(fun)
45✔
216
end
217

218
---@class xqueue.space
219
local methods = {}
25✔
220

221
---@class PrimaryKeyField:table
222
---@field no number position in tuple
223
---@field name string name of the field
224
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
225

226
---@class xqFeatures: table
227
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
228
---@field retval "tuple" | "table"
229
---@field buried boolean
230
---@field delayed boolean
231
---@field keep boolean
232
---@field tube boolean
233
---@field zombie boolean|number
234
---@field zombie_delay number
235
---@field ttl boolean|number
236
---@field ttr boolean|number
237
---@field ttl_default number?
238
---@field ttr_default number?
239

240
---@class xq:table
241
---@field NEVER integer (Default: 0)
242
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
243
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
244
---@field taken table<string,number> mapping key => sid
245
---@field _lock table<string,boolean> locks key => boolean (in atomic)
246
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
247
---@field take_wait fiber.channel
248
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
249
---@field debug boolean
250
---@field have_runat boolean
251
---@field gen_id? fun(): scalar
252
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
253
---@field packkey fun(self: xq, key: any): string
254
---@field tube_index? boxIndex
255
---@field index boxIndex
256
---@field key PrimaryKeyField
257
---@field fieldmap table<string,number>
258
---@field timeoffset fun(delta: number): number
259
---@field features xqFeatures
260
---@field fields table<string, string|number>
261
---@field tuple fun(tbl: table): box.tuple
262
---@field table fun(tuple: box.tuple): table
263
---@field retwrap fun(t: box.tuple|table): table|box.tuple
264
---@field wakeup fun(self: xq, t: box.tuple|table)
265
---@field runat_chan fiber.channel
266
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
267
---@field put_back fun(key: table|box.tuple)
268
---@field _stat { counts: table, transition: table }
269
---@field putback fun(self: xq, task: table|box.tuple)
270
---@field _default_truncate fun(space: boxSpaceObject)
271
---@field _on_repl replaceTrigger
272
---@field _on_dis fun()
273
---@field ready? fiber.channel channel appears when xq is not ready
274

275

276
---@class xqueue.space: boxSpaceObject
277
---@field xq xq xqueue specific storage
278

279
---@class xqueue.fields
280
---@field status? string|number Xqueue name for the Status field
281
---@field runat? string|number Xqueue name for the RunAt field
282
---@field priority? string|number Xqueue name for Task priority field
283
---@field tube? string|number Xqueue name for Tube field
284

285
---@class xqueue.features
286
---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation
287
---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods.
288
---(Default: table when space with format, tuple when space is without format)
289
---@field buried? boolean should xqueue allow buring of the tasks (by default: true)
290
---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not
291
---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false)
292
---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep
293
---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present)
294
---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default.
295
---Requires runat field and index.
296
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
297
---Requires runat field and index.
298

299
---@class xqueue.upgrade.options
300
---@field format? boxSpaceFormat
301
---@field fields xqueue.fields
302
---@field debug? boolean
303
---@field tube_stats? string[] List of tube names for per-tube statistics
304
---@field features xqueue.features
305
---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback
306
---@field workers? number (number of workers to spawn)
307

308
---Upgrades given space to xqueue instance
309
---@param space xqueue.space
310
---@param opts xqueue.upgrade.options
311
---@param depth? number
312
function M.upgrade(space,opts,depth)
25✔
313
        depth = depth or 0
45✔
314
        log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
45✔
315
        if not opts.fields then error("opts.fields required",2) end
45✔
316
        if opts.format then
45✔
317
                -- todo: check if already have such format
318
                local format_av = box.space._space.index.name:get(space.name)[ 7 ]
58✔
319
                if not is_eq(format_av, opts.format) then
58✔
320
                        space:format(opts.format)
×
321
                else
322
                        print("formats are equal")
29✔
323
                end
324
        end
325

326
        -- This variable will be defined later
327
        local taken_mt
328

329
        local self = {}
45✔
330
        if space.xq then
45✔
331
                self.taken = space.xq.taken
×
332
                self._stat = space.xq._stat
×
333
                self.bysid = space.xq.bysid
×
334
                self._lock = space.xq._lock
×
335
                self.take_wait = space.xq.take_wait
×
336
                self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
×
337
                self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
×
338
                self._on_repl  = space.xq._on_repl
×
339
                self._on_dis = space.xq._on_dis
×
340
        else
341
                self.taken = {}
45✔
342
                self.bysid = {}
45✔
343
                -- byfid = {};
344
                self._lock = {}
45✔
345
                self.put_wait = setmetatable({}, { __mode = 'v' })
45✔
346
                self.take_wait = fiber.channel(0)
45✔
347
                self.take_chans = setmetatable({}, { __mode = 'v' })
45✔
348
        end
349
        setmetatable(self.bysid, {
90✔
350
                __serialize='map',
351
                __newindex = function(t, key, val)
352
                        if type(val) == 'table' then
43✔
353
                                rawset(t, key, setmetatable(val, taken_mt))
43✔
354
                        else
NEW
355
                                rawset(t, key, val)
×
356
                        end
357
                end
358
        })
359
        self.debug = not not opts.debug
45✔
360

361
        if not self._default_truncate then
45✔
362
                self._default_truncate = space.truncate
45✔
363
        end
364

365
        local format_av = box.space._space.index.name:get(space.name)[ 7 ]
90✔
366
        local format = {}
45✔
367
        local have_format = false
45✔
368
        local have_runat = false
45✔
369
        for no,f in pairs(format_av) do
261✔
370
                format[ f.name ] = {
192✔
371
                        name = f.name;
192✔
372
                        type = f.type;
192✔
373
                        no   = no;
192✔
374
                }
192✔
375
                format[ no ] = format[ f.name ];
192✔
376
                have_format = true
192✔
377
                self.have_format = true
192✔
378
        end
379
        for _,idx in pairs(space.index) do
331✔
380
                for _,part in pairs(idx.parts) do
852✔
381
                        format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
450✔
382
                        format[ part.fieldno ].type = part.type
450✔
383
                end
384
        end
385

386
        -- dd(format)
387

388
        -- 1. fields check
389
        local fields = {}
45✔
390
        local fieldmap = {}
45✔
391
        for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
249✔
392
                local fname,ftype = f[1], f[2]
180✔
393
                local num = opts.fields[fname]
180✔
394
                if num then
180✔
395
                        if type(num) == 'string' then
90✔
396
                                if format[num] then
90✔
397
                                        fields[fname] = format[num].no;
90✔
398
                                else
399
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
400
                                end
401
                        elseif type(num) == 'number' then
×
402
                                if format[num] then
×
403
                                        fields[fname] = num
×
404
                                else
405
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
406
                                end
407
                        else
408
                                error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
×
409
                        end
410
                        -- check type
411
                        if format[num] then
90✔
412
                                if not typeeq(format[num].type, ftype) then
180✔
413
                                        error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
×
414
                                end
415
                        end
416
                        fieldmap[fname] = format[num].name
90✔
417
                end
418
        end
419

420
        -- dd(fields)
421

422
        -- 2. index check
423

424
        local pk = space.index[0]
45✔
425
        if #pk.parts ~= 1 then
45✔
426
                error("Composite primary keys are not supported yet")
×
427
        end
428
        local pktype
429
        if typeeq( format[pk.parts[1].fieldno].type, 'str' ) then
90✔
430
                pktype = 'string'
29✔
431
                taken_mt = { __serialize = 'map' }
29✔
432
        elseif typeeq( format[pk.parts[1].fieldno].type, 'num' ) then
32✔
433
                pktype = 'number'
16✔
434
                taken_mt = {
16✔
435
                        __serialize = 'map',
436
                        __newindex = function(t, key, val)
437
                                return rawset(t, tostring(ffi.cast("uint64_t", key)), val)
32,160✔
438
                        end,
439
                        __index = function(t, key)
440
                                return rawget(t, tostring(ffi.cast("uint64_t", key)))
40,022✔
441
                        end
442
                }
16✔
443
        else
444
                error("Unknown key type "..format[pk.parts[1].fieldno].type)
×
445
        end
446
        setmetatable(self.taken, taken_mt)
45✔
447

448
        local pkf = {
45✔
449
                no   = pk.parts[1].fieldno;
45✔
450
                name = format[pk.parts[1].fieldno].name;
45✔
451
                ['type'] = pktype;
45✔
452
        }
453
        self.key = pkf
45✔
454
        self.fields = fields
45✔
455
        self.fieldmap = fieldmap
45✔
456

457

458
        function self:getkey(arg)
45✔
459
                local _type = type(arg)
133,335✔
460
                if _type == 'table' then
133,335✔
461
                        if is_array(arg) then
266,670✔
462
                                return arg[ self.key.no ]
×
463
                        else
464
                                return arg[ self.key.name ]
133,335✔
465
                                -- pass
466
                        end
467
                elseif _type == 'cdata' and ffi.typeof(arg) == tuple_ctype then
×
468
                        return arg[ self.key.no ]
×
469
                elseif _type == self.key.type then
×
470
                        return arg
×
471
                else
472
                        error("Wrong key/task argument. Expected table or tuple or key", 3)
×
473
                end
474
        end
475

476
        function self.packkey(_, key)
45✔
477
                if type(key) == 'cdata' then
125,412✔
478
                        return tostring(ffi.cast("uint64_t", key))
125,348✔
479
                else
480
                        return key
64✔
481
                end
482
        end
483

484
        do
485
                local filter
486
                if fields.priority then
45✔
487
                        filter = function(index)
488
                                return #index.parts >= 3
12✔
489
                                        and index.parts[1].fieldno == fields.status
4✔
490
                                        and index.parts[2].fieldno == fields.priority
4✔
491
                                        and index.parts[3].fieldno == self.key.no
12✔
492
                        end
493
                else
494
                        filter = function(index)
495
                                return #index.parts >= 2
107✔
496
                                        and index.parts[1].fieldno == fields.status
66✔
497
                                        and index.parts[2].fieldno == self.key.no
107✔
498
                        end
499
                end
500
                for i,index in pairs(space.index) do
143✔
501
                        if type(i) == 'number' and filter(index) then
238✔
502
                                self.index = index
45✔
503
                                break
45✔
504
                        end
505
                end
506
                if not self.index then
45✔
507
                        if fields.priority then
×
508
                                error("not found index by status + priority + id",2+depth)
×
509
                        else
510
                                error("not found index by status + id",2+depth)
×
511
                        end
512
                end
513

514
                if fields.tube then
45✔
515
                        for n,index in pairs(space.index) do
10✔
516
                                if type(n) == 'number' and index.parts[1].fieldno == fields.tube then
8✔
517
                                        local not_match = false
4✔
518
                                        for i = 2, #index.parts do
12✔
519
                                                if index.parts[i].fieldno ~= self.index.parts[i-1].fieldno then
8✔
520
                                                        not_match = true
×
521
                                                        break
522
                                                end
523
                                        end
524
                                        if not not_match then
4✔
525
                                                self.tube_index = index
4✔
526
                                                break
4✔
527
                                        end
528
                                end
529
                        end
530
                        if not self.tube_index then
4✔
531
                                if fields.priority then
×
532
                                        error("not found index by tube + status + priority + id",2+depth)
×
533
                                else
534
                                        error("not found index by tube + status + id",2+depth)
×
535
                                end
536
                        end
537
                end
538
        end
539

540
        ---@type boxIndex
541
        local runat_index
542
        if fields.runat then
45✔
543
                for _,index in pairs(space.index) do
110✔
544
                        if type(_) == 'number' then
90✔
545
                                if index.parts[1].fieldno == fields.runat then
90✔
546
                                        -- print("found",index.name)
547
                                        runat_index = index
37✔
548
                                        break
37✔
549
                                end
550
                        end
551
                end
552
                if not runat_index then
37✔
553
                        error(string.format("fields.runat requires tree index with this first field in it"),2+depth)
×
554
                else
555
                        local t = runat_index:pairs({0},{iterator = box.index.GT}):nth(1)
74✔
556
                        if t then
37✔
557
                                if t[ self.fields.runat ] < monotonic_max_age then
×
558
                                        error("!!! Queue contains monotonic runat. Consider updating tasks (https://github.com/moonlibs/xqueue/issues/2)")
×
559
                                end
560
                        end
561
                        have_runat = true
37✔
562
                end
563
        end
564
        self.have_runat = have_runat
45✔
565

566
        ---@type table<string, { counts: {}, transition: {} }>
567
        local stat_tube = {}
45✔
568
        if self.fields.tube and type(opts.tube_stats) == 'table' then
45✔
NEW
569
                for _, tube_name in ipairs(opts.tube_stats) do
×
NEW
570
                        if type(tube_name) == 'string' then
×
NEW
571
                                stat_tube[tube_name] = {
×
572
                                        counts = {},
573
                                        transition = {},
574
                                }
575
                        end
576
                end
577
        end
578

579
        if not self._stat then
45✔
580
                self._stat = {
45✔
581
                        counts = {};
45✔
582
                        transition = {};
45✔
583
                        tube = stat_tube;
45✔
584
                }
45✔
585
                if self.fields.tube then
45✔
586
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
12✔
NEW
587
                                local s = t[self.fields.status]
×
NEW
588
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
589

NEW
590
                                local tube = t[self.fields.tube]
×
NEW
591
                                if stat_tube[tube] then
×
NEW
592
                                        stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1
×
593
                                end
594
                        end
595
                else
596
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
123✔
NEW
597
                                local s = t[self.fields.status]
×
NEW
598
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
599
                        end
600
                end
601
        else
NEW
602
                self._stat = { counts = {}, transition = {}, tube = stat_tube }
×
603
        end
604

605
        -- 3. features check
606

607
        local features = {}
45✔
608

609
        local gen_id
610
        opts.features = opts.features or {}
45✔
611
        if not opts.features.id then
45✔
612
                opts.features.id = 'uuid'
×
613
        end
614
        -- 'auto_increment' | 'time64' | 'uuid' | 'required' | function
615
        if opts.features.id == 'auto_increment' then
45✔
616
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
×
617
                        error("For auto_increment numeric pk is mandatory",2+depth)
×
618
                end
619
                local pk_fno = pk.parts[1].fieldno
×
620
                gen_id = function()
621
                        local max = pk:max()
×
622
                        if max then
×
623
                                return max[pk_fno] + 1
×
624
                        else
625
                                return 1
×
626
                        end
627
                end
628
        elseif opts.features.id == 'time64' then
45✔
629
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
32✔
630
                        error("For time64 numeric pk is mandatory",2+depth)
×
631
                end
632
                gen_id = function()
633
                        local key = clock.realtime64()
96✔
634
                        while true do
635
                                local exists = pk:get(key)
96✔
636
                                if not exists then
96✔
637
                                        return key
96✔
638
                                end
639
                                key = key + 1
×
640
                        end
641
                end
642
        elseif opts.features.id == 'uuid' then
29✔
643
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'str')) then
58✔
644
                        error("For uuid string pk is mandatory",2+depth)
×
645
                end
646
                local uuid = require'uuid'
29✔
647
                gen_id = function()
648
                        while true do
649
                                local key = uuid.str()
59✔
650
                                local exists = pk:get(key)
59✔
651
                                if not exists then
59✔
652
                                        return key
59✔
653
                                end
654
                        end
655
                end
656
        elseif opts.features.id == 'required' then
×
657
                -- gen_id = function()
658
                --         -- ???
659
                --         error("TODO")
660
                -- end
661
        elseif type(opts.features.id) == 'function'
×
662
                or (debug.getmetatable(opts.features.id)
×
663
                and debug.getmetatable(opts.features.id).__call)
×
664
        then
665
                gen_id = opts.features.id
×
666
        else
667
                error(string.format(
×
668
                        "Wrong type for features.id %s, may be 'auto_increment' | 'time64' | 'uuid' | 'required' | function",
669
                        opts.features.id
×
670
                ), 2+depth)
×
671
        end
672

673
        if not opts.features.retval then
45✔
674
                opts.features.retval = have_format and 'table' or 'tuple'
37✔
675
        end
676

677
        if format then
45✔
678
                self.tuple = _table2tuple(format)
90✔
679
                self.table = _tuple2table(format)
90✔
680
        end
681

682
        if opts.features.retval == 'table' then
45✔
683
                self.retwrap = self.table
45✔
684
        else
685
                self.retwrap = function(t) return t end
×
686
        end
687

688
        features.buried = not not opts.features.buried
45✔
689
        features.keep   = not not opts.features.keep
45✔
690
        if opts.features.delayed then
45✔
691
                if not have_runat then
37✔
692
                        error(string.format("Delayed feature requires runat field and index" ),2+depth)
×
693
                end
694
                features.delayed = true
37✔
695
        else
696
                features.delayed = false
8✔
697
        end
698

699
        if opts.features.zombie then
45✔
700
                if features.keep then
8✔
701
                        error(string.format("features keep and zombie are mutually exclusive" ),2+depth)
×
702
                end
703
                if not have_runat then
8✔
704
                        error(string.format("feature zombie requires runat field and index" ),2+depth)
×
705
                end
706

707
                features.zombie = true
8✔
708
                if type(opts.features.zombie) == 'number' then
8✔
709
                        features.zombie_delay = opts.features.zombie
4✔
710
                end
711
        else
712
                features.zombie = false
37✔
713
        end
714

715
        if opts.features.ttl then
45✔
716
                if not have_runat then
×
717
                        error(string.format("feature ttl requires runat field and index" ),2+depth)
×
718
                end
719

720
                features.ttl = true
×
721
                if type(opts.features.ttl) == 'number' then
×
722
                        features.ttl_default = opts.features.ttl
×
723
                end
724
        else
725
                features.ttl = false
45✔
726
        end
727

728
        if opts.features.ttr then
45✔
729
                if not have_runat then
×
730
                        error(string.format("feature ttr requires runat field and index" ),2+depth)
×
731
                end
732

733
                features.ttr = true
×
734
                if type(opts.features.ttr) == 'number' then
×
735
                        features.ttr_default = opts.features.ttr
×
736
                end
737
        else
738
                features.ttr = false
45✔
739
        end
740

741
        if fields.tube then
45✔
742
                features.tube = true
4✔
743
        end
744

745
        self.gen_id = gen_id
45✔
746
        self.features = features
45✔
747
        self.space = space.id
45✔
748

749
        function self.timeoffset(delta)
45✔
750
                delta = tonumber(delta) or 0
79✔
751
                return clock.realtime() + delta
79✔
752
        end
753
        function self.timeready(time)
45✔
754
                return time < clock.realtime()
361✔
755
        end
756
        function self.timeremaining(time)
45✔
757
                return time - clock.realtime()
302✔
758
        end
759
        -- self.NEVER = -1ULL
760
        self.NEVER = 0
45✔
761

762
        function self.keyfield(_,t)
45✔
763
                return t[pkf.no]
236✔
764
        end
765
        function self.keypack(_,t)
45✔
766
                return t[pkf.no]
×
767
        end
768

769
        -- Notify producer if it is still waiting us.
770
        -- Producer waits only for successfully processed task
771
        -- or for task which would never be processed.
772
        -- Discarding to process task depends on consumer's logic.
773
        -- Also task can be deleted from space by exceeding TTL.
774
        -- Producer should not be notified if queue is able to process task in future.
775
        local function notify_producer(key, task)
776
                local pkey = self:packkey(key)
8,102✔
777
                local wait = self.put_wait[pkey]
8,102✔
778
                if not wait then
8,102✔
779
                        -- no producer
780
                        return
144✔
781
                end
782

783
                if task.status == 'Z' or task.status == 'D' then
23,834✔
784
                        -- task was processed
785
                        wait.task = task
40✔
786
                        wait.processed = true
40✔
787
                        wait.cond:broadcast()
40✔
788
                        return
40✔
789
                end
790
                if not space:get{key} then
23,754✔
791
                        -- task is not present in space
792
                        -- it could be TTL or :ack
793
                        if task.status == 'T' then
8✔
794
                                -- task was acked:
795
                                wait.task = task
4✔
796
                                wait.processed = true
4✔
797
                                wait.cond:broadcast()
4✔
798
                        elseif task.status == 'R' then
×
799
                                -- task was killed by TTL
800
                                wait.task = task
×
801
                                wait.processed = false
×
802
                                wait.cond:broadcast()
×
803
                        end
804
                else
805
                        -- task is still in space
806
                        if task.status == 'B' then
15,828✔
807
                                -- task was buried
808
                                wait.task = task
×
809
                                wait.processed = false
×
810
                                wait.cond:broadcast()
×
811
                        end
812
                end
813
        end
814

815
        self.ready = fiber.channel(0)
45✔
816

817
        local function rw_fiber_f(func, ...)
818
                local xq = self
45✔
819
                repeat
820
                        if box.info.ro then
55✔
821
                                log.verbose("awaiting rw")
24✔
822
                                repeat
823
                                        if box.ctl.wait_rw then
113✔
824
                                                pcall(box.ctl.wait_rw, 1)
113✔
825
                                        else
NEW
826
                                                fiber.sleep(0.001)
×
827
                                        end
828
                                until not box.info.ro
96✔
829
                        end
830

831
                        local ok, err = pcall(func, ...)
38✔
832
                        if not ok then
10✔
NEW
833
                                log.error("%s%s",
×
NEW
834
                                        err, xq.ready and ': (xq is not ready yet)' or '')
×
835
                        end
836
                        fiber.testcancel()
10✔
837
                until (not box.space[space.name]) or space.xq ~= xq
10✔
838
        end
839

840
        if opts.worker then
45✔
841
                local workers = opts.workers or 1
4✔
842
                local worker = opts.worker
4✔
843
                for i = 1,workers do
12✔
844
                        fiber.create(rw_fiber_f, function(space,xq)
16✔
845
                                local fname = space.name .. '.xq.wrk' .. tostring(i)
8✔
846
                                ---@diagnostic disable-next-line: undefined-field
847
                                if package.reload then fname = fname .. '.' .. package.reload.count end
8✔
848
                                fiber.name(string.sub(fname,1,32))
16✔
849
                                repeat fiber.sleep(0.001) until space.xq
8✔
850
                                if xq.ready then xq.ready:get() end
8✔
851
                                log.info("I am worker %s",i)
8✔
852
                                while box.space[space.name] and space.xq == xq and not box.info.ro do
8,125✔
853
                                        if xq.ready then xq.ready:get() end
8,125✔
854
                                        local task = space:take(1)
16,250✔
855
                                        if task then
8,117✔
856
                                                local key = xq:getkey(task)
7,954✔
857
                                                local r,e = pcall(worker,task)
7,954✔
858
                                                if not r then
7,954✔
859
                                                        log.error("Worker for {%s} has error: %s", key, e)
×
860
                                                else
861
                                                        if xq.taken[ key ] then
15,908✔
862
                                                                space:ack(task)
×
863
                                                        end
864
                                                end
865
                                                if xq.taken[ key ] then
15,908✔
866
                                                        log.error("Worker for {%s} not released task", key)
×
867
                                                        space:release(task)
×
868
                                                end
869
                                        end
870
                                        fiber.yield()
8,117✔
871
                                end
NEW
872
                                if box.info.ro then
×
NEW
873
                                        log.info("Shutting down on ro instance")
×
NEW
874
                                        return
×
875
                                end
876
                                log.info("worker %s ended", i)
×
877
                        end,space,self)
16✔
878
                end
879
        end
880

881
        if have_runat then
45✔
882
                self.runat_chan = fiber.channel(0)
37✔
883
                self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
74✔
884
                        local fname = space.name .. '.xq'
30✔
885
                        if package.reload then fname = fname .. '.' .. package.reload.count end
30✔
886
                        fiber.name(string.sub(fname,1,32))
60✔
887
                        repeat fiber.sleep(0.001) until space.xq
30✔
888
                        if xq.ready then xq.ready:get() end
30✔
889
                        local chan = xq.runat_chan
30✔
890
                        log.info("Runat started")
30✔
891
                        local maxrun = 1000
30✔
892
                        local curwait
893
                        local collect = {}
30✔
894
                        while box.space[space.name] and space.xq == xq and not box.info.ro do
10,393✔
895
                                local r,e = pcall(function()
20,766✔
896
                                        -- print("runat loop 2 ",box.time64())
897
                                        local remaining
898
                                        for _,t in runat_index:pairs({0},{iterator = box.index.GT}) do
31,101✔
899

900
                                                -- print("checking ",t)
901
                                                if xq.timeready( t[ xq.fields.runat ] ) then
1,083✔
902
                                                        table.insert(collect,t)
59✔
903
                                                else
904
                                                        remaining = xq.timeremaining(t[ xq.fields.runat ])
906✔
905
                                                        break
302✔
906
                                                end
907
                                                if #collect >= maxrun then remaining = 0 break end
59✔
908
                                        end
909

910
                                        for _,t in ipairs(collect) do
10,359✔
911
                                                -- log.info("Runat: %s, %s", _, t)
912
                                                if t[xq.fields.status] == 'W' then
118✔
913
                                                        log.info("Runat: W->R %s",xq:keyfield(t))
30✔
914
                                                        -- TODO: default ttl?
915
                                                        local u = space:update({ xq:keyfield(t) },{
60✔
916
                                                                { '=',xq.fields.status,'R' },
15✔
917
                                                                { '=',xq.fields.runat, xq.NEVER }
15✔
918
                                                        })
919
                                                        xq:wakeup(u)
30✔
920
                                                elseif t[xq.fields.status] == 'R' and xq.features.ttl then
88✔
921
                                                        local key = xq:keyfield(t)
×
922
                                                        log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
×
923
                                                        t = space:delete{key}
×
924
                                                        notify_producer(key, t)
×
925
                                                elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
88✔
926
                                                        log.info("Runat: Kill Zombie %s",xq:keyfield(t))
88✔
927
                                                        space:delete{ xq:keyfield(t) }
176✔
928
                                                elseif t[xq.fields.status] == 'T' and xq.features.ttr then
×
929
                                                        local key = xq:keypack(t)
×
930
                                                        local sid = xq.taken[ key ]
×
931
                                                        local peer = peers[sid] or sid
×
932

933
                                                        log.info("Runat: autorelease T->R by ttr %s taken by %s",xq:keyfield(t), peer)
×
934
                                                        local u = space:update({ xq:keyfield(t) },{
×
935
                                                                { '=',xq.fields.status,'R' },
×
936
                                                                { '=',xq.fields.runat, xq.NEVER }
×
937
                                                        })
938
                                                        xq.taken[ key ] = nil
×
939
                                                        if sid then
×
940
                                                                self.bysid[ sid ][ key ] = nil
×
941
                                                        end
942
                                                        xq:wakeup(u)
×
943
                                                else
944
                                                        log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
×
945
                                                        space:update({ xq:keyfield(t) },{
×
946
                                                                { '=',xq.fields.runat, xq.NEVER }
×
947
                                                        })
948
                                                end
949
                                        end
950

951
                                        if remaining then
10,300✔
952
                                                if remaining >= 0 and remaining < 1 then
302✔
953
                                                        return remaining
139✔
954
                                                end
955
                                        end
956
                                        return 1
10,161✔
957
                                end)
958

959
                                table_clear(collect)
10,383✔
960

961
                                if r then
10,383✔
962
                                        curwait = e
10,300✔
963
                                else
964
                                        curwait = 1
83✔
965
                                        log.error("Runat/ERR: %s",e)
83✔
966
                                end
967
                                -- log.info("Wait %0.2fs",curwait)
968
                                if curwait == 0 then fiber.sleep(0) end
10,383✔
969
                                chan:get(curwait)
10,383✔
970
                        end
971
                        if box.info.ro then
10✔
972
                                log.info("Shutting down on ro instance")
10✔
973
                                return
10✔
974
                        end
975
                        log.info("Runat ended")
×
976
                end,space,self,runat_index)
74✔
977
        end
978

979
        local function atomic_tail(self, key, status, ...)
980
                self._lock[key] = nil
8,253✔
981
                if not status then
8,253✔
982
                        error((...), 3)
×
983
                end
984
                return ...
8,253✔
985
        end
986
        function self:atomic(key, fun, ...)
45✔
987
                self._lock[key] = true
8,253✔
988
                return atomic_tail(self, key, pcall(fun, ...))
16,506✔
989
        end
990

991
        function self:check_owner(key)
45✔
992
                local t = space:get(key)
16,196✔
993
                if not t then
8,098✔
994
                        error(string.format( "Task {%s} was not found", key ),2)
×
995
                end
996
                if not self.taken[key] then
16,136✔
997
                        error(string.format( "Task %s not taken by any", key ),2)
×
998
                end
999
                if self.taken[key] ~= box.session.id() then
16,136✔
1000
                        error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2)
×
1001
                end
1002
                return t
8,098✔
1003
        end
1004

1005
        function self:putback(task)
45✔
1006
                local key = task[ self.key.no ]
8,102✔
1007
                local sid = self.taken[ key ]
8,102✔
1008
                if sid then
8,102✔
1009
                        self.taken[ key ] = nil
8,098✔
1010
                        if self.bysid[ sid ] then
8,098✔
1011
                                self.bysid[ sid ][ key ] = nil
16,136✔
1012
                        else
1013
                                log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
×
1014
                        end
1015
                else
1016
                        log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
4✔
1017
                end
1018

1019
                notify_producer(key, task)
8,102✔
1020
        end
1021

1022
        function self:wakeup(t)
45✔
1023
                if t[self.fieldmap.status] ~= 'R' then return end
16,400✔
1024
                if self.fieldmap.tube then
8,105✔
1025
                        -- we may have consumers in the tubes:
1026
                        local tube_chan = self.take_chans[t[self.fieldmap.tube]]
120✔
1027
                        if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
60✔
1028
                                -- we have successfully notified consumer
1029
                                return
×
1030
                        end
1031
                        -- otherwise fallback to default channel:
1032
                end
1033
                if self.take_wait:has_readers() then
8,105✔
1034
                        self.take_wait:put(true,0)
23✔
1035
                end
1036
        end
1037

1038
        function self:make_ready()
45✔
1039
                while self.ready and self.ready:has_readers() do
57✔
1040
                        self.ready:put(true,0)
×
1041
                        fiber.sleep(0)
×
1042
                end
1043
                self.ready = nil
57✔
1044
        end
1045

1046
        local meta = debug.getmetatable(space)
45✔
1047
        for k,v in pairs(methods) do meta[k] = v end
564✔
1048

1049
        -- Triggers must set right before updating space
1050
        -- because raising error earlier leads to trigger inconsistency
1051
        self._on_repl = space:on_replace(function(old, new)
90✔
1052
                local old_st, new_st
1053
                local old_tube, new_tube
1054
                local old_tube_stat, new_tube_stat
1055
                local counts = self._stat.counts
16,475✔
1056
                local tube_ss = self._stat.tube
16,475✔
1057

1058
                if old then
16,475✔
1059
                        old_st = old[self.fields.status] --[[@as string]]
16,306✔
1060
                        if counts[old_st] and counts[old_st] > 0 then
16,306✔
1061
                                counts[old_st] = counts[old_st] - 1
16,306✔
1062
                        else
1063
                                log.error(
×
1064
                                        "Have not valid statistic by status: %s with value: %s",
1065
                                        old_st, tostring(counts[old_st])
×
1066
                                )
1067
                        end
1068
                        if self.fields.tube then
16,306✔
1069
                                old_tube = old[self.fields.tube]
120✔
1070
                                old_tube_stat = tube_ss[old_tube]
120✔
1071
                                if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then
120✔
NEW
1072
                                        if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then
×
NEW
1073
                                                old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1
×
1074
                                        else
NEW
1075
                                                log.error(
×
1076
                                                        "Have not valid statistic by tubs/status: tube %s with value: %s",
NEW
1077
                                                        old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st])
×
1078
                                                )
1079
                                        end
1080
                                else
1081
                                        old_tube_stat = nil
120✔
1082
                                end
1083
                        end
1084
                else
1085
                        old_st = 'X'
169✔
1086
                end
1087

1088
                if new then
16,475✔
1089
                        new_st = new[self.fields.status] --[[@as string]]
16,355✔
1090
                        counts[new_st] = (counts[new_st] or 0LL) + 1
16,355✔
1091
                        if counts[new_st] < 0 then
16,355✔
1092
                                log.error("Statistic overflow by task type: %s", new_st)
×
1093
                        end
1094
                        if self.fields.tube then
16,355✔
1095
                                new_tube = new[self.fields.tube]
140✔
1096
                                new_tube_stat = tube_ss[new_tube]
140✔
1097
                                if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then
140✔
NEW
1098
                                        if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then
×
NEW
1099
                                                new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1
×
1100
                                        else
NEW
1101
                                                log.error(
×
1102
                                                        "Have not valid statistic tube/status: tube %q with value: %s",
NEW
1103
                                                        new_tube, new_st, tostring(new_tube_stat.counts[new_st])
×
1104
                                                )
1105
                                        end
1106
                                else
1107
                                        new_tube_stat = nil
140✔
1108
                                end
1109
                        end
1110
                else
1111
                        new_st = 'X'
120✔
1112
                end
1113

1114
                local field = old_st.."-"..new_st
16,475✔
1115
                self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1
16,475✔
1116
                if old_tube_stat then
16,475✔
NEW
1117
                        if not new_tube_stat or old_tube_stat == new_tube_stat then
×
1118
                                -- no new tube or new and old tubes are the same
NEW
1119
                                old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1
×
1120
                        else
1121
                                -- nil != old_tube != new_tube != nil
1122
                                -- cross tube transition ?
1123
                                -- Can this be backoff with tube change?
NEW
1124
                                local old_field = old_st.."-S"
×
NEW
1125
                                local new_field = "S-"..new_st
×
NEW
1126
                                old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1
×
NEW
1127
                                new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1
×
1128
                        end
1129
                elseif new_tube_stat then
16,475✔
1130
                        -- old_tube_stat == nil
NEW
1131
                        new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1
×
1132
                end
1133
        end, self._on_repl)
16,565✔
1134

1135
        self._on_dis = box.session.on_disconnect(function()
90✔
1136
                local sid = box.session.id()
61✔
1137
                local peer = box.session.storage.peer
122✔
1138

1139
                log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
61✔
1140
                box.session.storage.destroyed = true
122✔
1141
                if self.bysid[sid] then
61✔
1142
                        local old = self.bysid[sid]
8✔
1143
                        while next(old) do
12✔
1144
                                for key,realkey in pairs(old) do
10✔
1145
                                        self.taken[key] = nil
4✔
1146
                                        old[key] = nil
4✔
1147
                                        local t = space:get(realkey)
8✔
1148
                                        if t then
4✔
1149
                                                if t[ self.fields.status ] == 'T' then
8✔
1150
                                                        self:wakeup(space:update({ realkey }, {
16✔
1151
                                                                { '=',self.fields.status,'R' },
4✔
1152
                                                                self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
4✔
1153
                                                        }))
1154
                                                        log.info("Rst: T->R {%s}", realkey )
8✔
1155
                                                else
1156
                                                        log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
×
1157
                                                end
1158
                                        else
1159
                                                log.error( "Rst: {%s}: taken not found", realkey )
×
1160
                                        end
1161
                                end
1162
                        end
1163
                        self.bysid[sid] = nil
8✔
1164
                end
1165
        end, self._on_dis)
151✔
1166

1167
        rawset(space,'xq',self)
45✔
1168

1169
        log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
45✔
1170

1171
        function self:starttest()
45✔
1172
                local xq = self
45✔
1173
                if box.info.status == 'orphan' then
45✔
1174
                        fiber.create(function()
×
1175
                                local x = 0
×
1176
                                repeat
1177
                                        fiber.sleep(x/1e5)
×
1178
                                        x = x + 1
×
1179
                                until box.info.status ~= 'orphan'
×
1180
                                self:starttest()
×
1181
                        end)
1182
                        return
×
1183
                end
1184
                if box.info.ro then
45✔
1185
                        log.info("Server is ro, not resetting statuses")
28✔
1186
                else
1187
                        local space = box.space[self.space]
31✔
1188
                        local ver = tonumber(rawget(_G, '_TARANTOOL'):match('^([^.]+%.[^.]+)'))
31✔
1189
                        local yield_limit = 2^32-1
31✔
1190
                        local scanned = 0
31✔
1191
                        if ver >= 1.10 then
31✔
1192
                                yield_limit = 1000
31✔
1193
                        end
1194
                        box.begin()
31✔
1195
                        for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do
93✔
NEW
1196
                                scanned = scanned + 1
×
1197
                                local key = t[ xq.key.no ]
×
1198
                                if not self.taken[key] and not self._lock[key] then
×
1199
                                        local update = {
×
1200
                                                { '=', self.fields.status, 'R' },
×
1201
                                                self.have_runat and {
×
1202
                                                        '=',self.fields.runat,
×
1203
                                                        self.ttl_default and self.timeoffset( self.ttl_default ) or self.NEVER
×
1204
                                                } or nil
×
1205
                                        }
1206
                                        space:update({key}, update)
×
1207
                                        log.info("Start: T->R (%s)", key)
×
1208
                                end
NEW
1209
                                if scanned == yield_limit then
×
NEW
1210
                                        scanned = 0
×
NEW
1211
                                        box.commit()
×
NEW
1212
                                        box.begin()
×
1213
                                end
1214
                        end
1215
                        box.commit()
31✔
1216
                end
1217
                self:make_ready()
45✔
1218
        end
1219
        self:starttest()
45✔
1220
end
1221

1222

1223
--[[
1224
* `space:put`
1225
        - `task` - table or array or tuple
1226
                + `table`
1227
                        * **requires** space format
1228
                        * suitable for id generation
1229
                + `array`
1230
                        * ignores space format
1231
                        * for id generation use `NULL` (**not** `nil`)
1232
                + `tuple`
1233
                        * ignores space format
1234
                        * **can't** be used with id generation
1235
        - `attr` - table of attributes
1236
                + `delay` - number of seconds
1237
                        * if set, task will become `W` instead of `R` for `delay` seconds
1238
                + `ttl` - number of seconds
1239
                        * if set, task will be discarded after ttl seconds unless was taken
1240
                + `wait` - number of seconds
1241
                        * if set, callee fiber will be blocked up to `wait` seconds until task won't
1242
                        be processed or timeout reached.
1243

1244
```lua
1245
box.space.myqueue:put{ name="xxx"; data="yyy"; }
1246
box.space.myqueue:put{ "xxx","yyy" }
1247
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })
1248

1249
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
1250
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
1251
```
1252
]]
1253

1254
---@param t any[]|box.tuple
1255
---@param opts? { delay: number?, ttl: number?, wait: number? }
1256
---@return table|box.tuple, boolean? has_been_processed
1257
function methods:put(t, opts)
25✔
1258
        local xq = self.xq
155✔
1259
        opts = opts or {}
155✔
1260
        if type(t) == 'table' then
155✔
1261
                if xq.gen_id then
155✔
1262
                        if is_array(t) then
310✔
1263
                                error("Task from array creation is not implemented yet", 2)
×
1264
                        else
1265
                                -- pass
1266
                        end
1267
                end
1268
        elseif type(t) == 'cdata' and ffi.typeof(t) == tuple_ctype then
×
1269
                if xq.gen_id then
×
1270
                        error("Can't use id generation with tuple.", 2)
×
1271
                end
1272
                error("Task from tuple creation is not implemented yet", 2)
×
1273
        else
1274
                error("Wrong argument to put. Expected table or tuple", 2)
×
1275
        end
1276
        -- here we have table with keys
1277
        if xq.gen_id then
155✔
1278
                t[ xq.key.name ] = xq.gen_id()
317✔
1279
        else
1280
                if not t[ xq.key.name ] then
×
1281
                        error("Primary key is mandatory",2)
×
1282
                end
1283
        end
1284

1285
        -- delayed or ttl or default ttl
1286
        if opts.delay then
155✔
1287
                if not xq.features.delayed then
11✔
1288
                        error("Feature delayed is not enabled",2)
×
1289
                end
1290
                if opts.ttl then
11✔
1291
                        error("Features ttl and delay are mutually exclusive",2)
×
1292
                end
1293
                t[ xq.fieldmap.status ] = 'W'
11✔
1294
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.delay)
22✔
1295

1296
                if opts.wait then
11✔
1297
                        error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
×
1298
                end
1299
        elseif opts.ttl then
144✔
1300
                if not xq.features.ttl then
×
1301
                        error("Feature ttl is not enabled",2)
×
1302
                end
1303
                t[ xq.fieldmap.status ] = 'R'
×
1304
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.ttl)
×
1305
        elseif xq.features.ttl_default then
144✔
1306
                t[ xq.fieldmap.status ] = 'R'
×
1307
                t[ xq.fieldmap.runat ]  = xq.timeoffset(xq.features.ttl_default)
×
1308
        elseif xq.have_runat then
144✔
1309
                t[ xq.fieldmap.status ] = 'R'
92✔
1310
                t[ xq.fieldmap.runat ]  = xq.NEVER
92✔
1311
        else
1312
                t[ xq.fieldmap.status ] = 'R'
52✔
1313
        end
1314

1315
        local tuple = xq.tuple(t)
155✔
1316
        local key = tuple[ xq.key.no ]
155✔
1317

1318
        local wait
1319
        if opts.wait then
155✔
1320
                wait = { cond = fiber.cond() }
40✔
1321
                xq.put_wait[xq:packkey(key)] = wait
40✔
1322
        end
1323

1324
        xq:atomic(key, function()
310✔
1325
                t = self:insert(tuple)
465✔
1326
        end)
1327
        xq:wakeup(t)
155✔
1328

1329
        if wait then
155✔
1330
                -- local func notify_producer will send to us some data
1331
                local ok = wait.cond:wait(opts.wait)
40✔
1332
                fiber.testcancel()
40✔
1333
                if ok and wait.task then
40✔
1334
                        return xq.retwrap(wait.task), wait.processed
18✔
1335
                end
1336
        end
1337
        return xq.retwrap(t)
146✔
1338
end
1339

1340
--[[
1341
* `space:wait(id, timeout)`
1342
        - `id`:
1343
                + `string` | `number` - primary key
1344
        - `timeout` - number of seconds to wait
1345
                * callee fiber will be blocked up to `timeout` seconds until task won't
1346
                be processed or timeout reached.
1347
]]
1348

1349
local wait_for = {
25✔
1350
        R = true,
1351
        T = true,
1352
        W = true,
1353
}
1354

1355
---@param key table|scalar|box.tuple
1356
---@param timeout number
1357
---@return table|box.tuple, boolean? has_been_processed
1358
function methods:wait(key, timeout)
25✔
1359
        local xq = self.xq
117,279✔
1360
        key = xq:getkey(key)
234,558✔
1361
        local task = self:get(key)
234,558✔
1362
        if not task then
117,279✔
1363
                error(("Task {%s} was not found"):format(key))
×
1364
        end
1365

1366
        local status = task[xq.fields.status]
117,279✔
1367
        if not wait_for[status] then
117,279✔
1368
                return xq.retwrap(task), false
18✔
1369
        end
1370

1371
        local pkey = xq:packkey(key)
117,270✔
1372
        local wait = xq.put_wait[pkey]
117,270✔
1373
        if not wait then
117,270✔
1374
                wait = { cond = fiber.cond() }
2,051✔
1375
                xq.put_wait[pkey] = wait
2,051✔
1376
        end
1377

1378
        -- local func notify_producer will send to us some data
1379
        local ok = wait.cond:wait(timeout)
117,270✔
1380
        fiber.testcancel()
117,270✔
1381
        if ok and wait.task then
117,270✔
1382
                return xq.retwrap(wait.task), wait.processed
70✔
1383
        end
1384

1385
        return xq.retwrap(task), false
234,470✔
1386
end
1387

1388
--[[
1389
* `space:take(timeout)`
1390
* `space:take(timeout, opts)`
1391
* `space:take(opts)`
1392
        - `timeout` - number of seconds to wait for new task
1393
                + choose reasonable time
1394
                + beware of **readahead** size (see tarantool docs)
1395
        - `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
1396
        - returns task tuple or table (see retval) or nothing on timeout
1397
        - *TODO*: ttr must be there
1398
]]
1399

1400
---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? }
1401
---@param opts? { timeout: number?, ttr: number?, tube: string? }
1402
---@return table|box.tuple?
1403
function methods:take(timeout, opts)
25✔
1404
        local xq = self.xq
8,340✔
1405
        timeout = timeout or 0
8,340✔
1406

1407
        if type(timeout) == 'table' then
8,340✔
1408
                opts = timeout
141✔
1409
                timeout = opts.timeout or 0
141✔
1410
        else
1411
                opts = opts or {}
8,199✔
1412
        end
1413
        assert(timeout >= 0, "timeout required")
8,340✔
1414

1415
        local ttr
1416
        if opts.ttr then
8,340✔
1417
                if not xq.features.ttr then
×
1418
                        error("Feature ttr is not enabled",2)
×
1419
                end
1420
                ttr = opts.ttr
×
1421
        elseif xq.features.ttr_default then
8,340✔
1422
                ttr = xq.features.ttr_default
×
1423
        end
1424

1425
        local index
1426
        local start_with
1427

1428
        local tube_chan
1429
        if opts.tube then
8,340✔
1430
                if not xq.features.tube then
80✔
1431
                        error("Feature tube is not enabled", 2)
×
1432
                end
1433

1434
                assert(type(opts.tube) == 'string', "opts.tube must be a string")
80✔
1435

1436
                index = xq.tube_index
80✔
1437
                start_with = {opts.tube, 'R'}
80✔
1438
                tube_chan = xq.take_chans[opts.tube] or fiber.channel()
80✔
1439
                xq.take_chans[opts.tube] = tube_chan
80✔
1440
        else
1441
                index = xq.index
8,260✔
1442
                start_with = {'R'}
8,260✔
1443
        end
1444
        ---@cast index -nil
1445

1446
        local now = fiber.time()
8,340✔
1447
        local key
1448
        local found
1449
        while not found do
16,691✔
1450
                for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
25,770✔
1451
                        key = t[ xq.key.no ]
8,121✔
1452
                        if not xq._lock[ key ] then
8,121✔
1453
                                -- found key
1454
                                xq._lock[ key ] = true
8,109✔
1455
                                found = t
8,109✔
1456
                                break
8,109✔
1457
                        end
1458
                end
1459
                if not found then
8,582✔
1460
                        local left = (now + timeout) - fiber.time()
946✔
1461
                        if left <= 0 then goto finish end
473✔
1462

1463
                        (tube_chan or xq.take_wait):get(left)
250✔
1464
                        if box.session.storage.destroyed then goto finish end
484✔
1465
                end
1466
        end
1467
        ::finish::
×
1468

1469
        -- If we were last reader from the tube
1470
        -- we remove channel. Writers will get false
1471
        -- on :put if they exists.
1472
        if tube_chan and not tube_chan:has_readers() then
8,332✔
1473
                tube_chan:close()
80✔
1474
                xq.take_chans[opts.tube] = nil
80✔
1475
        end
1476
        if not found then return end
8,332✔
1477

1478
        local r,e = pcall(function()
16,218✔
1479
                local sid = box.session.id()
8,109✔
1480
                local peer = box.session.storage.peer
16,218✔
1481

1482
                -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
1483
                if xq.debug then
8,109✔
1484
                        log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
7,990✔
1485
                end
1486

1487
                local update = {
8,109✔
1488
                        { '=', xq.fields.status, 'T' },
8,109✔
1489
                }
1490
                -- TODO: update more fields
1491

1492
                if ttr then
8,109✔
1493
                        table.insert(update,{ '=', xq.fields.runat, xq.timeoffset(ttr)})
×
1494
                        xq.runat_chan:put(true,0)
×
1495
                end
1496
                local t = self:update({key},update)
16,218✔
1497

1498
                if not xq.bysid[ sid ] then xq.bysid[ sid ] = {} end
8,109✔
1499
                xq.taken[ key ] = sid
8,109✔
1500
                xq.bysid[ sid ][ key ] = key
8,109✔
1501

1502
                return t
8,109✔
1503
        end)
1504

1505
        xq._lock[ key ] = nil
8,109✔
1506
        if not r then
8,109✔
1507
                error(e)
×
1508
        end
1509
        return xq.retwrap(e)
8,109✔
1510
end
1511

1512
--[[
1513
* `space:release(id, [attr])`
1514
        - `id`:
1515
                + `string` | `number` - primary key
1516
                + *TODO*: `tuple` - key will be extracted using index
1517
                + *TODO*: composite pk
1518
        - `attr`
1519
                + `update` - table for update, like in space:update
1520
                + `ttl` - timeout for time to live
1521
                + `delay` - number of seconds
1522
                        * if set, task will become `W` instead of `R` for `delay` seconds
1523
]]
1524

1525
---@param key table|scalar|box.tuple
1526
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1527
---@return table|box.tuple
1528
function methods:release(key, attr)
25✔
1529
        local xq = self.xq
7,966✔
1530
        key = xq:getkey(key)
15,932✔
1531

1532
        attr = attr or {}
7,966✔
1533

1534
        local t = xq:check_owner(key)
7,966✔
1535
        local old = t[ xq.fields.status ]
7,966✔
1536
        local update = {}
7,966✔
1537
        if attr.update then
7,966✔
1538
                for _,v in pairs(attr.update) do table.insert(update,v) end
28,691✔
1539
        end
1540

1541
        -- delayed or ttl or default ttl
1542
        if attr.delay then
7,966✔
1543
                if not xq.features.delayed then
24✔
1544
                        error("Feature delayed is not enabled",2)
×
1545
                end
1546
                if attr.ttl then
24✔
1547
                        error("Features ttl and delay are mutually exclusive",2)
×
1548
                end
1549
                table.insert(update, { '=', xq.fields.status, 'W' })
24✔
1550
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
48✔
1551
        elseif attr.ttl then
7,942✔
1552
                if not xq.features.ttl then
×
1553
                        error("Feature ttl is not enabled",2)
×
1554
                end
1555
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1556
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.ttl) })
×
1557
        elseif xq.features.ttl_default then
7,942✔
1558
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1559
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.ttl_default) })
×
1560
        elseif xq.have_runat then
7,942✔
1561
                table.insert(update, { '=', xq.fields.status, 'R' })
7,938✔
1562
                table.insert(update, { '=', xq.fields.runat, xq.NEVER })
7,938✔
1563
        else
1564
                table.insert(update, { '=', xq.fields.status, 'R' })
4✔
1565
        end
1566

1567
        xq:atomic(key,function()
15,932✔
1568
                t = self:update({key}, update)
23,898✔
1569

1570
                ---@cast t box.tuple
1571
                xq:wakeup(t)
7,966✔
1572
                if xq.have_runat then
7,966✔
1573
                        xq.runat_chan:put(true,0)
7,962✔
1574
                end
1575

1576
                log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
15,932✔
1577
                        key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
15,932✔
1578
        end)
1579

1580
        xq:putback(t)
7,966✔
1581

1582
        return t
7,966✔
1583
end
1584

1585
---@param key table|scalar|box.tuple
1586
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1587
---@return table|box.tuple
1588
function methods:ack(key, attr)
25✔
1589
        -- features.zombie
1590
        -- features.keep
1591
        local xq = self.xq
128✔
1592
        key = xq:getkey(key)
256✔
1593

1594
        attr = attr or {}
128✔
1595

1596
        local t = xq:check_owner(key)
128✔
1597
        local old = t[ xq.fields.status ]
128✔
1598
        local delete = false
128✔
1599
        local update = {}
128✔
1600
        if attr.update then
128✔
1601
                for _,v in pairs(attr.update) do table.insert(update,v) end
160✔
1602
        end
1603

1604
        -- delayed or ttl or default ttl
1605
        if attr.delay then
128✔
1606
                if not xq.features.zombie then
4✔
1607
                        error("Feature zombie is not enabled",2)
×
1608
                end
1609
                table.insert(update, { '=', xq.fields.status, 'Z' })
4✔
1610
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
8✔
1611
        elseif xq.features.zombie then
124✔
1612
                table.insert(update, { '=', xq.fields.status, 'Z' })
48✔
1613
                if xq.features.zombie_delay then
48✔
1614
                        table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.zombie_delay) })
80✔
1615
                end
1616
        elseif xq.features.keep then
76✔
1617
                table.insert(update, { '=', xq.fields.status, 'D' })
×
1618
        else
1619
                -- do remove task
1620
                delete = true
76✔
1621
        end
1622

1623
        xq:atomic(key,function()
256✔
1624
                if #update > 0 then
128✔
1625
                        t = self:update({key}, update)
168✔
1626
                        ---@cast t box.tuple
1627
                        xq:wakeup(t)
56✔
1628
                        if xq.have_runat then
56✔
1629
                                xq.runat_chan:put(true,0)
56✔
1630
                        end
1631
                        log.info("Ack: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
112✔
1632
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
112✔
1633
                end
1634

1635
                if delete then
128✔
1636
                        t = self:delete{key}
228✔
1637
                        log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
152✔
1638
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
152✔
1639
                end
1640
        end)
1641

1642
        xq:putback(t) -- in real drop form taken key
128✔
1643

1644
        return t
128✔
1645
end
1646

1647
---@param key table|scalar|box.tuple
1648
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1649
function methods:bury(key, attr)
25✔
1650
        attr = attr or {}
4✔
1651

1652
        local xq = self.xq
4✔
1653
        key = xq:getkey(key)
8✔
1654
        local t = xq:check_owner(key)
4✔
1655

1656
        local update = {}
4✔
1657
        if attr.update then
4✔
1658
                for _,v in pairs(attr.update) do table.insert(update,v) end
×
1659
        end
1660
        table.insert(update, { '=', xq.fields.status, 'B' })
4✔
1661

1662
        xq:atomic(key,function()
8✔
1663
                t = self:update({key}, update)
12✔
1664

1665
                xq:wakeup(t)
4✔
1666
                if xq.have_runat then
4✔
1667
                        xq.runat_chan:put(true,0)
×
1668
                end
1669
                log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())
8✔
1670
        end)
1671

1672
        xq:putback(t)
4✔
1673
end
1674

1675
local function kick_task(self, key, attr)
1676
        local xq   = self.xq
4✔
1677
        key  = xq:getkey(key)
8✔
1678
        local peer = box.session.storage.peer
8✔
1679
        local sid  = box.session.id()
4✔
1680
        attr       = attr or {}
4✔
1681

1682
        if xq.debug then
4✔
1683
                log.info("Kick {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
4✔
1684
        end
1685

1686
        self:update({key}, {{ '=', xq.fields.status, 'R' }})
8✔
1687
        xq:putback(key, attr)
4✔
1688
end
1689

1690
function methods:kick(nr_tasks_or_task, attr)
25✔
1691
        attr = attr or {}
4✔
1692
        if type(nr_tasks_or_task) == 'number' then
4✔
1693
                local task_n = 1
×
1694
                for _, t in self.xq.index:pairs({'B'},{ iterator = box.index.EQ }) do
×
1695
                        if task_n > nr_tasks_or_task then break end
×
1696
                        kick_task(self, t, attr)
×
1697
                        if task_n % 500 == 0 then fiber.sleep(0) end
×
1698
                        task_n = task_n + 1
×
1699
                end
1700
        else
1701
                kick_task(self, nr_tasks_or_task, attr)
4✔
1702
        end
1703
end
1704

1705
---@param key table|scalar|box.tuple
1706
---@return box.tuple|table
1707
function methods:kill(key)
25✔
1708
        local xq     = self.xq
×
NEW
1709
        key = xq:getkey(key)
×
1710
        local task   = self:get(key)
×
NEW
1711
        if not task then
×
NEW
1712
                error(("Task by %s not found to kill"):format(key))
×
1713
        end
1714
        local peer   = box.session.storage.peer
×
1715

1716
        if xq.debug then
×
1717
                log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id())
×
1718
        end
1719

NEW
1720
        task = self:delete(key)
×
1721
        ---@cast task -nil
1722
        ---Kill is treated as a synonim of Bury
NEW
1723
        task = task:update({{ '=', xq.fields.status, 'B' }})
×
NEW
1724
        xq:putback(task)
×
NEW
1725
        return xq.retwrap(task)
×
1726
end
1727

1728
-- special remap of truncate for deliting stats and saving methods
1729
function methods:truncate()
25✔
1730
        local xq = self.xq
12✔
1731
        xq.ready = xq.ready or fiber.channel(0)
12✔
1732
        local stat = xq._stat
12✔
1733
        for status, _ in pairs(stat.counts) do
54✔
1734
                stat.counts[status] = 0LL
36✔
1735
        end
1736
        for transition, _ in pairs(stat.transition) do
94✔
1737
                stat.transition[transition] = nil
76✔
1738
        end
1739
        local ret = xq._default_truncate(self)
12✔
1740
        local meta = debug.getmetatable(self)
12✔
1741
        for k,v in pairs(methods) do meta[k] = v end
150✔
1742
        xq:make_ready()
12✔
1743
        -- Now we reset our methods after truncation because
1744
        -- as we can see in on_replace_dd_truncate:
1745
        -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239
1746
        -- tarantool deletes space and restores it with the same indexes
1747
        -- but without methods
1748
        return ret
12✔
1749
end
1750

1751
local pretty_st = {
25✔
1752
        R = "Ready",
1753
        T = "Taken",
1754
        W = "Waiting",
1755
        B = "Buried",
1756
        Z = "Zombie",
1757
        D = "Done",
1758
}
1759

1760
local shortmap = { __serialize = 'map' }
25✔
1761

1762
---@param pretty? boolean
1763
function methods:stats(pretty)
25✔
1764
        local stats = table.deepcopy(self.xq._stat)
4✔
1765
                for s, ps in pairs(pretty_st) do
30✔
1766
                        if pretty then
24✔
1767
                                stats.counts[ps] = stats.counts[s] or 0LL
×
1768
                                stats.counts[s]  = nil
×
NEW
1769
                                for _, tube_stat in pairs(stats.tube) do
×
NEW
1770
                                        tube_stat.counts[ps] = tube_stat.counts[s] or 0LL
×
NEW
1771
                                        tube_stat.counts[s] = nil
×
1772
                                end
1773
                        else
1774
                                stats.counts[s] = stats.counts[s] or 0LL
24✔
1775
                                for _, tube_stat in pairs(stats.tube) do
36✔
NEW
1776
                                        setmetatable(tube_stat.counts, shortmap)
×
NEW
1777
                                        tube_stat.counts[s] = tube_stat.counts[s] or 0LL
×
1778
                                end
1779
                        end
1780
                end
1781
                setmetatable(stats.counts, shortmap)
4✔
1782
        return stats
4✔
1783
end
1784

1785
setmetatable(M,{
50✔
1786
        __call = function(_, space, opts)
1787
                M.upgrade(space,opts,1)
×
1788
        end
1789
})
1790

1791
return M
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc