• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / xqueue / 10019924251

20 Jul 2024 11:43AM UTC coverage: 60.571%. First build
10019924251

Pull #19

github

Vladislav Grubov
Merge branch 'origin/rw-survival' into tube-stats
Pull Request #19: Implements per tube statistics

36 of 86 new or added lines in 1 file covered. (41.86%)

530 of 875 relevant lines covered (60.57%)

8.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.57
/xqueue.lua
1
local M = {}
1✔
2

3
local ffi = require 'ffi'
1✔
4
local log = require 'log'
1✔
5
local fiber = require 'fiber'
1✔
6
local clock = require 'clock'
1✔
7

8
local tuple_ctype = ffi.typeof(box.tuple.new())
1✔
9

10
local monotonic_max_age = 10*365*86400;
1✔
11

12
local function table_clear(t)
13
        if type(t) ~= 'table' then
20✔
14
                error("bad argument #1 to 'clear' (table expected, got "..(t ~= nil and type(t) or 'no value')..")",2)
×
15
        end
16
        local count = #t
20✔
17
        for i=0, count do t[i]=nil end
23✔
18
        return
20✔
19
end
20

21
local function is_array(t)
22
        local gen,param,state = ipairs(t)
40✔
23
        -- print(gen,param,state)
24
        local v = gen(param,state)
40✔
25
        -- print(v)
26
        return v ~= nil
40✔
27
end
28

29
local peers = {}
1✔
30

31
rawset(_G,"\0xq.on_connect",box.session.on_connect(function()
2✔
32
        local sid = box.session.id()
×
33
        local peer = box.session.peer()
×
34
        box.session.storage.peer = box.session.peer()
×
35
        peers[ sid ] = peer
×
36
        log.info("connected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
37
end,rawget(_G,"\0xq.on_connect")))
1✔
38

39
rawset(_G,"\0xq.on_disconnect",box.session.on_disconnect(function()
2✔
40
        local sid = box.session.id()
×
41
        local peer = peers[ sid ]
×
42
        peers[ sid ] = nil
×
43
        log.info("disconnected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
×
44
end,rawget(_G,"\0xq.on_disconnect")))
1✔
45

46

47
--[[
48

49
Field sets:
50
1. id, status (minimal required configuration)
51
2. id, status, priority
52
3. id, status, runat
53
4. id, status, priority, runat
54

55
Primary index variants:
56
1. index(status, [ id ])
57
2. index(status, priority, [ id ])
58

59

60
Format:
61
local format = box.space._space.index.name:get(space_name)[ 7 ]
62

63
Status:
64
        R - ready - task is ready to be taken
65
                created by put without delay
66
                turned on by release/kick without delay
67
                turned on from W when delay passed
68
                deleted after ttl if ttl is enabled
69

70
        T - taken - task is taken by consumer. may not be taken by other.
71
                turned into R after ttr if ttr is enabled
72

73
        W - waiting - task is not ready to be taken. waiting for its `delay`
74
                requires `runat`
75
                `delay` may be set during put, release, kick
76
                turned into R after delay
77

78
        B - buried - task was temporary discarded from queue by consumer
79
                may be revived using kick by administrator
80
                use it in unpredicted conditions, when man intervention is required
81
                Without mandatory monitoring (stats.buried) usage of buried is useless and awry
82

83
        Z - zombie - task was processed and ack'ed and *temporary* kept for delay
84

85
        D - done - task was processed and ack'ed and permanently left in database
86
                enabled when keep feature is set
87

88
        X - reserved for statistics
89

90
(TODO: reload/upgrade and feature switch)
91

92
Interface:
93

94
# Creator methods:
95

96
        M.upgrade(space, {
97
                format = {
98
                        -- space format. applied to space.format() if passed
99
                },
100
                fields = {
101
                        -- id is always taken from pk
102
                        status   = 'status_field_name'    | status_field_no,
103
                        runat    = 'runat_field_name'     | runat_field_no,
104
                        priority = 'priority_field_name'  | priority_field_no,
105
                },
106
                features = {
107
                        id = 'auto_increment' | 'uuid' | 'required' | function
108
                                -- auto_increment - if pk is number, then use it for auto_increment
109
                                -- uuid - if pk is string, then use uuid for id
110
                                -- required - primary key MUST be present in tuple during put
111
                                -- function - funciton will be called to aquire id for task
112

113
                        buried = true,           -- if true, support bury/kick
114
                        delayed = true,          -- if true, support delayed tasks, requires `runat`
115

116
                        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
117
                                -- mutually exclusive with zombie
118

119
                        zombie = true|number,    -- requires `runat` field
120
                                -- if number, then with default zombie delay, otherwise only if set delay during ack
121
                                -- mutually exclusive with keep
122

123
                        ttl    = true|number,    -- requires `runat` field
124
                                -- if number, then with default ttl, otherwise only if set during put/release
125
                        ttr    = true|number,    -- requires `runat` field
126
                                -- if number, then with default ttl, otherwise only if set
127
                },
128
        })
129

130
Producer methods:
131
        sp:put({...}, [ attr ]) (array or table (if have format) or tuple)
132

133
Consumer methods:
134

135
* id:
136
        - array of keyfields of pk
137
        - table of named keyields
138
        - tuple
139

140
sp:take(timeout) -> tuple
141
sp:ack(id, [ attr ])
142
        attr.update (only if support zombie)
143
sp:release(id, [ attr ])
144
        attr.ttr (only if support ttr)
145
        attr.ttl (only if support ttl)
146
        attr.update
147
sp:bury(id, [ attr ])
148
        attr.ttl (only if support ttl)
149
        attr.update
150

151
Admin methods:
152

153
sp:queue_stats()
154
sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]eady
155

156

157
]]
158

159
local json = require 'json'
1✔
160
json.cfg{ encode_invalid_as_nil = true }
1✔
161

162

163
local function typeeq(src, ref)
164
        if ref == 'str' then
14✔
165
                return src == 'STR' or src == 'str' or src == 'string'
8✔
166
        elseif ref == 'num' then
6✔
167
                return src == 'NUM' or src == 'num' or src == 'number' or src == 'unsigned'
6✔
168
        else
169
                return src == ref
×
170
        end
171
end
172

173
local function is_eq(a, b, depth)
174
        local t = type(a)
×
175
        if t ~= type(b) then return false end
×
176
        if t == 'table' then
×
177
                for k,v in pairs(a) do
×
178
                        if b[k] == nil then return end
×
179
                        if not is_eq(v,b[k]) then return false end
×
180
                end
181
                for k,v in pairs(b) do
×
182
                        if a[k] == nil then return end
×
183
                        if not is_eq(v,a[k]) then return false end
×
184
                end
185
                return true
×
186
        elseif t == 'string' or t == 'number' then
×
187
                return a == b
×
188
        elseif t == 'cdata' then
×
189
                return ffi.typeof(a) == ffi.typeof(b) and a == b
×
190
        else
191
                error("Wrong types for equality", depth or 2)
×
192
        end
193
end
194

195

196
local function _tuple2table ( qformat )
197
        local rows = {}
3✔
198
        for k,v in ipairs(qformat) do
15✔
199
                table.insert(rows,"\t['"..v.name.."'] = t["..tostring(k).."];\n")
12✔
200
        end
201
        local fun = "return function(t,...) "..
3✔
202
                "if select('#',...) > 0 then error('excess args',2) end "..
3✔
203
                "return t and {\n"..table.concat(rows, "").."} or nil end\n"
3✔
204
        return dostring(fun)
3✔
205
end
206

207
local function _table2tuple ( qformat )
208
        local rows = {}
3✔
209
        for _,v in ipairs(qformat) do
15✔
210
                table.insert(rows,"\tt['"..v.name.."'] == nil and NULL or t['"..v.name.."'];\n")
12✔
211
        end
212
        local fun = "local NULL = require'msgpack'.NULL return function(t) return "..
3✔
213
                "t and box.tuple.new({\n"..table.concat(rows, "").."}) or nil end\n"
3✔
214
        -- print(fun)
215
        return dostring(fun)
3✔
216
end
217

218
---@class xqueue.space
219
local methods = {}
1✔
220

221
---@class PrimaryKeyField:table
222
---@field no number position in tuple
223
---@field name string name of the field
224
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
225

226
---@class xqFeatures: table
227
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
228
---@field retval "tuple" | "table"
229
---@field buried boolean
230
---@field delayed boolean
231
---@field keep boolean
232
---@field tube boolean
233
---@field zombie boolean|number
234
---@field zombie_delay number
235
---@field ttl boolean|number
236
---@field ttr boolean|number
237
---@field ttl_default number?
238
---@field ttr_default number?
239

240
---@class xq:table
241
---@field NEVER integer (Default: 0)
242
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
243
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
244
---@field taken table<string,number> mapping key => sid
245
---@field _lock table<string,boolean> locks key => boolean (in atomic)
246
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
247
---@field take_wait fiber.channel
248
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
249
---@field debug boolean
250
---@field have_runat boolean
251
---@field gen_id? fun(): scalar
252
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
253
---@field packkey fun(self: xq, key: any): string
254
---@field tube_index? boxIndex
255
---@field index boxIndex
256
---@field key PrimaryKeyField
257
---@field fieldmap table<string,number>
258
---@field timeoffset fun(delta: number): number
259
---@field features xqFeatures
260
---@field fields table<string, string|number>
261
---@field tuple fun(tbl: table): box.tuple
262
---@field table fun(tuple: box.tuple): table
263
---@field retwrap fun(t: box.tuple|table): table|box.tuple
264
---@field wakeup fun(self: xq, t: box.tuple|table)
265
---@field runat_chan fiber.channel
266
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
267
---@field put_back fun(key: table|box.tuple)
268
---@field _stat { counts: table, transition: table }
269
---@field putback fun(self: xq, task: table|box.tuple)
270
---@field _default_truncate fun(space: boxSpaceObject)
271
---@field _on_repl replaceTrigger
272
---@field _on_dis fun()
273

274

275
---@class xqueue.space: boxSpaceObject
276
---@field xq xq xqueue specific storage
277

278
---@class xqueue.fields
279
---@field status? string|number Xqueue name for the Status field
280
---@field runat? string|number Xqueue name for the RunAt field
281
---@field priority? string|number Xqueue name for Task priority field
282
---@field tube? string|number Xqueue name for Tube field
283

284
---@class xqueue.features
285
---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation
286
---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods.
287
---(Default: table when space with format, tuple when space is without format)
288
---@field buried? boolean should xqueue allow buring of the tasks (by default: true)
289
---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not
290
---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false)
291
---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep
292
---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present)
293
---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default.
294
---Requires runat field and index.
295
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
296
---Requires runat field and index.
297

298
---@class xqueue.upgrade.options
299
---@field format? boxSpaceFormat
300
---@field fields xqueue.fields
301
---@field debug? boolean
302
---@field tube_stats? string[] List of tube names for per-tube statistics
303
---@field features xqueue.features
304
---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback
305
---@field workers? number (number of workers to spawn)
306

307
---Upgrades given space to xqueue instance
308
---@param space xqueue.space
309
---@param opts xqueue.upgrade.options
310
---@param depth? number
311
function M.upgrade(space,opts,depth)
1✔
312
        depth = depth or 0
3✔
313
        log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
3✔
314
        if not opts.fields then error("opts.fields required",2) end
3✔
315
        if opts.format then
3✔
316
                -- todo: check if already have such format
317
                local format_av = box.space._space.index.name:get(space.name)[ 7 ]
×
318
                if not is_eq(format_av, opts.format) then
×
319
                        space:format(opts.format)
×
320
                else
321
                        print("formats are equal")
×
322
                end
323
        end
324

325
        -- This variable will be defined later
326
        local taken_mt
327

328
        local self = {}
3✔
329
        if space.xq then
3✔
330
                self.taken = space.xq.taken
×
331
                self._stat = space.xq._stat
×
332
                self.bysid = space.xq.bysid
×
333
                self._lock = space.xq._lock
×
334
                self.take_wait = space.xq.take_wait
×
335
                self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
×
336
                self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
×
337
                self._on_repl  = space.xq._on_repl
×
338
                self._on_dis = space.xq._on_dis
×
339
        else
340
                self.taken = {}
3✔
341
                self.bysid = {}
3✔
342
                -- byfid = {};
343
                self._lock = {}
3✔
344
                self.put_wait = setmetatable({}, { __mode = 'v' })
3✔
345
                self.take_wait = fiber.channel(0)
3✔
346
                self.take_chans = setmetatable({}, { __mode = 'v' })
3✔
347
        end
348
        setmetatable(self.bysid, {
6✔
349
                __serialize='map',
350
                __newindex = function(t, key, val)
351
                        if type(val) == 'table' then
4✔
352
                                rawset(t, key, setmetatable(val, taken_mt))
4✔
353
                        else
NEW
354
                                rawset(t, key, val)
×
355
                        end
356
                end
357
        })
358
        self.debug = not not opts.debug
3✔
359

360
        if not self._default_truncate then
3✔
361
                self._default_truncate = space.truncate
3✔
362
        end
363

364
        local format_av = box.space._space.index.name:get(space.name)[ 7 ]
6✔
365
        local format = {}
3✔
366
        local have_format = false
3✔
367
        local have_runat = false
3✔
368
        for no,f in pairs(format_av) do
15✔
369
                format[ f.name ] = {
12✔
370
                        name = f.name;
12✔
371
                        type = f.type;
12✔
372
                        no   = no;
12✔
373
                }
12✔
374
                format[ no ] = format[ f.name ];
12✔
375
                have_format = true
12✔
376
                self.have_format = true
12✔
377
        end
378
        for _,idx in pairs(space.index) do
21✔
379
                for _,part in pairs(idx.parts) do
50✔
380
                        format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
32✔
381
                        format[ part.fieldno ].type = part.type
32✔
382
                end
383
        end
384

385
        -- dd(format)
386

387
        -- 1. fields check
388
        local fields = {}
3✔
389
        local fieldmap = {}
3✔
390
        for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
15✔
391
                local fname,ftype = f[1], f[2]
12✔
392
                local num = opts.fields[fname]
12✔
393
                if num then
12✔
394
                        if type(num) == 'string' then
6✔
395
                                if format[num] then
6✔
396
                                        fields[fname] = format[num].no;
6✔
397
                                else
398
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
399
                                end
400
                        elseif type(num) == 'number' then
×
401
                                if format[num] then
×
402
                                        fields[fname] = num
×
403
                                else
404
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
405
                                end
406
                        else
407
                                error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
×
408
                        end
409
                        -- check type
410
                        if format[num] then
6✔
411
                                if not typeeq(format[num].type, ftype) then
12✔
412
                                        error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
×
413
                                end
414
                        end
415
                        fieldmap[fname] = format[num].name
6✔
416
                end
417
        end
418

419
        -- dd(fields)
420

421
        -- 2. index check
422

423
        local pk = space.index[0]
3✔
424
        if #pk.parts ~= 1 then
3✔
425
                error("Composite primary keys are not supported yet")
×
426
        end
427
        local pktype
428
        if typeeq( format[pk.parts[1].fieldno].type, 'str' ) then
6✔
429
                pktype = 'string'
1✔
430
                taken_mt = { __serialize = 'map' }
1✔
431
        elseif typeeq( format[pk.parts[1].fieldno].type, 'num' ) then
4✔
432
                pktype = 'number'
2✔
433
                taken_mt = {
2✔
434
                        __serialize = 'map',
435
                        __newindex = function(t, key, val)
436
                                return rawset(t, tostring(ffi.cast("uint64_t", key)), val)
80✔
437
                        end,
438
                        __index = function(t, key)
439
                                return rawget(t, tostring(ffi.cast("uint64_t", key)))
60✔
440
                        end
441
                }
2✔
442
        else
443
                error("Unknown key type "..format[pk.parts[1].fieldno].type)
×
444
        end
445
        setmetatable(self.taken, taken_mt)
3✔
446

447
        local pkf = {
3✔
448
                no   = pk.parts[1].fieldno;
3✔
449
                name = format[pk.parts[1].fieldno].name;
3✔
450
                ['type'] = pktype;
3✔
451
        }
452
        self.key = pkf
3✔
453
        self.fields = fields
3✔
454
        self.fieldmap = fieldmap
3✔
455

456

457
        function self:getkey(arg)
3✔
458
                local _type = type(arg)
25✔
459
                if _type == 'table' then
25✔
460
                        if is_array(arg) then
50✔
461
                                return arg[ self.key.no ]
×
462
                        else
463
                                return arg[ self.key.name ]
25✔
464
                                -- pass
465
                        end
466
                elseif _type == 'cdata' and ffi.typeof(arg) == tuple_ctype then
×
467
                        return arg[ self.key.no ]
×
468
                elseif _type == self.key.type then
×
469
                        return arg
×
470
                else
471
                        error("Wrong key/task argument. Expected table or tuple or key", 3)
×
472
                end
473
        end
474

475
        function self.packkey(_, key)
3✔
476
                if type(key) == 'cdata' then
25✔
477
                        return tostring(ffi.cast("uint64_t", key))
20✔
478
                else
479
                        return key
5✔
480
                end
481
        end
482

483
        do
484
                local filter
485
                if fields.priority then
3✔
486
                        filter = function(index)
487
                                return #index.parts >= 3
×
488
                                        and index.parts[1].fieldno == fields.status
×
489
                                        and index.parts[2].fieldno == fields.priority
×
490
                                        and index.parts[3].fieldno == self.key.no
×
491
                        end
492
                else
493
                        filter = function(index)
494
                                return #index.parts >= 2
7✔
495
                                        and index.parts[1].fieldno == fields.status
4✔
496
                                        and index.parts[2].fieldno == self.key.no
7✔
497
                        end
498
                end
499
                for i,index in pairs(space.index) do
7✔
500
                        if type(i) == 'number' and filter(index) then
14✔
501
                                self.index = index
3✔
502
                                break
3✔
503
                        end
504
                end
505
                if not self.index then
3✔
506
                        if fields.priority then
×
507
                                error("not found index by status + priority + id",2+depth)
×
508
                        else
509
                                error("not found index by status + id",2+depth)
×
510
                        end
511
                end
512

513
                if fields.tube then
3✔
514
                        for n,index in pairs(space.index) do
2✔
515
                                if type(n) == 'number' and index.parts[1].fieldno == fields.tube then
2✔
516
                                        local not_match = false
1✔
517
                                        for i = 2, #index.parts do
3✔
518
                                                if index.parts[i].fieldno ~= self.index.parts[i-1].fieldno then
2✔
519
                                                        not_match = true
×
520
                                                        break
521
                                                end
522
                                        end
523
                                        if not not_match then
1✔
524
                                                self.tube_index = index
1✔
525
                                                break
1✔
526
                                        end
527
                                end
528
                        end
529
                        if not self.tube_index then
1✔
530
                                if fields.priority then
×
531
                                        error("not found index by tube + status + priority + id",2+depth)
×
532
                                else
533
                                        error("not found index by tube + status + id",2+depth)
×
534
                                end
535
                        end
536
                end
537
        end
538

539
        ---@type boxIndex
540
        local runat_index
541
        if fields.runat then
3✔
542
                for _,index in pairs(space.index) do
7✔
543
                        if type(_) == 'number' then
7✔
544
                                if index.parts[1].fieldno == fields.runat then
7✔
545
                                        -- print("found",index.name)
546
                                        runat_index = index
2✔
547
                                        break
2✔
548
                                end
549
                        end
550
                end
551
                if not runat_index then
2✔
552
                        error(string.format("fields.runat requires tree index with this first field in it"),2+depth)
×
553
                else
554
                        local t = runat_index:pairs({0},{iterator = box.index.GT}):nth(1)
4✔
555
                        if t then
2✔
556
                                if t[ self.fields.runat ] < monotonic_max_age then
×
557
                                        error("!!! Queue contains monotonic runat. Consider updating tasks (https://github.com/moonlibs/xqueue/issues/2)")
×
558
                                end
559
                        end
560
                        have_runat = true
2✔
561
                end
562
        end
563
        self.have_runat = have_runat
3✔
564

565
        ---@type table<string, { counts: {}, transition: {} }>
566
        local stat_tube = {}
3✔
567
        if self.fields.tube and type(opts.tube_stats) == 'table' then
3✔
NEW
568
                for _, tube_name in ipairs(opts.tube_stats) do
×
NEW
569
                        if type(tube_name) == 'string' then
×
NEW
570
                                stat_tube[tube_name] = {
×
571
                                        counts = {},
572
                                        transition = {},
573
                                }
574
                        end
575
                end
576
        end
577

578
        if not self._stat then
3✔
579
                self._stat = {
3✔
580
                        counts = {};
3✔
581
                        transition = {};
3✔
582
                        tube = stat_tube;
3✔
583
                }
3✔
584
                -- TODO: benchmark index:count()
585
                if self.fields.tube then
3✔
586
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
3✔
NEW
587
                                local s = t[self.fields.status]
×
NEW
588
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
589

NEW
590
                                local tube = t[self.fields.tube]
×
NEW
591
                                if stat_tube[tube] then
×
NEW
592
                                        stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1
×
593
                                end
594
                        end
595
                else
596
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
6✔
NEW
597
                                local s = t[self.fields.status]
×
NEW
598
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
599
                        end
600
                end
601
        end
602

603
        -- 3. features check
604

605
        local features = {}
3✔
606

607
        local gen_id
608
        opts.features = opts.features or {}
3✔
609
        if not opts.features.id then
3✔
610
                opts.features.id = 'uuid'
×
611
        end
612
        -- 'auto_increment' | 'time64' | 'uuid' | 'required' | function
613
        if opts.features.id == 'auto_increment' then
3✔
614
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
×
615
                        error("For auto_increment numeric pk is mandatory",2+depth)
×
616
                end
617
                local pk_fno = pk.parts[1].fieldno
×
618
                gen_id = function()
619
                        local max = pk:max()
×
620
                        if max then
×
621
                                return max[pk_fno] + 1
×
622
                        else
623
                                return 1
×
624
                        end
625
                end
626
        elseif opts.features.id == 'time64' then
3✔
627
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
4✔
628
                        error("For time64 numeric pk is mandatory",2+depth)
×
629
                end
630
                gen_id = function()
631
                        local key = clock.realtime64()
13✔
632
                        while true do
633
                                local exists = pk:get(key)
13✔
634
                                if not exists then
13✔
635
                                        return key
13✔
636
                                end
637
                                key = key + 1
×
638
                        end
639
                end
640
        elseif opts.features.id == 'uuid' then
1✔
641
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'str')) then
2✔
642
                        error("For uuid string pk is mandatory",2+depth)
×
643
                end
644
                local uuid = require'uuid'
1✔
645
                gen_id = function()
646
                        while true do
647
                                local key = uuid.str()
2✔
648
                                local exists = pk:get(key)
2✔
649
                                if not exists then
2✔
650
                                        return key
2✔
651
                                end
652
                        end
653
                end
654
        elseif opts.features.id == 'required' then
×
655
                -- gen_id = function()
656
                --         -- ???
657
                --         error("TODO")
658
                -- end
659
        elseif type(opts.features.id) == 'function'
×
660
                or (debug.getmetatable(opts.features.id)
×
661
                and debug.getmetatable(opts.features.id).__call)
×
662
        then
663
                gen_id = opts.features.id
×
664
        else
665
                error(string.format(
×
666
                        "Wrong type for features.id %s, may be 'auto_increment' | 'time64' | 'uuid' | 'required' | function",
667
                        opts.features.id
×
668
                ), 2+depth)
×
669
        end
670

671
        if not opts.features.retval then
3✔
672
                opts.features.retval = have_format and 'table' or 'tuple'
3✔
673
        end
674

675
        if format then
3✔
676
                self.tuple = _table2tuple(format)
6✔
677
                self.table = _tuple2table(format)
6✔
678
        end
679

680
        if opts.features.retval == 'table' then
3✔
681
                self.retwrap = self.table
3✔
682
        else
683
                self.retwrap = function(t) return t end
×
684
        end
685

686
        features.buried = not not opts.features.buried
3✔
687
        features.keep   = not not opts.features.keep
3✔
688
        if opts.features.delayed then
3✔
689
                if not have_runat then
2✔
690
                        error(string.format("Delayed feature requires runat field and index" ),2+depth)
×
691
                end
692
                features.delayed = true
2✔
693
        else
694
                features.delayed = false
1✔
695
        end
696

697
        if opts.features.zombie then
3✔
698
                if features.keep then
1✔
699
                        error(string.format("features keep and zombie are mutually exclusive" ),2+depth)
×
700
                end
701
                if not have_runat then
1✔
702
                        error(string.format("feature zombie requires runat field and index" ),2+depth)
×
703
                end
704

705
                features.zombie = true
1✔
706
                if type(opts.features.zombie) == 'number' then
1✔
707
                        features.zombie_delay = opts.features.zombie
×
708
                end
709
        else
710
                features.zombie = false
2✔
711
        end
712

713
        if opts.features.ttl then
3✔
714
                if not have_runat then
×
715
                        error(string.format("feature ttl requires runat field and index" ),2+depth)
×
716
                end
717

718
                features.ttl = true
×
719
                if type(opts.features.ttl) == 'number' then
×
720
                        features.ttl_default = opts.features.ttl
×
721
                end
722
        else
723
                features.ttl = false
3✔
724
        end
725

726
        if opts.features.ttr then
3✔
727
                if not have_runat then
×
728
                        error(string.format("feature ttr requires runat field and index" ),2+depth)
×
729
                end
730

731
                features.ttr = true
×
732
                if type(opts.features.ttr) == 'number' then
×
733
                        features.ttr_default = opts.features.ttr
×
734
                end
735
        else
736
                features.ttr = false
3✔
737
        end
738

739
        if fields.tube then
3✔
740
                features.tube = true
1✔
741
        end
742

743
        self.gen_id = gen_id
3✔
744
        self.features = features
3✔
745
        self.space = space.id
3✔
746

747
        function self.timeoffset(delta)
3✔
748
                delta = tonumber(delta) or 0
8✔
749
                return clock.realtime() + delta
8✔
750
        end
751
        function self.timeready(time)
3✔
752
                return time < clock.realtime()
11✔
753
        end
754
        function self.timeremaining(time)
3✔
755
                return time - clock.realtime()
8✔
756
        end
757
        -- self.NEVER = -1ULL
758
        self.NEVER = 0
3✔
759

760
        function self.keyfield(_,t)
3✔
761
                return t[pkf.no]
12✔
762
        end
763
        function self.keypack(_,t)
3✔
764
                return t[pkf.no]
×
765
        end
766

767
        -- Notify producer if it is still waiting us.
768
        -- Producer waits only for successfully processed task
769
        -- or for task which would never be processed.
770
        -- Discarding to process task depends on consumer's logic.
771
        -- Also task can be deleted from space by exceeding TTL.
772
        -- Producer should not be notified if queue is able to process task in future.
773
        local function notify_producer(key, task)
774
                local pkey = self:packkey(key)
25✔
775
                local wait = self.put_wait[pkey]
25✔
776
                if not wait then
25✔
777
                        -- no producer
778
                        return
25✔
779
                end
780

781
                if task.status == 'Z' or task.status == 'D' then
×
782
                        -- task was processed
783
                        wait.task = task
×
784
                        wait.processed = true
×
785
                        wait.cond:broadcast()
×
786
                        return
×
787
                end
788
                if not space:get{key} then
×
789
                        -- task is not present in space
790
                        -- it could be TTL or :ack
791
                        if task.status == 'T' then
×
792
                                -- task was acked:
793
                                wait.task = task
×
794
                                wait.processed = true
×
795
                                wait.cond:broadcast()
×
796
                        elseif task.status == 'R' then
×
797
                                -- task was killed by TTL
798
                                wait.task = task
×
799
                                wait.processed = false
×
800
                                wait.cond:broadcast()
×
801
                        end
802
                else
803
                        -- task is still in space
804
                        if task.status == 'B' then
×
805
                                -- task was buried
806
                                wait.task = task
×
807
                                wait.processed = false
×
808
                                wait.cond:broadcast()
×
809
                        end
810
                end
811
        end
812

813
        self.ready = fiber.channel(0)
3✔
814

815
        local function rw_fiber_f(func, ...)
816
                local xq = self
2✔
817
                repeat
818
                        if box.info.ro then
2✔
NEW
819
                                log.verbose("awaiting rw")
×
820
                                repeat
NEW
821
                                        if box.ctl.wait_rw then
×
NEW
822
                                                box.ctl.wait_rw(1)
×
823
                                        else
NEW
824
                                                fiber.sleep(0.001)
×
825
                                        end
NEW
826
                                until not box.info.ro
×
827
                        end
828

829
                        local ok, err = pcall(func, ...)
2✔
NEW
830
                        if not ok then
×
NEW
831
                                log.error("%s", err)
×
832
                        end
NEW
833
                        fiber.testcancel()
×
NEW
834
                until (not box.space[space.name]) or space.xq ~= xq
×
835
        end
836

837
        if opts.worker then
3✔
838
                local workers = opts.workers or 1
×
839
                local worker = opts.worker
×
840
                for i = 1,workers do
×
NEW
841
                        fiber.create(rw_fiber_f, function(space,xq)
×
842
                                local fname = space.name .. '.xq.wrk' .. tostring(i)
×
843
                                ---@diagnostic disable-next-line: undefined-field
844
                                if package.reload then fname = fname .. '.' .. package.reload.count end
×
845
                                fiber.name(string.sub(fname,1,32))
×
846
                                repeat fiber.sleep(0.001) until space.xq
×
847
                                if xq.ready then xq.ready:get() end
×
848
                                log.info("I am worker %s",i)
×
849
                                if box.info.ro then
×
NEW
850
                                        log.info("Shutting down on ro instance")
×
851
                                        return
×
852
                                end
853
                                while box.space[space.name] and space.xq == xq do
×
854
                                        local task = space:take(1)
×
855
                                        if task then
×
856
                                                local key = xq:getkey(task)
×
857
                                                local r,e = pcall(worker,task)
×
858
                                                if not r then
×
859
                                                        log.error("Worker for {%s} has error: %s", key, e)
×
860
                                                else
861
                                                        if xq.taken[ key ] then
×
862
                                                                space:ack(task)
×
863
                                                        end
864
                                                end
865
                                                if xq.taken[ key ] then
×
866
                                                        log.error("Worker for {%s} not released task", key)
×
867
                                                        space:release(task)
×
868
                                                end
869
                                        end
870
                                        fiber.yield()
×
871
                                end
872
                                log.info("worker %s ended", i)
×
NEW
873
                        end,space,self)
×
874
                end
875
        end
876

877
        if have_runat then
3✔
878
                self.runat_chan = fiber.channel(0)
2✔
879
                self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
4✔
880
                        local fname = space.name .. '.xq'
2✔
881
                        if package.reload then fname = fname .. '.' .. package.reload.count end
2✔
882
                        fiber.name(string.sub(fname,1,32))
4✔
883
                        repeat fiber.sleep(0.001) until space.xq
2✔
884
                        if xq.ready then xq.ready:get() end
2✔
885
                        local chan = xq.runat_chan
2✔
886
                        log.info("Runat started")
2✔
887
                        if box.info.ro then
2✔
NEW
888
                                log.info("Shutting down on ro instance")
×
889
                                return
×
890
                        end
891
                        local maxrun = 1000
2✔
892
                        local curwait
893
                        local collect = {}
2✔
894
                        while box.space[space.name] and space.xq == xq do
20✔
895
                                local r,e = pcall(function()
40✔
896
                                        -- print("runat loop 2 ",box.time64())
897
                                        local remaining
898
                                        for _,t in runat_index:pairs({0},{iterator = box.index.GT}) do
66✔
899

900
                                                -- print("checking ",t)
901
                                                if xq.timeready( t[ xq.fields.runat ] ) then
33✔
902
                                                        table.insert(collect,t)
3✔
903
                                                else
904
                                                        remaining = xq.timeremaining(t[ xq.fields.runat ])
24✔
905
                                                        break
8✔
906
                                                end
907
                                                if #collect >= maxrun then remaining = 0 break end
3✔
908
                                        end
909

910
                                        for _,t in ipairs(collect) do
23✔
911
                                                -- log.info("Runat: %s, %s", _, t)
912
                                                if t[xq.fields.status] == 'W' then
6✔
913
                                                        log.info("Runat: W->R %s",xq:keyfield(t))
4✔
914
                                                        -- TODO: default ttl?
915
                                                        local u = space:update({ xq:keyfield(t) },{
8✔
916
                                                                { '=',xq.fields.status,'R' },
2✔
917
                                                                { '=',xq.fields.runat, xq.NEVER }
2✔
918
                                                        })
919
                                                        xq:wakeup(u)
4✔
920
                                                elseif t[xq.fields.status] == 'R' and xq.features.ttl then
2✔
921
                                                        local key = xq:keyfield(t)
×
922
                                                        log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
×
923
                                                        t = space:delete{key}
×
924
                                                        notify_producer(key, t)
×
925
                                                elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
2✔
926
                                                        log.info("Runat: Kill Zombie %s",xq:keyfield(t))
2✔
927
                                                        space:delete{ xq:keyfield(t) }
4✔
928
                                                elseif t[xq.fields.status] == 'T' and xq.features.ttr then
×
929
                                                        local key = xq:keypack(t)
×
930
                                                        local sid = xq.taken[ key ]
×
931
                                                        local peer = peers[sid] or sid
×
932

933
                                                        log.info("Runat: autorelease T->R by ttr %s taken by %s",xq:keyfield(t), peer)
×
934
                                                        local u = space:update({ xq:keyfield(t) },{
×
935
                                                                { '=',xq.fields.status,'R' },
×
936
                                                                { '=',xq.fields.runat, xq.NEVER }
×
937
                                                        })
938
                                                        xq.taken[ key ] = nil
×
939
                                                        if sid then
×
940
                                                                self.bysid[ sid ][ key ] = nil
×
941
                                                        end
942
                                                        xq:wakeup(u)
×
943
                                                else
944
                                                        log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
×
945
                                                        space:update({ xq:keyfield(t) },{
×
946
                                                                { '=',xq.fields.runat, xq.NEVER }
×
947
                                                        })
948
                                                end
949
                                        end
950

951
                                        if remaining then
20✔
952
                                                if remaining >= 0 and remaining < 1 then
8✔
953
                                                        return remaining
8✔
954
                                                end
955
                                        end
956
                                        return 1
12✔
957
                                end)
958

959
                                table_clear(collect)
20✔
960

961
                                if r then
20✔
962
                                        curwait = e
20✔
963
                                else
964
                                        curwait = 1
×
965
                                        log.error("Runat/ERR: %s",e)
×
966
                                end
967
                                -- log.info("Wait %0.2fs",curwait)
968
                                if curwait == 0 then fiber.sleep(0) end
20✔
969
                                chan:get(curwait)
20✔
970
                        end
971
                        log.info("Runat ended")
×
972
                end,space,self,runat_index)
4✔
973
        end
974

975
        local function atomic_tail(self, key, status, ...)
976
                self._lock[key] = nil
39✔
977
                if not status then
39✔
978
                        error((...), 3)
×
979
                end
980
                return ...
39✔
981
        end
982
        function self:atomic(key, fun, ...)
3✔
983
                self._lock[key] = true
39✔
984
                return atomic_tail(self, key, pcall(fun, ...))
78✔
985
        end
986

987
        function self:check_owner(key)
3✔
988
                local t = space:get(key)
48✔
989
                if not t then
24✔
990
                        error(string.format( "Task {%s} was not found", key ),2)
×
991
                end
992
                if not self.taken[key] then
44✔
993
                        error(string.format( "Task %s not taken by any", key ),2)
×
994
                end
995
                if self.taken[key] ~= box.session.id() then
44✔
996
                        error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2)
×
997
                end
998
                return t
24✔
999
        end
1000

1001
        function self:putback(task)
3✔
1002
                local key = task[ self.key.no ]
25✔
1003
                local sid = self.taken[ key ]
25✔
1004
                if sid then
25✔
1005
                        self.taken[ key ] = nil
24✔
1006
                        if self.bysid[ sid ] then
24✔
1007
                                self.bysid[ sid ][ key ] = nil
44✔
1008
                        else
1009
                                log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
×
1010
                        end
1011
                else
1012
                        log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
1✔
1013
                end
1014

1015
                notify_producer(key, task)
25✔
1016
        end
1017

1018
        function self:wakeup(t)
3✔
1019
                if t[self.fieldmap.status] ~= 'R' then return end
68✔
1020
                if self.fieldmap.tube then
23✔
1021
                        -- we may have consumers in the tubes:
1022
                        local tube_chan = self.take_chans[t[self.fieldmap.tube]]
30✔
1023
                        if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
15✔
1024
                                -- we have successfully notified consumer
1025
                                return
×
1026
                        end
1027
                        -- otherwise fallback to default channel:
1028
                end
1029
                if self.take_wait:has_readers() then
23✔
1030
                        self.take_wait:put(true,0)
3✔
1031
                end
1032
        end
1033

1034
        function self:make_ready()
3✔
1035
                while self.ready and self.ready:has_readers() do
3✔
1036
                        self.ready:put(true,0)
×
1037
                        fiber.sleep(0)
×
1038
                end
1039
                self.ready = nil
3✔
1040
        end
1041

1042
        local meta = debug.getmetatable(space)
3✔
1043
        for k,v in pairs(methods) do meta[k] = v end
36✔
1044

1045
        -- Triggers must set right before updating space
1046
        -- because raising error earlier leads to trigger inconsistency
1047
        self._on_repl = space:on_replace(function(old, new)
6✔
1048
                local old_st, new_st
1049
                local old_tube, new_tube
1050
                local old_tube_stat, new_tube_stat
1051
                local counts = self._stat.counts
67✔
1052
                local tube_ss = self._stat.tube
67✔
1053

1054
                if old then
67✔
1055
                        old_st = old[self.fields.status] --[[@as string]]
52✔
1056
                        if counts[old_st] and counts[old_st] > 0 then
52✔
1057
                                counts[old_st] = counts[old_st] - 1
52✔
1058
                        else
1059
                                log.error(
×
1060
                                        "Have not valid statistic by status: %s with value: %s",
1061
                                        old_st, tostring(counts[old_st])
×
1062
                                )
1063
                        end
1064
                        if self.fields.tube then
52✔
1065
                                old_tube = old[self.fields.tube]
30✔
1066
                                old_tube_stat = tube_ss[old_tube]
30✔
1067
                                if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then
30✔
NEW
1068
                                        if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then
×
NEW
1069
                                                old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1
×
1070
                                        else
NEW
1071
                                                log.error(
×
1072
                                                        "Have not valid statistic by tubs/status: tube %s with value: %s",
NEW
1073
                                                        old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st])
×
1074
                                                )
1075
                                        end
1076
                                else
1077
                                        old_tube_stat = nil
30✔
1078
                                end
1079
                        end
1080
                else
1081
                        old_st = 'X'
15✔
1082
                end
1083

1084
                if new then
67✔
1085
                        new_st = new[self.fields.status] --[[@as string]]
59✔
1086
                        counts[new_st] = (counts[new_st] or 0LL) + 1
59✔
1087
                        if counts[new_st] < 0 then
59✔
1088
                                log.error("Statistic overflow by task type: %s", new_st)
×
1089
                        end
1090
                        if self.fields.tube then
59✔
1091
                                new_tube = new[self.fields.tube]
35✔
1092
                                new_tube_stat = tube_ss[new_tube]
35✔
1093
                                if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then
35✔
NEW
1094
                                        if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then
×
NEW
1095
                                                new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1
×
1096
                                        else
NEW
1097
                                                log.error(
×
1098
                                                        "Have not valid statistic tube/status: tube %q with value: %s",
NEW
1099
                                                        new_tube, new_st, tostring(new_tube_stat.counts[new_st])
×
1100
                                                )
1101
                                        end
1102
                                else
1103
                                        new_tube_stat = nil
35✔
1104
                                end
1105
                        end
1106
                else
1107
                        new_st = 'X'
8✔
1108
                end
1109

1110
                local field = old_st.."-"..new_st
67✔
1111
                self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1
67✔
1112
                if old_tube_stat then
67✔
NEW
1113
                        if not new_tube_stat or old_tube_stat == new_tube_stat then
×
1114
                                -- no new tube or new and old tubes are the same
NEW
1115
                                old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1
×
1116
                        else
1117
                                -- nil != old_tube != new_tube != nil
1118
                                -- cross tube transition ?
1119
                                -- Can this be backoff with tube change?
NEW
1120
                                local old_field = old_st.."-S"
×
NEW
1121
                                local new_field = "S-"..new_st
×
NEW
1122
                                old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1
×
NEW
1123
                                new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1
×
1124
                        end
1125
                elseif new_tube_stat then
67✔
1126
                        -- old_tube_stat == nil
NEW
1127
                        new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1
×
1128
                end
1129
        end, self._on_repl)
73✔
1130

1131
        self._on_dis = box.session.on_disconnect(function()
6✔
1132
                local sid = box.session.id()
×
1133
                local peer = box.session.storage.peer
×
1134

1135
                log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
×
1136
                box.session.storage.destroyed = true
×
1137
                if self.bysid[sid] then
×
1138
                        local old = self.bysid[sid]
×
1139
                        while next(old) do
×
1140
                                for key,realkey in pairs(old) do
×
1141
                                        self.taken[key] = nil
×
1142
                                        old[key] = nil
×
1143
                                        local t = space:get(realkey)
×
1144
                                        if t then
×
1145
                                                if t[ self.fields.status ] == 'T' then
×
1146
                                                        self:wakeup(space:update({ realkey }, {
×
1147
                                                                { '=',self.fields.status,'R' },
×
1148
                                                                self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
×
1149
                                                        }))
1150
                                                        log.info("Rst: T->R {%s}", realkey )
×
1151
                                                else
1152
                                                        log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
×
1153
                                                end
1154
                                        else
1155
                                                log.error( "Rst: {%s}: taken not found", realkey )
×
1156
                                        end
1157
                                end
1158
                        end
1159
                        self.bysid[sid] = nil
×
1160
                end
1161
        end, self._on_dis)
6✔
1162

1163
        rawset(space,'xq',self)
3✔
1164

1165
        log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
3✔
1166

1167

1168
        function self:starttest()
3✔
1169
                local xq = self
3✔
1170
                if box.info.status == 'orphan' then
3✔
1171
                        fiber.create(function()
×
1172
                                local x = 0
×
1173
                                repeat
1174
                                        fiber.sleep(x/1e5)
×
1175
                                        x = x + 1
×
1176
                                until box.info.status ~= 'orphan'
×
1177
                                self:starttest()
×
1178
                        end)
1179
                        return
×
1180
                end
1181
                if box.info.ro then
3✔
1182
                        log.info("Server is ro, not resetting statuses")
×
1183
                else
1184
                        -- FIXME: with stable iterators add yield
1185
                        local space = box.space[self.space]
3✔
1186
                        box.begin()
3✔
1187
                        for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do
9✔
1188
                                local key = t[ xq.key.no ]
×
1189
                                if not self.taken[key] and not self._lock[key] then
×
1190
                                        local update = {
×
1191
                                                { '=', self.fields.status, 'R' },
×
1192
                                                self.have_runat and {
×
1193
                                                        '=',self.fields.runat,
×
1194
                                                        self.ttl_default and self.timeoffset( self.ttl_default ) or self.NEVER
×
1195
                                                } or nil
×
1196
                                        }
1197
                                        space:update({key}, update)
×
1198
                                        log.info("Start: T->R (%s)", key)
×
1199
                                end
1200
                        end
1201
                        box.commit()
3✔
1202
                end
1203
                self:make_ready()
3✔
1204
        end
1205
        self:starttest()
3✔
1206
end
1207

1208

1209
--[[
1210
* `space:put`
1211
        - `task` - table or array or tuple
1212
                + `table`
1213
                        * **requires** space format
1214
                        * suitable for id generation
1215
                + `array`
1216
                        * ignores space format
1217
                        * for id generation use `NULL` (**not** `nil`)
1218
                + `tuple`
1219
                        * ignores space format
1220
                        * **can't** be used with id generation
1221
        - `attr` - table of attributes
1222
                + `delay` - number of seconds
1223
                        * if set, task will become `W` instead of `R` for `delay` seconds
1224
                + `ttl` - number of seconds
1225
                        * if set, task will be discarded after ttl seconds unless was taken
1226
                + `wait` - number of seconds
1227
                        * if set, callee fiber will be blocked up to `wait` seconds until task won't
1228
                        be processed or timeout reached.
1229

1230
```lua
1231
box.space.myqueue:put{ name="xxx"; data="yyy"; }
1232
box.space.myqueue:put{ "xxx","yyy" }
1233
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })
1234

1235
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
1236
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
1237
```
1238
]]
1239

1240
---@param t any[]|box.tuple
1241
---@param opts? { delay: number?, ttl: number?, wait: number? }
1242
---@return table|box.tuple, boolean? has_been_processed
1243
function methods:put(t, opts)
1✔
1244
        local xq = self.xq
15✔
1245
        opts = opts or {}
15✔
1246
        if type(t) == 'table' then
15✔
1247
                if xq.gen_id then
15✔
1248
                        if is_array(t) then
30✔
1249
                                error("Task from array creation is not implemented yet", 2)
×
1250
                        else
1251
                                -- pass
1252
                        end
1253
                end
1254
        elseif type(t) == 'cdata' and ffi.typeof(t) == tuple_ctype then
×
1255
                if xq.gen_id then
×
1256
                        error("Can't use id generation with tuple.", 2)
×
1257
                end
1258
                error("Task from tuple creation is not implemented yet", 2)
×
1259
        else
1260
                error("Wrong argument to put. Expected table or tuple", 2)
×
1261
        end
1262
        -- here we have table with keys
1263
        if xq.gen_id then
15✔
1264
                t[ xq.key.name ] = xq.gen_id()
30✔
1265
        else
1266
                if not t[ xq.key.name ] then
×
1267
                        error("Primary key is mandatory",2)
×
1268
                end
1269
        end
1270

1271
        -- delayed or ttl or default ttl
1272
        if opts.delay then
15✔
1273
                if not xq.features.delayed then
1✔
1274
                        error("Feature delayed is not enabled",2)
×
1275
                end
1276
                if opts.ttl then
1✔
1277
                        error("Features ttl and delay are mutually exclusive",2)
×
1278
                end
1279
                t[ xq.fieldmap.status ] = 'W'
1✔
1280
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.delay)
2✔
1281

1282
                if opts.wait then
1✔
1283
                        error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
×
1284
                end
1285
        elseif opts.ttl then
14✔
1286
                if not xq.features.ttl then
×
1287
                        error("Feature ttl is not enabled",2)
×
1288
                end
1289
                t[ xq.fieldmap.status ] = 'R'
×
1290
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.ttl)
×
1291
        elseif xq.features.ttl_default then
14✔
1292
                t[ xq.fieldmap.status ] = 'R'
×
1293
                t[ xq.fieldmap.runat ]  = xq.timeoffset(xq.features.ttl_default)
×
1294
        elseif xq.have_runat then
14✔
1295
                t[ xq.fieldmap.status ] = 'R'
12✔
1296
                t[ xq.fieldmap.runat ]  = xq.NEVER
12✔
1297
        else
1298
                t[ xq.fieldmap.status ] = 'R'
2✔
1299
        end
1300

1301
        local tuple = xq.tuple(t)
15✔
1302
        local key = tuple[ xq.key.no ]
15✔
1303

1304
        local wait
1305
        if opts.wait then
15✔
1306
                wait = { cond = fiber.cond() }
×
1307
                xq.put_wait[xq:packkey(key)] = wait
×
1308
        end
1309

1310
        xq:atomic(key, function()
30✔
1311
                t = self:insert(tuple)
45✔
1312
        end)
1313
        xq:wakeup(t)
15✔
1314

1315
        if wait then
15✔
1316
                -- local func notify_producer will send to us some data
1317
                local ok = wait.cond:wait(opts.wait)
×
1318
                fiber.testcancel()
×
1319
                if ok and wait.task then
×
1320
                        return xq.retwrap(wait.task), wait.processed
×
1321
                end
1322
        end
1323
        return xq.retwrap(t)
15✔
1324
end
1325

1326
--[[
1327
* `space:wait(id, timeout)`
1328
        - `id`:
1329
                + `string` | `number` - primary key
1330
        - `timeout` - number of seconds to wait
1331
                * callee fiber will be blocked up to `timeout` seconds until task won't
1332
                be processed or timeout reached.
1333
]]
1334

1335
local wait_for = {
1✔
1336
        R = true,
1337
        T = true,
1338
        W = true,
1339
}
1340

1341
---@param key table|scalar|box.tuple
1342
---@param timeout number
1343
---@return table|box.tuple, boolean? has_been_processed
1344
function methods:wait(key, timeout)
1✔
1345
        local xq = self.xq
×
1346
        key = xq:getkey(key)
×
1347
        local task = self:get(key)
×
1348
        if not task then
×
1349
                error(("Task {%s} was not found"):format(key))
×
1350
        end
1351

1352
        local status = task[xq.fields.status]
×
1353
        if not wait_for[status] then
×
1354
                return xq.retwrap(task), false
×
1355
        end
1356

1357
        local pkey = xq:packkey(key)
×
1358
        local wait = xq.put_wait[pkey]
×
1359
        if not wait then
×
1360
                wait = { cond = fiber.cond() }
×
1361
                xq.put_wait[pkey] = wait
×
1362
        end
1363

1364
        -- local func notify_producer will send to us some data
1365
        local ok = wait.cond:wait(timeout)
×
1366
        fiber.testcancel()
×
1367
        if ok and wait.task then
×
1368
                return xq.retwrap(wait.task), wait.processed
×
1369
        end
1370

1371
        return xq.retwrap(task), false
×
1372
end
1373

1374
--[[
1375
* `space:take(timeout)`
1376
* `space:take(timeout, opts)`
1377
* `space:take(opts)`
1378
        - `timeout` - number of seconds to wait for new task
1379
                + choose reasonable time
1380
                + beware of **readahead** size (see tarantool docs)
1381
        - `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
1382
        - returns task tuple or table (see retval) or nothing on timeout
1383
        - *TODO*: ttr must be there
1384
]]
1385

1386
---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? }
1387
---@param opts? { timeout: number?, ttr: number?, tube: string? }
1388
---@return table|box.tuple?
1389
function methods:take(timeout, opts)
1✔
1390
        local xq = self.xq
38✔
1391
        timeout = timeout or 0
38✔
1392

1393
        if type(timeout) == 'table' then
38✔
1394
                opts = timeout
27✔
1395
                timeout = opts.timeout or 0
27✔
1396
        else
1397
                opts = opts or {}
11✔
1398
        end
1399
        assert(timeout >= 0, "timeout required")
38✔
1400

1401
        local ttr
1402
        if opts.ttr then
38✔
1403
                if not xq.features.ttr then
×
1404
                        error("Feature ttr is not enabled",2)
×
1405
                end
1406
                ttr = opts.ttr
×
1407
        elseif xq.features.ttr_default then
38✔
1408
                ttr = xq.features.ttr_default
×
1409
        end
1410

1411
        local index
1412
        local start_with
1413

1414
        local tube_chan
1415
        if opts.tube then
38✔
1416
                if not xq.features.tube then
20✔
1417
                        error("Feature tube is not enabled", 2)
×
1418
                end
1419

1420
                assert(type(opts.tube) == 'string', "opts.tube must be a string")
20✔
1421

1422
                index = xq.tube_index
20✔
1423
                start_with = {opts.tube, 'R'}
20✔
1424
                tube_chan = xq.take_chans[opts.tube] or fiber.channel()
20✔
1425
                xq.take_chans[opts.tube] = tube_chan
20✔
1426
        else
1427
                index = xq.index
18✔
1428
                start_with = {'R'}
18✔
1429
        end
1430
        ---@cast index -nil
1431

1432
        local now = fiber.time()
38✔
1433
        local key
1434
        local found
1435
        while not found do
79✔
1436
                for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
165✔
1437
                        key = t[ xq.key.no ]
24✔
1438
                        if not xq._lock[ key ] then
24✔
1439
                                -- found key
1440
                                xq._lock[ key ] = true
24✔
1441
                                found = t
24✔
1442
                                break
24✔
1443
                        end
1444
                end
1445
                if not found then
55✔
1446
                        local left = (now + timeout) - fiber.time()
62✔
1447
                        if left <= 0 then goto finish end
31✔
1448

1449
                        (tube_chan or xq.take_wait):get(left)
17✔
1450
                        if box.session.storage.destroyed then goto finish end
34✔
1451
                end
1452
        end
1453
        ::finish::
×
1454

1455
        -- If we were last reader from the tube
1456
        -- we remove channel. Writers will get false
1457
        -- on :put if they exists.
1458
        if tube_chan and not tube_chan:has_readers() then
38✔
1459
                tube_chan:close()
20✔
1460
                xq.take_chans[opts.tube] = nil
20✔
1461
        end
1462
        if not found then return end
38✔
1463

1464
        local r,e = pcall(function()
48✔
1465
                local sid = box.session.id()
24✔
1466
                local peer = box.session.storage.peer
48✔
1467

1468
                -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
1469
                if xq.debug then
24✔
1470
                        log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
9✔
1471
                end
1472

1473
                local update = {
24✔
1474
                        { '=', xq.fields.status, 'T' },
24✔
1475
                }
1476
                -- TODO: update more fields
1477

1478
                if ttr then
24✔
1479
                        table.insert(update,{ '=', xq.fields.runat, xq.timeoffset(ttr)})
×
1480
                        xq.runat_chan:put(true,0)
×
1481
                end
1482
                local t = self:update({key},update)
48✔
1483

1484
                if not xq.bysid[ sid ] then xq.bysid[ sid ] = {} end
24✔
1485
                xq.taken[ key ] = sid
24✔
1486
                xq.bysid[ sid ][ key ] = key
24✔
1487

1488
                return t
24✔
1489
        end)
1490

1491
        xq._lock[ key ] = nil
24✔
1492
        if not r then
24✔
1493
                error(e)
×
1494
        end
1495
        return xq.retwrap(e)
24✔
1496
end
1497

1498
--[[
1499
* `space:release(id, [attr])`
1500
        - `id`:
1501
                + `string` | `number` - primary key
1502
                + *TODO*: `tuple` - key will be extracted using index
1503
                + *TODO*: composite pk
1504
        - `attr`
1505
                + `update` - table for update, like in space:update
1506
                + `ttl` - timeout for time to live
1507
                + `delay` - number of seconds
1508
                        * if set, task will become `W` instead of `R` for `delay` seconds
1509
]]
1510

1511
---@param key table|scalar|box.tuple
1512
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1513
---@return table|box.tuple
1514
function methods:release(key, attr)
1✔
1515
        local xq = self.xq
13✔
1516
        key = xq:getkey(key)
26✔
1517

1518
        attr = attr or {}
13✔
1519

1520
        local t = xq:check_owner(key)
13✔
1521
        local old = t[ xq.fields.status ]
13✔
1522
        local update = {}
13✔
1523
        if attr.update then
13✔
1524
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1525
        end
1526

1527
        -- delayed or ttl or default ttl
1528
        if attr.delay then
13✔
1529
                if not xq.features.delayed then
6✔
1530
                        error("Feature delayed is not enabled",2)
×
1531
                end
1532
                if attr.ttl then
6✔
1533
                        error("Features ttl and delay are mutually exclusive",2)
×
1534
                end
1535
                table.insert(update, { '=', xq.fields.status, 'W' })
6✔
1536
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
12✔
1537
        elseif attr.ttl then
7✔
1538
                if not xq.features.ttl then
×
1539
                        error("Feature ttl is not enabled",2)
×
1540
                end
1541
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1542
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.ttl) })
×
1543
        elseif xq.features.ttl_default then
7✔
1544
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1545
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.ttl_default) })
×
1546
        elseif xq.have_runat then
7✔
1547
                table.insert(update, { '=', xq.fields.status, 'R' })
6✔
1548
                table.insert(update, { '=', xq.fields.runat, xq.NEVER })
6✔
1549
        else
1550
                table.insert(update, { '=', xq.fields.status, 'R' })
1✔
1551
        end
1552

1553
        xq:atomic(key,function()
26✔
1554
                t = self:update({key}, update)
39✔
1555

1556
                ---@cast t box.tuple
1557
                xq:wakeup(t)
13✔
1558
                if xq.have_runat then
13✔
1559
                        xq.runat_chan:put(true,0)
12✔
1560
                end
1561

1562
                log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
26✔
1563
                        key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
26✔
1564
        end)
1565

1566
        xq:putback(t)
13✔
1567

1568
        return t
13✔
1569
end
1570

1571
---@param key table|scalar|box.tuple
1572
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1573
---@return table|box.tuple
1574
function methods:ack(key, attr)
1✔
1575
        -- features.zombie
1576
        -- features.keep
1577
        local xq = self.xq
10✔
1578
        key = xq:getkey(key)
20✔
1579

1580
        attr = attr or {}
10✔
1581

1582
        local t = xq:check_owner(key)
10✔
1583
        local old = t[ xq.fields.status ]
10✔
1584
        local delete = false
10✔
1585
        local update = {}
10✔
1586
        if attr.update then
10✔
1587
                for _,v in pairs(attr.update) do table.insert(update,v) end
2✔
1588
        end
1589

1590
        -- delayed or ttl or default ttl
1591
        if attr.delay then
10✔
1592
                if not xq.features.zombie then
1✔
1593
                        error("Feature zombie is not enabled",2)
×
1594
                end
1595
                table.insert(update, { '=', xq.fields.status, 'Z' })
1✔
1596
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
2✔
1597
        elseif xq.features.zombie then
9✔
1598
                table.insert(update, { '=', xq.fields.status, 'Z' })
2✔
1599
                if xq.features.zombie_delay then
2✔
1600
                        table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.zombie_delay) })
×
1601
                end
1602
        elseif xq.features.keep then
7✔
1603
                table.insert(update, { '=', xq.fields.status, 'D' })
×
1604
        else
1605
                -- do remove task
1606
                delete = true
7✔
1607
        end
1608

1609
        xq:atomic(key,function()
20✔
1610
                if #update > 0 then
10✔
1611
                        t = self:update({key}, update)
9✔
1612
                        ---@cast t box.tuple
1613
                        xq:wakeup(t)
3✔
1614
                        if xq.have_runat then
3✔
1615
                                xq.runat_chan:put(true,0)
3✔
1616
                        end
1617
                        log.info("Ack: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
6✔
1618
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
6✔
1619
                end
1620

1621
                if delete then
10✔
1622
                        t = self:delete{key}
21✔
1623
                        log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
14✔
1624
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
14✔
1625
                end
1626
        end)
1627

1628
        xq:putback(t) -- in real drop form taken key
10✔
1629

1630
        return t
10✔
1631
end
1632

1633
---@param key table|scalar|box.tuple
1634
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1635
function methods:bury(key, attr)
1✔
1636
        attr = attr or {}
1✔
1637

1638
        local xq = self.xq
1✔
1639
        key = xq:getkey(key)
2✔
1640
        local t = xq:check_owner(key)
1✔
1641

1642
        local update = {}
1✔
1643
        if attr.update then
1✔
1644
                for _,v in pairs(attr.update) do table.insert(update,v) end
×
1645
        end
1646
        table.insert(update, { '=', xq.fields.status, 'B' })
1✔
1647

1648
        xq:atomic(key,function()
2✔
1649
                t = self:update({key}, update)
3✔
1650

1651
                xq:wakeup(t)
1✔
1652
                if xq.have_runat then
1✔
1653
                        xq.runat_chan:put(true,0)
×
1654
                end
1655
                log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())
2✔
1656
        end)
1657

1658
        xq:putback(t)
1✔
1659
end
1660

1661
local function kick_task(self, key, attr)
1662
        local xq   = self.xq
1✔
1663
        key  = xq:getkey(key)
2✔
1664
        local peer = box.session.storage.peer
2✔
1665
        local sid  = box.session.id()
1✔
1666
        attr       = attr or {}
1✔
1667

1668
        if xq.debug then
1✔
1669
                log.info("Kick {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
1✔
1670
        end
1671

1672
        self:update({key}, {{ '=', xq.fields.status, 'R' }})
2✔
1673
        xq:putback(key, attr)
1✔
1674
end
1675

1676
function methods:kick(nr_tasks_or_task, attr)
1✔
1677
        attr = attr or {}
1✔
1678
        if type(nr_tasks_or_task) == 'number' then
1✔
1679
                local task_n = 1
×
1680
                for _, t in self.xq.index:pairs({'B'},{ iterator = box.index.EQ }) do
×
1681
                        if task_n > nr_tasks_or_task then break end
×
1682
                        kick_task(self, t, attr)
×
1683
                        if task_n % 500 == 0 then fiber.sleep(0) end
×
1684
                        task_n = task_n + 1
×
1685
                end
1686
        else
1687
                kick_task(self, nr_tasks_or_task, attr)
1✔
1688
        end
1689
end
1690

1691
---@param key table|scalar|box.tuple
1692
---@return box.tuple|table
1693
function methods:kill(key)
1✔
1694
        local xq     = self.xq
×
NEW
1695
        key = xq:getkey(key)
×
1696
        local task   = self:get(key)
×
NEW
1697
        if not task then
×
NEW
1698
                error(("Task by %s not found to kill"):format(key))
×
1699
        end
1700
        local peer   = box.session.storage.peer
×
1701

1702
        if xq.debug then
×
1703
                log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id())
×
1704
        end
1705

NEW
1706
        task = self:delete(key)
×
1707
        ---@cast task -nil
1708
        ---Kill is treated as a synonim of Bury
NEW
1709
        task = task:update({{ '=', xq.fields.status, 'B' }})
×
NEW
1710
        xq:putback(task)
×
NEW
1711
        return xq.retwrap(task)
×
1712
end
1713

1714
-- special remap of truncate for deliting stats and saving methods
1715
function methods:truncate()
1✔
1716
        local stat = self.xq._stat
2✔
1717
        for status, _ in pairs(stat.counts) do
9✔
1718
                stat.counts[status] = 0LL
7✔
1719
        end
1720
        for transition, _ in pairs(stat.transition) do
16✔
1721
                stat.transition[transition] = nil
14✔
1722
        end
1723
        local ret = self.xq._default_truncate(self)
2✔
1724
        local meta = debug.getmetatable(self)
2✔
1725
        for k,v in pairs(methods) do meta[k] = v end
24✔
1726
        -- Now we reset our methods after truncation because
1727
        -- as we can see in on_replace_dd_truncate:
1728
        -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239
1729
        -- tarantool deletes space and restores it with the same indexes
1730
        -- but without methods
1731
        return ret
2✔
1732
end
1733

1734
local pretty_st = {
1✔
1735
        R = "Ready",
1736
        T = "Taken",
1737
        W = "Waiting",
1738
        B = "Buried",
1739
        Z = "Zombie",
1740
        D = "Done",
1741
}
1742

1743
---@param pretty? boolean
1744
function methods:stats(pretty)
1✔
1745
        local stats = table.deepcopy(self.xq._stat)
1✔
1746
                for s, ps in pairs(pretty_st) do
7✔
1747
                        if pretty then
6✔
1748
                                stats.counts[ps] = stats.counts[s] or 0LL
×
1749
                                stats.counts[s]  = nil
×
NEW
1750
                                for _, tube_stat in pairs(stats.tube) do
×
NEW
1751
                                        tube_stat.counts[ps] = tube_stat.counts[s] or 0LL
×
NEW
1752
                                        tube_stat.counts[s] = nil
×
1753
                                end
1754
                        else
1755
                                stats.counts[s] = stats.counts[s] or 0LL
6✔
1756
                                for _, tube_stat in pairs(stats.tube) do
6✔
NEW
1757
                                        tube_stat.counts[s] = tube_stat.counts[s] or 0LL
×
1758
                                end
1759
                        end
1760
                end
1761
        return stats
1✔
1762
end
1763

1764
setmetatable(M,{
2✔
1765
        __call = function(_, space, opts)
1766
                M.upgrade(space,opts,1)
×
1767
        end
1768
})
1769

1770
return M
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc