• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / xqueue / 10019881435

20 Jul 2024 11:34AM UTC coverage: 61.066% (+5.2%) from 55.916%
10019881435

push

github

web-flow
Merge 2de0410f2 into dddd83e75

32 of 71 new or added lines in 1 file covered. (45.07%)

186 existing lines in 1 file now uncovered.

527 of 863 relevant lines covered (61.07%)

8.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.07
/xqueue.lua
1
local M = {}
1✔
2

3
local ffi = require 'ffi'
1✔
4
local log = require 'log'
1✔
5
local fiber = require 'fiber'
1✔
6
local clock = require 'clock'
1✔
7

8
local tuple_ctype = ffi.typeof(box.tuple.new())
1✔
9

10
local monotonic_max_age = 10*365*86400;
1✔
11

12
local function table_clear(t)
13
        if type(t) ~= 'table' then
20✔
14
                error("bad argument #1 to 'clear' (table expected, got "..(t ~= nil and type(t) or 'no value')..")",2)
×
15
        end
16
        local count = #t
20✔
17
        for i=0, count do t[i]=nil end
23✔
18
        return
20✔
19
end
20

21
local function is_array(t)
22
        local gen,param,state = ipairs(t)
40✔
23
        -- print(gen,param,state)
24
        local v = gen(param,state)
40✔
25
        -- print(v)
26
        return v ~= nil
40✔
27
end
28

29
local peers = {}
1✔
30

31
rawset(_G,"\0xq.on_connect",box.session.on_connect(function()
2✔
32
        local sid = box.session.id()
×
33
        local peer = box.session.peer()
×
34
        box.session.storage.peer = box.session.peer()
×
35
        peers[ sid ] = peer
×
36
        log.info("connected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
37
end,rawget(_G,"\0xq.on_connect")))
1✔
38

39
rawset(_G,"\0xq.on_disconnect",box.session.on_disconnect(function()
2✔
40
        local sid = box.session.id()
×
41
        local peer = peers[ sid ]
×
42
        peers[ sid ] = nil
×
43
        log.info("disconnected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
44
end,rawget(_G,"\0xq.on_disconnect")))
1✔
45

46

47
--[[
48

49
Field sets:
50
1. id, status (minimal required configuration)
51
2. id, status, priority
52
3. id, status, runat
53
4. id, status, priority, runat
54

55
Primary index variants:
56
1. index(status, [ id ])
57
2. index(status, priority, [ id ])
58

59

60
Format:
61
local format = box.space._space.index.name:get(space_name)[ 7 ]
62

63
Status:
64
        R - ready - task is ready to be taken
65
                created by put without delay
66
                turned on by release/kick without delay
67
                turned on from W when delay passed
68
                deleted after ttl if ttl is enabled
69

70
        T - taken - task is taken by consumer. may not be taken by other.
71
                turned into R after ttr if ttr is enabled
72

73
        W - waiting - task is not ready to be taken. waiting for its `delay`
74
                requires `runat`
75
                `delay` may be set during put, release, kick
76
                turned into R after delay
77

78
        B - buried - task was temporary discarded from queue by consumer
79
                may be revived using kick by administrator
80
                use it in unpredicted conditions, when man intervention is required
81
                Without mandatory monitoring (stats.buried) usage of buried is useless and awry
82

83
        Z - zombie - task was processed and ack'ed and *temporary* kept for delay
84

85
        D - done - task was processed and ack'ed and permanently left in database
86
                enabled when keep feature is set
87

88
        X - reserved for statistics
89

90
(TODO: reload/upgrade and feature switch)
91

92
Interface:
93

94
# Creator methods:
95

96
        M.upgrade(space, {
97
                format = {
98
                        -- space format. applied to space.format() if passed
99
                },
100
                fields = {
101
                        -- id is always taken from pk
102
                        status   = 'status_field_name'    | status_field_no,
103
                        runat    = 'runat_field_name'     | runat_field_no,
104
                        priority = 'priority_field_name'  | priority_field_no,
105
                },
106
                features = {
107
                        id = 'auto_increment' | 'uuid' | 'required' | function
108
                                -- auto_increment - if pk is number, then use it for auto_increment
109
                                -- uuid - if pk is string, then use uuid for id
110
                                -- required - primary key MUST be present in tuple during put
111
                                -- function - funciton will be called to aquire id for task
112

113
                        buried = true,           -- if true, support bury/kick
114
                        delayed = true,          -- if true, support delayed tasks, requires `runat`
115

116
                        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
117
                                -- mutually exclusive with zombie
118

119
                        zombie = true|number,    -- requires `runat` field
120
                                -- if number, then with default zombie delay, otherwise only if set delay during ack
121
                                -- mutually exclusive with keep
122

123
                        ttl    = true|number,    -- requires `runat` field
124
                                -- if number, then with default ttl, otherwise only if set during put/release
125
                        ttr    = true|number,    -- requires `runat` field
126
                                -- if number, then with default ttl, otherwise only if set
127
                },
128
        })
129

130
Producer methods:
131
        sp:put({...}, [ attr ]) (array or table (if have format) or tuple)
132

133
Consumer methods:
134

135
* id:
136
        - array of keyfields of pk
137
        - table of named keyields
138
        - tuple
139

140
sp:take(timeout) -> tuple
141
sp:ack(id, [ attr ])
142
        attr.update (only if support zombie)
143
sp:release(id, [ attr ])
144
        attr.ttr (only if support ttr)
145
        attr.ttl (only if support ttl)
146
        attr.update
147
sp:bury(id, [ attr ])
148
        attr.ttl (only if support ttl)
149
        attr.update
150

151
Admin methods:
152

153
sp:queue_stats()
154
sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]eady
155

156

157
]]
158

159
local json = require 'json'
1✔
160
json.cfg{ encode_invalid_as_nil = true }
1✔
161

162

163
local function typeeq(src, ref)
164
        if ref == 'str' then
14✔
165
                return src == 'STR' or src == 'str' or src == 'string'
8✔
166
        elseif ref == 'num' then
6✔
167
                return src == 'NUM' or src == 'num' or src == 'number' or src == 'unsigned'
6✔
168
        else
UNCOV
169
                return src == ref
×
170
        end
171
end
172

173
local function is_eq(a, b, depth)
174
        local t = type(a)
×
175
        if t ~= type(b) then return false end
×
UNCOV
176
        if t == 'table' then
×
177
                for k,v in pairs(a) do
×
178
                        if b[k] == nil then return end
×
179
                        if not is_eq(v,b[k]) then return false end
×
180
                end
181
                for k,v in pairs(b) do
×
182
                        if a[k] == nil then return end
×
183
                        if not is_eq(v,a[k]) then return false end
×
184
                end
185
                return true
×
UNCOV
186
        elseif t == 'string' or t == 'number' then
×
187
                return a == b
×
UNCOV
188
        elseif t == 'cdata' then
×
UNCOV
189
                return ffi.typeof(a) == ffi.typeof(b) and a == b
×
190
        else
UNCOV
191
                error("Wrong types for equality", depth or 2)
×
192
        end
193
end
194

195

196
local function _tuple2table ( qformat )
197
        local rows = {}
3✔
198
        for k,v in ipairs(qformat) do
15✔
199
                table.insert(rows,"\t['"..v.name.."'] = t["..tostring(k).."];\n")
12✔
200
        end
201
        local fun = "return function(t,...) "..
3✔
202
                "if select('#',...) > 0 then error('excess args',2) end "..
3✔
203
                "return t and {\n"..table.concat(rows, "").."} or nil end\n"
3✔
204
        return dostring(fun)
3✔
205
end
206

207
local function _table2tuple ( qformat )
208
        local rows = {}
3✔
209
        for _,v in ipairs(qformat) do
15✔
210
                table.insert(rows,"\tt['"..v.name.."'] == nil and NULL or t['"..v.name.."'];\n")
12✔
211
        end
212
        local fun = "local NULL = require'msgpack'.NULL return function(t) return "..
3✔
213
                "t and box.tuple.new({\n"..table.concat(rows, "").."}) or nil end\n"
3✔
214
        -- print(fun)
215
        return dostring(fun)
3✔
216
end
217

218
---@class xqueue.space
219
local methods = {}
1✔
220

221
---@class PrimaryKeyField:table
222
---@field no number position in tuple
223
---@field name string name of the field
224
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
225

226
---@class xqFeatures: table
227
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
228
---@field retval "tuple" | "table"
229
---@field buried boolean
230
---@field delayed boolean
231
---@field keep boolean
232
---@field tube boolean
233
---@field zombie boolean|number
234
---@field zombie_delay number
235
---@field ttl boolean|number
236
---@field ttr boolean|number
237
---@field ttl_default number?
238
---@field ttr_default number?
239

240
---@class xq:table
241
---@field NEVER integer (Default: 0)
242
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
243
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
244
---@field taken table<string,number> mapping key => sid
245
---@field _lock table<string,boolean> locks key => boolean (in atomic)
246
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
247
---@field take_wait fiber.channel
248
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
249
---@field debug boolean
250
---@field have_runat boolean
251
---@field gen_id? fun(): scalar
252
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
253
---@field packkey fun(self: xq, key: any): string
254
---@field tube_index? boxIndex
255
---@field index boxIndex
256
---@field key PrimaryKeyField
257
---@field fieldmap table<string,number>
258
---@field timeoffset fun(delta: number): number
259
---@field features xqFeatures
260
---@field fields table<string, string|number>
261
---@field tuple fun(tbl: table): box.tuple
262
---@field table fun(tuple: box.tuple): table
263
---@field retwrap fun(t: box.tuple|table): table|box.tuple
264
---@field wakeup fun(self: xq, t: box.tuple|table)
265
---@field runat_chan fiber.channel
266
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
267
---@field put_back fun(key: table|box.tuple)
268
---@field _stat { counts: table, transition: table }
269
---@field putback fun(self: xq, task: table|box.tuple)
270
---@field _default_truncate fun(space: boxSpaceObject)
271
---@field _on_repl replaceTrigger
272
---@field _on_dis fun()
273

274

275
---@class xqueue.space: boxSpaceObject
276
---@field xq xq xqueue specific storage
277

278
---@class xqueue.fields
279
---@field status? string|number Xqueue name for the Status field
280
---@field runat? string|number Xqueue name for the RunAt field
281
---@field priority? string|number Xqueue name for Task priority field
282
---@field tube? string|number Xqueue name for Tube field
283

284
---@class xqueue.features
285
---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation
286
---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods.
287
---(Default: table when space with format, tuple when space is without format)
288
---@field buried? boolean should xqueue allow buring of the tasks (by default: true)
289
---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not
290
---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false)
291
---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep
292
---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present)
293
---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default.
294
---Requires runat field and index.
295
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
296
---Requires runat field and index.
297

298
---@class xqueue.upgrade.options
299
---@field format? boxSpaceFormat
300
---@field fields xqueue.fields
301
---@field debug? boolean
302
---@field tube_stats? string[] List of tube names for per-tube statistics
303
---@field features xqueue.features
304
---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback
305
---@field workers? number (number of workers to spawn)
306

307
---Upgrades given space to xqueue instance
308
---@param space xqueue.space
309
---@param opts xqueue.upgrade.options
310
---@param depth? number
311
function M.upgrade(space,opts,depth)
1✔
312
        depth = depth or 0
3✔
313
        log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
3✔
314
        if not opts.fields then error("opts.fields required",2) end
3✔
315
        if opts.format then
3✔
316
                -- todo: check if already have such format
317
                local format_av = box.space._space.index.name:get(space.name)[ 7 ]
×
318
                if not is_eq(format_av, opts.format) then
×
319
                        space:format(opts.format)
×
320
                else
321
                        print("formats are equal")
×
322
                end
323
        end
324

325
        -- This variable will be defined later
326
        local taken_mt
327

328
        local self = {}
3✔
329
        if space.xq then
3✔
330
                self.taken = space.xq.taken
×
331
                self._stat = space.xq._stat
×
332
                self.bysid = space.xq.bysid
×
333
                self._lock = space.xq._lock
×
334
                self.take_wait = space.xq.take_wait
×
335
                self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
×
336
                self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
×
337
                self._on_repl  = space.xq._on_repl
×
338
                self._on_dis = space.xq._on_dis
×
339
        else
340
                self.taken = {}
3✔
341
                self.bysid = {}
3✔
342
                -- byfid = {};
343
                self._lock = {}
3✔
344
                self.put_wait = setmetatable({}, { __mode = 'v' })
3✔
345
                self.take_wait = fiber.channel(0)
3✔
346
                self.take_chans = setmetatable({}, { __mode = 'v' })
3✔
347
        end
348
        setmetatable(self.bysid, {
6✔
349
                __serialize='map',
350
                __newindex = function(t, key, val)
351
                        if type(val) == 'table' then
4✔
352
                                rawset(t, key, setmetatable(val, taken_mt))
4✔
353
                        else
NEW
354
                                rawset(t, key, val)
×
355
                        end
356
                end
357
        })
358
        self.debug = not not opts.debug
3✔
359

360
        if not self._default_truncate then
3✔
361
                self._default_truncate = space.truncate
3✔
362
        end
363

364
        local format_av = box.space._space.index.name:get(space.name)[ 7 ]
6✔
365
        local format = {}
3✔
366
        local have_format = false
3✔
367
        local have_runat = false
3✔
368
        for no,f in pairs(format_av) do
15✔
369
                format[ f.name ] = {
12✔
370
                        name = f.name;
12✔
371
                        type = f.type;
12✔
372
                        no   = no;
12✔
373
                }
12✔
374
                format[ no ] = format[ f.name ];
12✔
375
                have_format = true
12✔
376
                self.have_format = true
12✔
377
        end
378
        for _,idx in pairs(space.index) do
21✔
379
                for _,part in pairs(idx.parts) do
50✔
380
                        format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
32✔
381
                        format[ part.fieldno ].type = part.type
32✔
382
                end
383
        end
384

385
        -- dd(format)
386

387
        -- 1. fields check
388
        local fields = {}
3✔
389
        local fieldmap = {}
3✔
390
        for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
15✔
391
                local fname,ftype = f[1], f[2]
12✔
392
                local num = opts.fields[fname]
12✔
393
                if num then
12✔
394
                        if type(num) == 'string' then
6✔
395
                                if format[num] then
6✔
396
                                        fields[fname] = format[num].no;
6✔
397
                                else
398
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
399
                                end
400
                        elseif type(num) == 'number' then
×
401
                                if format[num] then
×
402
                                        fields[fname] = num
×
403
                                else
404
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
405
                                end
406
                        else
407
                                error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
×
408
                        end
409
                        -- check type
410
                        if format[num] then
6✔
411
                                if not typeeq(format[num].type, ftype) then
12✔
412
                                        error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
×
413
                                end
414
                        end
415
                        fieldmap[fname] = format[num].name
6✔
416
                end
417
        end
418

419
        -- dd(fields)
420

421
        -- 2. index check
422

423
        local pk = space.index[0]
3✔
424
        if #pk.parts ~= 1 then
3✔
425
                error("Composite primary keys are not supported yet")
×
426
        end
427
        local pktype
428
        if typeeq( format[pk.parts[1].fieldno].type, 'str' ) then
6✔
429
                pktype = 'string'
1✔
430
                taken_mt = { __serialize = 'map' }
1✔
431
        elseif typeeq( format[pk.parts[1].fieldno].type, 'num' ) then
4✔
432
                pktype = 'number'
2✔
433
                taken_mt = {
2✔
434
                        __serialize = 'map',
435
                        __newindex = function(t, key, val)
436
                                return rawset(t, tostring(ffi.cast("uint64_t", key)), val)
80✔
437
                        end,
438
                        __index = function(t, key)
439
                                return rawget(t, tostring(ffi.cast("uint64_t", key)))
60✔
440
                        end
441
                }
2✔
442
        else
443
                error("Unknown key type "..format[pk.parts[1].fieldno].type)
×
444
        end
445
        setmetatable(self.taken, taken_mt)
3✔
446

447
        local pkf = {
3✔
448
                no   = pk.parts[1].fieldno;
3✔
449
                name = format[pk.parts[1].fieldno].name;
3✔
450
                ['type'] = pktype;
3✔
451
        }
452
        self.key = pkf
3✔
453
        self.fields = fields
3✔
454
        self.fieldmap = fieldmap
3✔
455

456

457
        function self:getkey(arg)
3✔
458
                local _type = type(arg)
25✔
459
                if _type == 'table' then
25✔
460
                        if is_array(arg) then
50✔
461
                                return arg[ self.key.no ]
×
462
                        else
463
                                return arg[ self.key.name ]
25✔
464
                                -- pass
465
                        end
UNCOV
466
                elseif _type == 'cdata' and ffi.typeof(arg) == tuple_ctype then
×
UNCOV
467
                        return arg[ self.key.no ]
×
UNCOV
468
                elseif _type == self.key.type then
×
UNCOV
469
                        return arg
×
470
                else
UNCOV
471
                        error("Wrong key/task argument. Expected table or tuple or key", 3)
×
472
                end
473
        end
474

475
        function self.packkey(_, key)
3✔
476
                if type(key) == 'cdata' then
25✔
477
                        return tostring(ffi.cast("uint64_t", key))
20✔
478
                else
479
                        return key
5✔
480
                end
481
        end
482

483
        do
484
                local filter
485
                if fields.priority then
3✔
486
                        filter = function(index)
UNCOV
487
                                return #index.parts >= 3
×
UNCOV
488
                                        and index.parts[1].fieldno == fields.status
×
UNCOV
489
                                        and index.parts[2].fieldno == fields.priority
×
UNCOV
490
                                        and index.parts[3].fieldno == self.key.no
×
491
                        end
492
                else
493
                        filter = function(index)
494
                                return #index.parts >= 2
7✔
495
                                        and index.parts[1].fieldno == fields.status
4✔
496
                                        and index.parts[2].fieldno == self.key.no
7✔
497
                        end
498
                end
499
                for i,index in pairs(space.index) do
7✔
500
                        if type(i) == 'number' and filter(index) then
14✔
501
                                self.index = index
3✔
502
                                break
3✔
503
                        end
504
                end
505
                if not self.index then
3✔
506
                        if fields.priority then
×
507
                                error("not found index by status + priority + id",2+depth)
×
508
                        else
509
                                error("not found index by status + id",2+depth)
×
510
                        end
511
                end
512

513
                if fields.tube then
3✔
514
                        for n,index in pairs(space.index) do
2✔
515
                                if type(n) == 'number' and index.parts[1].fieldno == fields.tube then
2✔
516
                                        local not_match = false
1✔
517
                                        for i = 2, #index.parts do
3✔
518
                                                if index.parts[i].fieldno ~= self.index.parts[i-1].fieldno then
2✔
519
                                                        not_match = true
×
520
                                                        break
521
                                                end
522
                                        end
523
                                        if not not_match then
1✔
524
                                                self.tube_index = index
1✔
525
                                                break
1✔
526
                                        end
527
                                end
528
                        end
529
                        if not self.tube_index then
1✔
UNCOV
530
                                if fields.priority then
×
UNCOV
531
                                        error("not found index by tube + status + priority + id",2+depth)
×
532
                                else
UNCOV
533
                                        error("not found index by tube + status + id",2+depth)
×
534
                                end
535
                        end
536
                end
537
        end
538

539
        ---@type boxIndex
540
        local runat_index
541
        if fields.runat then
3✔
542
                for _,index in pairs(space.index) do
7✔
543
                        if type(_) == 'number' then
7✔
544
                                if index.parts[1].fieldno == fields.runat then
7✔
545
                                        -- print("found",index.name)
546
                                        runat_index = index
2✔
547
                                        break
2✔
548
                                end
549
                        end
550
                end
551
                if not runat_index then
2✔
UNCOV
552
                        error(string.format("fields.runat requires tree index with this first field in it"),2+depth)
×
553
                else
554
                        local t = runat_index:pairs({0},{iterator = box.index.GT}):nth(1)
4✔
555
                        if t then
2✔
UNCOV
556
                                if t[ self.fields.runat ] < monotonic_max_age then
×
UNCOV
557
                                        error("!!! Queue contains monotonic runat. Consider updating tasks (https://github.com/moonlibs/xqueue/issues/2)")
×
558
                                end
559
                        end
560
                        have_runat = true
2✔
561
                end
562
        end
563
        self.have_runat = have_runat
3✔
564

565
        ---@type table<string, { counts: {}, transition: {} }>
566
        local stat_tube = {}
3✔
567
        if self.fields.tube and type(opts.tube_stats) == 'table' then
3✔
NEW
568
                for _, tube_name in ipairs(opts.tube_stats) do
×
NEW
569
                        if type(tube_name) == 'string' then
×
NEW
570
                                stat_tube[tube_name] = {
×
571
                                        counts = {},
572
                                        transition = {},
573
                                }
574
                        end
575
                end
576
        end
577

578
        if not self._stat then
3✔
579
                self._stat = {
3✔
580
                        counts = {};
3✔
581
                        transition = {};
3✔
582
                        tube = stat_tube;
3✔
583
                }
3✔
584
                -- TODO: benchmark index:count()
585
                if self.fields.tube then
3✔
586
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
3✔
NEW
587
                                local s = t[self.fields.status]
×
NEW
588
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
589

NEW
590
                                local tube = t[self.fields.tube]
×
NEW
591
                                if stat_tube[tube] then
×
NEW
592
                                        stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1
×
593
                                end
594
                        end
595
                else
596
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
6✔
NEW
597
                                local s = t[self.fields.status]
×
NEW
598
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
599
                        end
600
                end
601
        end
602

603
        -- 3. features check
604

605
        local features = {}
3✔
606

607
        local gen_id
608
        opts.features = opts.features or {}
3✔
609
        if not opts.features.id then
3✔
UNCOV
610
                opts.features.id = 'uuid'
×
611
        end
612
        -- 'auto_increment' | 'time64' | 'uuid' | 'required' | function
613
        if opts.features.id == 'auto_increment' then
3✔
614
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
×
UNCOV
615
                        error("For auto_increment numeric pk is mandatory",2+depth)
×
616
                end
617
                local pk_fno = pk.parts[1].fieldno
×
618
                gen_id = function()
UNCOV
619
                        local max = pk:max()
×
UNCOV
620
                        if max then
×
UNCOV
621
                                return max[pk_fno] + 1
×
622
                        else
623
                                return 1
×
624
                        end
625
                end
626
        elseif opts.features.id == 'time64' then
3✔
627
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
4✔
UNCOV
628
                        error("For time64 numeric pk is mandatory",2+depth)
×
629
                end
630
                gen_id = function()
631
                        local key = clock.realtime64()
13✔
632
                        while true do
633
                                local exists = pk:get(key)
13✔
634
                                if not exists then
13✔
635
                                        return key
13✔
636
                                end
UNCOV
637
                                key = key + 1
×
638
                        end
639
                end
640
        elseif opts.features.id == 'uuid' then
1✔
641
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'str')) then
2✔
UNCOV
642
                        error("For uuid string pk is mandatory",2+depth)
×
643
                end
644
                local uuid = require'uuid'
1✔
645
                gen_id = function()
646
                        while true do
647
                                local key = uuid.str()
2✔
648
                                local exists = pk:get(key)
2✔
649
                                if not exists then
2✔
650
                                        return key
2✔
651
                                end
652
                        end
653
                end
654
        elseif opts.features.id == 'required' then
×
655
                -- gen_id = function()
656
                --         -- ???
657
                --         error("TODO")
658
                -- end
UNCOV
659
        elseif type(opts.features.id) == 'function'
×
UNCOV
660
                or (debug.getmetatable(opts.features.id)
×
UNCOV
661
                and debug.getmetatable(opts.features.id).__call)
×
662
        then
663
                gen_id = opts.features.id
×
664
        else
UNCOV
665
                error(string.format(
×
666
                        "Wrong type for features.id %s, may be 'auto_increment' | 'time64' | 'uuid' | 'required' | function",
UNCOV
667
                        opts.features.id
×
668
                ), 2+depth)
×
669
        end
670

671
        if not opts.features.retval then
3✔
672
                opts.features.retval = have_format and 'table' or 'tuple'
3✔
673
        end
674

675
        if format then
3✔
676
                self.tuple = _table2tuple(format)
6✔
677
                self.table = _tuple2table(format)
6✔
678
        end
679

680
        if opts.features.retval == 'table' then
3✔
681
                self.retwrap = self.table
3✔
682
        else
UNCOV
683
                self.retwrap = function(t) return t end
×
684
        end
685

686
        features.buried = not not opts.features.buried
3✔
687
        features.keep   = not not opts.features.keep
3✔
688
        if opts.features.delayed then
3✔
689
                if not have_runat then
2✔
UNCOV
690
                        error(string.format("Delayed feature requires runat field and index" ),2+depth)
×
691
                end
692
                features.delayed = true
2✔
693
        else
694
                features.delayed = false
1✔
695
        end
696

697
        if opts.features.zombie then
3✔
698
                if features.keep then
1✔
UNCOV
699
                        error(string.format("features keep and zombie are mutually exclusive" ),2+depth)
×
700
                end
701
                if not have_runat then
1✔
UNCOV
702
                        error(string.format("feature zombie requires runat field and index" ),2+depth)
×
703
                end
704

705
                features.zombie = true
1✔
706
                if type(opts.features.zombie) == 'number' then
1✔
UNCOV
707
                        features.zombie_delay = opts.features.zombie
×
708
                end
709
        else
710
                features.zombie = false
2✔
711
        end
712

713
        if opts.features.ttl then
3✔
UNCOV
714
                if not have_runat then
×
UNCOV
715
                        error(string.format("feature ttl requires runat field and index" ),2+depth)
×
716
                end
717

UNCOV
718
                features.ttl = true
×
UNCOV
719
                if type(opts.features.ttl) == 'number' then
×
UNCOV
720
                        features.ttl_default = opts.features.ttl
×
721
                end
722
        else
723
                features.ttl = false
3✔
724
        end
725

726
        if opts.features.ttr then
3✔
UNCOV
727
                if not have_runat then
×
728
                        error(string.format("feature ttr requires runat field and index" ),2+depth)
×
729
                end
730

UNCOV
731
                features.ttr = true
×
UNCOV
732
                if type(opts.features.ttr) == 'number' then
×
733
                        features.ttr_default = opts.features.ttr
×
734
                end
735
        else
736
                features.ttr = false
3✔
737
        end
738

739
        if fields.tube then
3✔
740
                features.tube = true
1✔
741
        end
742

743
        self.gen_id = gen_id
3✔
744
        self.features = features
3✔
745
        self.space = space.id
3✔
746

747
        function self.timeoffset(delta)
3✔
748
                delta = tonumber(delta) or 0
8✔
749
                return clock.realtime() + delta
8✔
750
        end
751
        function self.timeready(time)
3✔
752
                return time < clock.realtime()
11✔
753
        end
754
        function self.timeremaining(time)
3✔
755
                return time - clock.realtime()
8✔
756
        end
757
        -- self.NEVER = -1ULL
758
        self.NEVER = 0
3✔
759

760
        function self.keyfield(_,t)
3✔
761
                return t[pkf.no]
12✔
762
        end
763
        function self.keypack(_,t)
3✔
UNCOV
764
                return t[pkf.no]
×
765
        end
766

767
        -- Notify producer if it is still waiting us.
768
        -- Producer waits only for successfully processed task
769
        -- or for task which would never be processed.
770
        -- Discarding to process task depends on consumer's logic.
771
        -- Also task can be deleted from space by exceeding TTL.
772
        -- Producer should not be notified if queue is able to process task in future.
773
        local function notify_producer(key, task)
774
                local pkey = self:packkey(key)
25✔
775
                local wait = self.put_wait[pkey]
25✔
776
                if not wait then
25✔
777
                        -- no producer
778
                        return
25✔
779
                end
780

UNCOV
781
                if task.status == 'Z' or task.status == 'D' then
×
782
                        -- task was processed
UNCOV
783
                        wait.task = task
×
UNCOV
784
                        wait.processed = true
×
UNCOV
785
                        wait.cond:broadcast()
×
UNCOV
786
                        return
×
787
                end
UNCOV
788
                if not space:get{key} then
×
789
                        -- task is not present in space
790
                        -- it could be TTL or :ack
UNCOV
791
                        if task.status == 'T' then
×
792
                                -- task was acked:
UNCOV
793
                                wait.task = task
×
UNCOV
794
                                wait.processed = true
×
UNCOV
795
                                wait.cond:broadcast()
×
UNCOV
796
                        elseif task.status == 'R' then
×
797
                                -- task was killed by TTL
UNCOV
798
                                wait.task = task
×
UNCOV
799
                                wait.processed = false
×
UNCOV
800
                                wait.cond:broadcast()
×
801
                        end
802
                else
803
                        -- task is still in space
UNCOV
804
                        if task.status == 'B' then
×
805
                                -- task was buried
806
                                wait.task = task
×
UNCOV
807
                                wait.processed = false
×
808
                                wait.cond:broadcast()
×
809
                        end
810
                end
811
        end
812

813
        self.ready = fiber.channel(0)
3✔
814

815
        if opts.worker then
3✔
816
                local workers = opts.workers or 1
×
UNCOV
817
                local worker = opts.worker
×
NEW
818
                for id = 1,workers do
×
819
                        fiber.create(function(space,xq,i)
×
820
                                local fname = space.name .. '.xq.wrk' .. tostring(i)
×
821
                                ---@diagnostic disable-next-line: undefined-field
UNCOV
822
                                if package.reload then fname = fname .. '.' .. package.reload.count end
×
UNCOV
823
                                fiber.name(string.sub(fname,1,32))
×
824
                                repeat fiber.sleep(0.001) until space.xq
×
825
                                if xq.ready then xq.ready:get() end
×
826
                                log.info("I am worker %s",i)
×
UNCOV
827
                                if box.info.ro then
×
UNCOV
828
                                        log.notice("Shutting down on ro instance")
×
UNCOV
829
                                        return
×
830
                                end
UNCOV
831
                                while box.space[space.name] and space.xq == xq do
×
832
                                        local task = space:take(1)
×
833
                                        if task then
×
834
                                                local key = xq:getkey(task)
×
UNCOV
835
                                                local r,e = pcall(worker,task)
×
UNCOV
836
                                                if not r then
×
UNCOV
837
                                                        log.error("Worker for {%s} has error: %s", key, e)
×
838
                                                else
UNCOV
839
                                                        if xq.taken[ key ] then
×
UNCOV
840
                                                                space:ack(task)
×
841
                                                        end
842
                                                end
843
                                                if xq.taken[ key ] then
×
844
                                                        log.error("Worker for {%s} not released task", key)
×
845
                                                        space:release(task)
×
846
                                                end
847
                                        end
848
                                        fiber.yield()
×
849
                                end
850
                                log.info("worker %s ended", i)
×
NEW
851
                        end,space,self,id)
×
852
                end
853
        end
854

855
        if have_runat then
3✔
856
                self.runat_chan = fiber.channel(0)
2✔
857
                self.runat = fiber.create(function(space,xq,runat_index)
4✔
858
                        local fname = space.name .. '.xq'
2✔
859
                        if package.reload then fname = fname .. '.' .. package.reload.count end
2✔
860
                        fiber.name(string.sub(fname,1,32))
4✔
861
                        repeat fiber.sleep(0.001) until space.xq
2✔
862
                        if xq.ready then xq.ready:get() end
2✔
863
                        local chan = xq.runat_chan
2✔
864
                        log.info("Runat started")
2✔
865
                        if box.info.ro then
2✔
UNCOV
866
                                log.notice("Shutting down on ro instance")
×
UNCOV
867
                                return
×
868
                        end
869
                        local maxrun = 1000
2✔
870
                        local curwait
871
                        local collect = {}
2✔
872
                        while box.space[space.name] and space.xq == xq do
20✔
873
                                local r,e = pcall(function()
40✔
874
                                        -- print("runat loop 2 ",box.time64())
875
                                        local remaining
876
                                        for _,t in runat_index:pairs({0},{iterator = box.index.GT}) do
66✔
877

878
                                                -- print("checking ",t)
879
                                                if xq.timeready( t[ xq.fields.runat ] ) then
33✔
880
                                                        table.insert(collect,t)
3✔
881
                                                else
882
                                                        remaining = xq.timeremaining(t[ xq.fields.runat ])
24✔
883
                                                        break
8✔
884
                                                end
885
                                                if #collect >= maxrun then remaining = 0 break end
3✔
886
                                        end
887

888
                                        for _,t in ipairs(collect) do
23✔
889
                                                -- log.info("Runat: %s, %s", _, t)
890
                                                if t[xq.fields.status] == 'W' then
6✔
891
                                                        log.info("Runat: W->R %s",xq:keyfield(t))
4✔
892
                                                        -- TODO: default ttl?
893
                                                        local u = space:update({ xq:keyfield(t) },{
8✔
894
                                                                { '=',xq.fields.status,'R' },
2✔
895
                                                                { '=',xq.fields.runat, xq.NEVER }
2✔
896
                                                        })
897
                                                        xq:wakeup(u)
4✔
898
                                                elseif t[xq.fields.status] == 'R' and xq.features.ttl then
2✔
UNCOV
899
                                                        local key = xq:keyfield(t)
×
UNCOV
900
                                                        log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
×
UNCOV
901
                                                        t = space:delete{key}
×
UNCOV
902
                                                        notify_producer(key, t)
×
903
                                                elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
2✔
904
                                                        log.info("Runat: Kill Zombie %s",xq:keyfield(t))
2✔
905
                                                        space:delete{ xq:keyfield(t) }
4✔
UNCOV
906
                                                elseif t[xq.fields.status] == 'T' and xq.features.ttr then
×
UNCOV
907
                                                        local key = xq:keypack(t)
×
UNCOV
908
                                                        local sid = xq.taken[ key ]
×
UNCOV
909
                                                        local peer = peers[sid] or sid
×
910

UNCOV
911
                                                        log.info("Runat: autorelease T->R by ttr %s taken by %s",xq:keyfield(t), peer)
×
UNCOV
912
                                                        local u = space:update({ xq:keyfield(t) },{
×
UNCOV
913
                                                                { '=',xq.fields.status,'R' },
×
UNCOV
914
                                                                { '=',xq.fields.runat, xq.NEVER }
×
915
                                                        })
UNCOV
916
                                                        xq.taken[ key ] = nil
×
UNCOV
917
                                                        if sid then
×
UNCOV
918
                                                                self.bysid[ sid ][ key ] = nil
×
919
                                                        end
UNCOV
920
                                                        xq:wakeup(u)
×
921
                                                else
UNCOV
922
                                                        log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
×
UNCOV
923
                                                        space:update({ xq:keyfield(t) },{
×
924
                                                                { '=',xq.fields.runat, xq.NEVER }
×
925
                                                        })
926
                                                end
927
                                        end
928

929
                                        if remaining then
20✔
930
                                                if remaining >= 0 and remaining < 1 then
8✔
931
                                                        return remaining
8✔
932
                                                end
933
                                        end
934
                                        return 1
12✔
935
                                end)
936

937
                                table_clear(collect)
20✔
938

939
                                if r then
20✔
940
                                        curwait = e
20✔
941
                                else
942
                                        curwait = 1
×
943
                                        log.error("Runat/ERR: %s",e)
×
944
                                end
945
                                -- log.info("Wait %0.2fs",curwait)
946
                                if curwait == 0 then fiber.sleep(0) end
20✔
947
                                chan:get(curwait)
20✔
948
                        end
949
                        log.info("Runat ended")
×
950
                end,space,self,runat_index)
4✔
951
        end
952

953
        local function atomic_tail(self, key, status, ...)
954
                self._lock[key] = nil
39✔
955
                if not status then
39✔
UNCOV
956
                        error((...), 3)
×
957
                end
958
                return ...
39✔
959
        end
960
        function self:atomic(key, fun, ...)
3✔
961
                self._lock[key] = true
39✔
962
                return atomic_tail(self, key, pcall(fun, ...))
78✔
963
        end
964

965
        function self:check_owner(key)
3✔
966
                local t = space:get(key)
48✔
967
                if not t then
24✔
968
                        error(string.format( "Task {%s} was not found", key ),2)
×
969
                end
970
                if not self.taken[key] then
44✔
UNCOV
971
                        error(string.format( "Task %s not taken by any", key ),2)
×
972
                end
973
                if self.taken[key] ~= box.session.id() then
44✔
974
                        error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2)
×
975
                end
976
                return t
24✔
977
        end
978

979
        function self:putback(task)
3✔
980
                local key = task[ self.key.no ]
25✔
981
                local sid = self.taken[ key ]
25✔
982
                if sid then
25✔
983
                        self.taken[ key ] = nil
24✔
984
                        if self.bysid[ sid ] then
24✔
985
                                self.bysid[ sid ][ key ] = nil
44✔
986
                        else
UNCOV
987
                                log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
×
988
                        end
989
                else
990
                        log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
1✔
991
                end
992

993
                notify_producer(key, task)
25✔
994
        end
995

996
        function self:wakeup(t)
3✔
997
                if t[self.fieldmap.status] ~= 'R' then return end
68✔
998
                if self.fieldmap.tube then
23✔
999
                        -- we may have consumers in the tubes:
1000
                        local tube_chan = self.take_chans[t[self.fieldmap.tube]]
30✔
1001
                        if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
15✔
1002
                                -- we have successfully notified consumer
UNCOV
1003
                                return
×
1004
                        end
1005
                        -- otherwise fallback to default channel:
1006
                end
1007
                if self.take_wait:has_readers() then
23✔
1008
                        self.take_wait:put(true,0)
3✔
1009
                end
1010
        end
1011

1012
        function self:make_ready()
3✔
1013
                while self.ready and self.ready:has_readers() do
3✔
UNCOV
1014
                        self.ready:put(true,0)
×
UNCOV
1015
                        fiber.sleep(0)
×
1016
                end
1017
                self.ready = nil
3✔
1018
        end
1019

1020
        local meta = debug.getmetatable(space)
3✔
1021
        for k,v in pairs(methods) do meta[k] = v end
36✔
1022

1023
        -- Triggers must set right before updating space
1024
        -- because raising error earlier leads to trigger inconsistency
1025
        self._on_repl = space:on_replace(function(old, new)
6✔
1026
                local old_st, new_st
1027
                local old_tube, new_tube
1028
                local old_tube_stat, new_tube_stat
1029
                local counts = self._stat.counts
67✔
1030
                local tube_ss = self._stat.tube
67✔
1031

1032
                if old then
67✔
1033
                        old_st = old[self.fields.status] --[[@as string]]
52✔
1034
                        if counts[old_st] and counts[old_st] > 0 then
52✔
1035
                                counts[old_st] = counts[old_st] - 1
52✔
1036
                        else
UNCOV
1037
                                log.error(
×
1038
                                        "Have not valid statistic by status: %s with value: %s",
UNCOV
1039
                                        old_st, tostring(counts[old_st])
×
1040
                                )
1041
                        end
1042
                        if self.fields.tube then
52✔
1043
                                old_tube = old[self.fields.tube]
30✔
1044
                                old_tube_stat = tube_ss[old_tube]
30✔
1045
                                if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then
30✔
NEW
1046
                                        if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then
×
NEW
1047
                                                old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1
×
1048
                                        else
NEW
1049
                                                log.error(
×
1050
                                                        "Have not valid statistic by tubs/status: tube %s with value: %s",
NEW
1051
                                                        old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st])
×
1052
                                                )
1053
                                        end
1054
                                else
1055
                                        old_tube_stat = nil
30✔
1056
                                end
1057
                        end
1058
                else
1059
                        old_st = 'X'
15✔
1060
                end
1061

1062
                if new then
67✔
1063
                        new_st = new[self.fields.status] --[[@as string]]
59✔
1064
                        counts[new_st] = (counts[new_st] or 0LL) + 1
59✔
1065
                        if counts[new_st] < 0 then
59✔
UNCOV
1066
                                log.error("Statistic overflow by task type: %s", new_st)
×
1067
                        end
1068
                        if self.fields.tube then
59✔
1069
                                new_tube = new[self.fields.tube]
35✔
1070
                                new_tube_stat = tube_ss[new_tube]
35✔
1071
                                if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then
35✔
NEW
1072
                                        if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then
×
NEW
1073
                                                new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1
×
1074
                                        else
NEW
1075
                                                log.error(
×
1076
                                                        "Have not valid statistic tube/status: tube %q with value: %s",
NEW
1077
                                                        new_tube, new_st, tostring(new_tube_stat.counts[new_st])
×
1078
                                                )
1079
                                        end
1080
                                else
1081
                                        new_tube_stat = nil
35✔
1082
                                end
1083
                        end
1084
                else
1085
                        new_st = 'X'
8✔
1086
                end
1087

1088
                local field = old_st.."-"..new_st
67✔
1089
                self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1
67✔
1090
                if old_tube_stat then
67✔
NEW
1091
                        if not new_tube_stat or old_tube_stat == new_tube_stat then
×
1092
                                -- no new tube or new and old tubes are the same
NEW
1093
                                old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1
×
1094
                        else
1095
                                -- nil != old_tube != new_tube != nil
1096
                                -- cross tube transition ?
1097
                                -- Can this be backoff with tube change?
NEW
1098
                                local old_field = old_st.."-S"
×
NEW
1099
                                local new_field = "S-"..new_st
×
NEW
1100
                                old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1
×
NEW
1101
                                new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1
×
1102
                        end
1103
                elseif new_tube_stat then
67✔
1104
                        -- old_tube_stat == nil
NEW
1105
                        new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1
×
1106
                end
1107
        end, self._on_repl)
73✔
1108

1109
        self._on_dis = box.session.on_disconnect(function()
6✔
UNCOV
1110
                local sid = box.session.id()
×
UNCOV
1111
                local peer = box.session.storage.peer
×
1112

UNCOV
1113
                log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
×
UNCOV
1114
                box.session.storage.destroyed = true
×
1115
                if self.bysid[sid] then
×
UNCOV
1116
                        local old = self.bysid[sid]
×
1117
                        while next(old) do
×
UNCOV
1118
                                for key,realkey in pairs(old) do
×
UNCOV
1119
                                        self.taken[key] = nil
×
UNCOV
1120
                                        old[key] = nil
×
1121
                                        local t = space:get(realkey)
×
1122
                                        if t then
×
1123
                                                if t[ self.fields.status ] == 'T' then
×
1124
                                                        self:wakeup(space:update({ realkey }, {
×
1125
                                                                { '=',self.fields.status,'R' },
×
UNCOV
1126
                                                                self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
×
1127
                                                        }))
UNCOV
1128
                                                        log.info("Rst: T->R {%s}", realkey )
×
1129
                                                else
UNCOV
1130
                                                        log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
×
1131
                                                end
1132
                                        else
1133
                                                log.error( "Rst: {%s}: taken not found", realkey )
×
1134
                                        end
1135
                                end
1136
                        end
UNCOV
1137
                        self.bysid[sid] = nil
×
1138
                end
1139
        end, self._on_dis)
6✔
1140

1141
        rawset(space,'xq',self)
3✔
1142

1143
        log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
3✔
1144

1145

1146
        function self:starttest()
3✔
1147
                local xq = self
3✔
1148
                if box.info.status == 'orphan' then
3✔
1149
                        fiber.create(function()
×
1150
                                local x = 0
×
1151
                                repeat
UNCOV
1152
                                        fiber.sleep(x/1e5)
×
1153
                                        x = x + 1
×
UNCOV
1154
                                until box.info.status ~= 'orphan'
×
1155
                                self:starttest()
×
1156
                        end)
UNCOV
1157
                        return
×
1158
                end
1159
                if box.info.ro then
3✔
UNCOV
1160
                        log.info("Server is ro, not resetting statuses")
×
1161
                else
1162
                        -- FIXME: with stable iterators add yield
1163
                        local space = box.space[self.space]
3✔
1164
                        box.begin()
3✔
1165
                        for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do
9✔
UNCOV
1166
                                local key = t[ xq.key.no ]
×
UNCOV
1167
                                if not self.taken[key] and not self._lock[key] then
×
UNCOV
1168
                                        local update = {
×
1169
                                                { '=', self.fields.status, 'R' },
×
UNCOV
1170
                                                self.have_runat and {
×
1171
                                                        '=',self.fields.runat,
×
UNCOV
1172
                                                        self.ttl_default and self.timeoffset( self.ttl_default ) or self.NEVER
×
UNCOV
1173
                                                } or nil
×
1174
                                        }
UNCOV
1175
                                        space:update({key}, update)
×
1176
                                        log.info("Start: T->R (%s)", key)
×
1177
                                end
1178
                        end
1179
                        box.commit()
3✔
1180
                end
1181
                self:make_ready()
3✔
1182
        end
1183
        self:starttest()
3✔
1184
end
1185

1186

1187
--[[
1188
* `space:put`
1189
        - `task` - table or array or tuple
1190
                + `table`
1191
                        * **requires** space format
1192
                        * suitable for id generation
1193
                + `array`
1194
                        * ignores space format
1195
                        * for id generation use `NULL` (**not** `nil`)
1196
                + `tuple`
1197
                        * ignores space format
1198
                        * **can't** be used with id generation
1199
        - `attr` - table of attributes
1200
                + `delay` - number of seconds
1201
                        * if set, task will become `W` instead of `R` for `delay` seconds
1202
                + `ttl` - number of seconds
1203
                        * if set, task will be discarded after ttl seconds unless was taken
1204
                + `wait` - number of seconds
1205
                        * if set, callee fiber will be blocked up to `wait` seconds until task won't
1206
                        be processed or timeout reached.
1207

1208
```lua
1209
box.space.myqueue:put{ name="xxx"; data="yyy"; }
1210
box.space.myqueue:put{ "xxx","yyy" }
1211
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })
1212

1213
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
1214
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
1215
```
1216
]]
1217

1218
---@param t any[]|box.tuple
1219
---@param opts? { delay: number?, ttl: number?, wait: number? }
1220
---@return table|box.tuple, boolean? has_been_processed
1221
function methods:put(t, opts)
1✔
1222
        local xq = self.xq
15✔
1223
        opts = opts or {}
15✔
1224
        if type(t) == 'table' then
15✔
1225
                if xq.gen_id then
15✔
1226
                        if is_array(t) then
30✔
UNCOV
1227
                                error("Task from array creation is not implemented yet", 2)
×
1228
                        else
1229
                                -- pass
1230
                        end
1231
                end
UNCOV
1232
        elseif type(t) == 'cdata' and ffi.typeof(t) == tuple_ctype then
×
1233
                if xq.gen_id then
×
1234
                        error("Can't use id generation with tuple.", 2)
×
1235
                end
1236
                error("Task from tuple creation is not implemented yet", 2)
×
1237
        else
1238
                error("Wrong argument to put. Expected table or tuple", 2)
×
1239
        end
1240
        -- here we have table with keys
1241
        if xq.gen_id then
15✔
1242
                t[ xq.key.name ] = xq.gen_id()
30✔
1243
        else
UNCOV
1244
                if not t[ xq.key.name ] then
×
UNCOV
1245
                        error("Primary key is mandatory",2)
×
1246
                end
1247
        end
1248

1249
        -- delayed or ttl or default ttl
1250
        if opts.delay then
15✔
1251
                if not xq.features.delayed then
1✔
1252
                        error("Feature delayed is not enabled",2)
×
1253
                end
1254
                if opts.ttl then
1✔
UNCOV
1255
                        error("Features ttl and delay are mutually exclusive",2)
×
1256
                end
1257
                t[ xq.fieldmap.status ] = 'W'
1✔
1258
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.delay)
2✔
1259

1260
                if opts.wait then
1✔
UNCOV
1261
                        error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
×
1262
                end
1263
        elseif opts.ttl then
14✔
UNCOV
1264
                if not xq.features.ttl then
×
UNCOV
1265
                        error("Feature ttl is not enabled",2)
×
1266
                end
UNCOV
1267
                t[ xq.fieldmap.status ] = 'R'
×
UNCOV
1268
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.ttl)
×
1269
        elseif xq.features.ttl_default then
14✔
UNCOV
1270
                t[ xq.fieldmap.status ] = 'R'
×
UNCOV
1271
                t[ xq.fieldmap.runat ]  = xq.timeoffset(xq.features.ttl_default)
×
1272
        elseif xq.have_runat then
14✔
1273
                t[ xq.fieldmap.status ] = 'R'
12✔
1274
                t[ xq.fieldmap.runat ]  = xq.NEVER
12✔
1275
        else
1276
                t[ xq.fieldmap.status ] = 'R'
2✔
1277
        end
1278

1279
        local tuple = xq.tuple(t)
15✔
1280
        local key = tuple[ xq.key.no ]
15✔
1281

1282
        local wait
1283
        if opts.wait then
15✔
UNCOV
1284
                wait = { cond = fiber.cond() }
×
UNCOV
1285
                xq.put_wait[xq:packkey(key)] = wait
×
1286
        end
1287

1288
        xq:atomic(key, function()
30✔
1289
                t = self:insert(tuple)
45✔
1290
        end)
1291
        xq:wakeup(t)
15✔
1292

1293
        if wait then
15✔
1294
                -- local func notify_producer will send to us some data
UNCOV
1295
                local ok = wait.cond:wait(opts.wait)
×
UNCOV
1296
                fiber.testcancel()
×
UNCOV
1297
                if ok and wait.task then
×
UNCOV
1298
                        return xq.retwrap(wait.task), wait.processed
×
1299
                end
1300
        end
1301
        return xq.retwrap(t)
15✔
1302
end
1303

1304
--[[
1305
* `space:wait(id, timeout)`
1306
        - `id`:
1307
                + `string` | `number` - primary key
1308
        - `timeout` - number of seconds to wait
1309
                * callee fiber will be blocked up to `timeout` seconds until task won't
1310
                be processed or timeout reached.
1311
]]
1312

1313
local wait_for = {
1✔
1314
        R = true,
1315
        T = true,
1316
        W = true,
1317
}
1318

1319
---@param key table|scalar|box.tuple
1320
---@param timeout number
1321
---@return table|box.tuple, boolean? has_been_processed
1322
function methods:wait(key, timeout)
1✔
UNCOV
1323
        local xq = self.xq
×
UNCOV
1324
        key = xq:getkey(key)
×
1325
        local task = self:get(key)
×
1326
        if not task then
×
UNCOV
1327
                error(("Task {%s} was not found"):format(key))
×
1328
        end
1329

UNCOV
1330
        local status = task[xq.fields.status]
×
UNCOV
1331
        if not wait_for[status] then
×
UNCOV
1332
                return xq.retwrap(task), false
×
1333
        end
1334

UNCOV
1335
        local pkey = xq:packkey(key)
×
1336
        local wait = xq.put_wait[pkey]
×
UNCOV
1337
        if not wait then
×
UNCOV
1338
                wait = { cond = fiber.cond() }
×
UNCOV
1339
                xq.put_wait[pkey] = wait
×
1340
        end
1341

1342
        -- local func notify_producer will send to us some data
UNCOV
1343
        local ok = wait.cond:wait(timeout)
×
UNCOV
1344
        fiber.testcancel()
×
1345
        if ok and wait.task then
×
1346
                return xq.retwrap(wait.task), wait.processed
×
1347
        end
1348

1349
        return xq.retwrap(task), false
×
1350
end
1351

1352
--[[
1353
* `space:take(timeout)`
1354
* `space:take(timeout, opts)`
1355
* `space:take(opts)`
1356
        - `timeout` - number of seconds to wait for new task
1357
                + choose reasonable time
1358
                + beware of **readahead** size (see tarantool docs)
1359
        - `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
1360
        - returns task tuple or table (see retval) or nothing on timeout
1361
        - *TODO*: ttr must be there
1362
]]
1363

1364
---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? }
1365
---@param opts? { timeout: number?, ttr: number?, tube: string? }
1366
---@return table|box.tuple?
1367
function methods:take(timeout, opts)
1✔
1368
        local xq = self.xq
38✔
1369
        timeout = timeout or 0
38✔
1370

1371
        if type(timeout) == 'table' then
38✔
1372
                opts = timeout
27✔
1373
                timeout = opts.timeout or 0
27✔
1374
        else
1375
                opts = opts or {}
11✔
1376
        end
1377
        assert(timeout >= 0, "timeout required")
38✔
1378

1379
        local ttr
1380
        if opts.ttr then
38✔
1381
                if not xq.features.ttr then
×
1382
                        error("Feature ttr is not enabled",2)
×
1383
                end
UNCOV
1384
                ttr = opts.ttr
×
1385
        elseif xq.features.ttr_default then
38✔
UNCOV
1386
                ttr = xq.features.ttr_default
×
1387
        end
1388

1389
        local index
1390
        local start_with
1391

1392
        local tube_chan
1393
        if opts.tube then
38✔
1394
                if not xq.features.tube then
20✔
UNCOV
1395
                        error("Feature tube is not enabled", 2)
×
1396
                end
1397

1398
                assert(type(opts.tube) == 'string', "opts.tube must be a string")
20✔
1399

1400
                index = xq.tube_index
20✔
1401
                start_with = {opts.tube, 'R'}
20✔
1402
                tube_chan = xq.take_chans[opts.tube] or fiber.channel()
20✔
1403
                xq.take_chans[opts.tube] = tube_chan
20✔
1404
        else
1405
                index = xq.index
18✔
1406
                start_with = {'R'}
18✔
1407
        end
1408
        ---@cast index -nil
1409

1410
        local now = fiber.time()
38✔
1411
        local key
1412
        local found
1413
        while not found do
79✔
1414
                for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
165✔
1415
                        key = t[ xq.key.no ]
24✔
1416
                        if not xq._lock[ key ] then
24✔
1417
                                -- found key
1418
                                xq._lock[ key ] = true
24✔
1419
                                found = t
24✔
1420
                                break
24✔
1421
                        end
1422
                end
1423
                if not found then
55✔
1424
                        local left = (now + timeout) - fiber.time()
62✔
1425
                        if left <= 0 then goto finish end
31✔
1426

1427
                        (tube_chan or xq.take_wait):get(left)
17✔
1428
                        if box.session.storage.destroyed then goto finish end
34✔
1429
                end
1430
        end
1431
        ::finish::
×
1432

1433
        -- If we were last reader from the tube
1434
        -- we remove channel. Writers will get false
1435
        -- on :put if they exists.
1436
        if tube_chan and not tube_chan:has_readers() then
38✔
1437
                tube_chan:close()
20✔
1438
                xq.take_chans[opts.tube] = nil
20✔
1439
        end
1440
        if not found then return end
38✔
1441

1442
        local r,e = pcall(function()
48✔
1443
                local sid = box.session.id()
24✔
1444
                local peer = box.session.storage.peer
48✔
1445

1446
                -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
1447
                if xq.debug then
24✔
1448
                        log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
9✔
1449
                end
1450

1451
                local update = {
24✔
1452
                        { '=', xq.fields.status, 'T' },
24✔
1453
                }
1454
                -- TODO: update more fields
1455

1456
                if ttr then
24✔
UNCOV
1457
                        table.insert(update,{ '=', xq.fields.runat, xq.timeoffset(ttr)})
×
UNCOV
1458
                        xq.runat_chan:put(true,0)
×
1459
                end
1460
                local t = self:update({key},update)
48✔
1461

1462
                if not xq.bysid[ sid ] then xq.bysid[ sid ] = {} end
24✔
1463
                xq.taken[ key ] = sid
24✔
1464
                xq.bysid[ sid ][ key ] = key
24✔
1465

1466
                return t
24✔
1467
        end)
1468

1469
        xq._lock[ key ] = nil
24✔
1470
        if not r then
24✔
UNCOV
1471
                error(e)
×
1472
        end
1473
        return xq.retwrap(e)
24✔
1474
end
1475

1476
--[[
1477
* `space:release(id, [attr])`
1478
        - `id`:
1479
                + `string` | `number` - primary key
1480
                + *TODO*: `tuple` - key will be extracted using index
1481
                + *TODO*: composite pk
1482
        - `attr`
1483
                + `update` - table for update, like in space:update
1484
                + `ttl` - timeout for time to live
1485
                + `delay` - number of seconds
1486
                        * if set, task will become `W` instead of `R` for `delay` seconds
1487
]]
1488

1489
---@param key table|scalar|box.tuple
1490
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1491
---@return table|box.tuple
1492
function methods:release(key, attr)
1✔
1493
        local xq = self.xq
13✔
1494
        key = xq:getkey(key)
26✔
1495

1496
        attr = attr or {}
13✔
1497

1498
        local t = xq:check_owner(key)
13✔
1499
        local old = t[ xq.fields.status ]
13✔
1500
        local update = {}
13✔
1501
        if attr.update then
13✔
1502
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1503
        end
1504

1505
        -- delayed or ttl or default ttl
1506
        if attr.delay then
13✔
1507
                if not xq.features.delayed then
6✔
UNCOV
1508
                        error("Feature delayed is not enabled",2)
×
1509
                end
1510
                if attr.ttl then
6✔
1511
                        error("Features ttl and delay are mutually exclusive",2)
×
1512
                end
1513
                table.insert(update, { '=', xq.fields.status, 'W' })
6✔
1514
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
12✔
1515
        elseif attr.ttl then
7✔
UNCOV
1516
                if not xq.features.ttl then
×
1517
                        error("Feature ttl is not enabled",2)
×
1518
                end
UNCOV
1519
                table.insert(update, { '=', xq.fields.status, 'R' })
×
UNCOV
1520
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.ttl) })
×
1521
        elseif xq.features.ttl_default then
7✔
UNCOV
1522
                table.insert(update, { '=', xq.fields.status, 'R' })
×
UNCOV
1523
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.ttl_default) })
×
1524
        elseif xq.have_runat then
7✔
1525
                table.insert(update, { '=', xq.fields.status, 'R' })
6✔
1526
                table.insert(update, { '=', xq.fields.runat, xq.NEVER })
6✔
1527
        else
1528
                table.insert(update, { '=', xq.fields.status, 'R' })
1✔
1529
        end
1530

1531
        xq:atomic(key,function()
26✔
1532
                t = self:update({key}, update)
39✔
1533

1534
                ---@cast t box.tuple
1535
                xq:wakeup(t)
13✔
1536
                if xq.have_runat then
13✔
1537
                        xq.runat_chan:put(true,0)
12✔
1538
                end
1539

1540
                log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
26✔
1541
                        key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
26✔
1542
        end)
1543

1544
        xq:putback(t)
13✔
1545

1546
        return t
13✔
1547
end
1548

1549
---@param key table|scalar|box.tuple
1550
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1551
---@return table|box.tuple
1552
function methods:ack(key, attr)
1✔
1553
        -- features.zombie
1554
        -- features.keep
1555
        local xq = self.xq
10✔
1556
        key = xq:getkey(key)
20✔
1557

1558
        attr = attr or {}
10✔
1559

1560
        local t = xq:check_owner(key)
10✔
1561
        local old = t[ xq.fields.status ]
10✔
1562
        local delete = false
10✔
1563
        local update = {}
10✔
1564
        if attr.update then
10✔
1565
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1566
        end
1567

1568
        -- delayed or ttl or default ttl
1569
        if attr.delay then
10✔
1570
                if not xq.features.zombie then
1✔
UNCOV
1571
                        error("Feature zombie is not enabled",2)
×
1572
                end
1573
                table.insert(update, { '=', xq.fields.status, 'Z' })
1✔
1574
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
2✔
1575
        elseif xq.features.zombie then
9✔
1576
                table.insert(update, { '=', xq.fields.status, 'Z' })
2✔
1577
                if xq.features.zombie_delay then
2✔
UNCOV
1578
                        table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.zombie_delay) })
×
1579
                end
1580
        elseif xq.features.keep then
7✔
UNCOV
1581
                table.insert(update, { '=', xq.fields.status, 'D' })
×
1582
        else
1583
                -- do remove task
1584
                delete = true
7✔
1585
        end
1586

1587
        xq:atomic(key,function()
20✔
1588
                if #update > 0 then
10✔
1589
                        t = self:update({key}, update)
9✔
1590
                        ---@cast t box.tuple
1591
                        xq:wakeup(t)
3✔
1592
                        if xq.have_runat then
3✔
1593
                                xq.runat_chan:put(true,0)
3✔
1594
                        end
1595
                        log.info("Ack: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
6✔
1596
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
6✔
1597
                end
1598

1599
                if delete then
10✔
1600
                        t = self:delete{key}
21✔
1601
                        log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
14✔
1602
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
14✔
1603
                end
1604
        end)
1605

1606
        xq:putback(t) -- in real drop form taken key
10✔
1607

1608
        return t
10✔
1609
end
1610

1611
---@param key table|scalar|box.tuple
1612
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1613
function methods:bury(key, attr)
1✔
1614
        attr = attr or {}
1✔
1615

1616
        local xq = self.xq
1✔
1617
        key = xq:getkey(key)
2✔
1618
        local t = xq:check_owner(key)
1✔
1619

1620
        local update = {}
1✔
1621
        if attr.update then
1✔
UNCOV
1622
                for _,v in pairs(attr.update) do table.insert(update,v) end
×
1623
        end
1624
        table.insert(update, { '=', xq.fields.status, 'B' })
1✔
1625

1626
        xq:atomic(key,function()
2✔
1627
                t = self:update({key}, update)
3✔
1628

1629
                xq:wakeup(t)
1✔
1630
                if xq.have_runat then
1✔
UNCOV
1631
                        xq.runat_chan:put(true,0)
×
1632
                end
1633
                log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())
2✔
1634
        end)
1635

1636
        xq:putback(t)
1✔
1637
end
1638

1639
local function kick_task(self, key, attr)
1640
        local xq   = self.xq
1✔
1641
        key  = xq:getkey(key)
2✔
1642
        local peer = box.session.storage.peer
2✔
1643
        local sid  = box.session.id()
1✔
1644
        attr       = attr or {}
1✔
1645

1646
        if xq.debug then
1✔
1647
                log.info("Kick {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
1✔
1648
        end
1649

1650
        self:update({key}, {{ '=', xq.fields.status, 'R' }})
2✔
1651
        xq:putback(key, attr)
1✔
1652
end
1653

1654
function methods:kick(nr_tasks_or_task, attr)
1✔
1655
        attr = attr or {}
1✔
1656
        if type(nr_tasks_or_task) == 'number' then
1✔
UNCOV
1657
                local task_n = 1
×
1658
                for _, t in self.xq.index:pairs({'B'},{ iterator = box.index.EQ }) do
×
UNCOV
1659
                        if task_n > nr_tasks_or_task then break end
×
UNCOV
1660
                        kick_task(self, t, attr)
×
1661
                        if task_n % 500 == 0 then fiber.sleep(0) end
×
UNCOV
1662
                        task_n = task_n + 1
×
1663
                end
1664
        else
1665
                kick_task(self, nr_tasks_or_task, attr)
1✔
1666
        end
1667
end
1668

1669
---@param key table|scalar|box.tuple
1670
---@return box.tuple|table
1671
function methods:kill(key)
1✔
UNCOV
1672
        local xq     = self.xq
×
NEW
1673
        key = xq:getkey(key)
×
UNCOV
1674
        local task   = self:get(key)
×
NEW
1675
        if not task then
×
NEW
1676
                error(("Task by %s not found to kill"):format(key))
×
1677
        end
UNCOV
1678
        local peer   = box.session.storage.peer
×
1679

UNCOV
1680
        if xq.debug then
×
UNCOV
1681
                log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id())
×
1682
        end
1683

NEW
1684
        task = self:delete(key)
×
1685
        ---@cast task -nil
1686
        ---Kill is treated as a synonim of Bury
NEW
1687
        task = task:update({{ '=', xq.fields.status, 'B' }})
×
NEW
1688
        xq:putback(task)
×
NEW
1689
        return xq.retwrap(task)
×
1690
end
1691

1692
-- special remap of truncate for deliting stats and saving methods
1693
function methods:truncate()
1✔
1694
        local stat = self.xq._stat
2✔
1695
        for status, _ in pairs(stat.counts) do
9✔
1696
                stat.counts[status] = 0LL
7✔
1697
        end
1698
        for transition, _ in pairs(stat.transition) do
16✔
1699
                stat.transition[transition] = nil
14✔
1700
        end
1701
        local ret = self.xq._default_truncate(self)
2✔
1702
        local meta = debug.getmetatable(self)
2✔
1703
        for k,v in pairs(methods) do meta[k] = v end
24✔
1704
        -- Now we reset our methods after truncation because
1705
        -- as we can see in on_replace_dd_truncate:
1706
        -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239
1707
        -- tarantool deletes space and restores it with the same indexes
1708
        -- but without methods
1709
        return ret
2✔
1710
end
1711

1712
local pretty_st = {
1✔
1713
        R = "Ready",
1714
        T = "Taken",
1715
        W = "Waiting",
1716
        B = "Buried",
1717
        Z = "Zombie",
1718
        D = "Done",
1719
}
1720

1721
---@param pretty? boolean
1722
function methods:stats(pretty)
1✔
1723
        local stats = table.deepcopy(self.xq._stat)
1✔
1724
                for s, ps in pairs(pretty_st) do
7✔
1725
                        if pretty then
6✔
UNCOV
1726
                                stats.counts[ps] = stats.counts[s] or 0LL
×
UNCOV
1727
                                stats.counts[s]  = nil
×
NEW
1728
                                for _, tube_stat in pairs(stats.tube) do
×
NEW
1729
                                        tube_stat.counts[ps] = tube_stat.counts[s] or 0LL
×
NEW
1730
                                        tube_stat.counts[s] = nil
×
1731
                                end
1732
                        else
1733
                                stats.counts[s] = stats.counts[s] or 0LL
6✔
1734
                                for _, tube_stat in pairs(stats.tube) do
6✔
NEW
1735
                                        tube_stat.counts[s] = tube_stat.counts[s] or 0LL
×
1736
                                end
1737
                        end
1738
                end
1739
        return stats
1✔
1740
end
1741

1742
setmetatable(M,{
2✔
1743
        __call = function(_, space, opts)
1744
                M.upgrade(space,opts,1)
×
1745
        end
1746
})
1747

1748
return M
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc