• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / xqueue / 10021554272

20 Jul 2024 04:21PM UTC coverage: 66.098%. First build
10021554272

Pull #19

github

Vladislav Grubov
fix Readme
Pull Request #19: Implements per tube statistics

38 of 90 new or added lines in 1 file covered. (42.22%)

1162 of 1758 relevant lines covered (66.1%)

18.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.1
/xqueue.lua
1
local M = {}
3✔
2

3
local ffi = require 'ffi'
3✔
4
local log = require 'log'
3✔
5
local fiber = require 'fiber'
3✔
6
local clock = require 'clock'
3✔
7

8
local tuple_ctype = ffi.typeof(box.tuple.new())
3✔
9

10
local monotonic_max_age = 10*365*86400;
3✔
11

12
local function table_clear(t)
13
        if type(t) ~= 'table' then
74✔
14
                error("bad argument #1 to 'clear' (table expected, got "..(t ~= nil and type(t) or 'no value')..")",2)
×
15
        end
16
        local count = #t
74✔
17
        for i=0, count do t[i]=nil end
83✔
18
        return
74✔
19
end
20

21
local function is_array(t)
22
        local gen,param,state = ipairs(t)
129✔
23
        -- print(gen,param,state)
24
        local v = gen(param,state)
129✔
25
        -- print(v)
26
        return v ~= nil
129✔
27
end
28

29
local peers = {}
3✔
30

31
rawset(_G,"\0xq.on_connect",box.session.on_connect(function()
6✔
32
        local sid = box.session.id()
6✔
33
        local peer = box.session.peer()
6✔
34
        box.session.storage.peer = box.session.peer()
12✔
35
        peers[ sid ] = peer
6✔
36
        log.info("connected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
6✔
37
end,rawget(_G,"\0xq.on_connect")))
9✔
38

39
rawset(_G,"\0xq.on_disconnect",box.session.on_disconnect(function()
6✔
40
        local sid = box.session.id()
6✔
41
        local peer = peers[ sid ]
6✔
42
        peers[ sid ] = nil
6✔
43
        log.info("disconnected %s, sid=%s, fid=%s", peer, sid, fiber.id() )
6✔
44
end,rawget(_G,"\0xq.on_disconnect")))
9✔
45

46

47
--[[
48

49
Field sets:
50
1. id, status (minimal required configuration)
51
2. id, status, priority
52
3. id, status, runat
53
4. id, status, priority, runat
54

55
Primary index variants:
56
1. index(status, [ id ])
57
2. index(status, priority, [ id ])
58

59

60
Format:
61
local format = box.space._space.index.name:get(space_name)[ 7 ]
62

63
Status:
64
        R - ready - task is ready to be taken
65
                created by put without delay
66
                turned on by release/kick without delay
67
                turned on from W when delay passed
68
                deleted after ttl if ttl is enabled
69

70
        T - taken - task is taken by consumer. may not be taken by other.
71
                turned into R after ttr if ttr is enabled
72

73
        W - waiting - task is not ready to be taken. waiting for its `delay`
74
                requires `runat`
75
                `delay` may be set during put, release, kick
76
                turned into R after delay
77

78
        B - buried - task was temporary discarded from queue by consumer
79
                may be revived using kick by administrator
80
                use it in unpredicted conditions, when man intervention is required
81
                Without mandatory monitoring (stats.buried) usage of buried is useless and awry
82

83
        Z - zombie - task was processed and ack'ed and *temporary* kept for delay
84

85
        D - done - task was processed and ack'ed and permanently left in database
86
                enabled when keep feature is set
87

88
        X - reserved for statistics
89

90
(TODO: reload/upgrade and feature switch)
91

92
Interface:
93

94
# Creator methods:
95

96
        M.upgrade(space, {
97
                format = {
98
                        -- space format. applied to space.format() if passed
99
                },
100
                fields = {
101
                        -- id is always taken from pk
102
                        status   = 'status_field_name'    | status_field_no,
103
                        runat    = 'runat_field_name'     | runat_field_no,
104
                        priority = 'priority_field_name'  | priority_field_no,
105
                },
106
                features = {
107
                        id = 'auto_increment' | 'uuid' | 'required' | function
108
                                -- auto_increment - if pk is number, then use it for auto_increment
109
                                -- uuid - if pk is string, then use uuid for id
110
                                -- required - primary key MUST be present in tuple during put
111
                                -- function - funciton will be called to aquire id for task
112

113
                        buried = true,           -- if true, support bury/kick
114
                        delayed = true,          -- if true, support delayed tasks, requires `runat`
115

116
                        keep = true,             -- if true, keep ack'ed tasks in [D]one state, instead of deleting
117
                                -- mutually exclusive with zombie
118

119
                        zombie = true|number,    -- requires `runat` field
120
                                -- if number, then with default zombie delay, otherwise only if set delay during ack
121
                                -- mutually exclusive with keep
122

123
                        ttl    = true|number,    -- requires `runat` field
124
                                -- if number, then with default ttl, otherwise only if set during put/release
125
                        ttr    = true|number,    -- requires `runat` field
126
                                -- if number, then with default ttl, otherwise only if set
127
                },
128
        })
129

130
Producer methods:
131
        sp:put({...}, [ attr ]) (array or table (if have format) or tuple)
132

133
Consumer methods:
134

135
* id:
136
        - array of keyfields of pk
137
        - table of named keyields
138
        - tuple
139

140
sp:take(timeout) -> tuple
141
sp:ack(id, [ attr ])
142
        attr.update (only if support zombie)
143
sp:release(id, [ attr ])
144
        attr.ttr (only if support ttr)
145
        attr.ttl (only if support ttl)
146
        attr.update
147
sp:bury(id, [ attr ])
148
        attr.ttl (only if support ttl)
149
        attr.update
150

151
Admin methods:
152

153
sp:queue_stats()
154
sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]eady
155

156

157
]]
158

159
local json = require 'json'
3✔
160
json.cfg{ encode_invalid_as_nil = true }
3✔
161

162

163
local function typeeq(src, ref)
164
        if ref == 'str' then
57✔
165
                return src == 'STR' or src == 'str' or src == 'string'
30✔
166
        elseif ref == 'num' then
27✔
167
                return src == 'NUM' or src == 'num' or src == 'number' or src == 'unsigned'
27✔
168
        else
169
                return src == ref
×
170
        end
171
end
172

173
local function is_eq(a, b, depth)
174
        local t = type(a)
×
175
        if t ~= type(b) then return false end
×
176
        if t == 'table' then
×
177
                for k,v in pairs(a) do
×
178
                        if b[k] == nil then return end
×
179
                        if not is_eq(v,b[k]) then return false end
×
180
                end
181
                for k,v in pairs(b) do
×
182
                        if a[k] == nil then return end
×
183
                        if not is_eq(v,a[k]) then return false end
×
184
                end
185
                return true
×
186
        elseif t == 'string' or t == 'number' then
×
187
                return a == b
×
188
        elseif t == 'cdata' then
×
189
                return ffi.typeof(a) == ffi.typeof(b) and a == b
×
190
        else
191
                error("Wrong types for equality", depth or 2)
×
192
        end
193
end
194

195

196
local function _tuple2table ( qformat )
197
        local rows = {}
12✔
198
        for k,v in ipairs(qformat) do
60✔
199
                table.insert(rows,"\t['"..v.name.."'] = t["..tostring(k).."];\n")
48✔
200
        end
201
        local fun = "return function(t,...) "..
12✔
202
                "if select('#',...) > 0 then error('excess args',2) end "..
12✔
203
                "return t and {\n"..table.concat(rows, "").."} or nil end\n"
12✔
204
        return dostring(fun)
12✔
205
end
206

207
local function _table2tuple ( qformat )
208
        local rows = {}
12✔
209
        for _,v in ipairs(qformat) do
60✔
210
                table.insert(rows,"\tt['"..v.name.."'] == nil and NULL or t['"..v.name.."'];\n")
48✔
211
        end
212
        local fun = "local NULL = require'msgpack'.NULL return function(t) return "..
12✔
213
                "t and box.tuple.new({\n"..table.concat(rows, "").."}) or nil end\n"
12✔
214
        -- print(fun)
215
        return dostring(fun)
12✔
216
end
217

218
---@class xqueue.space
219
local methods = {}
3✔
220

221
---@class PrimaryKeyField:table
222
---@field no number position in tuple
223
---@field name string name of the field
224
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
225

226
---@class xqFeatures: table
227
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
228
---@field retval "tuple" | "table"
229
---@field buried boolean
230
---@field delayed boolean
231
---@field keep boolean
232
---@field tube boolean
233
---@field zombie boolean|number
234
---@field zombie_delay number
235
---@field ttl boolean|number
236
---@field ttr boolean|number
237
---@field ttl_default number?
238
---@field ttr_default number?
239

240
---@class xq:table
241
---@field NEVER integer (Default: 0)
242
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
243
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
244
---@field taken table<string,number> mapping key => sid
245
---@field _lock table<string,boolean> locks key => boolean (in atomic)
246
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
247
---@field take_wait fiber.channel
248
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
249
---@field debug boolean
250
---@field have_runat boolean
251
---@field gen_id? fun(): scalar
252
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
253
---@field packkey fun(self: xq, key: any): string
254
---@field tube_index? boxIndex
255
---@field index boxIndex
256
---@field key PrimaryKeyField
257
---@field fieldmap table<string,number>
258
---@field timeoffset fun(delta: number): number
259
---@field features xqFeatures
260
---@field fields table<string, string|number>
261
---@field tuple fun(tbl: table): box.tuple
262
---@field table fun(tuple: box.tuple): table
263
---@field retwrap fun(t: box.tuple|table): table|box.tuple
264
---@field wakeup fun(self: xq, t: box.tuple|table)
265
---@field runat_chan fiber.channel
266
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
267
---@field put_back fun(key: table|box.tuple)
268
---@field _stat { counts: table, transition: table }
269
---@field putback fun(self: xq, task: table|box.tuple)
270
---@field _default_truncate fun(space: boxSpaceObject)
271
---@field _on_repl replaceTrigger
272
---@field _on_dis fun()
273

274

275
---@class xqueue.space: boxSpaceObject
276
---@field xq xq xqueue specific storage
277

278
---@class xqueue.fields
279
---@field status? string|number Xqueue name for the Status field
280
---@field runat? string|number Xqueue name for the RunAt field
281
---@field priority? string|number Xqueue name for Task priority field
282
---@field tube? string|number Xqueue name for Tube field
283

284
---@class xqueue.features
285
---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation
286
---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods.
287
---(Default: table when space with format, tuple when space is without format)
288
---@field buried? boolean should xqueue allow buring of the tasks (by default: true)
289
---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not
290
---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false)
291
---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep
292
---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present)
293
---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default.
294
---Requires runat field and index.
295
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
296
---Requires runat field and index.
297

298
---@class xqueue.upgrade.options
299
---@field format? boxSpaceFormat
300
---@field fields xqueue.fields
301
---@field debug? boolean
302
---@field tube_stats? string[] List of tube names for per-tube statistics
303
---@field features xqueue.features
304
---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback
305
---@field workers? number (number of workers to spawn)
306

307
---Upgrades given space to xqueue instance
308
---@param space xqueue.space
309
---@param opts xqueue.upgrade.options
310
---@param depth? number
311
function M.upgrade(space,opts,depth)
3✔
312
        depth = depth or 0
12✔
313
        log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
12✔
314
        if not opts.fields then error("opts.fields required",2) end
12✔
315
        if opts.format then
12✔
316
                -- todo: check if already have such format
317
                local format_av = box.space._space.index.name:get(space.name)[ 7 ]
×
318
                if not is_eq(format_av, opts.format) then
×
319
                        space:format(opts.format)
×
320
                else
321
                        print("formats are equal")
×
322
                end
323
        end
324

325
        -- This variable will be defined later
326
        local taken_mt
327

328
        local self = {}
12✔
329
        if space.xq then
12✔
330
                self.taken = space.xq.taken
×
331
                self._stat = space.xq._stat
×
332
                self.bysid = space.xq.bysid
×
333
                self._lock = space.xq._lock
×
334
                self.take_wait = space.xq.take_wait
×
335
                self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
×
336
                self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
×
337
                self._on_repl  = space.xq._on_repl
×
338
                self._on_dis = space.xq._on_dis
×
339
        else
340
                self.taken = {}
12✔
341
                self.bysid = {}
12✔
342
                -- byfid = {};
343
                self._lock = {}
12✔
344
                self.put_wait = setmetatable({}, { __mode = 'v' })
12✔
345
                self.take_wait = fiber.channel(0)
12✔
346
                self.take_chans = setmetatable({}, { __mode = 'v' })
12✔
347
        end
348
        setmetatable(self.bysid, {
24✔
349
                __serialize='map',
350
                __newindex = function(t, key, val)
351
                        if type(val) == 'table' then
18✔
352
                                rawset(t, key, setmetatable(val, taken_mt))
18✔
353
                        else
NEW
354
                                rawset(t, key, val)
×
355
                        end
356
                end
357
        })
358
        self.debug = not not opts.debug
12✔
359

360
        if not self._default_truncate then
12✔
361
                self._default_truncate = space.truncate
12✔
362
        end
363

364
        local format_av = box.space._space.index.name:get(space.name)[ 7 ]
24✔
365
        local format = {}
12✔
366
        local have_format = false
12✔
367
        local have_runat = false
12✔
368
        for no,f in pairs(format_av) do
68✔
369
                format[ f.name ] = {
48✔
370
                        name = f.name;
48✔
371
                        type = f.type;
48✔
372
                        no   = no;
48✔
373
                }
48✔
374
                format[ no ] = format[ f.name ];
48✔
375
                have_format = true
48✔
376
                self.have_format = true
48✔
377
        end
378
        for _,idx in pairs(space.index) do
92✔
379
                for _,part in pairs(idx.parts) do
246✔
380
                        format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
126✔
381
                        format[ part.fieldno ].type = part.type
126✔
382
                end
383
        end
384

385
        -- dd(format)
386

387
        -- 1. fields check
388
        local fields = {}
12✔
389
        local fieldmap = {}
12✔
390
        for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
68✔
391
                local fname,ftype = f[1], f[2]
48✔
392
                local num = opts.fields[fname]
48✔
393
                if num then
48✔
394
                        if type(num) == 'string' then
24✔
395
                                if format[num] then
24✔
396
                                        fields[fname] = format[num].no;
24✔
397
                                else
398
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
399
                                end
400
                        elseif type(num) == 'number' then
×
401
                                if format[num] then
×
402
                                        fields[fname] = num
×
403
                                else
404
                                        error(string.format("unknown field %s for %s", num, fname),2 + depth)
×
405
                                end
406
                        else
407
                                error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
×
408
                        end
409
                        -- check type
410
                        if format[num] then
24✔
411
                                if not typeeq(format[num].type, ftype) then
48✔
412
                                        error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
×
413
                                end
414
                        end
415
                        fieldmap[fname] = format[num].name
24✔
416
                end
417
        end
418

419
        -- dd(fields)
420

421
        -- 2. index check
422

423
        local pk = space.index[0]
12✔
424
        if #pk.parts ~= 1 then
12✔
425
                error("Composite primary keys are not supported yet")
×
426
        end
427
        local pktype
428
        if typeeq( format[pk.parts[1].fieldno].type, 'str' ) then
24✔
429
                pktype = 'string'
3✔
430
                taken_mt = { __serialize = 'map' }
3✔
431
        elseif typeeq( format[pk.parts[1].fieldno].type, 'num' ) then
18✔
432
                pktype = 'number'
9✔
433
                taken_mt = {
9✔
434
                        __serialize = 'map',
435
                        __newindex = function(t, key, val)
436
                                return rawset(t, tostring(ffi.cast("uint64_t", key)), val)
258✔
437
                        end,
438
                        __index = function(t, key)
439
                                return rawget(t, tostring(ffi.cast("uint64_t", key)))
189✔
440
                        end
441
                }
9✔
442
        else
443
                error("Unknown key type "..format[pk.parts[1].fieldno].type)
×
444
        end
445
        setmetatable(self.taken, taken_mt)
12✔
446

447
        local pkf = {
12✔
448
                no   = pk.parts[1].fieldno;
12✔
449
                name = format[pk.parts[1].fieldno].name;
12✔
450
                ['type'] = pktype;
12✔
451
        }
452
        self.key = pkf
12✔
453
        self.fields = fields
12✔
454
        self.fieldmap = fieldmap
12✔
455

456

457
        function self:getkey(arg)
12✔
458
                local _type = type(arg)
81✔
459
                if _type == 'table' then
81✔
460
                        if is_array(arg) then
162✔
461
                                return arg[ self.key.no ]
×
462
                        else
463
                                return arg[ self.key.name ]
81✔
464
                                -- pass
465
                        end
466
                elseif _type == 'cdata' and ffi.typeof(arg) == tuple_ctype then
×
467
                        return arg[ self.key.no ]
×
468
                elseif _type == self.key.type then
×
469
                        return arg
×
470
                else
471
                        error("Wrong key/task argument. Expected table or tuple or key", 3)
×
472
                end
473
        end
474

475
        function self.packkey(_, key)
12✔
476
                if type(key) == 'cdata' then
81✔
477
                        return tostring(ffi.cast("uint64_t", key))
66✔
478
                else
479
                        return key
15✔
480
                end
481
        end
482

483
        do
484
                local filter
485
                if fields.priority then
12✔
486
                        filter = function(index)
487
                                return #index.parts >= 3
×
488
                                        and index.parts[1].fieldno == fields.status
×
489
                                        and index.parts[2].fieldno == fields.priority
×
490
                                        and index.parts[3].fieldno == self.key.no
×
491
                        end
492
                else
493
                        filter = function(index)
494
                                return #index.parts >= 2
27✔
495
                                        and index.parts[1].fieldno == fields.status
15✔
496
                                        and index.parts[2].fieldno == self.key.no
27✔
497
                        end
498
                end
499
                for i,index in pairs(space.index) do
35✔
500
                        if type(i) == 'number' and filter(index) then
54✔
501
                                self.index = index
12✔
502
                                break
12✔
503
                        end
504
                end
505
                if not self.index then
12✔
506
                        if fields.priority then
×
507
                                error("not found index by status + priority + id",2+depth)
×
508
                        else
509
                                error("not found index by status + id",2+depth)
×
510
                        end
511
                end
512

513
                if fields.tube then
12✔
514
                        for n,index in pairs(space.index) do
8✔
515
                                if type(n) == 'number' and index.parts[1].fieldno == fields.tube then
6✔
516
                                        local not_match = false
3✔
517
                                        for i = 2, #index.parts do
9✔
518
                                                if index.parts[i].fieldno ~= self.index.parts[i-1].fieldno then
6✔
519
                                                        not_match = true
×
520
                                                        break
521
                                                end
522
                                        end
523
                                        if not not_match then
3✔
524
                                                self.tube_index = index
3✔
525
                                                break
3✔
526
                                        end
527
                                end
528
                        end
529
                        if not self.tube_index then
3✔
530
                                if fields.priority then
×
531
                                        error("not found index by tube + status + priority + id",2+depth)
×
532
                                else
533
                                        error("not found index by tube + status + id",2+depth)
×
534
                                end
535
                        end
536
                end
537
        end
538

539
        ---@type boxIndex
540
        local runat_index
541
        if fields.runat then
12✔
542
                for _,index in pairs(space.index) do
36✔
543
                        if type(_) == 'number' then
30✔
544
                                if index.parts[1].fieldno == fields.runat then
30✔
545
                                        -- print("found",index.name)
546
                                        runat_index = index
9✔
547
                                        break
9✔
548
                                end
549
                        end
550
                end
551
                if not runat_index then
9✔
552
                        error(string.format("fields.runat requires tree index with this first field in it"),2+depth)
×
553
                else
554
                        local t = runat_index:pairs({0},{iterator = box.index.GT}):nth(1)
18✔
555
                        if t then
9✔
556
                                if t[ self.fields.runat ] < monotonic_max_age then
×
557
                                        error("!!! Queue contains monotonic runat. Consider updating tasks (https://github.com/moonlibs/xqueue/issues/2)")
×
558
                                end
559
                        end
560
                        have_runat = true
9✔
561
                end
562
        end
563
        self.have_runat = have_runat
12✔
564

565
        ---@type table<string, { counts: {}, transition: {} }>
566
        local stat_tube = {}
12✔
567
        if self.fields.tube and type(opts.tube_stats) == 'table' then
12✔
NEW
568
                for _, tube_name in ipairs(opts.tube_stats) do
×
NEW
569
                        if type(tube_name) == 'string' then
×
NEW
570
                                stat_tube[tube_name] = {
×
571
                                        counts = {},
572
                                        transition = {},
573
                                }
574
                        end
575
                end
576
        end
577

578
        if not self._stat then
12✔
579
                self._stat = {
12✔
580
                        counts = {};
12✔
581
                        transition = {};
12✔
582
                        tube = stat_tube;
12✔
583
                }
12✔
584
                if self.fields.tube then
12✔
585
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
9✔
NEW
586
                                local s = t[self.fields.status]
×
NEW
587
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
588

NEW
589
                                local tube = t[self.fields.tube]
×
NEW
590
                                if stat_tube[tube] then
×
NEW
591
                                        stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1
×
592
                                end
593
                        end
594
                else
595
                        for _, t in space:pairs(nil, { iterator = box.index.ALL }) do
27✔
NEW
596
                                local s = t[self.fields.status]
×
NEW
597
                                self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
×
598
                        end
599
                end
600
        else
NEW
601
                self._stat = { counts = {}, transition = {}, tube = stat_tube }
×
602
        end
603

604
        -- 3. features check
605

606
        local features = {}
12✔
607

608
        local gen_id
609
        opts.features = opts.features or {}
12✔
610
        if not opts.features.id then
12✔
611
                opts.features.id = 'uuid'
×
612
        end
613
        -- 'auto_increment' | 'time64' | 'uuid' | 'required' | function
614
        if opts.features.id == 'auto_increment' then
12✔
615
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
×
616
                        error("For auto_increment numeric pk is mandatory",2+depth)
×
617
                end
618
                local pk_fno = pk.parts[1].fieldno
×
619
                gen_id = function()
620
                        local max = pk:max()
×
621
                        if max then
×
622
                                return max[pk_fno] + 1
×
623
                        else
624
                                return 1
×
625
                        end
626
                end
627
        elseif opts.features.id == 'time64' then
12✔
628
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'num')) then
18✔
629
                        error("For time64 numeric pk is mandatory",2+depth)
×
630
                end
631
                gen_id = function()
632
                        local key = clock.realtime64()
42✔
633
                        while true do
634
                                local exists = pk:get(key)
42✔
635
                                if not exists then
42✔
636
                                        return key
42✔
637
                                end
638
                                key = key + 1
×
639
                        end
640
                end
641
        elseif opts.features.id == 'uuid' then
3✔
642
                if not (#pk.parts == 1 and typeeq( pk.parts[1].type, 'str')) then
6✔
643
                        error("For uuid string pk is mandatory",2+depth)
×
644
                end
645
                local uuid = require'uuid'
3✔
646
                gen_id = function()
647
                        while true do
648
                                local key = uuid.str()
6✔
649
                                local exists = pk:get(key)
6✔
650
                                if not exists then
6✔
651
                                        return key
6✔
652
                                end
653
                        end
654
                end
655
        elseif opts.features.id == 'required' then
×
656
                -- gen_id = function()
657
                --         -- ???
658
                --         error("TODO")
659
                -- end
660
        elseif type(opts.features.id) == 'function'
×
661
                or (debug.getmetatable(opts.features.id)
×
662
                and debug.getmetatable(opts.features.id).__call)
×
663
        then
664
                gen_id = opts.features.id
×
665
        else
666
                error(string.format(
×
667
                        "Wrong type for features.id %s, may be 'auto_increment' | 'time64' | 'uuid' | 'required' | function",
668
                        opts.features.id
×
669
                ), 2+depth)
×
670
        end
671

672
        if not opts.features.retval then
12✔
673
                opts.features.retval = have_format and 'table' or 'tuple'
9✔
674
        end
675

676
        if format then
12✔
677
                self.tuple = _table2tuple(format)
24✔
678
                self.table = _tuple2table(format)
24✔
679
        end
680

681
        if opts.features.retval == 'table' then
12✔
682
                self.retwrap = self.table
12✔
683
        else
684
                self.retwrap = function(t) return t end
×
685
        end
686

687
        features.buried = not not opts.features.buried
12✔
688
        features.keep   = not not opts.features.keep
12✔
689
        if opts.features.delayed then
12✔
690
                if not have_runat then
9✔
691
                        error(string.format("Delayed feature requires runat field and index" ),2+depth)
×
692
                end
693
                features.delayed = true
9✔
694
        else
695
                features.delayed = false
3✔
696
        end
697

698
        if opts.features.zombie then
12✔
699
                if features.keep then
3✔
700
                        error(string.format("features keep and zombie are mutually exclusive" ),2+depth)
×
701
                end
702
                if not have_runat then
3✔
703
                        error(string.format("feature zombie requires runat field and index" ),2+depth)
×
704
                end
705

706
                features.zombie = true
3✔
707
                if type(opts.features.zombie) == 'number' then
3✔
708
                        features.zombie_delay = opts.features.zombie
×
709
                end
710
        else
711
                features.zombie = false
9✔
712
        end
713

714
        if opts.features.ttl then
12✔
715
                if not have_runat then
×
716
                        error(string.format("feature ttl requires runat field and index" ),2+depth)
×
717
                end
718

719
                features.ttl = true
×
720
                if type(opts.features.ttl) == 'number' then
×
721
                        features.ttl_default = opts.features.ttl
×
722
                end
723
        else
724
                features.ttl = false
12✔
725
        end
726

727
        if opts.features.ttr then
12✔
728
                if not have_runat then
×
729
                        error(string.format("feature ttr requires runat field and index" ),2+depth)
×
730
                end
731

732
                features.ttr = true
×
733
                if type(opts.features.ttr) == 'number' then
×
734
                        features.ttr_default = opts.features.ttr
×
735
                end
736
        else
737
                features.ttr = false
12✔
738
        end
739

740
        if fields.tube then
12✔
741
                features.tube = true
3✔
742
        end
743

744
        self.gen_id = gen_id
12✔
745
        self.features = features
12✔
746
        self.space = space.id
12✔
747

748
        function self.timeoffset(delta)
12✔
749
                delta = tonumber(delta) or 0
24✔
750
                return clock.realtime() + delta
24✔
751
        end
752
        function self.timeready(time)
12✔
753
                return time < clock.realtime()
35✔
754
        end
755
        function self.timeremaining(time)
12✔
756
                return time - clock.realtime()
26✔
757
        end
758
        -- self.NEVER = -1ULL
759
        self.NEVER = 0
12✔
760

761
        function self.keyfield(_,t)
12✔
762
                return t[pkf.no]
36✔
763
        end
764
        function self.keypack(_,t)
12✔
765
                return t[pkf.no]
×
766
        end
767

768
        -- Notify producer if it is still waiting us.
769
        -- Producer waits only for successfully processed task
770
        -- or for task which would never be processed.
771
        -- Discarding to process task depends on consumer's logic.
772
        -- Also task can be deleted from space by exceeding TTL.
773
        -- Producer should not be notified if queue is able to process task in future.
774
        local function notify_producer(key, task)
775
                local pkey = self:packkey(key)
78✔
776
                local wait = self.put_wait[pkey]
78✔
777
                if not wait then
78✔
778
                        -- no producer
779
                        return
75✔
780
                end
781

782
                if task.status == 'Z' or task.status == 'D' then
9✔
783
                        -- task was processed
784
                        wait.task = task
×
785
                        wait.processed = true
×
786
                        wait.cond:broadcast()
×
787
                        return
×
788
                end
789
                if not space:get{key} then
9✔
790
                        -- task is not present in space
791
                        -- it could be TTL or :ack
792
                        if task.status == 'T' then
6✔
793
                                -- task was acked:
794
                                wait.task = task
3✔
795
                                wait.processed = true
3✔
796
                                wait.cond:broadcast()
3✔
797
                        elseif task.status == 'R' then
×
798
                                -- task was killed by TTL
799
                                wait.task = task
×
800
                                wait.processed = false
×
801
                                wait.cond:broadcast()
×
802
                        end
803
                else
804
                        -- task is still in space
805
                        if task.status == 'B' then
×
806
                                -- task was buried
807
                                wait.task = task
×
808
                                wait.processed = false
×
809
                                wait.cond:broadcast()
×
810
                        end
811
                end
812
        end
813

814
        self.ready = fiber.channel(0)
12✔
815

816
        local function rw_fiber_f(func, ...)
817
                local xq = self
9✔
818
                repeat
819
                        if box.info.ro then
9✔
NEW
820
                                log.verbose("awaiting rw")
×
821
                                repeat
NEW
822
                                        if box.ctl.wait_rw then
×
NEW
823
                                                box.ctl.wait_rw(1)
×
824
                                        else
NEW
825
                                                fiber.sleep(0.001)
×
826
                                        end
NEW
827
                                until not box.info.ro
×
828
                        end
829

830
                        local ok, err = pcall(func, ...)
9✔
NEW
831
                        if not ok then
×
NEW
832
                                log.error("%s", err)
×
833
                        end
NEW
834
                        fiber.testcancel()
×
NEW
835
                until (not box.space[space.name]) or space.xq ~= xq
×
836
        end
837

838
        if opts.worker then
12✔
839
                local workers = opts.workers or 1
×
840
                local worker = opts.worker
×
841
                for i = 1,workers do
×
NEW
842
                        fiber.create(rw_fiber_f, function(space,xq)
×
843
                                local fname = space.name .. '.xq.wrk' .. tostring(i)
×
844
                                ---@diagnostic disable-next-line: undefined-field
845
                                if package.reload then fname = fname .. '.' .. package.reload.count end
×
846
                                fiber.name(string.sub(fname,1,32))
×
847
                                repeat fiber.sleep(0.001) until space.xq
×
848
                                if xq.ready then xq.ready:get() end
×
849
                                log.info("I am worker %s",i)
×
850
                                if box.info.ro then
×
NEW
851
                                        log.info("Shutting down on ro instance")
×
852
                                        return
×
853
                                end
854
                                while box.space[space.name] and space.xq == xq do
×
855
                                        local task = space:take(1)
×
856
                                        if task then
×
857
                                                local key = xq:getkey(task)
×
858
                                                local r,e = pcall(worker,task)
×
859
                                                if not r then
×
860
                                                        log.error("Worker for {%s} has error: %s", key, e)
×
861
                                                else
862
                                                        if xq.taken[ key ] then
×
863
                                                                space:ack(task)
×
864
                                                        end
865
                                                end
866
                                                if xq.taken[ key ] then
×
867
                                                        log.error("Worker for {%s} not released task", key)
×
868
                                                        space:release(task)
×
869
                                                end
870
                                        end
871
                                        fiber.yield()
×
872
                                end
873
                                log.info("worker %s ended", i)
×
NEW
874
                        end,space,self)
×
875
                end
876
        end
877

878
        if have_runat then
12✔
879
                self.runat_chan = fiber.channel(0)
9✔
880
                self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
18✔
881
                        local fname = space.name .. '.xq'
9✔
882
                        if package.reload then fname = fname .. '.' .. package.reload.count end
9✔
883
                        fiber.name(string.sub(fname,1,32))
18✔
884
                        repeat fiber.sleep(0.001) until space.xq
9✔
885
                        if xq.ready then xq.ready:get() end
9✔
886
                        local chan = xq.runat_chan
9✔
887
                        log.info("Runat started")
9✔
888
                        if box.info.ro then
9✔
NEW
889
                                log.info("Shutting down on ro instance")
×
890
                                return
×
891
                        end
892
                        local maxrun = 1000
9✔
893
                        local curwait
894
                        local collect = {}
9✔
895
                        while box.space[space.name] and space.xq == xq do
74✔
896
                                local r,e = pcall(function()
148✔
897
                                        -- print("runat loop 2 ",box.time64())
898
                                        local remaining
899
                                        for _,t in runat_index:pairs({0},{iterator = box.index.GT}) do
236✔
900

901
                                                -- print("checking ",t)
902
                                                if xq.timeready( t[ xq.fields.runat ] ) then
105✔
903
                                                        table.insert(collect,t)
9✔
904
                                                else
905
                                                        remaining = xq.timeremaining(t[ xq.fields.runat ])
78✔
906
                                                        break
26✔
907
                                                end
908
                                                if #collect >= maxrun then remaining = 0 break end
9✔
909
                                        end
910

911
                                        for _,t in ipairs(collect) do
81✔
912
                                                -- log.info("Runat: %s, %s", _, t)
913
                                                if t[xq.fields.status] == 'W' then
18✔
914
                                                        log.info("Runat: W->R %s",xq:keyfield(t))
12✔
915
                                                        -- TODO: default ttl?
916
                                                        local u = space:update({ xq:keyfield(t) },{
24✔
917
                                                                { '=',xq.fields.status,'R' },
6✔
918
                                                                { '=',xq.fields.runat, xq.NEVER }
6✔
919
                                                        })
920
                                                        xq:wakeup(u)
12✔
921
                                                elseif t[xq.fields.status] == 'R' and xq.features.ttl then
6✔
922
                                                        local key = xq:keyfield(t)
×
923
                                                        log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
×
924
                                                        t = space:delete{key}
×
925
                                                        notify_producer(key, t)
×
926
                                                elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
6✔
927
                                                        log.info("Runat: Kill Zombie %s",xq:keyfield(t))
6✔
928
                                                        space:delete{ xq:keyfield(t) }
12✔
929
                                                elseif t[xq.fields.status] == 'T' and xq.features.ttr then
×
930
                                                        local key = xq:keypack(t)
×
931
                                                        local sid = xq.taken[ key ]
×
932
                                                        local peer = peers[sid] or sid
×
933

934
                                                        log.info("Runat: autorelease T->R by ttr %s taken by %s",xq:keyfield(t), peer)
×
935
                                                        local u = space:update({ xq:keyfield(t) },{
×
936
                                                                { '=',xq.fields.status,'R' },
×
937
                                                                { '=',xq.fields.runat, xq.NEVER }
×
938
                                                        })
939
                                                        xq.taken[ key ] = nil
×
940
                                                        if sid then
×
941
                                                                self.bysid[ sid ][ key ] = nil
×
942
                                                        end
943
                                                        xq:wakeup(u)
×
944
                                                else
945
                                                        log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
×
946
                                                        space:update({ xq:keyfield(t) },{
×
947
                                                                { '=',xq.fields.runat, xq.NEVER }
×
948
                                                        })
949
                                                end
950
                                        end
951

952
                                        if remaining then
72✔
953
                                                if remaining >= 0 and remaining < 1 then
26✔
954
                                                        return remaining
26✔
955
                                                end
956
                                        end
957
                                        return 1
46✔
958
                                end)
959

960
                                table_clear(collect)
74✔
961

962
                                if r then
74✔
963
                                        curwait = e
72✔
964
                                else
965
                                        curwait = 1
2✔
966
                                        log.error("Runat/ERR: %s",e)
2✔
967
                                end
968
                                -- log.info("Wait %0.2fs",curwait)
969
                                if curwait == 0 then fiber.sleep(0) end
74✔
970
                                chan:get(curwait)
74✔
971
                        end
972
                        log.info("Runat ended")
×
973
                end,space,self,runat_index)
18✔
974
        end
975

976
        local function atomic_tail(self, key, status, ...)
977
                self._lock[key] = nil
123✔
978
                if not status then
123✔
979
                        error((...), 3)
×
980
                end
981
                return ...
123✔
982
        end
983
        function self:atomic(key, fun, ...)
12✔
984
                self._lock[key] = true
123✔
985
                return atomic_tail(self, key, pcall(fun, ...))
246✔
986
        end
987

988
        function self:check_owner(key)
12✔
989
                local t = space:get(key)
150✔
990
                if not t then
75✔
991
                        error(string.format( "Task {%s} was not found", key ),2)
×
992
                end
993
                if not self.taken[key] then
138✔
994
                        error(string.format( "Task %s not taken by any", key ),2)
×
995
                end
996
                if self.taken[key] ~= box.session.id() then
138✔
997
                        error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2)
×
998
                end
999
                return t
75✔
1000
        end
1001

1002
        function self:putback(task)
12✔
1003
                local key = task[ self.key.no ]
78✔
1004
                local sid = self.taken[ key ]
78✔
1005
                if sid then
78✔
1006
                        self.taken[ key ] = nil
75✔
1007
                        if self.bysid[ sid ] then
75✔
1008
                                self.bysid[ sid ][ key ] = nil
138✔
1009
                        else
1010
                                log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
×
1011
                        end
1012
                else
1013
                        log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
3✔
1014
                end
1015

1016
                notify_producer(key, task)
78✔
1017
        end
1018

1019
        function self:wakeup(t)
12✔
1020
                if t[self.fieldmap.status] ~= 'R' then return end
222✔
1021
                if self.fieldmap.tube then
75✔
1022
                        -- we may have consumers in the tubes:
1023
                        local tube_chan = self.take_chans[t[self.fieldmap.tube]]
90✔
1024
                        if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
45✔
1025
                                -- we have successfully notified consumer
1026
                                return
×
1027
                        end
1028
                        -- otherwise fallback to default channel:
1029
                end
1030
                if self.take_wait:has_readers() then
75✔
1031
                        self.take_wait:put(true,0)
9✔
1032
                end
1033
        end
1034

1035
        function self:make_ready()
12✔
1036
                while self.ready and self.ready:has_readers() do
12✔
1037
                        self.ready:put(true,0)
×
1038
                        fiber.sleep(0)
×
1039
                end
1040
                self.ready = nil
12✔
1041
        end
1042

1043
        local meta = debug.getmetatable(space)
12✔
1044
        for k,v in pairs(methods) do meta[k] = v end
152✔
1045

1046
        -- Triggers must set right before updating space
1047
        -- because raising error earlier leads to trigger inconsistency
1048
        self._on_repl = space:on_replace(function(old, new)
24✔
1049
                local old_st, new_st
1050
                local old_tube, new_tube
1051
                local old_tube_stat, new_tube_stat
1052
                local counts = self._stat.counts
219✔
1053
                local tube_ss = self._stat.tube
219✔
1054

1055
                if old then
219✔
1056
                        old_st = old[self.fields.status] --[[@as string]]
171✔
1057
                        if counts[old_st] and counts[old_st] > 0 then
171✔
1058
                                counts[old_st] = counts[old_st] - 1
171✔
1059
                        else
1060
                                log.error(
×
1061
                                        "Have not valid statistic by status: %s with value: %s",
1062
                                        old_st, tostring(counts[old_st])
×
1063
                                )
1064
                        end
1065
                        if self.fields.tube then
171✔
1066
                                old_tube = old[self.fields.tube]
90✔
1067
                                old_tube_stat = tube_ss[old_tube]
90✔
1068
                                if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then
90✔
NEW
1069
                                        if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then
×
NEW
1070
                                                old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1
×
1071
                                        else
NEW
1072
                                                log.error(
×
1073
                                                        "Have not valid statistic by tubs/status: tube %s with value: %s",
NEW
1074
                                                        old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st])
×
1075
                                                )
1076
                                        end
1077
                                else
1078
                                        old_tube_stat = nil
90✔
1079
                                end
1080
                        end
1081
                else
1082
                        old_st = 'X'
48✔
1083
                end
1084

1085
                if new then
219✔
1086
                        new_st = new[self.fields.status] --[[@as string]]
192✔
1087
                        counts[new_st] = (counts[new_st] or 0LL) + 1
192✔
1088
                        if counts[new_st] < 0 then
192✔
1089
                                log.error("Statistic overflow by task type: %s", new_st)
×
1090
                        end
1091
                        if self.fields.tube then
192✔
1092
                                new_tube = new[self.fields.tube]
105✔
1093
                                new_tube_stat = tube_ss[new_tube]
105✔
1094
                                if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then
105✔
NEW
1095
                                        if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then
×
NEW
1096
                                                new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1
×
1097
                                        else
NEW
1098
                                                log.error(
×
1099
                                                        "Have not valid statistic tube/status: tube %q with value: %s",
NEW
1100
                                                        new_tube, new_st, tostring(new_tube_stat.counts[new_st])
×
1101
                                                )
1102
                                        end
1103
                                else
1104
                                        new_tube_stat = nil
105✔
1105
                                end
1106
                        end
1107
                else
1108
                        new_st = 'X'
27✔
1109
                end
1110

1111
                local field = old_st.."-"..new_st
219✔
1112
                self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1
219✔
1113
                if old_tube_stat then
219✔
NEW
1114
                        if not new_tube_stat or old_tube_stat == new_tube_stat then
×
1115
                                -- no new tube or new and old tubes are the same
NEW
1116
                                old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1
×
1117
                        else
1118
                                -- nil != old_tube != new_tube != nil
1119
                                -- cross tube transition ?
1120
                                -- Can this be backoff with tube change?
NEW
1121
                                local old_field = old_st.."-S"
×
NEW
1122
                                local new_field = "S-"..new_st
×
NEW
1123
                                old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1
×
NEW
1124
                                new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1
×
1125
                        end
1126
                elseif new_tube_stat then
219✔
1127
                        -- old_tube_stat == nil
NEW
1128
                        new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1
×
1129
                end
1130
        end, self._on_repl)
243✔
1131

1132
        self._on_dis = box.session.on_disconnect(function()
24✔
1133
                local sid = box.session.id()
18✔
1134
                local peer = box.session.storage.peer
36✔
1135

1136
                log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
18✔
1137
                box.session.storage.destroyed = true
36✔
1138
                if self.bysid[sid] then
18✔
1139
                        local old = self.bysid[sid]
6✔
1140
                        while next(old) do
9✔
1141
                                for key,realkey in pairs(old) do
8✔
1142
                                        self.taken[key] = nil
3✔
1143
                                        old[key] = nil
3✔
1144
                                        local t = space:get(realkey)
6✔
1145
                                        if t then
3✔
1146
                                                if t[ self.fields.status ] == 'T' then
6✔
1147
                                                        self:wakeup(space:update({ realkey }, {
12✔
1148
                                                                { '=',self.fields.status,'R' },
3✔
1149
                                                                self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
3✔
1150
                                                        }))
1151
                                                        log.info("Rst: T->R {%s}", realkey )
6✔
1152
                                                else
1153
                                                        log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
×
1154
                                                end
1155
                                        else
1156
                                                log.error( "Rst: {%s}: taken not found", realkey )
×
1157
                                        end
1158
                                end
1159
                        end
1160
                        self.bysid[sid] = nil
6✔
1161
                end
1162
        end, self._on_dis)
42✔
1163

1164
        rawset(space,'xq',self)
12✔
1165

1166
        log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
12✔
1167

1168

1169
        function self:starttest()
12✔
1170
                local xq = self
12✔
1171
                if box.info.status == 'orphan' then
12✔
1172
                        fiber.create(function()
×
1173
                                local x = 0
×
1174
                                repeat
1175
                                        fiber.sleep(x/1e5)
×
1176
                                        x = x + 1
×
1177
                                until box.info.status ~= 'orphan'
×
1178
                                self:starttest()
×
1179
                        end)
1180
                        return
×
1181
                end
1182
                if box.info.ro then
12✔
1183
                        log.info("Server is ro, not resetting statuses")
×
1184
                else
1185
                        -- FIXME: with stable iterators add yield
1186
                        local space = box.space[self.space]
12✔
1187
                        box.begin()
12✔
1188
                        for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do
36✔
1189
                                local key = t[ xq.key.no ]
×
1190
                                if not self.taken[key] and not self._lock[key] then
×
1191
                                        local update = {
×
1192
                                                { '=', self.fields.status, 'R' },
×
1193
                                                self.have_runat and {
×
1194
                                                        '=',self.fields.runat,
×
1195
                                                        self.ttl_default and self.timeoffset( self.ttl_default ) or self.NEVER
×
1196
                                                } or nil
×
1197
                                        }
1198
                                        space:update({key}, update)
×
1199
                                        log.info("Start: T->R (%s)", key)
×
1200
                                end
1201
                        end
1202
                        box.commit()
12✔
1203
                end
1204
                self:make_ready()
12✔
1205
        end
1206
        self:starttest()
12✔
1207
end
1208

1209

1210
--[[
1211
* `space:put`
1212
        - `task` - table or array or tuple
1213
                + `table`
1214
                        * **requires** space format
1215
                        * suitable for id generation
1216
                + `array`
1217
                        * ignores space format
1218
                        * for id generation use `NULL` (**not** `nil`)
1219
                + `tuple`
1220
                        * ignores space format
1221
                        * **can't** be used with id generation
1222
        - `attr` - table of attributes
1223
                + `delay` - number of seconds
1224
                        * if set, task will become `W` instead of `R` for `delay` seconds
1225
                + `ttl` - number of seconds
1226
                        * if set, task will be discarded after ttl seconds unless was taken
1227
                + `wait` - number of seconds
1228
                        * if set, callee fiber will be blocked up to `wait` seconds until task won't
1229
                        be processed or timeout reached.
1230

1231
```lua
1232
box.space.myqueue:put{ name="xxx"; data="yyy"; }
1233
box.space.myqueue:put{ "xxx","yyy" }
1234
box.space.myqueue:put(box.tuple.new{ 1,"xxx","yyy" })
1235

1236
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 })
1237
box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
1238
```
1239
]]
1240

1241
---@param t any[]|box.tuple
1242
---@param opts? { delay: number?, ttl: number?, wait: number? }
1243
---@return table|box.tuple, boolean? has_been_processed
1244
function methods:put(t, opts)
3✔
1245
        local xq = self.xq
48✔
1246
        opts = opts or {}
48✔
1247
        if type(t) == 'table' then
48✔
1248
                if xq.gen_id then
48✔
1249
                        if is_array(t) then
96✔
1250
                                error("Task from array creation is not implemented yet", 2)
×
1251
                        else
1252
                                -- pass
1253
                        end
1254
                end
1255
        elseif type(t) == 'cdata' and ffi.typeof(t) == tuple_ctype then
×
1256
                if xq.gen_id then
×
1257
                        error("Can't use id generation with tuple.", 2)
×
1258
                end
1259
                error("Task from tuple creation is not implemented yet", 2)
×
1260
        else
1261
                error("Wrong argument to put. Expected table or tuple", 2)
×
1262
        end
1263
        -- here we have table with keys
1264
        if xq.gen_id then
48✔
1265
                t[ xq.key.name ] = xq.gen_id()
96✔
1266
        else
1267
                if not t[ xq.key.name ] then
×
1268
                        error("Primary key is mandatory",2)
×
1269
                end
1270
        end
1271

1272
        -- delayed or ttl or default ttl
1273
        if opts.delay then
48✔
1274
                if not xq.features.delayed then
3✔
1275
                        error("Feature delayed is not enabled",2)
×
1276
                end
1277
                if opts.ttl then
3✔
1278
                        error("Features ttl and delay are mutually exclusive",2)
×
1279
                end
1280
                t[ xq.fieldmap.status ] = 'W'
3✔
1281
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.delay)
6✔
1282

1283
                if opts.wait then
3✔
1284
                        error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
×
1285
                end
1286
        elseif opts.ttl then
45✔
1287
                if not xq.features.ttl then
×
1288
                        error("Feature ttl is not enabled",2)
×
1289
                end
1290
                t[ xq.fieldmap.status ] = 'R'
×
1291
                t[ xq.fieldmap.runat ]  = xq.timeoffset(opts.ttl)
×
1292
        elseif xq.features.ttl_default then
45✔
1293
                t[ xq.fieldmap.status ] = 'R'
×
1294
                t[ xq.fieldmap.runat ]  = xq.timeoffset(xq.features.ttl_default)
×
1295
        elseif xq.have_runat then
45✔
1296
                t[ xq.fieldmap.status ] = 'R'
39✔
1297
                t[ xq.fieldmap.runat ]  = xq.NEVER
39✔
1298
        else
1299
                t[ xq.fieldmap.status ] = 'R'
6✔
1300
        end
1301

1302
        local tuple = xq.tuple(t)
48✔
1303
        local key = tuple[ xq.key.no ]
48✔
1304

1305
        local wait
1306
        if opts.wait then
48✔
1307
                wait = { cond = fiber.cond() }
×
1308
                xq.put_wait[xq:packkey(key)] = wait
×
1309
        end
1310

1311
        xq:atomic(key, function()
96✔
1312
                t = self:insert(tuple)
144✔
1313
        end)
1314
        xq:wakeup(t)
48✔
1315

1316
        if wait then
48✔
1317
                -- local func notify_producer will send to us some data
1318
                local ok = wait.cond:wait(opts.wait)
×
1319
                fiber.testcancel()
×
1320
                if ok and wait.task then
×
1321
                        return xq.retwrap(wait.task), wait.processed
×
1322
                end
1323
        end
1324
        return xq.retwrap(t)
48✔
1325
end
1326

1327
--[[
1328
* `space:wait(id, timeout)`
1329
        - `id`:
1330
                + `string` | `number` - primary key
1331
        - `timeout` - number of seconds to wait
1332
                * callee fiber will be blocked up to `timeout` seconds until task won't
1333
                be processed or timeout reached.
1334
]]
1335

1336
local wait_for = {
3✔
1337
        R = true,
1338
        T = true,
1339
        W = true,
1340
}
1341

1342
---@param key table|scalar|box.tuple
1343
---@param timeout number
1344
---@return table|box.tuple, boolean? has_been_processed
1345
function methods:wait(key, timeout)
3✔
1346
        local xq = self.xq
3✔
1347
        key = xq:getkey(key)
6✔
1348
        local task = self:get(key)
6✔
1349
        if not task then
3✔
1350
                error(("Task {%s} was not found"):format(key))
×
1351
        end
1352

1353
        local status = task[xq.fields.status]
3✔
1354
        if not wait_for[status] then
3✔
1355
                return xq.retwrap(task), false
×
1356
        end
1357

1358
        local pkey = xq:packkey(key)
3✔
1359
        local wait = xq.put_wait[pkey]
3✔
1360
        if not wait then
3✔
1361
                wait = { cond = fiber.cond() }
3✔
1362
                xq.put_wait[pkey] = wait
3✔
1363
        end
1364

1365
        -- local func notify_producer will send to us some data
1366
        local ok = wait.cond:wait(timeout)
3✔
1367
        fiber.testcancel()
3✔
1368
        if ok and wait.task then
3✔
1369
                return xq.retwrap(wait.task), wait.processed
6✔
1370
        end
1371

1372
        return xq.retwrap(task), false
×
1373
end
1374

1375
--[[
1376
* `space:take(timeout)`
1377
* `space:take(timeout, opts)`
1378
* `space:take(opts)`
1379
        - `timeout` - number of seconds to wait for new task
1380
                + choose reasonable time
1381
                + beware of **readahead** size (see tarantool docs)
1382
        - `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
1383
        - returns task tuple or table (see retval) or nothing on timeout
1384
        - *TODO*: ttr must be there
1385
]]
1386

1387
---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? }
1388
---@param opts? { timeout: number?, ttr: number?, tube: string? }
1389
---@return table|box.tuple?
1390
function methods:take(timeout, opts)
3✔
1391
        local xq = self.xq
120✔
1392
        timeout = timeout or 0
120✔
1393

1394
        if type(timeout) == 'table' then
120✔
1395
                opts = timeout
81✔
1396
                timeout = opts.timeout or 0
81✔
1397
        else
1398
                opts = opts or {}
39✔
1399
        end
1400
        assert(timeout >= 0, "timeout required")
120✔
1401

1402
        local ttr
1403
        if opts.ttr then
120✔
1404
                if not xq.features.ttr then
×
1405
                        error("Feature ttr is not enabled",2)
×
1406
                end
1407
                ttr = opts.ttr
×
1408
        elseif xq.features.ttr_default then
120✔
1409
                ttr = xq.features.ttr_default
×
1410
        end
1411

1412
        local index
1413
        local start_with
1414

1415
        local tube_chan
1416
        if opts.tube then
120✔
1417
                if not xq.features.tube then
60✔
1418
                        error("Feature tube is not enabled", 2)
×
1419
                end
1420

1421
                assert(type(opts.tube) == 'string', "opts.tube must be a string")
60✔
1422

1423
                index = xq.tube_index
60✔
1424
                start_with = {opts.tube, 'R'}
60✔
1425
                tube_chan = xq.take_chans[opts.tube] or fiber.channel()
60✔
1426
                xq.take_chans[opts.tube] = tube_chan
60✔
1427
        else
1428
                index = xq.index
60✔
1429
                start_with = {'R'}
60✔
1430
        end
1431
        ---@cast index -nil
1432

1433
        local now = fiber.time()
120✔
1434
        local key
1435
        local found
1436
        while not found do
249✔
1437
                for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
513✔
1438
                        key = t[ xq.key.no ]
78✔
1439
                        if not xq._lock[ key ] then
78✔
1440
                                -- found key
1441
                                xq._lock[ key ] = true
78✔
1442
                                found = t
78✔
1443
                                break
78✔
1444
                        end
1445
                end
1446
                if not found then
171✔
1447
                        local left = (now + timeout) - fiber.time()
186✔
1448
                        if left <= 0 then goto finish end
93✔
1449

1450
                        (tube_chan or xq.take_wait):get(left)
51✔
1451
                        if box.session.storage.destroyed then goto finish end
102✔
1452
                end
1453
        end
1454
        ::finish::
×
1455

1456
        -- If we were last reader from the tube
1457
        -- we remove channel. Writers will get false
1458
        -- on :put if they exists.
1459
        if tube_chan and not tube_chan:has_readers() then
120✔
1460
                tube_chan:close()
60✔
1461
                xq.take_chans[opts.tube] = nil
60✔
1462
        end
1463
        if not found then return end
120✔
1464

1465
        local r,e = pcall(function()
156✔
1466
                local sid = box.session.id()
78✔
1467
                local peer = box.session.storage.peer
156✔
1468

1469
                -- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
1470
                if xq.debug then
78✔
1471
                        log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
27✔
1472
                end
1473

1474
                local update = {
78✔
1475
                        { '=', xq.fields.status, 'T' },
78✔
1476
                }
1477
                -- TODO: update more fields
1478

1479
                if ttr then
78✔
1480
                        table.insert(update,{ '=', xq.fields.runat, xq.timeoffset(ttr)})
×
1481
                        xq.runat_chan:put(true,0)
×
1482
                end
1483
                local t = self:update({key},update)
156✔
1484

1485
                if not xq.bysid[ sid ] then xq.bysid[ sid ] = {} end
78✔
1486
                xq.taken[ key ] = sid
78✔
1487
                xq.bysid[ sid ][ key ] = key
78✔
1488

1489
                return t
78✔
1490
        end)
1491

1492
        xq._lock[ key ] = nil
78✔
1493
        if not r then
78✔
1494
                error(e)
×
1495
        end
1496
        return xq.retwrap(e)
78✔
1497
end
1498

1499
--[[
1500
* `space:release(id, [attr])`
1501
        - `id`:
1502
                + `string` | `number` - primary key
1503
                + *TODO*: `tuple` - key will be extracted using index
1504
                + *TODO*: composite pk
1505
        - `attr`
1506
                + `update` - table for update, like in space:update
1507
                + `ttl` - timeout for time to live
1508
                + `delay` - number of seconds
1509
                        * if set, task will become `W` instead of `R` for `delay` seconds
1510
]]
1511

1512
---@param key table|scalar|box.tuple
1513
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1514
---@return table|box.tuple
1515
function methods:release(key, attr)
3✔
1516
        local xq = self.xq
39✔
1517
        key = xq:getkey(key)
78✔
1518

1519
        attr = attr or {}
39✔
1520

1521
        local t = xq:check_owner(key)
39✔
1522
        local old = t[ xq.fields.status ]
39✔
1523
        local update = {}
39✔
1524
        if attr.update then
39✔
1525
                for _,v in pairs(attr.update) do table.insert(update,v) end
8✔
1526
        end
1527

1528
        -- delayed or ttl or default ttl
1529
        if attr.delay then
39✔
1530
                if not xq.features.delayed then
18✔
1531
                        error("Feature delayed is not enabled",2)
×
1532
                end
1533
                if attr.ttl then
18✔
1534
                        error("Features ttl and delay are mutually exclusive",2)
×
1535
                end
1536
                table.insert(update, { '=', xq.fields.status, 'W' })
18✔
1537
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
36✔
1538
        elseif attr.ttl then
21✔
1539
                if not xq.features.ttl then
×
1540
                        error("Feature ttl is not enabled",2)
×
1541
                end
1542
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1543
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.ttl) })
×
1544
        elseif xq.features.ttl_default then
21✔
1545
                table.insert(update, { '=', xq.fields.status, 'R' })
×
1546
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.ttl_default) })
×
1547
        elseif xq.have_runat then
21✔
1548
                table.insert(update, { '=', xq.fields.status, 'R' })
18✔
1549
                table.insert(update, { '=', xq.fields.runat, xq.NEVER })
18✔
1550
        else
1551
                table.insert(update, { '=', xq.fields.status, 'R' })
3✔
1552
        end
1553

1554
        xq:atomic(key,function()
78✔
1555
                t = self:update({key}, update)
117✔
1556

1557
                ---@cast t box.tuple
1558
                xq:wakeup(t)
39✔
1559
                if xq.have_runat then
39✔
1560
                        xq.runat_chan:put(true,0)
36✔
1561
                end
1562

1563
                log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
78✔
1564
                        key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
78✔
1565
        end)
1566

1567
        xq:putback(t)
39✔
1568

1569
        return t
39✔
1570
end
1571

1572
---@param key table|scalar|box.tuple
1573
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1574
---@return table|box.tuple
1575
function methods:ack(key, attr)
3✔
1576
        -- features.zombie
1577
        -- features.keep
1578
        local xq = self.xq
33✔
1579
        key = xq:getkey(key)
66✔
1580

1581
        attr = attr or {}
33✔
1582

1583
        local t = xq:check_owner(key)
33✔
1584
        local old = t[ xq.fields.status ]
33✔
1585
        local delete = false
33✔
1586
        local update = {}
33✔
1587
        if attr.update then
33✔
1588
                for _,v in pairs(attr.update) do table.insert(update,v) end
16✔
1589
        end
1590

1591
        -- delayed or ttl or default ttl
1592
        if attr.delay then
33✔
1593
                if not xq.features.zombie then
3✔
1594
                        error("Feature zombie is not enabled",2)
×
1595
                end
1596
                table.insert(update, { '=', xq.fields.status, 'Z' })
3✔
1597
                table.insert(update, { '=', xq.fields.runat, xq.timeoffset(attr.delay) })
6✔
1598
        elseif xq.features.zombie then
30✔
1599
                table.insert(update, { '=', xq.fields.status, 'Z' })
6✔
1600
                if xq.features.zombie_delay then
6✔
1601
                        table.insert(update, { '=', xq.fields.runat, xq.timeoffset(xq.features.zombie_delay) })
×
1602
                end
1603
        elseif xq.features.keep then
24✔
1604
                table.insert(update, { '=', xq.fields.status, 'D' })
×
1605
        else
1606
                -- do remove task
1607
                delete = true
24✔
1608
        end
1609

1610
        xq:atomic(key,function()
66✔
1611
                if #update > 0 then
33✔
1612
                        t = self:update({key}, update)
36✔
1613
                        ---@cast t box.tuple
1614
                        xq:wakeup(t)
12✔
1615
                        if xq.have_runat then
12✔
1616
                                xq.runat_chan:put(true,0)
12✔
1617
                        end
1618
                        log.info("Ack: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
24✔
1619
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
24✔
1620
                end
1621

1622
                if delete then
33✔
1623
                        t = self:delete{key}
72✔
1624
                        log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
48✔
1625
                                key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
48✔
1626
                end
1627
        end)
1628

1629
        xq:putback(t) -- in real drop form taken key
33✔
1630

1631
        return t
33✔
1632
end
1633

1634
---@param key table|scalar|box.tuple
1635
---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] }
1636
function methods:bury(key, attr)
3✔
1637
        attr = attr or {}
3✔
1638

1639
        local xq = self.xq
3✔
1640
        key = xq:getkey(key)
6✔
1641
        local t = xq:check_owner(key)
3✔
1642

1643
        local update = {}
3✔
1644
        if attr.update then
3✔
1645
                for _,v in pairs(attr.update) do table.insert(update,v) end
×
1646
        end
1647
        table.insert(update, { '=', xq.fields.status, 'B' })
3✔
1648

1649
        xq:atomic(key,function()
6✔
1650
                t = self:update({key}, update)
9✔
1651

1652
                xq:wakeup(t)
3✔
1653
                if xq.have_runat then
3✔
1654
                        xq.runat_chan:put(true,0)
×
1655
                end
1656
                log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())
6✔
1657
        end)
1658

1659
        xq:putback(t)
3✔
1660
end
1661

1662
local function kick_task(self, key, attr)
1663
        local xq   = self.xq
3✔
1664
        key  = xq:getkey(key)
6✔
1665
        local peer = box.session.storage.peer
6✔
1666
        local sid  = box.session.id()
3✔
1667
        attr       = attr or {}
3✔
1668

1669
        if xq.debug then
3✔
1670
                log.info("Kick {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
3✔
1671
        end
1672

1673
        self:update({key}, {{ '=', xq.fields.status, 'R' }})
6✔
1674
        xq:putback(key, attr)
3✔
1675
end
1676

1677
function methods:kick(nr_tasks_or_task, attr)
3✔
1678
        attr = attr or {}
3✔
1679
        if type(nr_tasks_or_task) == 'number' then
3✔
1680
                local task_n = 1
×
1681
                for _, t in self.xq.index:pairs({'B'},{ iterator = box.index.EQ }) do
×
1682
                        if task_n > nr_tasks_or_task then break end
×
1683
                        kick_task(self, t, attr)
×
1684
                        if task_n % 500 == 0 then fiber.sleep(0) end
×
1685
                        task_n = task_n + 1
×
1686
                end
1687
        else
1688
                kick_task(self, nr_tasks_or_task, attr)
3✔
1689
        end
1690
end
1691

1692
---@param key table|scalar|box.tuple
1693
---@return box.tuple|table
1694
function methods:kill(key)
3✔
1695
        local xq     = self.xq
×
NEW
1696
        key = xq:getkey(key)
×
1697
        local task   = self:get(key)
×
NEW
1698
        if not task then
×
NEW
1699
                error(("Task by %s not found to kill"):format(key))
×
1700
        end
1701
        local peer   = box.session.storage.peer
×
1702

1703
        if xq.debug then
×
1704
                log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id())
×
1705
        end
1706

NEW
1707
        task = self:delete(key)
×
1708
        ---@cast task -nil
1709
        ---Kill is treated as a synonim of Bury
NEW
1710
        task = task:update({{ '=', xq.fields.status, 'B' }})
×
NEW
1711
        xq:putback(task)
×
NEW
1712
        return xq.retwrap(task)
×
1713
end
1714

1715
-- special remap of truncate for deliting stats and saving methods
1716
function methods:truncate()
3✔
1717
        local stat = self.xq._stat
9✔
1718
        for status, _ in pairs(stat.counts) do
42✔
1719
                stat.counts[status] = 0LL
27✔
1720
        end
1721
        for transition, _ in pairs(stat.transition) do
72✔
1722
                stat.transition[transition] = nil
57✔
1723
        end
1724
        local ret = self.xq._default_truncate(self)
9✔
1725
        local meta = debug.getmetatable(self)
9✔
1726
        for k,v in pairs(methods) do meta[k] = v end
114✔
1727
        -- Now we reset our methods after truncation because
1728
        -- as we can see in on_replace_dd_truncate:
1729
        -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239
1730
        -- tarantool deletes space and restores it with the same indexes
1731
        -- but without methods
1732
        return ret
9✔
1733
end
1734

1735
local pretty_st = {
3✔
1736
        R = "Ready",
1737
        T = "Taken",
1738
        W = "Waiting",
1739
        B = "Buried",
1740
        Z = "Zombie",
1741
        D = "Done",
1742
}
1743

1744
local shortmap = { __serialize = 'map' }
3✔
1745

1746
---@param pretty? boolean
1747
function methods:stats(pretty)
3✔
1748
        local stats = table.deepcopy(self.xq._stat)
3✔
1749
                for s, ps in pairs(pretty_st) do
23✔
1750
                        if pretty then
18✔
1751
                                stats.counts[ps] = stats.counts[s] or 0LL
×
1752
                                stats.counts[s]  = nil
×
NEW
1753
                                for _, tube_stat in pairs(stats.tube) do
×
NEW
1754
                                        tube_stat.counts[ps] = tube_stat.counts[s] or 0LL
×
NEW
1755
                                        tube_stat.counts[s] = nil
×
1756
                                end
1757
                        else
1758
                                stats.counts[s] = stats.counts[s] or 0LL
18✔
1759
                                for _, tube_stat in pairs(stats.tube) do
30✔
NEW
1760
                                        setmetatable(tube_stat.counts, shortmap)
×
NEW
1761
                                        tube_stat.counts[s] = tube_stat.counts[s] or 0LL
×
1762
                                end
1763
                        end
1764
                end
1765
                setmetatable(stats.counts, shortmap)
3✔
1766
        return stats
3✔
1767
end
1768

1769
setmetatable(M,{
6✔
1770
        __call = function(_, space, opts)
1771
                M.upgrade(space,opts,1)
×
1772
        end
1773
})
1774

1775
return M
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc