• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

tarantool / expirationd / 17674387513

12 Sep 2025 11:25AM UTC coverage: 84.706% (-9.6%) from 94.344%
17674387513

Pull #173

github

vakhov
ci: update GitHub Actions workflows (runners, fixes, stability)
Pull Request #173: Sync with EE: selected fixes and improvements

40 of 85 new or added lines in 2 files covered. (47.06%)

8 existing lines in 1 file now uncovered.

432 of 510 relevant lines covered (84.71%)

881.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.65
/expirationd/init.lua
1
--- expirationd - data expiration with custom quirks.
2
--
3
-- @module expirationd
4

5
-- ========================================================================= --
6
-- local support functions
7
-- ========================================================================= --
8

9
local checks = require("checks")
12✔
10
local fun = require("fun")
12✔
11
local log = require("log")
12✔
12
local fiber = require("fiber")
12✔
13
local is_metrics_package, metrics = pcall(require, "metrics")
12✔
14
local is_hotreload_package, hotreload = pcall(require, "cartridge.hotreload")
12✔
15
local strategy_lifetime_all = require("expirationd.strategy.lifetime_all")
12✔
16

17
-- get fiber id function
18
local function get_fiber_id(fiber)
19
    local fid = 0
4,678✔
20
    if fiber ~= nil and fiber:status() ~= "dead" then
4,678✔
21
        fid = fiber:id()
1,891✔
22
    end
23
    return fid
4,678✔
24
end
25

26
-- This method adds an issue to the cluster instance that is displayed in the admin panel
27
-- and metrics. The method calls a global function that must be declared in the role module.
28
-- If the function is not declared, then instead of an issue, a warning is written to the log.
29
local function add_issue(task, message, ...)
30
    local message_tpl = 'Expirationd warning, task "%s": ' .. message
942✔
31
    local result_msg = message_tpl:format(task.name, ...)
942✔
32
    log.warn(result_msg)
942✔
33
    task.alert = result_msg
942✔
34

35
    if rawget(_G, 'expirationd_enable_issue') ~= nil then
942✔
NEW
36
        _G.expirationd_enable_issue(task.name, result_msg)
×
37
    end
38
end
39

40
-- This method removes a previously added issue from the cluster instance, which is displayed in
41
-- the admin panel and metrics. The method calls a global function that must be declared in the
42
-- role module. If the function is not declared, then nothing is called.
43
local function remove_issue(task)
44
    task.alert = nil
5,612✔
45

46
    if rawget(_G, 'expirationd_disable_issue') ~= nil then
5,612✔
NEW
47
        _G.expirationd_disable_issue(task.name)
×
48
    end
49
end
50

51
local stash_names = {
12✔
52
    cfg = '__expirationd_cfg',
53
    metrics_stats = '__expirationd_metrics_stats',
54
    task_continue = '__expirationd_task_continue',
55
}
56

57
if is_hotreload_package then
12✔
UNCOV
58
    for _, name in pairs(stash_names) do
×
UNCOV
59
        hotreload.whitelist_globals({ name })
×
60
    end
61
end
62

63
-- get a stash instance, initialize if needed
64
local function stash_get(name)
65
    local instance = rawget(_G, name) or {}
36✔
66
    rawset(_G, name, instance)
36✔
67
    return instance
36✔
68
end
69

70
local task_list = {}
12✔
71
local task_continue = stash_get(stash_names.task_continue)
12✔
72
local cfg = stash_get(stash_names.cfg)
12✔
73
if cfg.metrics == nil then
12✔
74
    cfg.metrics = true
3✔
75
end
76
local metrics_stats = stash_get(stash_names.metrics_stats)
12✔
77

78
local constants = {
12✔
79
    -- default value of number of tuples that will be checked by one iteration
80
    default_tuples_per_iteration = 1024,
81
    -- default value of time required for full index scan (in seconds)
82
    default_full_scan_time = 3600,
83
    -- maximal worker delay (seconds)
84
    max_delay = 1,
85
    -- check worker interval
86
    check_interval = 1,
87
    -- force expirationd, even if started on replica (false by default)
88
    force = false,
89
    -- assumed size of vinyl space (in the first iteration)
90
    default_vinyl_assumed_space_len = math.pow(10, 7),
12✔
91
    -- factor for recalculation of vinyl space size
92
    default_vinyl_assumed_space_len_factor = 2,
93
    -- default function on full scan
94
    default_on_full_scan = function() end,
1,782✔
95
    -- default function for start_key
96
    start_key = function() return nil end,
4,670✔
97
    -- default function for process_while
98
    process_while = function() return true end,
2,051✔
99
    -- default iterating over the loop will go in ascending index
100
    iterator_type = "ALL",
101
    -- default atomic_iteration is false, batch of items doesn't include in one transaction
102
    atomic_iteration = false,
103
}
104

105
local function is_metrics_v_0_11_installed()
106
    if not is_metrics_package or metrics.unregister_callback == nil then
42✔
107
        return false
×
108
    end
109
    local counter = require('metrics.collectors.counter')
42✔
110
    return counter.remove and true or false
42✔
111
end
112

113
local function metrics_enable()
114
    if not is_metrics_v_0_11_installed() then
84✔
115
        error("metrics >= 0.11.0 is required", 3)
×
116
    end
117

118
    -- Workaround for a cartridge role reload:
119
    --
120
    -- Metrics package does not lose observation after a cartridge reload since
121
    -- 0.13.0. expirationd does not automatically restart tasks after the
122
    -- cartridge reload. So, we are acting here as if all expirationd tasks
123
    -- have been killed: reset all collectors and the callback.
124
    if metrics_stats.callback then
42✔
125
        metrics.unregister_callback(metrics_stats.callback)
9✔
126
    end
127
    metrics_stats.callback = nil
42✔
128
    if metrics_stats.collectors then
42✔
129
        for _, c in pairs(metrics_stats.collectors) do
54✔
130
            metrics.registry:unregister(c.collector)
36✔
131
        end
132
    end
133
    metrics_stats.collectors = nil
42✔
134

135
    local create_collector = function(name, description)
136
        return {
168✔
137
            collector = metrics.counter(name, description),
336✔
138
            task_value = {},
168✔
139
        }
168✔
140
    end
141

142
    metrics_stats.collectors = {
42✔
143
        ["checked_count"] = create_collector(
84✔
144
            "expirationd_checked_count",
42✔
145
            "expirationd task's a number of checked tuples"
146
        ),
84✔
147
        ["expired_count"] = create_collector(
84✔
148
            "expirationd_expired_count",
42✔
149
            "expirationd task's a number of expired tuples"
150
        ),
84✔
151
        ["restarts"] = create_collector(
84✔
152
            "expirationd_restarts",
42✔
153
            "expirationd task's a number of restarts"
154
        ),
84✔
155
        ["working_time"] = create_collector(
84✔
156
            "expirationd_working_time",
42✔
157
            "expirationd task's operation time"
158
        ),
84✔
159
    }
42✔
160

161
    local callback = function()
162
        for task_name, task in pairs(task_list) do
117✔
163
            local stats = task:statistics()
39✔
164
            for k, v in pairs(stats) do
234✔
165
                local prev_v = metrics_stats.collectors[k].task_value[task_name] or 0
156✔
166
                local v_inc = v - prev_v
156✔
167
                metrics_stats.collectors[k].collector:inc(v_inc, {name = task_name})
156✔
168
                metrics_stats.collectors[k].task_value[task_name] = v
156✔
169
            end
170
        end
171
    end
172
    metrics.register_callback(callback)
42✔
173
    metrics_stats.callback = callback
42✔
174
end
175

176
local function metrics_disable()
177
    for _, c in pairs(metrics_stats.collectors) do
180✔
178
        metrics.registry:unregister(c.collector)
120✔
179
    end
180
    metrics_stats.collectors = nil
30✔
181
    metrics.unregister_callback(metrics_stats.callback)
30✔
182
    metrics_stats.callback = nil
30✔
183
end
184

185
if cfg.metrics then
12✔
186
    local enabled, _ = pcall(metrics_enable)
12✔
187
    cfg.metrics = enabled
12✔
188
end
189

190
-- ========================================================================= --
191
-- Task local functions
192
-- ========================================================================= --
193

194
-- ------------------------------------------------------------------------- --
195
-- Task fibers
196
-- ------------------------------------------------------------------------- --
197
local function check_space_and_index_exist(task)
198
    local space = box.space[task.space]
1,786✔
199
    if space == nil then
1,786✔
200
        local prefix = "Space with " ..
21✔
201
                       (type(task.space) == "string" and "name " or "id ")
21✔
202
        return false, prefix .. task.space .. " does not exist"
21✔
203
    end
204

205
    local index = space.index[task.index]
1,765✔
206
    if index == nil then
1,765✔
207
        local prefix = "Index with " ..
15✔
208
                       (type(task.index) == "string" and "name " or "id ")
15✔
209
        return false, prefix .. task.index .. " does not exist"
15✔
210
    end
211

212
    return true
1,750✔
213
end
214

215
local function load_function(func_name)
NEW
216
    if func_name == nil or type(func_name) ~= 'string' then
×
NEW
217
        return nil
×
218
    end
219

NEW
220
    local func = rawget(_G, func_name)
×
NEW
221
    if func ~= nil then
×
NEW
222
        if type(func) ~= 'function' then
×
NEW
223
            return nil
×
224
        end
225

NEW
226
        return func
×
NEW
227
    elseif box.schema.func.exists(func_name) then
×
228
        return function(...)
NEW
229
            return box.func[func_name]:call({...})
×
230
        end
231
    else
NEW
232
        return nil
×
233
    end
234
end
235

236
local function load_functions(task)
237
    local func_list = {
875✔
238
        'is_tuple_expired',
239
        'iterate_with',
240
        'on_full_scan_complete',
241
        'on_full_scan_error',
242
        'on_full_scan_start',
243
        'on_full_scan_success',
244
        'process_expired_tuple',
245
        'process_while',
246
    }
247

248
    local result = {}
875✔
249
    for _, option_name in ipairs(func_list) do
7,875✔
250
        if task[option_name] and type(task[option_name]) == 'string' then
7,000✔
NEW
251
            result[option_name] = load_function(task[option_name])
×
NEW
252
            if result[option_name] == nil then
×
NEW
253
                return false, ('Function "%s" (for option "%s") -- not loaded'):format(task[option_name], option_name)
×
254
            end
255
        end
256
    end
257

258
    for option_name, func_ptr in pairs(result) do
1,750✔
NEW
259
        task[option_name] = func_ptr
×
260
    end
261

262
    return true
875✔
263
end
264

265
local function check_is_tuple_expired(task)
266
    if task.is_tuple_expired == nil then
875✔
NEW
267
        local ok, result = pcall(
×
NEW
268
                strategy_lifetime_all.get_method_is_tuple_expired,
×
NEW
269
                task.space,
×
NEW
270
                task.args.time_create_field
×
271
        )
NEW
272
        if ok then
×
NEW
273
            task.is_tuple_expired = result
×
274
        else
NEW
275
            return false, result
×
276
        end
277
    end
278

279
    return true
875✔
280
end
281

282
local function check_space_and_index(task)
283
    local ok, err = check_space_and_index_exist(task)
875✔
284
    if not ok then
875✔
UNCOV
285
        return false, err
×
286
    end
287

288
    local space = box.space[task.space]
875✔
289
    local index = space.index[task.index]
875✔
290
    if index.type ~= "TREE" and index.type ~= "HASH" then
875✔
291
        return false, "Not supported index type, expected TREE or HASH"
3✔
292
    end
293

294
    if space.engine == "memtx" and index.func ~= nil then
872✔
295
        local version = rawget(_G, "_TARANTOOL"):split('-', 1)[1]
6✔
296
        local major_minor_patch = version:split('.', 2)
3✔
297

298
        local major = tonumber(major_minor_patch[1])
3✔
299
        local minor = tonumber(major_minor_patch[2])
3✔
300
        local patch = tonumber(major_minor_patch[3])
3✔
301
        -- https://github.com/tarantool/expirationd/issues/101
302
        -- fixed since 2.8.4 and 2.10
303
        local supported = (major == 2 and minor == 8 and patch >= 4) or
3✔
304
            (major == 2 and minor >= 10) or
3✔
305
            (major >= 3)
3✔
306
        local force_allow = task.force_allow_functional_index or false
3✔
307
        if not supported and not force_allow then
3✔
308
            return false, "Functional indices are not supported for" ..
×
309
                          " Tarantool < 2.8.4, see" ..
×
310
                          " options.force_allow_functional_index"
311
        end
312
    end
313

314
    local ok, err = pcall(function()
1,744✔
315
        index:pairs(task.start_key(), {iterator = task.iterator_type})
1,744✔
316
    end)
317
    if not ok then
872✔
318
        return false, err
46✔
319
    end
320

321
    return true, nil
826✔
322
end
323

324
-- get all fields in key(composite possible) from a tuple
325
local function construct_key(space, index, tuple)
326
    return fun.map(
2,678✔
327
        function(x) return tuple[x.fieldno] end,
4,017✔
328
        box.space[space].index[index].parts
1,339✔
329
    ):totable()
2,678✔
330
end
331

332
-- do expiration process on tuple
333
local function expiration_process(task, tuple)
334
    task.checked_tuples_count = task.checked_tuples_count + 1
2,039✔
335
    if task.is_tuple_expired(task.args, tuple) then
4,033✔
336
        task.expired_tuples_count = task.expired_tuples_count + 1
1,859✔
337
        task.process_expired_tuple(task.space, task.args, tuple, task)
1,859✔
338
    end
339
end
340

341
-- yield for some time
342
local function suspend_basic(task, len)
343
    local delay = (task.tuples_per_iteration * task.full_scan_time)
39✔
344
    delay = math.min(delay / len, task.iteration_delay)
39✔
345
    fiber.sleep(delay)
39✔
346
end
347

348
local function suspend(task)
349
    -- Return the number of tuples in the space
350
    local index = box.space[task.space].index[task.index]
30✔
351
    local space_len = index:len()
30✔
352
    if space_len > 0 then
30✔
353
        suspend_basic(task, space_len)
24✔
354
    end
355
end
356

357
local function default_do_worker_iteration(task)
358
    -- full index scan loop
359
    local space_len = task.vinyl_assumed_space_len
4,029✔
360
    local checked_tuples_count = 0
4,029✔
361
    local vinyl_checked_tuples_count = 0
4,029✔
362
    if task.atomic_iteration then
4,029✔
363
        -- Check before starting the transaction,
364
        -- since a transaction can be long.
365
        if task.worker_cancelled then
59✔
366
            return true
24✔
367
        end
368
        box.begin()
35✔
369
    end
370
    for _, tuple in task:iterate_with() do
15,924✔
371
        checked_tuples_count = checked_tuples_count + 1
2,039✔
372
        vinyl_checked_tuples_count = vinyl_checked_tuples_count + 1
2,039✔
373
        expiration_process(task, tuple)
2,039✔
374
        -- find out if the worker can go to sleep
375
        -- if the batch is full
376
        if checked_tuples_count >= task.tuples_per_iteration then
1,989✔
377
            if task.atomic_iteration then
45✔
378
                box.commit()
15✔
379
                -- The suspend functions can be long.
380
                if task.worker_cancelled then
15✔
381
                    return true
×
382
                end
383
            end
384
            checked_tuples_count = 0
45✔
385
            if box.space[task.space].engine == "vinyl" then
45✔
386
                if vinyl_checked_tuples_count > space_len then
15✔
387
                    space_len = task.vinyl_assumed_space_len_factor * space_len
6✔
388
                end
389
                suspend_basic(task, space_len)
27✔
390
            else
391
                suspend(task)
30✔
392
            end
393
            if task.atomic_iteration then
42✔
394
                -- Check before starting the transaction,
395
                -- since a transaction can be long.
396
                if task.worker_cancelled then
15✔
397
                    return true
6✔
398
                end
399
                box.begin()
9✔
400
            end
401
        end
402
    end
403
    if task.atomic_iteration then
3,919✔
404
        box.commit()
24✔
405
    end
406
    if box.space[task.space].engine == "vinyl" then
3,919✔
407
        task.vinyl_assumed_space_len = vinyl_checked_tuples_count
297✔
408
    end
409
end
410

411
local function worker_loop(task)
412
    -- detach worker from the guardian and attach it to sched fiber
413
    fiber.name(string.format("worker of %q", task.name), { truncate = true })
826✔
414

415
    -- https://www.tarantool.io/en/doc/latest/reference/configuration/#confval-read_only
416
    local space = box.space[task.space]
826✔
417
    local is_ro_writable = space.temporary or space.is_local
826✔
418
    local is_skipped = false
826✔
419
    while true do
420
        local box_info = box.info
4,035✔
421
        if not box_info.ro or is_ro_writable or task.force then
4,035✔
422
            if is_skipped then
4,029✔
423
                -- Wait for maximum upstream lag * 2 if we became a master.
424
                -- It helps to don't hit a conflict due to parallel processing
425
                -- of remaining transactions from the old master and from the
426
                -- new master.
427
                local max_lag = 0
3✔
428
                for i = 1, table.maxn(box_info.replication) do
6✔
429
                    local r = box_info.replication[i]
3✔
430
                    if r and r.upstream and max_lag < r.upstream.lag then
3✔
431
                        max_lag = r.upstream.lag
×
432
                    end
433
                end
434
                if max_lag > 0 then
3✔
435
                    fiber.sleep(max_lag * 2)
×
436
                end
437
            end
438
            is_skipped = false
4,029✔
439

440
            task.on_full_scan_start(task)
4,029✔
441
            local state, err = pcall(task.do_worker_iteration, task)
4,029✔
442
            -- Following functions are on_full_scan*,
443
            -- but we probably did not complete the full scan,
444
            -- so we should check for cancellation here.
445
            if task.worker_cancelled then
4,029✔
446
                fiber.self():cancel()
90✔
447
            end
448
            if state then
3,939✔
449
                task.on_full_scan_success(task)
7,760✔
450
            else
451
                task.on_full_scan_error(task, err)
59✔
452
            end
453

454
            task.on_full_scan_complete(task)
3,936✔
455
            if not state then
3,936✔
456
                box.rollback()
56✔
457
                error(err)
56✔
458
            end
459
        else
460
            is_skipped = true
6✔
461
        end
462

463
        -- If we do not check the fiber for cancellation,
464
        -- then the fiber may fall asleep for a long time, depending on `full_scan_delay`.
465
        -- And a fiber that wants to stop this task can also freeze, a kind of deadlock.
466
        if task.worker_cancelled then
3,886✔
467
            fiber.self():cancel()
6✔
468
        end
469
        -- Full scan iteration is complete, yield
470
        remove_issue(task)
3,880✔
471
        fiber.sleep(task.full_scan_delay)
3,880✔
472
    end
473
end
474

475
local function run_worker_loop(task)
476
    local ok, err
477

478
    ok, err = check_space_and_index_exist(task)
1,822✔
479
    if not ok then
911✔
480
        add_issue(task, err)
36✔
481
        return
36✔
482
    end
483

484
    ok, err = load_functions(task)
1,750✔
485
    if not ok then
875✔
NEW
486
        add_issue(task, err)
×
NEW
487
        return
×
488
    end
489

490
    ok, err = check_is_tuple_expired(task)
1,750✔
491
    if not ok then
875✔
NEW
492
        add_issue(task, err)
×
NEW
493
        return
×
494
    end
495

496
    ok, err = check_space_and_index(task)
1,750✔
497
    if ok then
875✔
498
        -- create worker fiber
499
        task.worker_fiber = fiber.create(worker_loop, task)
826✔
500

501
        log.info("expirationd: task %q restarted", task.name)
826✔
502
        task.restarts = task.restarts + 1
826✔
503
    else
504
        add_issue(task, err)
49✔
505
    end
506
end
507

508
local function guardian_loop(task)
509
    add_issue(task, 'Task is not running')
857✔
510

511
    -- detach the guardian from the creator and attach it to sched
512
    fiber.name(string.format("guardian of %q", task.name), { truncate = true })
857✔
513

514
    while true do
515
        -- if fiber doesn't exist
516
        if get_fiber_id(task.worker_fiber) == 0 then
2,428✔
517
            run_worker_loop(task)
911✔
518
        end
519
        fiber.sleep(constants.check_interval)
1,214✔
520
    end
521
end
522

523
-- ------------------------------------------------------------------------- --
524
-- Task management
525
-- ------------------------------------------------------------------------- --
526

527
-- {{{ Task instance methods
528

529
--- Task instance methods.
530
--
531
-- NOTE: task object contains a number of properties that available for users.
532
-- However these properties are not a part of expirationd API. Property name
533
-- can be changed or property itself can be removed in future version. Be
534
-- careful!
535
--
536
-- @section Methods
537
--
538
local Task_methods = {
12✔
539
    --- Start a task. It continues processing from a last tuple if the task
540
    --  was previously stopped by @{task.stop} and the task has default
541
    --  `start_key` and `iterate_with` functions.
542
    --
543
    -- @param  self
544
    --     Task instance.
545
    --
546
    -- @return None
547
    --
548
    -- @function task.start
549
    start = function (self)
550
        self:stop()
857✔
551
        self.guardian_fiber = fiber.create(guardian_loop, self)
857✔
552
    end,
553

554
    --- Stop a task.
555
    --
556
    -- @param  self
557
    --     Task instance.
558
    --
559
    -- @return None
560
    --
561
    -- @function task.stop
562
    stop = function (self)
563
        remove_issue(self)
1,732✔
564

565
        if (get_fiber_id(self.guardian_fiber) ~= 0) then
3,464✔
566
            self.guardian_fiber:cancel()
857✔
567
            while self.guardian_fiber:status() ~= "dead" do
1,714✔
568
                fiber.sleep(0.01)
857✔
569
            end
570
            self.guardian_fiber = nil
857✔
571
        end
572
        if (get_fiber_id(self.worker_fiber) ~= 0) then
3,464✔
573
            self.worker_cancelled = true
731✔
574
            if not self.atomic_iteration then
731✔
575
                self.worker_fiber:cancel()
701✔
576
            end
577
            while self.worker_fiber:status() ~= "dead" do
4,161✔
578
                fiber.sleep(0.01)
3,430✔
579
            end
580
            self.worker_fiber = nil
731✔
581
        end
582
    end,
583

584
    --- Restart a task.
585
    --
586
    -- @param  self
587
    --     Task instance.
588
    --
589
    -- @return None
590
    --
591
    -- @function task.restart
592
    restart = function (self)
593
        self:stop()
33✔
594
        task_continue[self.name] = nil
33✔
595
        self:start()
33✔
596
    end,
597

598
    --- Kill a task.
599
    --
600
    -- Stop a task and delete it from list of tasks.
601
    --
602
    -- @param  self
603
    --     Task instance.
604
    --
605
    -- @return None
606
    --
607
    -- @function task.kill
608
    kill = function (self)
609
        self:stop()
821✔
610
        if metrics_stats.collectors then
821✔
611
            for _, c in pairs(metrics_stats.collectors) do
4,890✔
612
                c.collector:remove({name = self.name})
3,260✔
613
                c.task_value[self.name] = nil
3,260✔
614
            end
615
        end
616
        task_list[self.name] = nil
821✔
617
        task_continue[self.name] = nil
821✔
618
    end,
619

620
    --- Get a statistics about a task.
621
    --
622
    -- @param  self
623
    --     Task instance.
624
    --
625
    -- @return Response of the following structure:
626
    --
627
    -- ```
628
    -- {
629
    --     checked_count = number,
630
    --     expired_count = number,
631
    --     restarts = number,
632
    --     working_time = number,
633
    -- }
634
    -- ```
635
    --
636
    -- where:
637
    --
638
    -- `checked_count` is a number of tuples checked for expiration (expired + skipped).
639
    --
640
    -- `expired_count` is a number of expired tuples.
641
    --
642
    -- `restarts` is a number of restarts since start. From the start `restarts` is equal to 1.
643
    --
644
    -- `working_time` is a task's operation time.
645
    --
646
    -- @function task.statistics
647
    statistics = function (self)
648
        return {
132✔
649
            checked_count = self.checked_tuples_count,
132✔
650
            expired_count = self.expired_tuples_count,
132✔
651
            restarts      = self.restarts,
132✔
652
            working_time  = math.floor(fiber.time() - self.start_time),
264✔
653
        }
132✔
654
    end,
655
}
656

657
-- }}} Task instance methods
658

659
--- create new expiration task
660
local function create_task(name)
661
    local task = setmetatable({
1,642✔
662
        name                  = name,
821✔
663
        start_time            = fiber.time(),
1,642✔
664
        guardian_fiber        = nil,
665
        worker_fiber          = nil,
666
        space                 = nil,
667
        expired_tuples_count  = 0,
668
        checked_tuples_count  = 0,
669
        restarts              = 0,
670
        is_tuple_expired      = nil,
671
        process_expired_tuple = nil,
672
        args                  = nil,
673
        index                 = nil,
674
        iterate_with          = nil,
675
        worker_cancelled      = false,
676
        iteration_delay                = constants.max_delay,
821✔
677
        full_scan_delay                = constants.max_delay,
821✔
678
        tuples_per_iteration           = constants.default_tuples_per_iteration,
821✔
679
        full_scan_time                 = constants.default_full_scan_time,
821✔
680
        vinyl_assumed_space_len        = constants.default_vinyl_assumed_space_len,
821✔
681
        vinyl_assumed_space_len_factor = constants.default_vinyl_assumed_space_len_factor,
821✔
682
        on_full_scan_error             = constants.default_on_full_scan,
821✔
683
        on_full_scan_success           = constants.default_on_full_scan,
821✔
684
        on_full_scan_start             = constants.default_on_full_scan,
821✔
685
        on_full_scan_complete          = constants.default_on_full_scan,
821✔
686
        start_key                      = constants.start_key,
821✔
687
        process_while                  = constants.process_while,
821✔
688
        iterator_type                  = constants.iterator_type,
821✔
689
        atomic_iteration               = constants.atomic_iteration,
821✔
690
    }, { __index = Task_methods })
821✔
691
    return task
821✔
692
end
693

694
-- get task for table
695
local function get_task(name)
696
    if name == nil then
173✔
697
        error("task name is nil")
×
698
    end
699

700
    -- check, does the task exist
701
    if task_list[name] == nil then
173✔
UNCOV
702
        error("task '" .. name .. "' doesn't exist")
×
703
    end
704

705
    return task_list[name]
173✔
706
end
707

708
-- default process_expired_tuple function
709
-- luacheck: ignore unused args
710
local function default_tuple_drop(space, args, tuple)
711
    box.space[space]:delete(construct_key(space, 0, tuple))
2,638✔
712
end
713

714
local function create_continue_key(tuple, old_parts, task)
715
    local index = box.space[task.space].index[task.index]
26✔
716

717
    if tuple == nil or #old_parts ~= #index.parts then
49✔
718
        return nil
3✔
719
    end
720

721
    for i, part in ipairs(old_parts) do
43✔
722
        for k, v in pairs(part) do
109✔
723
            if index.parts[i][k] == nil or index.parts[i][k] ~= v then
66✔
724
                return nil
3✔
725
            end
726
        end
727
    end
728

729
    return construct_key(task.space, task.index, tuple)
20✔
730
end
731

732
local continue_iterators_map = {}
12✔
733
for _, ge in ipairs({box.index.ALL, box.index.GE, box.index.GT, box.index.EQ,
120✔
734
                     "ALL", "GE", "GT", "EQ"}) do
12✔
735
    -- default start_key() == nil, so it's ok that EQ -> GE because the
736
    -- continue logic does not work with non-default start_key callback
737
    continue_iterators_map[ge] = box.index.GE
96✔
738
end
739
for _, le in ipairs({box.index.LE, box.index.LT, box.index.REQ,
96✔
740
                     "LE", "LT", "REQ"}) do
12✔
741
    continue_iterators_map[le] = box.index.LE
72✔
742
end
743

744
local function create_continue_state(task)
745
    if task.start_key ~= constants.start_key then
3,996✔
746
        task_continue[task.name] = nil
83✔
747
        return nil, nil
83✔
748
    end
749

750
    local key = nil
3,913✔
751
    local it = nil
752
    local c = task_continue[task.name]
3,913✔
753
    if c and c.space == task.space and c.index == task.index and c.it == task.iterator_type then
3,913✔
754
        key = create_continue_key(c.tuple, c.index_parts, task)
52✔
755
        it = continue_iterators_map[task.iterator_type]
26✔
756
    end
757

758
    local index = box.space[task.space].index[task.index]
3,913✔
759
    task_continue[task.name] = {
3,907✔
760
        space = task.space,
3,907✔
761
        index = task.index,
3,907✔
762
        index_parts = table.deepcopy(index.parts),
7,808✔
763
        it = task.iterator_type,
3,901✔
764
        tuple = nil,
765
    }
3,901✔
766

767
    if key == nil or it == nil then
3,901✔
768
        return nil, nil
3,881✔
769
    end
770
    return key, it
20✔
771
end
772

773
-- default iterate_with function
774
local function default_iterate_with(task)
775
    local continue_key, continue_it = create_continue_state(task)
3,996✔
776
    local index = box.space[task.space].index[task.index]
3,984✔
777
    local iter, param, state = index:pairs(continue_key or task.start_key(),
11,932✔
778
                                           { iterator = continue_it or task.iterator_type })
3,984✔
779
       :take_while(
7,962✔
780
            function()
781
                return task:process_while()
2,048✔
782
            end
783
        )
784

785
    if task_continue[task.name] then
3,981✔
786
        return function(p, s)
787
            local i, tuple = iter(p, s)
5,760✔
788
            if tuple then
5,757✔
789
                task_continue[task.name].tuple = tuple
1,921✔
790
            else
791
                task_continue[task.name] = nil
3,836✔
792
            end
793
            return i, tuple
5,757✔
794
        end, param, state
3,898✔
795
    else
796
        return iter, param, state
83✔
797
    end
798
end
799

800
-- ========================================================================= --
801
-- Expiration daemon constants
802
-- ========================================================================= --
803
--
804
-- {{{ Module constants
805
--
806

807
local _VERSION
808

809
--- Module constants
810
--
811
-- @section Constants
812
--
813

814
--- Current module version in format `MAJOR.MINOR.PATCH`.
815
_VERSION = require('expirationd.version')
24✔
816

817
--
818
-- }}} Module constants
819

820
-- ========================================================================= --
821
-- Expiration daemon management functions
822
-- ========================================================================= --
823
--
824
-- {{{ Module functions
825
--
826

827
--- Module functions
828
--
829
-- @section Functions
830

831
--- Configure expirationd.
832
--
833
-- Since version 1.2.0.
834
--
835
-- How to set up a configuration option:
836
--
837
-- ```
838
-- expirationd.cfg({metrics = true})
839
-- ```
840
--
841
-- How to get an option value:
842
--
843
-- ```
844
-- print(expirationd.cfg.metrics)
845
-- true
846
-- ```
847
--
848
-- @table options
849
--
850
-- @bool[opt] options.metrics
851
--     Enable or disable stats collection by [metrics][1]. metrics >= 0.11.0
852
--     is required. It is enabled by default.
853
--
854
--     If enabled it creates four counter collectors, see @{task.statistics}:
855
--
856
--     1. `expirationd_checked_count`
857
--
858
--     2. `expirationd_expired_count`
859
--
860
--     3. `expirationd_restarts`
861
--
862
--     4. `expirationd_working_time`
863
--
864
--     Labeled with `name = task_name`.
865
--
866
--     [1]: https://github.com/tarantool/metrics/
867
--
868
-- @return None
869
--
870
-- @function expirationd.cfg
871
local function expirationd_cfg(self, options)
872
    checks('table', {
369✔
873
        metrics = '?boolean',
874
    })
875

876
    if options.metrics == nil then
369✔
UNCOV
877
        return
×
878
    end
879

880
    if cfg.metrics ~= options.metrics then
369✔
881
        if options.metrics == true then
60✔
882
            metrics_enable()
60✔
883
        else
884
            metrics_disable()
30✔
885
        end
886
        rawset(cfg, 'metrics', options.metrics)
60✔
887
    end
888
end
889

890
--- Run a scheduled task to check and process (expire) tuples in a given space.
891
--
892
-- How expirationd works in general:
893
--
894
-- 1. Process min(`space_length`, `tuples_per_iteration`) tuples at once.
895
--
896
-- 2. Sleep `tuples_per_iteration` × `full_scan_time` / `space_length` (but not
897
--    beyond 1 second).
898
--
899
-- 3. Repeat 1-2 until the whole space will be traversed.
900
--
901
-- 4. Sleep 1 second.
902
--
903
-- 5. Repeat 1-4.
904
--
905
-- NOTE: By default expirationd does not start tasks on an read-only instance
906
-- for non-local persistent spaces, see the `force` option.
907
--
908
-- NOTE: By default expirationd continues processing from a last processed
909
-- tuple if a task with same name has not been killed properly with
910
-- @{task.kill} or @{kill}. This behavior only works with default functions
911
-- `start_key` and `iterate_with`. You need to set at least one function if
912
-- another behavior is needed, see `start_key` and `iterate_with` options.
913
--
914
-- @string name
915
--     Task name.
916
-- @string space
917
--     Space to look in for expired tuples. `space` can be numeric or
918
--     string. It can be a space id or a space name, respectively.
919
-- @func is_tuple_expired
920
--     Function, must accept tuple and return `true` or `false` (is tuple
921
--     expired or not), receives `args` and `tuple` as arguments.
922
--
923
--
924
-- Example of function:
925
--
926
-- ```
927
-- local function is_tuple_expired(args, tuple)
928
--     local tuple_expire_time = get_field(tuple, args.field_no)
929
--     local current_time = fiber.time()
930
--     return current_time >= tuple_expire_time
931
-- end
932
-- ```
933
--
934
-- @table[opt] options
935
--     Table with named options.
936
-- @param[opt] options.args
937
--     Passed to `is_tuple_expired()` and `process_expired_tuple()` as
938
--     an additional context.
939
-- @boolean[opt] options.atomic_iteration
940
--     False (default) to process each tuple as a single transaction and true
941
--     to process tuples from each batch in a single transaction.
942
-- @boolean[opt] options.force
943
--     By default expirationd processes tasks for all types of spaces only on
944
--     an writable instance. It does not process tasks on an read-only instance
945
--     for [non-local persistent spaces][1]. It means that expirationd will not
946
--     start the task processing on a replica for regular spaces. Set the
947
--     option to `true` to force enable the task processing anyway.
948
--
949
--     [1]: https://www.tarantool.io/en/doc/latest/reference/configuration/#confval-read_only
950
--
951
-- @boolean[opt] options.force_allow_functional_index
952
--     By default expirationd returns an error on iteration through a functional
953
--     index for Tarantool < 2.8.4 because it may cause a crash, see
954
--     https://github.com/tarantool/expirationd/issues/101
955
--     You can skip the error using the option if you know what you are doing
956
--     (implement your own `iterate_with` as example).
957
-- @number[opt] options.full_scan_delay
958
--     Sleep time between full scans (in seconds). It is allowed to pass an FFI
959
--     number: `1LL`, `1ULL` etc. Default value is 1 sec.
960
-- @number[opt] options.full_scan_time
961
--     Time required for a full index scan (in seconds). It is allowed to pass
962
--     an FFI number: `1LL`, `1ULL` etc. `full_scan_time` used for calculation
963
--     of time during which fiber sleeps between iterations. Default value is
964
--     3600.
965
-- @string[opt] options.index
966
--     Name or id of the index to iterate on. If omitted, will use the primary
967
--     index. If there's no index with this name, will throw an error.
968
--     Supported index types are TREE and HASH, using other types will result
969
--     in an error.
970
-- @func[opt] options.iterate_with
971
--     Function which returns an iterator object which provides tuples to
972
--     check, considering the `start_key`, `process_while` and other options.
973
--     When option is nil default function is used. Function must accept a task
974
--     instance object. Default function returns iterator returned by
975
--     [index_object:pairs()][1], where `index` is a primary index or index
976
--     that specified with argument `options.index`:
977
--
978
-- Example of function:
979
--
980
-- ```
981
--  local function iterate_with()
982
--      index:pairs(option.start_key(), {
983
--         iterator = option.iterator_type
984
--      }):take_while(
985
--             function()
986
--                 return option.process_while()
987
--             end
988
--        )
989
--  end
990
-- ```
991
--
992
--     [1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_index/pairs/.
993
--
994
-- @number[opt] options.iteration_delay
995
--     Max sleep time between iterations (in seconds). It is allowed to pass
996
--     an FFI number: `1LL`, `1ULL` etc. Default value is 1 sec.
997
--     Fiber sleeps min(`tuples_per_iteration` × `full_scan_time` / `space_length`, `iteration_delay`).
998
-- @string[opt] options.iterator_type
999
--     Type of the iterator to use, as string or box.index constant, for
1000
--     example, `EQ` or `box.index.EQ`, default is `box.index.ALL`. See more
1001
--     about index iterators in [index_object:pairs()][1].
1002
--
1003
--     [1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_index/pairs/.
1004
--
1005
-- @func[opt] options.on_full_scan_complete
1006
--     Function to call after completing a full scan iteration. Default value
1007
--     is a function that do nothing.
1008
-- @func[opt] options.on_full_scan_error
1009
--     Function to call after terminating a full scan due to an error. Default
1010
--     value is a function that do nothing.
1011
--
1012
-- Example of function:
1013
--
1014
-- ```
1015
-- local function on_full_scan_error()
1016
--     pcall(fiber.sleep, 1)
1017
-- end
1018
-- ```
1019
-- @func[opt] options.on_full_scan_start
1020
--     Function to call before starting a full scan iteration. Default value
1021
--     is a function that do nothing.
1022
-- @func[opt] options.on_full_scan_success
1023
--     Function to call after successfully completing a full scan iteration.
1024
--     Default value is a function that do nothing.
1025
-- @func[opt] options.process_expired_tuple
1026
--     Applied to expired tuples, receives `space`, `args`, `tuple` as
1027
--     arguments. When `process_expired_tuple` is not passed (or `nil` passed),
1028
--     tuples are removed.
1029
--
1030
-- Example of function:
1031
--
1032
-- ```
1033
-- local function put_tuple_to_archive(space, args, tuple)
1034
--     box.space[space]:delete{tuple[1]}
1035
--     local email = tuple[2]
1036
--     if args.archive_space ~= nil and email ~= nil then
1037
--         box.space[args.archive_space]:replace{email, fiber.time()}
1038
--     end
1039
-- end
1040
-- ```
1041
--
1042
-- @func[opt] options.process_while
1043
--     Function to call before checking each tuple. If it returns false, the
1044
--     task will stop until next full scan. Default is a function that always
1045
--     return `true`.
1046
--
1047
-- Example of function:
1048
--
1049
-- ```
1050
-- local function process_while()
1051
--     return false
1052
-- end
1053
-- ```
1054
--
1055
-- @param[opt] options.start_key
1056
--     Start iterating from the tuple with this index value. Or when iterator
1057
--     is 'EQ', iterate over tuples with this index value. Must be a value of
1058
--     the same data type as the index field or fields, or a function which
1059
--     returns such value. If omitted or nil, all tuples will be checked.
1060
-- @number[opt] options.tuples_per_iteration
1061
--     Number of tuples to check in one batch (iteration). It is allowed to
1062
--     pass an FFI number: `1LL`, `1ULL` etc. Default value is 1024.
1063
-- @number[opt] options.vinyl_assumed_space_len_factor
1064
--     Factor for recalculation of vinyl space size. Vinyl space size can't be
1065
--     counted (since many operations, `upsert` for example, are applied when
1066
--     you address some data), so you should count (approximate space size)
1067
--     tuples with the first start. `vinyl_assumed_space_len` is approximate
1068
--     count for first run and `vinyl_assumed_space_len_factor` for next
1069
--     milestone (after we've reached next milestone is `*` and so on). It is
1070
--     allowed to pass an FFI number: `1LL`, `1ULL` etc. Default value is 2.
1071
-- @number[opt] options.vinyl_assumed_space_len
1072
--     Assumed size of vinyl space (in the first iteration).
1073
--     Vinyl space size can't be counted (since many operations, `upsert` for
1074
--     example, are applied when you address some data), so you should count
1075
--     (approximate space size) tuples with the first start.
1076
--     `vinyl_assumed_space_len` is approximate count for first run and
1077
--     `vinyl_assumed_space_len_factor` for next milestone (after we've reached
1078
--     next milestone is `*` and so on). It is allowed to pass an FFI number:
1079
--     `1LL`, `1ULL` etc. Default value is 10^7.
1080
--
1081
-- @return task instance
1082
--
1083
-- @usage
1084
--
1085
-- local expirationd = require('expirationd')
1086
--
1087
-- box.cfg{}
1088
--
1089
-- local space = box.space.old
1090
-- local job_name = "clean_all"
1091
--
1092
-- local function is_expired(args, tuple)
1093
--     return true
1094
-- end
1095
--
1096
-- local function delete_tuple(space, args, tuple)
1097
--     box.space[space]:delete{tuple[1]}
1098
-- end
1099
--
1100
-- expirationd.start(job_name, space.id, is_expired, {
1101
--     process_expired_tuple = delete_tuple,
1102
--     args = nil,
1103
--     tuples_per_iteration = 50,
1104
--     full_scan_time = 3600
1105
-- })
1106
--
1107
-- @function expirationd.start
1108
local function expirationd_run_task(name, space, is_tuple_expired, options)
1109
    checks('string', 'number|string', '?string|function', {
854✔
1110
        args = '?',
1111
        atomic_iteration = '?boolean',
1112
        force = '?boolean',
1113
        force_allow_functional_index = '?boolean',
1114
        full_scan_delay = '?number|cdata',
1115
        full_scan_time = '?number|cdata',
1116
        index = '?number|string',
1117
        iterate_with = '?string|function',
1118
        iteration_delay = '?number|cdata',
1119
        iterator_type = '?number|string',
1120
        on_full_scan_complete = '?string|function',
1121
        on_full_scan_error = '?string|function',
1122
        on_full_scan_start = '?string|function',
1123
        on_full_scan_success = '?string|function',
1124
        process_expired_tuple = '?string|function',
1125
        process_while = '?string|function',
1126
        start_key = '?',
1127
        tuples_per_iteration = '?number|cdata',
1128
        vinyl_assumed_space_len_factor = '?number|cdata',
1129
        vinyl_assumed_space_len = '?number|cdata',
1130
    })
1131

1132
    -- check, does the task exist
1133
    local prev = task_list[name]
821✔
1134
    if prev ~= nil then
821✔
1135
        log.info("restart task %q", name)
68✔
1136
        local tmp = task_continue[name]
68✔
1137
        prev:kill(name)
68✔
1138
        task_continue[name] = tmp
68✔
1139
    end
1140

1141
    options = options or {}
821✔
1142

1143
    local task = create_task(name)
821✔
1144
    task.space = space
821✔
1145
    task.index = options.index or 0
821✔
1146
    task.force_allow_functional_index = options.force_allow_functional_index
821✔
1147
    task.is_tuple_expired = is_tuple_expired
821✔
1148
    task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop
821✔
1149

1150
    -- check iterator_type
1151
    if options.iterator_type ~= nil then
821✔
1152
        task.iterator_type = options.iterator_type
180✔
1153
    end
1154

1155
    -- check start_key
1156
    if options.start_key ~= nil then
821✔
1157
        if type(options.start_key) == "function" then
105✔
1158
            task.start_key = function() return options.start_key() end
×
1159
        else
1160
            task.start_key = function() return options.start_key end
307✔
1161
        end
1162
    end
1163

1164
    -- check process_while
1165
    if options.process_while ~= nil then
821✔
1166
        task.process_while = options.process_while
18✔
1167
    end
1168

1169
    -- check transaction option
1170
    if options.atomic_iteration ~= nil then
821✔
1171
        task.atomic_iteration = options.atomic_iteration
33✔
1172
    end
1173

1174
    task.iterate_with = options.iterate_with or default_iterate_with
821✔
1175

1176
    -- check expire and process after expiration handler's arguments
1177
    task.args = options.args
821✔
1178

1179
    -- check tuples per iteration (not required)
1180
    if options.tuples_per_iteration ~= nil then
821✔
1181
        if options.tuples_per_iteration <= 0 then
71✔
1182
            error("Invalid tuples per iteration parameter")
×
1183
        end
1184
        task.tuples_per_iteration = options.tuples_per_iteration
71✔
1185
    end
1186

1187
    -- check full scan time
1188
    if options.full_scan_time ~= nil then
821✔
1189
        if options.full_scan_time <= 0 then
38✔
UNCOV
1190
            error("Invalid full scan time")
×
1191
        end
1192
        task.full_scan_time = options.full_scan_time
38✔
1193
    end
1194

1195
    if options.force ~= nil then
821✔
UNCOV
1196
        task.force = options.force
×
1197
    end
1198

1199
    if options.vinyl_assumed_space_len ~= nil then
821✔
1200
        task.vinyl_assumed_space_len = options.vinyl_assumed_space_len
18✔
1201
    end
1202

1203
    if options.vinyl_assumed_space_len_factor ~= nil then
821✔
UNCOV
1204
        task.vinyl_assumed_space_len_factor = options.vinyl_assumed_space_len_factor
×
1205
    end
1206

1207
    task.do_worker_iteration = default_do_worker_iteration
821✔
1208

1209
    if options.iteration_delay ~= nil then
821✔
1210
        task.iteration_delay = options.iteration_delay
18✔
1211
    end
1212

1213
    if options.full_scan_delay ~= nil then
821✔
1214
        task.full_scan_delay = options.full_scan_delay
36✔
1215
    end
1216

1217
    if options.on_full_scan_start ~= nil then
821✔
1218
        task.on_full_scan_start = options.on_full_scan_start
9✔
1219
    end
1220

1221
    if options.on_full_scan_success ~= nil then
821✔
1222
        task.on_full_scan_success = options.on_full_scan_success
9✔
1223
    end
1224

1225
    if options.on_full_scan_complete ~= nil then
821✔
1226
        task.on_full_scan_complete = options.on_full_scan_complete
51✔
1227
    end
1228

1229
    if options.on_full_scan_error ~= nil then
821✔
1230
        task.on_full_scan_error = options.on_full_scan_error
9✔
1231
    end
1232

1233
    -- put the task to table
1234
    task_list[name] = task
821✔
1235
    -- run
1236
    task:start()
821✔
1237

1238
    return task
821✔
1239
end
1240

1241
local function run_task_obsolete(name,
1242
                              space,
1243
                              is_tuple_expired,
1244
                              process_expired_tuple,
1245
                              args,
1246
                              tuples_per_iteration,
1247
                              full_scan_time)
1248
    log.info("expirationd.run_task() is obsolete, please consider a switching to expirationd.start()")
×
1249
    return expirationd_run_task(
×
1250
        name, space, is_tuple_expired, {
×
1251
            process_expired_tuple = process_expired_tuple,
1252
            args = args,
1253
            full_scan_time = full_scan_time,
1254
            tuples_per_iteration = tuples_per_iteration,
1255
            force = false,
1256
        }
1257
    )
1258
end
1259

1260
--- Kill an existing task.
1261
--
1262
-- @string name
1263
--     Task name.
1264
--
1265
-- @return None
1266
--
1267
-- @function expirationd.kill
1268
local function expirationd_kill_task(name)
1269
    checks('string')
86✔
1270

1271
    return get_task(name):kill()
172✔
1272
end
1273

1274
--- Return a list with task's names.
1275
--
1276
-- @return Response of the following structure:
1277
--
1278
-- ```
1279
-- {
1280
--     "expirationd-1"
1281
--     "expirationd-2",
1282
--     "expirationd-3",
1283
-- }
1284
-- ```
1285
--
1286
-- @function expirationd.tasks
1287
local function expirationd_show_task_list()
1288
    return fun.map(function(x) return x end, fun.iter(task_list)):totable()
824✔
1289
end
1290

1291
--- Return task statistics in table.
1292
--
1293
-- @string[opt] name
1294
--     Task name. If `name` is nil, then return map of `name`:`stats`, else
1295
--     return map with stats.
1296
--
1297
-- @return Response of the following structure:
1298
--
1299
-- ```
1300
-- {
1301
--     checked_count = number,
1302
--     expired_count = number,
1303
--     restarts = number,
1304
--     working_time = number,
1305
-- }
1306
-- ```
1307
--
1308
-- where:
1309
--
1310
-- `checked_count` is a number of tuples checked for expiration (expired + skipped).
1311
--
1312
-- `expired_count` is a number of expired tuples.
1313
--
1314
-- `restarts` is a number of restarts since start. From the start
1315
-- `restarts` is equal to 1.
1316
--
1317
-- `working_time` is a task's operation time.
1318
--
1319
-- @function expirationd.stats
1320
local function expirationd_task_stats(name)
1321
    checks('?string')
45✔
1322

1323
    if name ~= nil then
45✔
1324
        return get_task(name):statistics()
72✔
1325
    end
1326
    local retval = {}
9✔
1327
    for task_name, task in pairs(task_list) do
18✔
1328
        retval[task_name] = task:statistics()
×
1329
    end
1330
    return retval
9✔
1331
end
1332

1333
--- Get task by name.
1334
--
1335
-- @string name
1336
--     Task name.
1337
--
1338
-- @return task instance
1339
--
1340
-- @function expirationd.task
1341
local function expirationd_get_task(name)
1342
    checks('string')
51✔
1343

1344
    return get_task(name)
51✔
1345
end
1346

1347
--- Reload module.
1348
--
1349
-- Update expirationd version in a running Tarantool and restart all tasks.
1350
-- Reload process step by step: remove expirationd module from
1351
-- `package.loaded`, import new version of expirationd using `require` and
1352
-- finally restart all tasks.
1353
--
1354
-- @return None
1355
--
1356
-- @function expirationd.update
1357
local function expirationd_update()
1358
    local expd_prev = require("expirationd")
9✔
1359
    table.clear(expd_prev)
9✔
1360
    setmetatable(expd_prev, {
18✔
1361
        __index = function()
1362
            error("Wait until update is done before using expirationd", 2)
9✔
1363
        end
1364
    })
1365
    package.loaded["expirationd"] = nil
9✔
1366
    local expd_new  = require("expirationd")
9✔
1367
    local tmp_task_list = task_list; task_list = {}
9✔
1368
    local tmp_task_continue = table.deepcopy(task_continue)
9✔
1369
    for _, task in pairs(tmp_task_list) do
56✔
1370
        task:kill()
38✔
1371
        -- kill() resets a continue state, we should restore the state
1372
        if tmp_task_continue[task.name] then
38✔
1373
            task_continue[task.name] = tmp_task_continue[task.name]
×
1374
        end
1375
        expd_new.start(
76✔
1376
            task.name, task.space,
38✔
1377
            task.is_tuple_expired, {
38✔
1378
                process_expired_tuple = task.process_expired_tuple,
38✔
1379
                args = task.args, tuples_per_iteration = task.tuples_per_iteration,
38✔
1380
                full_scan_time = task.full_scan_time, force = task.force
38✔
1381
            }
1382
        )
1383
    end
1384
    -- update old function table to represent new reloaded expirationd
1385
    -- some kind of dirty hack if user forgot to require new expirationd
1386
    setmetatable(expd_prev, nil)
9✔
1387
    for name, func in pairs(expd_new) do
144✔
1388
        expd_prev[name] = func
126✔
1389
    end
1390
end
1391

1392
local function task_stats_obsolete(...)
1393
    log.info("expirationd.task_stats() is obsolete, please consider a switching to expirationd.stats()")
×
1394
    return expirationd_task_stats(...)
×
1395
end
1396

1397
local function kill_task_obsolete(...)
1398
    log.info("expirationd.kill_task() is obsolete, please consider a switching to expirationd.kill()")
×
1399
    return expirationd_kill_task(...)
×
1400
end
1401

1402
local function get_task_obsolete(...)
1403
    log.info("expirationd.get_task() is obsolete, please consider a switching to expirationd.task()")
×
1404
    return expirationd_get_task(...)
×
1405
end
1406

1407
local function get_tasks_obsolete(...)
1408
    log.info("expirationd.get_tasks() is obsolete, please consider a switching to expirationd.tasks()")
×
1409
    return expirationd_get_task(...)
×
1410
end
1411

1412
local function show_task_list_obsolete(...)
1413
    log.info("expirationd.show_task_list() is obsolete, please consider a switching to expirationd.tasks()")
×
1414
    return expirationd_get_task(...)
×
1415
end
1416

1417
return {
12✔
1418
    cfg     = setmetatable({}, {
24✔
1419
        __index = cfg,
12✔
1420
        __newindex = function() error("Use expirationd.cfg{} instead", 2) end,
15✔
1421
        __call = expirationd_cfg,
12✔
1422
        __serialize = function() return cfg end,
12✔
1423
    }),
12✔
1424
    start   = expirationd_run_task,
12✔
1425
    stats   = expirationd_task_stats,
12✔
1426
    update  = expirationd_update,
12✔
1427
    kill    = expirationd_kill_task,
12✔
1428
    task    = expirationd_get_task,
12✔
1429
    tasks   = expirationd_show_task_list,
12✔
1430
    -- Obsolete function names, use previous, instead
1431
    task_stats     = task_stats_obsolete,
12✔
1432
    kill_task      = kill_task_obsolete,
12✔
1433
    get_task       = get_task_obsolete,
12✔
1434
    get_tasks      = get_tasks_obsolete,
12✔
1435
    run_task       = run_task_obsolete,
12✔
1436
    show_task_list = show_task_list_obsolete,
12✔
1437

1438
    _VERSION = _VERSION,
12✔
1439
}
12✔
1440

1441
-- }}} Module functions
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc