• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / xqueue / 10019880989

20 Jul 2024 11:34AM UTC coverage: 61.021%. First build
10019880989

Pull #19

github

Vladislav Grubov
tests, annotations and fixes
Pull Request #19: Implements per tube statistics

32 of 71 new or added lines in 1 file covered. (45.07%)

526 of 862 relevant lines covered (61.02%)

8.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.02
/xqueue.lua
1
local M = {}
1✔
2

3
local ffi = require 'ffi'
1✔
4
local log = require 'log'
1✔
5
local fiber = require 'fiber'
1✔
6
local clock = require 'clock'
1✔
7

8
local tuple_ctype = ffi.typeof(box.tuple.new())
1✔
9

10
local monotonic_max_age = 10*365*86400;
1✔
11

12
local function table_clear(t)
13
        if type(t) ~= 'table' then
20✔
14
                error("bad argument #1 to 'clear' (table expected, got "..(t ~= nil and type(t) or 'no value')..")",2)
×
15
        end
16
        local count = #t
20✔
17
        for i=0, count do t[i]=nil end
23✔
18
        return
20✔
19
end
20

21
local function is_array(t)
22
        local gen,param,state = ipairs(t)
40✔
23
        -- print(gen,param,state)
24
        local v = gen(param,state)
40✔
25
        -- print(v)
26
        return v ~= nil
40✔
27
end
28

29
local peers = {}
1✔
30

31
rawset(_G,"\0xq.on_connect",box.session.on_connect(function()
2✔
32
        local sid = box.session.id()
×
33
        local peer = box.session.peer()
×
34
        box.session.storage.peer = box.session.peer()
×
35
        peers[ sid ] = peer
×
36
        log.info("connected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
37
end,rawget(_G,"\0xq.on_connect")))
1✔
38

39
rawset(_G,"\0xq.on_disconnect",box.session.on_disconnect(function()
2✔
40
        local sid = box.session.id()
×
41
        local peer = peers[ sid ]
×
42
        peers[ sid ] = nil
×
43
        log.info("disconnected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
44
end,rawget(_G,"\0xq.on_disconnect")))
1✔
45

46

47
--[[
48

49
Field sets:
50
1. id, status (minimal required configuration)
51
2. id, status, priority
52
3. id, status, runat
53
4. id, status, priority, runat
54

55
Primary index variants:
56
1. index(status, [ id ])
57
2. index(status, priority, [ id ])
58

59

60
Format:
61
local format = box.space._space.index.name:get(space_name)[ 7 ]
62

63
Status:
64
        R - ready - task is ready to be taken
65
                created by put without delay
66
                turned on by release/kick without delay
67
                turned on from W when delay passed
68
                deleted after ttl if ttl is enabled
69

70
        T - taken - task is taken by consumer. may not be taken by other.
71
                turned into R after ttr if ttr is enabled
72

73
        W - waiting - task is not ready to be taken. waiting for its `delay`
74
                requires `runat`
75
                `delay` may be set during put, release, kick
76
                turned into R after delay
77

78
        B - buried - task was temporary discarded from queue by consumer
79
                may be revived using kick by administrator
80
                use it in unpredicted conditions, when man intervention is required
81
                Without mandatory monitoring (stats.buried) usage of buried is useless and awry
82

83
        Z - zombie - task was processed and ack'ed and *temporary* kept for delay
84

85
        D - done - task was processed and ack'ed and permanently left in database
86
                enabled when keep feature is set
87

88
        X - reserved for statistics
89

90
(TODO: reload/upgrade and feature switch)
91

92
Interface:
93

94
# Creator methods:
95

96
        M.upgrade(space, {
97
                format = {
98
                        -- space format. applied to space.format() if passed
99
                },
100
                fields = {
101
                        -- id is always taken from pk
102
                        status   = 'status_field_name'    | status_field_no,
103
                        runat    = 'runat_field_name'     | runat_field_no,
104
                        priority = 'priority_field_name'  | priority_field_no,
105
                },
106
                features = {
107
                        id = 'auto_increment' | 'uuid' | 'required' | function
108
                                -- auto_increment - if pk is number, then use it for auto_increment
109
                                -- uuid - if pk is string, then use uuid for id
110
                                -- required - primary key MUST be present in tuple during put
111
                                -- function - funciton will be called to aquire id for task
112

113
                        buried = true,           -- if true, support bury/kick
114
                        delayed = true,          -- if true, support delayed tasks, requires `runat`
115

116
                        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
117
                                -- mutually exclusive with zombie
118

119
                        zombie = true|number,    -- requires `runat` field
120
                                -- if number, then with default zombie delay, otherwise only if set delay during ack
121
                                -- mutually exclusive with keep
122

123
                        ttl    = true|number,    -- requires `runat` field
124
                                -- if number, then with default ttl, otherwise only if set during put/release
125
                        ttr    = true|number,    -- requires `runat` field
126
                                -- if number, then with default ttl, otherwise only if set
127
                },
128
        })
129

130
Producer methods:
131
        sp:put({...}, [ attr ]) (array or table (if have format) or tuple)
132

133
Consumer methods:
134

135
* id:
136
        - array of keyfields of pk
137
        - table of named keyields
138
        - tuple
139

140
sp:take(timeout) -> tuple
141
sp:ack(id, [ attr ])
142
        attr.update (only if support zombie)
143
sp:release(id, [ attr ])
144
        attr.ttr (only if support ttr)
145
        attr.ttl (only if support ttl)
146
        attr.update
147
sp:bury(id, [ attr ])
148
        attr.ttl (only if support ttl)
149
        attr.update
150

151
Admin methods:
152

153
sp:queue_stats()
154
sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]eady
155

156

157
]]
158

159
local json = require 'json'
1✔
160
json.cfg{ encode_invalid_as_nil = true }
1✔
161

162

163
local function typeeq(src, ref)
164
        if ref == 'str' then
14✔
165
                return src == 'STR' or src == 'str' or src == 'string'
8✔
166
        elseif ref == 'num' then
6✔
167
                return src == 'NUM' or src == 'num' or src == 'number' or src == 'unsigned'
6✔
168
        else
169
                return src == ref
×
170
        end
171
end
172

173
local function is_eq(a, b, depth)
174
        local t = type(a)
×
175
        if t ~= type(b) then return false end
×
176
        if t == 'table' then
×
177
                for k,v in pairs(a) do
×
178
                        if b[k] == nil then return end
×
179
                        if not is_eq(v,b[k]) then return false end
×
180
                end
181
                for k,v in pairs(b) do
×
182
                        if a[k] == nil then return end
×
183
                        if not is_eq(v,a[k]) then return false end
×
184
                end
185
                return true
×
186
        elseif t == 'string' or t == 'number' then
×
187
                return a == b
×
188
        elseif t == 'cdata' then
×
189
                return ffi.typeof(a) == ffi.typeof(b) and a == b
×
190
        else
191
                error("Wrong types for equality", depth or 2)
×
192
        end
193
end
194

195

196
local function _tuple2table ( qformat )
197
        local rows = {}
3✔
198
        for k,v in ipairs(qformat) do
15✔
199
                table.insert(rows,"\t['"..v.name.."'] = t["..tostring(k).."];\n")
12✔
200
        end
201
        local fun = "return function(t,...) "..
3✔
202
                "if select('#',...) > 0 then error('excess args',2) end "..
3✔
203
                "return t and {\n"..table.concat(rows, "").."} or nil end\n"
3✔
204
        return dostring(fun)
3✔
205
end
206

207
local function _table2tuple ( qformat )
208
        local rows = {}
3✔
209
        for _,v in ipairs(qformat) do
15✔
210
                table.insert(rows,"\tt['"..v.name.."'] == nil and NULL or t['"..v.name.."'];\n")
12✔
211
        end
212
        local fun = "local NULL = require'msgpack'.NULL return function(t) return "..
3✔
213
                "t and box.tuple.new({\n"..table.concat(rows, "").."}) or nil end\n"
3✔
214
        -- print(fun)
215
        return dostring(fun)
3✔
216
end
217

218
---@class xqueue.space
219
local methods = {}
1✔
220

221
---@class PrimaryKeyField:table
222
---@field no number position in tuple
223
---@field name string name of the field
224
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
225

226
---@class xqFeatures: table
227
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
228
---@field retval "tuple" | "table"
229
---@field buried boolean
230
---@field delayed boolean
231
---@field keep boolean
232
---@field tube boolean
233
---@field zombie boolean|number
234
---@field zombie_delay number
235
---@field ttl boolean|number
236
---@field ttr boolean|number
237
---@field ttl_default number?
238
---@field ttr_default number?
239

240
---@class xq:table
241
---@field NEVER integer (Default: 0)
242
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
243
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
244
---@field taken table<string,number> mapping key => sid
245
---@field _lock table<string,boolean> locks key => boolean (in atomic)
246
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
247
---@field take_wait fiber.channel
248
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
249
---@field debug boolean
250
---@field have_runat boolean
251
---@field gen_id? fun(): scalar
252
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
253
---@field packkey fun(self: xq, key: any): string
254
---@field tube_index? boxIndex
255
---@field index boxIndex
256
---@field key PrimaryKeyField
257
---@field fieldmap table<string,number>
258
---@field timeoffset fun(delta: number): number
259
---@field features xqFeatures
260
---@field fields table<string, string|number>
261
---@field tuple fun(tbl: table): box.tuple
262
---@field table fun(tuple: box.tuple): table
263
---@field retwrap fun(t: box.tuple|table): table|box.tuple
264
---@field wakeup fun(self: xq, t: box.tuple|table)
265
---@field runat_chan fiber.channel
266
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
267
---@field put_back fun(key: table|box.tuple)
268
---@field _stat { counts: table, transition: table }
269
---@field putback fun(self: xq, task: table|box.tuple)
270
---@field _default_truncate fun(space: boxSpaceObject)
271
---@field _on_repl replaceTrigger
272
---@field _on_dis fun()
273

274

275
---@class xqueue.space: boxSpaceObject
276
---@field xq xq xqueue specific storage
277

278
---@class xqueue.fields
279
---@field status? string|number Xqueue name for the Status field
280
---@field runat? string|number Xqueue name for the RunAt field
281
---@field priority? string|number Xqueue name for Task priority field
282
---@field tube? string|number Xqueue name for Tube field
283

284
---@class xqueue.features
285
---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation
286
---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods.
287
---(Default: table when space with format, tuple when space is without format)
288
---@field buried? boolean should xqueue allow buring of the tasks (by default: true)
289
---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not
290
---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false)
291
---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep
292
---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present)
293
---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default.
294
---Requires runat field and index.
295
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
296
---Requires runat field and index.
297

298
---@class xqueue.upgrade.options
299
---@field format? boxSpaceFormat
300
---@field fields xqueue.fields
301
---@field debug? boolean
302
---@field tube_stats? string[] List of tube names for per-tube statistics
303
---@field features xqueue.features
304
---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback
305
---@field workers? number (number of workers to spawn)
306

307
---Upgrades given space to xqueue instance
308
---@param space xqueue.space
309
---@param opts xqueue.upgrade.options
310
---@param depth? number
311
function M.upgrade(space,opts,depth)
1✔
312
        depth = depth or 0
3✔
313
        log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
3✔
314
        if not opts.fields then error("opts.fields required",2) end
3✔
315
        if opts.format then
3✔
316
                -- todo: check if already have such format
317
                local format_av = box.space._space.index.name:get(space.name)[ 7 ]
×
318
                if not is_eq(format_av, opts.format) then
×
319
                        space:format(opts.format)
×
320
                else
321
                        print("formats are equal")
×
322
                end
323
        end
324

325
        -- This variable will be defined later
326
        local taken_mt
327

328
        local self = {}
3✔
329
        if space.xq then
3✔
330
                self.taken = space.xq.taken
×
331
                self._stat = space.xq._stat
×
332
                self.bysid = space.xq.bysid
×
333
                self._lock = space.xq._lock
×
334
                self.take_wait = space.xq.take_wait
×
335
                self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
×
336
                self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
×
337
                self._on_repl  = space.xq._on_repl
×
338
                self._on_dis = space.xq._on_dis
×
339
        else
340
                self.taken = {}
3✔
341
                self.bysid = {}
3✔
342
                -- byfid = {};
343
                self._lock = {}
3✔
344
                self.put_wait = setmetatable({}, { __mode = 'v' })
3✔
345
                self.take_wait = fiber.channel(0)
3✔
346
                self.take_chans = setmetatable({}, { __mode = 'v' })
3✔
347
        end
348
        setmetatable(self.bysid, {
6✔
349
                __serialize='map',
350
                __newindex = function(t, key, val)
351
                        if type(val) == 'table' then
4✔
352
                                rawset(t, key, setmetatable(val, taken_mt))
4✔
353
                        else
NEW
354
                                rawset(t, key, val)
×
355
                        end
356
                end
357
        })
358
        self.debug = not not opts.debug
3✔
359

360
        if not self._default_truncate then
3✔
361
                self._default_truncate = space.truncate
3✔
362
        end
363

364
        local format_av = box.space._space.index.name:get(space.name)[ 7 ]
6✔
365
        local format = {}
3✔
366
        local have_format = false
3✔
367
        local have_runat = false
3✔
368
        for no,f in pairs(format_av) do
15✔
369
                format[ f.name ] = {
12✔
370
                        name = f.name;
12✔
371
                        type = f.type;
12✔
372
                        no   = no;
12✔
373
                }
12✔
374
                format[ no ] = format[ f.name ];
12✔
375
                have_format = true
12✔
376
                self.have_format = true
12✔
377
        end
378
        for _,idx in pairs(space.index) do
21✔
379
                for _,part in pairs(idx.parts) do
50✔
380
                        format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
32✔
381
                        format[ part.fieldno ].type = part.type
32✔
382
                end
383
        end
384

385
        -- dd(format)
386

387
        -- 1. fields check
388
        local fields = {}
3✔
389
        local fieldmap = {}
3✔
390
        for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
15✔
391
                local fname,ftype = f[1], f[2]
12✔
392
                local num = opts.fields[fname]
12✔
393
                if num then
12✔
394
                        if type(num) == 'string' then
6✔
395
                                if format[num] then
6✔
396
                                        fields[fname] = format[num].no;
6✔
397
                                else
398
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
399
                                end
400
                        elseif type(num) == 'number' then
×
401
                                if format[num] then
×
402
                                        fields[fname] = num
×
403
                                else
404
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
405
                                end
406
                        else
407
                                error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
×
408
                        end
409
                        -- check type
410
                        if format[num] then
6✔
411
                                if not typeeq(format[num].type, ftype) then
12✔
412
                                        error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
×
413
                                end
414
                        end
415
                        fieldmap[fname] = format[num].name
6✔
416
                end
417
        end
418

419
        -- dd(fields)
420

421
        -- 2. index check
422

423
        local pk = space.index[0]
3✔
424
        if #pk.parts ~= 1 then
3✔
425
                error("Composite primary keys are not supported yet")
×
426
        end
427
        local pktype
428
        if typeeq( format[pk.parts[1].fieldno].type, 'str' ) then
6✔
429
                pktype = 'string'
1✔
430
                taken_mt = { __serialize = 'map' }
1✔
431
        elseif typeeq( format[pk.parts[1].fieldno].type, 'num' ) then
4✔
432
                pktype = 'number'
2✔
433
                taken_mt = {
2✔
434
                        __serialize = 'map',
435
                        __newindex = function(t, key, val)
436
                                return rawset(t, tostring(ffi.cast("uint64_t", key)), val)
80✔
437
                        end,
438
                        __index = function(t, key)
439
                                return rawget(t, tostring(ffi.cast("uint64_t", key)))
60✔
440
                        end
441
                }
2✔
442
        else
443
                error("Unknown key type "..format[pk.parts[1].fieldno].type)
×
444
        end
445
        setmetatable(self.taken, taken_mt)
3✔
446

447
        local pkf = {
3✔
448
                no   = pk.parts[1].fieldno;
3✔
449
                name = format[pk.parts[1].fieldno].name;
3✔
450
                ['type'] = pktype;
3✔
451
        }
452
        self.key = pkf
3✔
453
        self.fields = fields
3✔
454
        self.fieldmap = fieldmap
3✔
455

456

457
        function self:getkey(arg)
3✔
458
                local _type = type(arg)
25✔
459
                if _type == 'table' then
25✔
460
                        if is_array(arg) then
50✔
461
                                return arg[ self.key.no ]
×
462
                        else
463
                                return arg[ self.key.name ]
25✔
464
                                -- pass
465
                        end
466
                elseif _type == 'cdata' and ffi.typeof(arg) == tuple_ctype then
×
467
                        return arg[ self.key.no ]
×
468
                elseif _type == self.key.type then
×
469
                        return arg
×
470
                else
471
                        error("Wrong key/task argument. Expected table or tuple or key", 3)
×
472
                end
473
        end
474

475
        function self.packkey(_, key)
3✔
476
                if type(key) == 'cdata' then
25✔
477
                        return tostring(ffi.cast("uint64_t", key))
20✔
478
                else
479
                        return key
5✔
480
                end
481
        end
482

483
        do
484
                local filter
485
                if fields.priority then
3✔
486
                        filter = function(index)
487
                                return #index.parts >= 3
×
488
                                        and index.parts[1].fieldno == fields.status
×
489
                                        and index.parts[2].fieldno == fields.priority
×
490
                                        and index.parts[3].fieldno == self.key.no
×
491
                        end
492
                else
493
                        filter = function(index)
494
                                return #index.parts >= 2
7✔
495
                                        and index.parts[1].fieldno == fields.status
4✔
496
                                        and index.parts[2].fieldno == self.key.no
7✔
497
                        end
498
                end
499
                for i,index in pairs(space.index) do
7✔
500
                        if type(i) == 'number' and filter(index) then
14✔
501
                                self.index = index
3✔
502
                                break
3✔
503
                        end
504
                end
505
                if not self.index then
3✔
506
                        if fields.priority then
×
507
                                error("not found index by status + priority + id",2+depth)
×
508
                        else
509
                                error("not found index by status + id",2+depth)
×
510
                        end
511
                end
512

513
                if fields.tube then
3✔
514
                        for n,index in pairs(space.index) do
2✔
515
                                if type(n) == 'number' and index.parts[1].fieldno == fields.tube then
2✔
516
                                        local not_match = false
1✔
517
                                        for i = 2, #index.parts do
3✔
518
                                                if index.parts[i].fieldno ~= self.index.parts[i-1].fieldno then
2✔
519
                                                        not_match = true
×
520
                                                        break
521
                                                end
522
                                        end
523
                                        if not not_match then
1✔
524
                                                self.tube_index = index
1✔
525
                                                break
1✔
526
                                        end
527
                                end
528
                        end
529
                        if not self.tube_index then
1✔
530
                                if fields.priority then
×
531
                                        error("not found index by tube + status + priority + id",2+depth)
×
532
                                else
533
                                        error("not found index by tube + status + id",2+depth)
×
534
                                end
535
                        end
536
                end
537
        end
538

539
        ---@type boxIndex
540
        local runat_index
541
        if fields.runat then
3✔
542
                for _,index in pairs(space.index) do
7✔
543
                        if type(_) == 'number' then
7✔
544
                                if index.parts[1].fieldno == fields.runat then
7✔
545
                                        -- print("found",index.name)
546
                                        runat_index = index
2✔
547
                                        break
2✔
548
                                end
549
                        end
550
                end
551
                if not runat_index then
2✔
552
                        error(string.format("fields.runat requires tree index with this first field in it"),2+depth)
×
553
                else
554
                        local t = runat_index:pairs({0},{iterator = box.index.GT}):nth(1)
4✔
555
                        if t then
2✔
556
                                if t[ self.fields.runat ] < monotonic_max_age then
×
557
                                        error("!!! Queue contains monotonic runat. Consider updating tasks (https://github.com/moonlibs/xqueue/issues/2)")
×
558
                                end
559
                        end
560
                        have_runat = true
2✔
561
                end
562
        end
563
        self.have_runat = have_runat
3✔
564

565
        ---@type table<string, { counts: {}, transition: {} }>
566
        local stat_tube = {}
3✔
567
        if self.fields.tube and type(opts.tube_stats) == 'table' then
3✔
NEW
568
                for _, tube_name in ipairs(opts.tube_stats) do
×
NEW
569
                        if type(tube_name) == 'string' then
×
NEW
570
                                stat_tube[tube_name] = {
×
571
                                        counts = {},
572
                                        transition = {},
573
                                }
574
                        end
575
                end
576
        end
577

578
        if not self._stat then
3✔
579
                self._stat = {
3✔
580
                        counts = {};
3✔
581
                        transition = {};
3✔
582
                        tube = stat_tube;
3✔
583
                }
3✔
584
                -- TODO: benchmark index:count()
585
                if self.fields.tube then
3✔
586
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
3✔
NEW
587
                                local s = t[self.fields.status]
×
NEW
588
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
589

NEW
590
                                local tube = t[self.fields.tube]
×
NEW
591
                                if stat_tube[tube] then
×
NEW
592
                                        stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1
×
593
                                end
594
                        end
595
                else
596
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
6✔
NEW
597
                                local s = t[self.fields.status]
×
NEW
598
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
599
                        end
600
                end
601
        end
602

603
        -- 3. features check
604

605
        local features = {}
3✔
606

607
        local gen_id
608
        opts.features = opts.features or {}
3✔
609
        if not opts.features.id then
3✔
610
                opts.features.id = 'uuid'
×
611
        end
612
        -- 'auto_increment' | 'time64' | 'uuid' | 'required' | function
613
        if opts.features.id == 'auto_increment' then
3✔
614
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
×
615
                        error("For auto_increment numeric pk is mandatory",2+depth)
×
616
                end
617
                local pk_fno = pk.parts[1].fieldno
×
618
                gen_id = function()
619
                        local max = pk:max()
×
620
                        if max then
×
621
                                return max[pk_fno] + 1
×
622
                        else
623
                                return 1
×
624
                        end
625
                end
626
        elseif opts.features.id == 'time64' then
3✔
627
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
4✔
628
                        error("For time64 numeric pk is mandatory",2+depth)
×
629
                end
630
                gen_id = function()
631
                        local key = clock.realtime64()
13✔
632
                        while true do
633
                                local exists = pk:get(key)
13✔
634
                                if not exists then
13✔
635
                                        return key
13✔
636
                                end
637
                                key = key + 1
×
638
                        end
639
                end
640
        elseif opts.features.id == 'uuid' then
1✔
641
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'str')) then
2✔
642
                        error("For uuid string pk is mandatory",2+depth)
×
643
                end
644
                local uuid = require'uuid'
1✔
645
                gen_id = function()
646
                        while true do
647
                                local key = uuid.str()
2✔
648
                                local exists = pk:get(key)
2✔
649
                                if not exists then
2✔
650
                                        return key
2✔
651
                                end
652
                        end
653
                end
654
        elseif opts.features.id == 'required' then
×
655
                -- gen_id = function()
656
                --         -- ???
657
                --         error("TODO")
658
                -- end
659
        elseif type(opts.features.id) == 'function'
×
660
                or (debug.getmetatable(opts.features.id)
×
661
                and debug.getmetatable(opts.features.id).__call)
×
662
        then
663
                gen_id = opts.features.id
×
664
        else
665
                error(string.format(
×
666
                        "Wrong type for features.id %s, may be 'auto_increment' | 'time64' | 'uuid' | 'required' | function",
667
                        opts.features.id
×
668
                ), 2+depth)
×
669
        end
670

671
        if not opts.features.retval then
3✔
672
                opts.features.retval = have_format and 'table' or 'tuple'
3✔
673
        end
674

675
        if format then
3✔
676
                self.tuple = _table2tuple(format)
6✔
677
                self.table = _tuple2table(format)
6✔
678
        end
679

680
        if opts.features.retval == 'table' then
3✔
681
                self.retwrap = self.table
3✔
682
        else
683
                self.retwrap = function(t) return t end
×
684
        end
685

686
        features.buried = not not opts.features.buried
3✔
687
        features.keep   = not not opts.features.keep
3✔
688
        if opts.features.delayed then
3✔
689
                if not have_runat then
2✔
690
                        error(string.format("Delayed feature requires runat field and index" ),2+depth)
×
691
                end
692
                features.delayed = true
2✔
693
        else
694
                features.delayed = false
1✔
695
        end
696

697
        if opts.features.zombie then
3✔
698
                if features.keep then
1✔
699
                        error(string.format("features keep and zombie are mutually exclusive" ),2+depth)
×
700
                end
701
                if not have_runat then
1✔
702
                        error(string.format("feature zombie requires runat field and index" ),2+depth)
×
703
                end
704

705
                features.zombie = true
1✔
706
                if type(opts.features.zombie) == 'number' then
1✔
707
                        features.zombie_delay = opts.features.zombie
×
708
                end
709
        else
710
                features.zombie = false
2✔
711
        end
712

713
        if opts.features.ttl then
3✔
714
                if not have_runat then
×
715
                        error(string.format("feature ttl requires runat field and index" ),2+depth)
×
716
                end
717

718
                features.ttl = true
×
719
                if type(opts.features.ttl) == 'number' then
×
720
                        features.ttl_default = opts.features.ttl
×
721
                end
722
        else
723
                features.ttl = false
3✔
724
        end
725

726
        if opts.features.ttr then
3✔
727
                if not have_runat then
×
728
                        error(string.format("feature ttr requires runat field and index" ),2+depth)
×
729
                end
730

731
                features.ttr = true
×
732
                if type(opts.features.ttr) == 'number' then
×
733
                        features.ttr_default = opts.features.ttr
×
734
                end
735
        else
736
                features.ttr = false
3✔
737
        end
738

739
        if fields.tube then
3✔
740
                features.tube = true
1✔
741
        end
742

743
        self.gen_id = gen_id
3✔
744
        self.features = features
3✔
745
        self.space = space.id
3✔
746

747
        function self.timeoffset(delta)
3✔
748
                return clock.realtime() + tonumber(delta)
8✔
749
        end
750
        function self.timeready(time)
3✔
751
                return time < clock.realtime()
11✔
752
        end
753
        function self.timeremaining(time)
3✔
754
                return time - clock.realtime()
8✔
755
        end
756
        -- self.NEVER = -1ULL
757
        self.NEVER = 0
3✔
758

759
        function self.keyfield(_,t)
3✔
760
                return t[pkf.no]
12✔
761
        end
762
        function self.keypack(_,t)
3✔
763
                return t[pkf.no]
×
764
        end
765

766
        -- Notify producer if it is still waiting us.
767
        -- Producer waits only for successfully processed task
768
        -- or for task which would never be processed.
769
        -- Discarding to process task depends on consumer's logic.
770
        -- Also task can be deleted from space by exceeding TTL.
771
        -- Producer should not be notified if queue is able to process task in future.
772
        local function notify_producer(key, task)
773
                local pkey = self:packkey(key)
25✔
774
                local wait = self.put_wait[pkey]
25✔
775
                if not wait then
25✔
776
                        -- no producer
777
                        return
25✔
778
                end
779

780
                if task.status == 'Z' or task.status == 'D' then
×
781
                        -- task was processed
782
                        wait.task = task
×
783
                        wait.processed = true
×
784
                        wait.cond:broadcast()
×
785
                        return
×
786
                end
787
                if not space:get{key} then
×
788
                        -- task is not present in space
789
                        -- it could be TTL or :ack
790
                        if task.status == 'T' then
×
791
                                -- task was acked:
792
                                wait.task = task
×
793
                                wait.processed = true
×
794
                                wait.cond:broadcast()
×
795
                        elseif task.status == 'R' then
×
796
                                -- task was killed by TTL
797
                                wait.task = task
×
798
                                wait.processed = false
×
799
                                wait.cond:broadcast()
×
800
                        end
801
                else
802
                        -- task is still in space
803
                        if task.status == 'B' then
×
804
                                -- task was buried
805
                                wait.task = task
×
806
                                wait.processed = false
×
807
                                wait.cond:broadcast()
×
808
                        end
809
                end
810
        end
811

812
        self.ready = fiber.channel(0)
3✔
813

814
        if opts.worker then
3✔
815
                local workers = opts.workers or 1
×
816
                local worker = opts.worker
×
NEW
817
                for id = 1,workers do
×
818
                        fiber.create(function(space,xq,i)
×
819
                                local fname = space.name .. '.xq.wrk' .. tostring(i)
×
820
                                ---@diagnostic disable-next-line: undefined-field
821
                                if package.reload then fname = fname .. '.' .. package.reload.count end
×
822
                                fiber.name(string.sub(fname,1,32))
×
823
                                repeat fiber.sleep(0.001) until space.xq
×
824
                                if xq.ready then xq.ready:get() end
×
825
                                log.info("I am worker %s",i)
×
826
                                if box.info.ro then
×
827
                                        log.notice("Shutting down on ro instance")
×
828
                                        return
×
829
                                end
830
                                while box.space[space.name] and space.xq == xq do
×
831
                                        local task = space:take(1)
×
832
                                        if task then
×
833
                                                local key = xq:getkey(task)
×
834
                                                local r,e = pcall(worker,task)
×
835
                                                if not r then
×
836
                                                        log.error("Worker for {%s} has error: %s", key, e)
×
837
                                                else
838
                                                        if xq.taken[ key ] then
×
839
                                                                space:ack(task)
×
840
                                                        end
841
                                                end
842
                                                if xq.taken[ key ] then
×
843
                                                        log.error("Worker for {%s} not released task", key)
×
844
                                                        space:release(task)
×
845
                                                end
846
                                        end
847
                                        fiber.yield()
×
848
                                end
849
                                log.info("worker %s ended", i)
×
NEW
850
                        end,space,self,id)
×
851
                end
852
        end
853

854
        if have_runat then
3✔
855
                self.runat_chan = fiber.channel(0)
2✔
856
                self.runat = fiber.create(function(space,xq,runat_index)
4✔
857
                        local fname = space.name .. '.xq'
2✔
858
                        if package.reload then fname = fname .. '.' .. package.reload.count end
2✔
859
                        fiber.name(string.sub(fname,1,32))
4✔
860
                        repeat fiber.sleep(0.001) until space.xq
2✔
861
                        if xq.ready then xq.ready:get() end
2✔
862
                        local chan = xq.runat_chan
2✔
863
                        log.info("Runat started")
2✔
864
                        if box.info.ro then
2✔
865
                                log.notice("Shutting down on ro instance")
×
866
                                return
×
867
                        end
868
                        local maxrun = 1000
2✔
869
                        local curwait
870
                        local collect = {}
2✔
871
                        while box.space[space.name] and space.xq == xq do
20✔
872
                                local r,e = pcall(function()
40✔
873
                                        -- print("runat loop 2 ",box.time64())
874
                                        local remaining
875
                                        for _,t in runat_index:pairs({0},{iterator = box.index.GT}) do
66✔
876

877
                                                -- print("checking ",t)
878
                                                if xq.timeready( t[ xq.fields.runat ] ) then
33✔
879
                                                        table.insert(collect,t)
3✔
880
                                                else
881
                                                        remaining = xq.timeremaining(t[ xq.fields.runat ])
24✔
882
                                                        break
8✔
883
                                                end
884
                                                if #collect >= maxrun then remaining = 0 break end
3✔
885
                                        end
886

887
                                        for _,t in ipairs(collect) do
23✔
888
                                                -- log.info("Runat: %s, %s", _, t)
889
                                                if t[xq.fields.status] == 'W' then
6✔
890
                                                        log.info("Runat: W->R %s",xq:keyfield(t))
4✔
891
                                                        -- TODO: default ttl?
892
                                                        local u = space:update({ xq:keyfield(t) },{
8✔
893
                                                                { '=',xq.fields.status,'R' },
2✔
894
                                                                { '=',xq.fields.runat, xq.NEVER }
2✔
895
                                                        })
896
                                                        xq:wakeup(u)
4✔
897
                                                elseif t[xq.fields.status] == 'R' and xq.features.ttl then
2✔
898
                                                        local key = xq:keyfield(t)
×
899
                                                        log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
×
900
                                                        t = space:delete{key}
×
901
                                                        notify_producer(key, t)
×
902
                                                elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
2✔
903
                                                        log.info("Runat: Kill Zombie %s",xq:keyfield(t))
2✔
904
                                                        space:delete{ xq:keyfield(t) }
4✔
905
                                                elseif t[xq.fields.status] == 'T' and xq.features.ttr then
×
906
                                                        local key = xq:keypack(t)
×
907
                                                        local sid = xq.taken[ key ]
×
908
                                                        local peer = peers[sid] or sid
×
909

910
                                                        log.info("Runat: autorelease T->R by ttr %s taken by %s",xq:keyfield(t), peer)
×
911
                                                        local u = space:update({ xq:keyfield(t) },{
×
912
                                                                { '=',xq.fields.status,'R' },
×
913
                                                                { '=',xq.fields.runat, xq.NEVER }
×
914
                                                        })
915
                                                        xq.taken[ key ] = nil
×
916
                                                        if sid then
×
917
                                                                self.bysid[ sid ][ key ] = nil
×
918
                                                        end
919
                                                        xq:wakeup(u)
×
920
                                                else
921
                                                        log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
×
922
                                                        space:update({ xq:keyfield(t) },{
×
923
                                                                { '=',xq.fields.runat, xq.NEVER }
×
924
                                                        })
925
                                                end
926
                                        end
927

928
                                        if remaining then
20✔
929
                                                if remaining >= 0 and remaining < 1 then
8✔
930
                                                        return remaining
8✔
931
                                                end
932
                                        end
933
                                        return 1
12✔
934
                                end)
935

936
                                table_clear(collect)
20✔
937

938
                                if r then
20✔
939
                                        curwait = e
20✔
940
                                else
941
                                        curwait = 1
×
942
                                        log.error("Runat/ERR: %s",e)
×
943
                                end
944
                                -- log.info("Wait %0.2fs",curwait)
945
                                if curwait == 0 then fiber.sleep(0) end
20✔
946
                                chan:get(curwait)
20✔
947
                        end
948
                        log.info("Runat ended")
×
949
                end,space,self,runat_index)
4✔
950
        end
951

952
        local function atomic_tail(self, key, status, ...)
953
                self._lock[key] = nil
39✔
954
                if not status then
39✔
955
                        error((...), 3)
×
956
                end
957
                return ...
39✔
958
        end
959
        function self:atomic(key, fun, ...)
3✔
960
                self._lock[key] = true
39✔
961
                return atomic_tail(self, key, pcall(fun, ...))
78✔
962
        end
963

964
        function self:check_owner(key)
3✔
965
                local t = space:get(key)
48✔
966
                if not t then
24✔
967
                        error(string.format( "Task {%s} was not found", key ),2)
×
968
                end
969
                if not self.taken[key] then
44✔
970
                        error(string.format( "Task %s not taken by any", key ),2)
×
971
                end
972
                if self.taken[key] ~= box.session.id() then
44✔
973
                        error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2)
×
974
                end
975
                return t
24✔
976
        end
977

978
        function self:putback(task)
3✔
979
                local key = task[ self.key.no ]
25✔
980
                local sid = self.taken[ key ]
25✔
981
                if sid then
25✔
982
                        self.taken[ key ] = nil
24✔
983
                        if self.bysid[ sid ] then
24✔
984
                                self.bysid[ sid ][ key ] = nil
44✔
985
                        else
986
                                log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
×
987
                        end
988
                else
989
                        log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
1✔
990
                end
991

992
                notify_producer(key, task)
25✔
993
        end
994

995
        function self:wakeup(t)
3✔
996
                if t[self.fieldmap.status] ~= 'R' then return end
68✔
997
                if self.fieldmap.tube then
23✔
998
                        -- we may have consumers in the tubes:
999
                        local tube_chan = self.take_chans[t[self.fieldmap.tube]]
30✔
1000
                        if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
15✔
1001
                                -- we have successfully notified consumer
1002
                                return
×
1003
                        end
1004
                        -- otherwise fallback to default channel:
1005
                end
1006
                if self.take_wait:has_readers() then
23✔
1007
                        self.take_wait:put(true,0)
3✔
1008
                end
1009
        end
1010

1011
        function self:make_ready()
3✔
1012
                while self.ready and self.ready:has_readers() do
3✔
1013
                        self.ready:put(true,0)
×
1014
                        fiber.sleep(0)
×
1015
                end
1016
                self.ready = nil
3✔
1017
        end
1018

1019
        local meta = debug.getmetatable(space)
3✔
1020
        for k,v in pairs(methods) do meta[k] = v end
36✔
1021

1022
        -- Triggers must set right before updating space
1023
        -- because raising error earlier leads to trigger inconsistency
1024
        self._on_repl = space:on_replace(function(old, new)
6✔
1025
                local old_st, new_st
1026
                local old_tube, new_tube
1027
                local old_tube_stat, new_tube_stat
1028
                local counts = self._stat.counts
67✔
1029
                local tube_ss = self._stat.tube
67✔
1030

1031
                if old then
67✔
1032
                        old_st = old[self.fields.status] --[[@as string]]
52✔
1033
                        if counts[old_st] and counts[old_st] > 0 then
52✔
1034
                                counts[old_st] = counts[old_st] - 1
52✔
1035
                        else
1036
                                log.error(
×
1037
                                        "Have not valid statistic by status: %s with value: %s",
1038
                                        old_st, tostring(counts[old_st])
×
1039
                                )
1040
                        end
1041
                        if self.fields.tube then
52✔
1042
                                old_tube = old[self.fields.tube]
30✔
1043
                                old_tube_stat = tube_ss[old_tube]
30✔
1044
                                if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then
30✔
NEW
1045
                                        if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then
×
NEW
1046
                                                old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1
×
1047
                                        else
NEW
1048
                                                log.error(
×
1049
                                                        "Have not valid statistic by tubs/status: tube %s with value: %s",
NEW
1050
                                                        old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st])
×
1051
                                                )
1052
                                        end
1053
                                else
1054
                                        old_tube_stat = nil
30✔
1055
                                end
1056
                        end
1057
                else
1058
                        old_st = 'X'
15✔
1059
                end
1060

1061
                if new then
67✔
1062
                        new_st = new[self.fields.status] --[[@as string]]
59✔
1063
                        counts[new_st] = (counts[new_st] or 0LL) + 1
59✔
1064
                        if counts[new_st] < 0 then
59✔
1065
                                log.error("Statistic overflow by task type: %s", new_st)
×
1066
                        end
1067
                        if self.fields.tube then
59✔
1068
                                new_tube = new[self.fields.tube]
35✔
1069
                                new_tube_stat = tube_ss[new_tube]
35✔
1070
                                if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then
35✔
NEW
1071
                                        if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then
×
NEW
1072
                                                new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1
×
1073
                                        else
NEW
1074
                                                log.error(
×
1075
                                                        "Have not valid statistic tube/status: tube %q with value: %s",
NEW
1076
                                                        new_tube, new_st, tostring(new_tube_stat.counts[new_st])
×
1077
                                                )
1078
                                        end
1079
                                else
1080
                                        new_tube_stat = nil
35✔
1081
                                end
1082
                        end
1083
                else
1084
                        new_st = 'X'
8✔
1085
                end
1086

1087
                local field = old_st.."-"..new_st
67✔
1088
                self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1
67✔
1089
                if old_tube_stat then
67✔
NEW
1090
                        if not new_tube_stat or old_tube_stat == new_tube_stat then
×
1091
                                -- no new tube or new and old tubes are the same
NEW
1092
                                old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1
×
1093
                        else
1094
                                -- nil != old_tube != new_tube != nil
1095
                                -- cross tube transition ?
1096
                                -- Can this be backoff with tube change?
NEW
1097
                                local old_field = old_st.."-S"
×
NEW
1098
                                local new_field = "S-"..new_st
×
NEW
1099
                                old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1
×
NEW
1100
                                new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1
×
1101
                        end
1102
                elseif new_tube_stat then
67✔
1103
                        -- old_tube_stat == nil
NEW
1104
                        new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1
×
1105
                end
1106
        end, self._on_repl)
73✔
1107

1108
        self._on_dis = box.session.on_disconnect(function()
6✔
1109
                local sid = box.session.id()
×
1110
                local peer = box.session.storage.peer
×
1111

1112
                log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
×
1113
                box.session.storage.destroyed = true
×
1114
                if self.bysid[sid] then
×
1115
                        local old = self.bysid[sid]
×
1116
                        while next(old) do
×
1117
                                for key,realkey in pairs(old) do
×
1118
                                        self.taken[key] = nil
×
1119
                                        old[key] = nil
×
1120
                                        local t = space:get(realkey)
×
1121
                                        if t then
×
1122
                                                if t[ self.fields.status ] == 'T' then
×
1123
                                                        self:wakeup(space:update({ realkey }, {
×
1124
                                                                { '=',self.fields.status,'R' },
×
1125
                                                                self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
×
1126
                                                        }))
1127
                                                        log.info("Rst: T->R {%s}", realkey )
×
1128
                                                else
1129
                                                        log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
×
1130
                                                end
1131
                                        else
1132
                                                log.error( "Rst: {%s}: taken not found", realkey )
×
1133
                                        end
1134
                                end
1135
                        end
1136
                        self.bysid[sid] = nil
×
1137
                end
1138
        end, self._on_dis)
6✔
1139

1140
        rawset(space,'xq',self)
3✔
1141

1142
        log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
3✔
1143

1144

1145
        function self:starttest()
3✔
1146
                local xq = self
3✔
1147
                if box.info.status == 'orphan' then
3✔
1148
                        fiber.create(function()
×
1149
                                local x = 0
×
1150
                                repeat
1151
                                        fiber.sleep(x/1e5)
×
1152
                                        x = x + 1
×
1153
                                until box.info.status ~= 'orphan'
×
1154
                                self:starttest()
×
1155
                        end)
1156
                        return
×
1157
                end
1158
                if box.info.ro then
3✔
1159
                        log.info("Server is ro, not resetting statuses")
×
1160
                else
1161
                        -- FIXME: with stable iterators add yield
1162
                        local space = box.space[self.space]
3✔
1163
                        box.begin()
3✔
1164
                        for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do
9✔
1165
                                local key = t[ xq.key.no ]
×
1166
                                if not self.taken[key] and not self._lock[key] then
×
1167
                                        local update = {
×
1168
                                                { '=', self.fields.status, 'R' },
×
1169
                                                self.have_runat and {
×
1170
                                                        '=',self.fields.runat,
×
1171
                                                        self.ttl_default and self.timeoffset( self.ttl_default ) or self.NEVER
×
1172
                                                } or nil
×
1173
                                        }
1174
                                        space:update({key}, update)
×
1175
                                        log.info("Start: T->R (%s)", key)
×
1176
                                end
1177
                        end
1178
                        box.commit()
3✔
1179
                end
1180
                self:make_ready()
3✔
1181
        end
1182
        self:starttest()
3✔
1183
end
1184

1185

1186
--[[
1187
* `space:put`
1188
        - `task` - table or array or tuple
1189
                + `table`
1190
                        * **requires** space format
1191
                        * suitable for id generation
1192
                + `array`
1193
                        * ignores space format
1194
                        * for id generation use `NULL` (**not** `nil`)
1195
                + `tuple`
1196
                        * ignores space format
1197
                        * **can't** be used with id generation
1198
        - `attr` - table of attributes
1199
                + `delay` - number of seconds
1200
                        * if set, task will become `W` instead of `R` for `delay` seconds
1201
                + `ttl` - number of seconds
1202
                        * if set, task will be discarded after ttl seconds unless was taken
1203
                + `wait` - number of seconds
1204
                        * if set, callee fiber will be blocked up to `wait` seconds until task won't
1205
                        be processed or timeout reached.
1206

1207
```lua
1208
box.space.myqueue:put{ name="xxx"; data="yyy"; }
1209
box.space.myqueue:put{ "xxx","yyy" }
1210
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })
1211

1212
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
1213
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
1214
```
1215
]]
1216

1217
---@param t any[]|box.tuple
1218
---@param opts? { delay: number?, ttl: number?, wait: number? }
1219
---@return table|box.tuple, boolean? has_been_processed
1220
function methods:put(t, opts)
1✔
1221
        local xq = self.xq
15✔
1222
        opts = opts or {}
15✔
1223
        if type(t) == 'table' then
15✔
1224
                if xq.gen_id then
15✔
1225
                        if is_array(t) then
30✔
1226
                                error("Task from array creation is not implemented yet", 2)
×
1227
                        else
1228
                                -- pass
1229
                        end
1230
                end
1231
        elseif type(t) == 'cdata' and ffi.typeof(t) == tuple_ctype then
×
1232
                if xq.gen_id then
×
1233
                        error("Can't use id generation with tuple.", 2)
×
1234
                end
1235
                error("Task from tuple creation is not implemented yet", 2)
×
1236
        else
1237
                error("Wrong argument to put. Expected table or tuple", 2)
×
1238
        end
1239
        -- here we have table with keys
1240
        if xq.gen_id then
15✔
1241
                t[ xq.key.name ] = xq.gen_id()
30✔
1242
        else
1243
                if not t[ xq.key.name ] then
×
1244
                        error("Primary key is mandatory",2)
×
1245
                end
1246
        end
1247

1248
        -- delayed or ttl or default ttl
1249
        if opts.delay then
15✔
1250
                if not xq.features.delayed then
1✔
1251
                        error("Feature delayed is not enabled",2)
×
1252
                end
1253
                if opts.ttl then
1✔
1254
                        error("Features ttl and delay are mutually exclusive",2)
×
1255
                end
1256
                t[ xq.fieldmap.status ] = 'W'
1✔
1257
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.delay)
2✔
1258

1259
                if opts.wait then
1✔
1260
                        error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
×
1261
                end
1262
        elseif opts.ttl then
14✔
1263
                if not xq.features.ttl then
×
1264
                        error("Feature ttl is not enabled",2)
×
1265
                end
1266
                t[ xq.fieldmap.status ] = 'R'
×
1267
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.ttl)
×
1268
        elseif xq.features.ttl_default then
14✔
1269
                t[ xq.fieldmap.status ] = 'R'
×
1270
                t[ xq.fieldmap.runat ]  = xq.timeoffset(xq.features.ttl_default)
×
1271
        elseif xq.have_runat then
14✔
1272
                t[ xq.fieldmap.status ] = 'R'
12✔
1273
                t[ xq.fieldmap.runat ]  = xq.NEVER
12✔
1274
        else
1275
                t[ xq.fieldmap.status ] = 'R'
2✔
1276
        end
1277

1278
        local tuple = xq.tuple(t)
15✔
1279
        local key = tuple[ xq.key.no ]
15✔
1280

1281
        local wait
1282
        if opts.wait then
15✔
1283
                wait = { cond = fiber.cond() }
×
1284
                xq.put_wait[xq:packkey(key)] = wait
×
1285
        end
1286

1287
        xq:atomic(key, function()
30✔
1288
                t = self:insert(tuple)
45✔
1289
        end)
1290
        xq:wakeup(t)
15✔
1291

1292
        if wait then
15✔
1293
                -- local func notify_producer will send to us some data
1294
                local ok = wait.cond:wait(opts.wait)
×
1295
                fiber.testcancel()
×
1296
                if ok and wait.task then
×
1297
                        return xq.retwrap(wait.task), wait.processed
×
1298
                end
1299
        end
1300
        return xq.retwrap(t)
15✔
1301
end
1302

1303
--[[
1304
* `space:wait(id, timeout)`
1305
        - `id`:
1306
                + `string` | `number` - primary key
1307
        - `timeout` - number of seconds to wait
1308
                * callee fiber will be blocked up to `timeout` seconds until task won't
1309
                be processed or timeout reached.
1310
]]
1311

1312
local wait_for = {
1✔
1313
        R = true,
1314
        T = true,
1315
        W = true,
1316
}
1317

1318
---@param key table|scalar|box.tuple
1319
---@param timeout number
1320
---@return table|box.tuple, boolean? has_been_processed
1321
function methods:wait(key, timeout)
1✔
1322
        local xq = self.xq
×
1323
        key = xq:getkey(key)
×
1324
        local task = self:get(key)
×
1325
        if not task then
×
1326
                error(("Task {%s} was not found"):format(key))
×
1327
        end
1328

1329
        local status = task[xq.fields.status]
×
1330
        if not wait_for[status] then
×
1331
                return xq.retwrap(task), false
×
1332
        end
1333

1334
        local pkey = xq:packkey(key)
×
1335
        local wait = xq.put_wait[pkey]
×
1336
        if not wait then
×
1337
                wait = { cond = fiber.cond() }
×
1338
                xq.put_wait[pkey] = wait
×
1339
        end
1340

1341
        -- local func notify_producer will send to us some data
1342
        local ok = wait.cond:wait(timeout)
×
1343
        fiber.testcancel()
×
1344
        if ok and wait.task then
×
1345
                return xq.retwrap(wait.task), wait.processed
×
1346
        end
1347

1348
        return xq.retwrap(task), false
×
1349
end
1350

1351
--[[
1352
* `space:take(timeout)`
1353
* `space:take(timeout, opts)`
1354
* `space:take(opts)`
1355
        - `timeout` - number of seconds to wait for new task
1356
                + choose reasonable time
1357
                + beware of **readahead** size (see tarantool docs)
1358
        - `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
1359
        - returns task tuple or table (see retval) or nothing on timeout
1360
        - *TODO*: ttr must be there
1361
]]
1362

1363
---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? }
1364
---@param opts? { timeout: number?, ttr: number?, tube: string? }
1365
---@return table|box.tuple?
1366
function methods:take(timeout, opts)
1✔
1367
        local xq = self.xq
38✔
1368
        timeout = timeout or 0
38✔
1369

1370
        if type(timeout) == 'table' then
38✔
1371
                opts = timeout
27✔
1372
                timeout = opts.timeout or 0
27✔
1373
        else
1374
                opts = opts or {}
11✔
1375
        end
1376
        assert(timeout >= 0, "timeout required")
38✔
1377

1378
        local ttr
1379
        if opts.ttr then
38✔
1380
                if not xq.features.ttr then
×
1381
                        error("Feature ttr is not enabled",2)
×
1382
                end
1383
                ttr = opts.ttr
×
1384
        elseif xq.features.ttr_default then
38✔
1385
                ttr = xq.features.ttr_default
×
1386
        end
1387

1388
        local index
1389
        local start_with
1390

1391
        local tube_chan
1392
        if opts.tube then
38✔
1393
                if not xq.features.tube then
20✔
1394
                        error("Feature tube is not enabled", 2)
×
1395
                end
1396

1397
                assert(type(opts.tube) == 'string', "opts.tube must be a string")
20✔
1398

1399
                index = xq.tube_index
20✔
1400
                start_with = {opts.tube, 'R'}
20✔
1401
                tube_chan = xq.take_chans[opts.tube] or fiber.channel()
20✔
1402
                xq.take_chans[opts.tube] = tube_chan
20✔
1403
        else
1404
                index = xq.index
18✔
1405
                start_with = {'R'}
18✔
1406
        end
1407
        ---@cast index -nil
1408

1409
        local now = fiber.time()
38✔
1410
        local key
1411
        local found
1412
        while not found do
79✔
1413
                for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
165✔
1414
                        key = t[ xq.key.no ]
24✔
1415
                        if not xq._lock[ key ] then
24✔
1416
                                -- found key
1417
                                xq._lock[ key ] = true
24✔
1418
                                found = t
24✔
1419
                                break
24✔
1420
                        end
1421
                end
1422
                if not found then
55✔
1423
                        local left = (now + timeout) - fiber.time()
62✔
1424
                        if left <= 0 then goto finish end
31✔
1425

1426
                        (tube_chan or xq.take_wait):get(left)
17✔
1427
                        if box.session.storage.destroyed then goto finish end
34✔
1428
                end
1429
        end
1430
        ::finish::
×
1431

1432
        -- If we were last reader from the tube
1433
        -- we remove channel. Writers will get false
1434
        -- on :put if they exists.
1435
        if tube_chan and not tube_chan:has_readers() then
38✔
1436
                tube_chan:close()
20✔
1437
                xq.take_chans[opts.tube] = nil
20✔
1438
        end
1439
        if not found then return end
38✔
1440

1441
        local r,e = pcall(function()
48✔
1442
                local sid = box.session.id()
24✔
1443
                local peer = box.session.storage.peer
48✔
1444

1445
                -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
1446
                if xq.debug then
24✔
1447
                        log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
9✔
1448
                end
1449

1450
                local update = {
24✔
1451
                        { '=', xq.fields.status, 'T' },
24✔
1452
                }
1453
                -- TODO: update more fields
1454

1455
                if ttr then
24✔
1456
                        table.insert(update,{ '=', xq.fields.runat, xq.timeoffset(ttr)})
×
1457
                        xq.runat_chan:put(true,0)
×
1458
                end
1459
                local t = self:update({key},update)
48✔
1460

1461
                if not xq.bysid[ sid ] then xq.bysid[ sid ] = {} end
24✔
1462
                xq.taken[ key ] = sid
24✔
1463
                xq.bysid[ sid ][ key ] = key
24✔
1464

1465
                return t
24✔
1466
        end)
1467

1468
        xq._lock[ key ] = nil
24✔
1469
        if not r then
24✔
1470
                error(e)
×
1471
        end
1472
        return xq.retwrap(e)
24✔
1473
end
1474

1475
--[[
1476
* `space:release(id, [attr])`
1477
        - `id`:
1478
                + `string` | `number` - primary key
1479
                + *TODO*: `tuple` - key will be extracted using index
1480
                + *TODO*: composite pk
1481
        - `attr`
1482
                + `update` - table for update, like in space:update
1483
                + `ttl` - timeout for time to live
1484
                + `delay` - number of seconds
1485
                        * if set, task will become `W` instead of `R` for `delay` seconds
1486
]]
1487

1488
---@param key table|scalar|box.tuple
1489
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1490
---@return table|box.tuple
1491
function methods:release(key, attr)
1✔
1492
        local xq = self.xq
13✔
1493
        key = xq:getkey(key)
26✔
1494

1495
        attr = attr or {}
13✔
1496

1497
        local t = xq:check_owner(key)
13✔
1498
        local old = t[ xq.fields.status ]
13✔
1499
        local update = {}
13✔
1500
        if attr.update then
13✔
1501
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1502
        end
1503

1504
        -- delayed or ttl or default ttl
1505
        if attr.delay then
13✔
1506
                if not xq.features.delayed then
6✔
1507
                        error("Feature delayed is not enabled",2)
×
1508
                end
1509
                if attr.ttl then
6✔
1510
                        error("Features ttl and delay are mutually exclusive",2)
×
1511
                end
1512
                table.insert(update, { '=', xq.fields.status, 'W' })
6✔
1513
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
12✔
1514
        elseif attr.ttl then
7✔
1515
                if not xq.features.ttl then
×
1516
                        error("Feature ttl is not enabled",2)
×
1517
                end
1518
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1519
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.ttl) })
×
1520
        elseif xq.features.ttl_default then
7✔
1521
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1522
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.ttl_default) })
×
1523
        elseif xq.have_runat then
7✔
1524
                table.insert(update, { '=', xq.fields.status, 'R' })
6✔
1525
                table.insert(update, { '=', xq.fields.runat, xq.NEVER })
6✔
1526
        else
1527
                table.insert(update, { '=', xq.fields.status, 'R' })
1✔
1528
        end
1529

1530
        xq:atomic(key,function()
26✔
1531
                t = self:update({key}, update)
39✔
1532

1533
                ---@cast t box.tuple
1534
                xq:wakeup(t)
13✔
1535
                if xq.have_runat then
13✔
1536
                        xq.runat_chan:put(true,0)
12✔
1537
                end
1538

1539
                log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
26✔
1540
                        key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
26✔
1541
        end)
1542

1543
        xq:putback(t)
13✔
1544

1545
        return t
13✔
1546
end
1547

1548
---@param key table|scalar|box.tuple
1549
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1550
---@return table|box.tuple
1551
function methods:ack(key, attr)
1✔
1552
        -- features.zombie
1553
        -- features.keep
1554
        local xq = self.xq
10✔
1555
        key = xq:getkey(key)
20✔
1556

1557
        attr = attr or {}
10✔
1558

1559
        local t = xq:check_owner(key)
10✔
1560
        local old = t[ xq.fields.status ]
10✔
1561
        local delete = false
10✔
1562
        local update = {}
10✔
1563
        if attr.update then
10✔
1564
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1565
        end
1566

1567
        -- delayed or ttl or default ttl
1568
        if attr.delay then
10✔
1569
                if not xq.features.zombie then
1✔
1570
                        error("Feature zombie is not enabled",2)
×
1571
                end
1572
                table.insert(update, { '=', xq.fields.status, 'Z' })
1✔
1573
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
2✔
1574
        elseif xq.features.zombie then
9✔
1575
                table.insert(update, { '=', xq.fields.status, 'Z' })
2✔
1576
                if xq.features.zombie_delay then
2✔
1577
                        table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.zombie_delay) })
×
1578
                end
1579
        elseif xq.features.keep then
7✔
1580
                table.insert(update, { '=', xq.fields.status, 'D' })
×
1581
        else
1582
                -- do remove task
1583
                delete = true
7✔
1584
        end
1585

1586
        xq:atomic(key,function()
20✔
1587
                if #update > 0 then
10✔
1588
                        t = self:update({key}, update)
9✔
1589
                        ---@cast t box.tuple
1590
                        xq:wakeup(t)
3✔
1591
                        if xq.have_runat then
3✔
1592
                                xq.runat_chan:put(true,0)
3✔
1593
                        end
1594
                        log.info("Ack: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
6✔
1595
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
6✔
1596
                end
1597

1598
                if delete then
10✔
1599
                        t = self:delete{key}
21✔
1600
                        log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
14✔
1601
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
14✔
1602
                end
1603
        end)
1604

1605
        xq:putback(t) -- in real drop form taken key
10✔
1606

1607
        return t
10✔
1608
end
1609

1610
---@param key table|scalar|box.tuple
1611
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1612
function methods:bury(key, attr)
1✔
1613
        attr = attr or {}
1✔
1614

1615
        local xq = self.xq
1✔
1616
        key = xq:getkey(key)
2✔
1617
        local t = xq:check_owner(key)
1✔
1618

1619
        local update = {}
1✔
1620
        if attr.update then
1✔
1621
                for _,v in pairs(attr.update) do table.insert(update,v) end
×
1622
        end
1623
        table.insert(update, { '=', xq.fields.status, 'B' })
1✔
1624

1625
        xq:atomic(key,function()
2✔
1626
                t = self:update({key}, update)
3✔
1627

1628
                xq:wakeup(t)
1✔
1629
                if xq.have_runat then
1✔
1630
                        xq.runat_chan:put(true,0)
×
1631
                end
1632
                log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())
2✔
1633
        end)
1634

1635
        xq:putback(t)
1✔
1636
end
1637

1638
local function kick_task(self, key, attr)
1639
        local xq   = self.xq
1✔
1640
        key  = xq:getkey(key)
2✔
1641
        local peer = box.session.storage.peer
2✔
1642
        local sid  = box.session.id()
1✔
1643
        attr       = attr or {}
1✔
1644

1645
        if xq.debug then
1✔
1646
                log.info("Kick {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
1✔
1647
        end
1648

1649
        self:update({key}, {{ '=', xq.fields.status, 'R' }})
2✔
1650
        xq:putback(key, attr)
1✔
1651
end
1652

1653
function methods:kick(nr_tasks_or_task, attr)
1✔
1654
        attr = attr or {}
1✔
1655
        if type(nr_tasks_or_task) == 'number' then
1✔
1656
                local task_n = 1
×
1657
                for _, t in self.xq.index:pairs({'B'},{ iterator = box.index.EQ }) do
×
1658
                        if task_n > nr_tasks_or_task then break end
×
1659
                        kick_task(self, t, attr)
×
1660
                        if task_n % 500 == 0 then fiber.sleep(0) end
×
1661
                        task_n = task_n + 1
×
1662
                end
1663
        else
1664
                kick_task(self, nr_tasks_or_task, attr)
1✔
1665
        end
1666
end
1667

1668
---@param key table|scalar|box.tuple
1669
---@return box.tuple|table
1670
function methods:kill(key)
1✔
1671
        local xq     = self.xq
×
NEW
1672
        key = xq:getkey(key)
×
1673
        local task   = self:get(key)
×
NEW
1674
        if not task then
×
NEW
1675
                error(("Task by %s not found to kill"):format(key))
×
1676
        end
1677
        local peer   = box.session.storage.peer
×
1678

1679
        if xq.debug then
×
1680
                log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id())
×
1681
        end
1682

NEW
1683
        task = self:delete(key)
×
1684
        ---@cast task -nil
1685
        ---Kill is treated as a synonim of Bury
NEW
1686
        task = task:update({{ '=', xq.fields.status, 'B' }})
×
NEW
1687
        xq:putback(task)
×
NEW
1688
        return xq.retwrap(task)
×
1689
end
1690

1691
-- special remap of truncate for deliting stats and saving methods
1692
function methods:truncate()
1✔
1693
        local stat = self.xq._stat
2✔
1694
        for status, _ in pairs(stat.counts) do
9✔
1695
                stat.counts[status] = 0LL
7✔
1696
        end
1697
        for transition, _ in pairs(stat.transition) do
16✔
1698
                stat.transition[transition] = nil
14✔
1699
        end
1700
        local ret = self.xq._default_truncate(self)
2✔
1701
        local meta = debug.getmetatable(self)
2✔
1702
        for k,v in pairs(methods) do meta[k] = v end
24✔
1703
        -- Now we reset our methods after truncation because
1704
        -- as we can see in on_replace_dd_truncate:
1705
        -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239
1706
        -- tarantool deletes space and restores it with the same indexes
1707
        -- but without methods
1708
        return ret
2✔
1709
end
1710

1711
local pretty_st = {
1✔
1712
        R = "Ready",
1713
        T = "Taken",
1714
        W = "Waiting",
1715
        B = "Buried",
1716
        Z = "Zombie",
1717
        D = "Done",
1718
}
1719

1720
---@param pretty? boolean
1721
function methods:stats(pretty)
1✔
1722
        local stats = table.deepcopy(self.xq._stat)
1✔
1723
                for s, ps in pairs(pretty_st) do
7✔
1724
                        if pretty then
6✔
1725
                                stats.counts[ps] = stats.counts[s] or 0LL
×
1726
                                stats.counts[s]  = nil
×
NEW
1727
                                for _, tube_stat in pairs(stats.tube) do
×
NEW
1728
                                        tube_stat.counts[ps] = tube_stat.counts[s] or 0LL
×
NEW
1729
                                        tube_stat.counts[s] = nil
×
1730
                                end
1731
                        else
1732
                                stats.counts[s] = stats.counts[s] or 0LL
6✔
1733
                                for _, tube_stat in pairs(stats.tube) do
6✔
NEW
1734
                                        tube_stat.counts[s] = tube_stat.counts[s] or 0LL
×
1735
                                end
1736
                        end
1737
                end
1738
        return stats
1✔
1739
end
1740

1741
setmetatable(M,{
2✔
1742
        __call = function(_, space, opts)
1743
                M.upgrade(space,opts,1)
×
1744
        end
1745
})
1746

1747
return M
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc