• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ochaton / switchover / #96

pending completion
#96

push

ochaton
fix: Fixes ETCD endpoint selection and ignore_tarantool_quorum

28 of 28 new or added lines in 6 files covered. (100.0%)

5210 of 8029 relevant lines covered (64.89%)

963.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.92
/switchover/_mutex.lua
1
local G = require 'switchover._global'
21✔
2
local log = require 'log'
21✔
3
local json = require 'json'
21✔
4
local fiber = require 'fiber'
21✔
5

6

7
local Mutex = {}
21✔
8
Mutex.__index = Mutex
21✔
9

10
function Mutex:new(path)
21✔
11
        assert(G.etcd, "etcd required to create Mutex")
14✔
12
        return setmetatable({ path = assert(path, "mutex: path is requred") }, self)
14✔
13
end
14

15
function Mutex:atomic(opts, func, ...)
21✔
16
        local key = assert(opts.key, "Mutex: key is required")
14✔
17
        local ttl = assert(opts.ttl, "Mutex: ttl is required")
14✔
18
        self.key = key
14✔
19

20
        local res = G.etcd:set(self.path, key, { prevExist = false, ttl = ttl, quorum = true }, { leader = true })
14✔
21
        log.verbose("set %s => %s (ttl: %d): %s", self.path, key, ttl, json.encode(res))
14✔
22

23
        if res.action ~= 'create' then
14✔
24
                -- data is safe, so first arg is true
25
                return true, ("Mutex wasn't acquired: %s:%s"):format(res.cause, res.message)
×
26
        end
27

28
        self._leasing_f = fiber.create(function()
28✔
29
                fiber.yield()
14✔
30
                repeat
31
                        fiber.sleep(ttl / 2)
14✔
32
                        fiber.testcancel()
3✔
33

34
                        if self._leasing_f ~= fiber.self() then break end
3✔
35
                        local cas = G.etcd:set(self.path, key,
×
36
                                { prevValue = key, ttl = ttl, quorum = true },
×
37
                                { leader = true })
×
38
                        log.warn('CaS leasing: %s', json.encode(cas))
×
39
                until self._leasing_f ~= fiber.self()
×
40

41
                log.verbose("End CaS {%s => %s} leasing fiber", self.path, key)
3✔
42
        end)
43

44
        local r = { pcall(func, ...) }
28✔
45
        -- safely remove _leasing fiber. It will be removed after ttl/2 timeout
46
        -- do not call fiber.cancel()!
47
        self._leasing_f = nil
14✔
48

49
        if table.remove(r, 1) then
28✔
50
                local ok, err = unpack(r)
14✔
51
                if ok then
14✔
52
                        if opts.release_on_success then
14✔
53
                                self.released = true
×
54
                                local var = G.etcd:rm(self.path, { prevValue = key, quorum = true }, {leader = true})
×
55
                                log.info("mutex:autorelease: %s", json.encode(var))
×
56
                        end
57
                else
58
                        log.verbose("mutex:atomic failed %s:%s", tostring(ok), tostring(err))
×
59
                end
60
                return ok, err
14✔
61
        else
62
                return false, r[1]
×
63
        end
64
end
65

66
function Mutex:release()
21✔
67
        if self.released then
14✔
68
                log.warn("duplicate mutex release")
×
69
                return
×
70
        end
71
        self.released = true
14✔
72
        local var = G.etcd:rm(self.path, { prevValue = self.key }, {leader = true, quorum = true})
14✔
73
        log.info("mutex:release: %s", json.encode(var))
14✔
74
end
75

76
return setmetatable(Mutex, { __call = Mutex.new })
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc