• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ochaton / switchover / #96

pending completion
#96

push

ochaton
fix: Fixes ETCD endpoint selection and ignore_tarantool_quorum

28 of 28 new or added lines in 6 files covered. (100.0%)

5210 of 8029 relevant lines covered (64.89%)

963.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.12
/switchover/auto.lua
1
local M = {}
2✔
2

3
local log = require 'switchover._log'
2✔
4
local json = require 'json'
2✔
5
local fun = require 'fun'
2✔
6
local clock = require 'clock'
2✔
7
local G = require 'switchover._global'
2✔
8
local e = require 'switchover._error'
2✔
9
local Mutex = require 'switchover._mutex'
2✔
10

11
local etcd_switch = require 'switchover.heal'.etcd_switch
2✔
12
local switchover_discovery = require 'switchover.discovery'
2✔
13
local switchover_resolve = require 'switchover._resolve'
2✔
14

15
local function diff_vclock(v1, v2)
16
        local diffv = {}
×
17
        local diff_sign = 0
×
18
        local maxid = 0
×
19
        for id in pairs(v1) do
×
20
                if maxid < id then maxid = id end
×
21
        end
22
        for id in pairs(v2) do
×
23
                if maxid < id then maxid = id end
×
24
        end
25
        for id = 1, maxid do
×
26
                diffv[id] = math.abs((v1[id] or 0) - (v2[id] or 0))
×
27
                diff_sign = diff_sign + diffv[id]
×
28
        end
29
        return diffv, diff_sign
×
30
end
31

32
---@class CandidateInfo
33
---@field public replica Tarantool
34
---@field public n_downstreams number
35
---@field public score number
36
---@field public vclock table<number,number>
37
---@field public dsign number diff signature with master
38
---@field public dvclock table<number,number> diff vclock with master
39
---@field public max_achievable_vclock table<number,number>
40
---@field public mdsign number diff signature with max achievable
41
---@field public rwf_num number 0 - reload will be ok, 1 - reload will freeze
42
---@field public failover_priority number failover weight instance with greater failover priority is choosen
43

44
---@param repl SwitchoverReplicaset
45
---@param tt Tarantool
46
---@param max_vclock table
47
---@param failover_priority number
48
---@return CandidateInfo
49
local function build_candidate_info(repl, tt, max_vclock, failover_priority)
50
        local diffv, diff_sign = diff_vclock(max_vclock, tt:vclock())
×
51
        log.info("%s => dv:%s ds:%s", tt.masked_endpoint, json.encode(diffv), diff_sign)
×
52

53
        local max_achievable_vclock = repl:max_achievable_vclock(tt)
×
54
        local _, mdsign = diff_vclock(max_vclock, max_achievable_vclock)
×
55

56
        return {
×
57
                replica = tt,
58
                n_downstreams = #tt:followed_downstreams(),
59
                score = repl:stability_score(tt),
60
                vclock = tt:vclock(),
61
                dsign = diff_sign,
62
                dvclock = diffv,
63
                max_achievable_vclock = max_achievable_vclock,
64
                mdsign = mdsign,
65
                failover_priority = failover_priority,
66
                -- reload will freeze (0 - no, 1 - yes)
67
                rwf_num = tt:reload_will_freeze() and 1 or 0,
68
        }
69
end
70

71
local function failover(args)
72
        ---@type SwitchoverReplicaset
73
        local repl = assert(args.replicaset, "replicaset is required")
×
74
        ---@type CandidateInfo
75
        local candidate_info = assert(args.candidate_info, "candidate is required")
×
76
        ---@type Cluster
77
        local etcd_shard = assert(args.etcd_shard, "shard is required")
×
78
        ---@type number
79
        local timeout = assert(args.timeout, "timeout is required")
×
80
        ---@type boolean
81
        local no_orphan_fix = args.no_orphan_fix
×
82
        ---@type Tarantool
83
        local previous_master = args.previous_master
×
84
        ---@type number
85
        local fence_timeout = args.fence_timeout or 3
×
86

87
        local mutex_value = G.etcd:get(args.mutex_key, { quorum = true }, { leader = true })
×
88
        local deadline = clock.time()+timeout
×
89

90
        if mutex_value ~= args.mutex_value then
×
91
                error("ETCD mutex value changed")
×
92
        end
93

94
        -- check loading of previous master:
95
                -- loading => box.cfg{read_only=true}
96
                -- not loading => rollback ETCD to etcd_master
97
        if previous_master then
×
98
                if previous_master:cfg({update=true}).read_only ~= true then
×
99
                        local f_elapsed = clock.time()
×
100
                        log.warn("[%s] Start to fence previous master: %s", etcd_shard.name, previous_master)
×
101

102
                        local ok, err = pcall(function()
×
103
                                previous_master.conn:eval([[ box.cfg{read_only=true} ]], {}, { timeout = fence_timeout })
×
104
                        end)
105
                        if not ok then
×
106
                                log.error("[%s] fence %s eval failed: %s", etcd_shard.name, previous_master.endpoint, err)
×
107
                        else
108
                                log.warn("[%s] fencing %s complete in %.3fs", etcd_shard.name, etcd_shard:master_name(), f_elapsed)
×
109
                        end
110
                end
111

112
                if previous_master:cfg({update=true}).read_only ~= true then
×
113
                        log.error("[%s] Failed to fence master: %s", previous_master)
×
114
                        return true, "Previous master was discovered. auto failed to fence it"
×
115
                end
116
        end
117

118
        local candidate = candidate_info.replica
×
119
        log.warn("Changing etcd master to %s", candidate.endpoint)
×
120

121
        etcd_switch {
×
122
                etcd_shard = etcd_shard,
123
                etcd_master = etcd_shard:master(),
124
                etcd_master_name = etcd_shard:master_name(),
125
                candidate_uuid = candidate:uuid(),
126
        }
127

128
        local optimal_rcq
129
        local current_rcq = candidate:cfg().replication_connect_quorum or #candidate:replication()
×
130

131
        -- TODO: rework this part. RCQ=1 (because instance already bootstrapped)
132
        if current_rcq > #repl.replica_list and not no_orphan_fix then
×
133
                local n_upstreams = #candidate:followed_upstreams()
×
134
                optimal_rcq = #repl.replica_list
×
135

136
                if n_upstreams < optimal_rcq-1 then
×
137
                        log.warn("Cant fix RCQ because too Too little amount of connected upstreams (got %d, required %d)",
×
138
                                n_upstreams+1, optimal_rcq -- we count self here
×
139
                        )
140
                        optimal_rcq = nil
×
141
                end
142
                log.warn("Will fix replication_connect_quorum: %d -> %d", current_rcq, optimal_rcq)
×
143
        end
144

145
        local target_vclock = candidate_info.max_achievable_vclock
×
146
        local new_info, time_or_err = candidate.conn:eval([[
×
147
                local target_vclock, timeout, rcq = ...
148
                local log = require 'log'
149
                local clock = require 'clock'
150
                local j = require 'json'
151
                local f = require 'fiber'
152
                local s = f.time()
153
                local deadline = s+timeout
154

155
                local function vclock_reached()
156
                        for id, lsn in pairs(target_vclock) do
157
                                if (box.info.vclock[id] or 0) < lsn then
158
                                        return false
159
                                end
160
                        end
161
                        return true
162
                end
163

164
                while not vclock_reached() and clock.time() < deadline do f.sleep(0.001) end
165

166
                if not vclock_reached() then
167
                        log.error("switchover: (data safe) switchover failed. vclock %s wasnt reached",
168
                                j.encode(target_vclock))
169
                        return false, ("vclock was not reached in: %.4fs"):format(f.time()-s), box.info
170
                end
171

172
                if deadline < clock.time() then
173
                        log.warn("switchover: (data safe) Timed out reached. Node wont be promoted to master")
174
                        return false, ("timed out. no promote were done: %.4fs"):format(f.time()-s), box.info
175
                end
176

177
                log.warn("switchover: (success) vclock on candidate successfully reached. "
178
                        .."Calling box.cfg{ read_only = false } (node will become master)")
179

180
                box.cfg{ read_only = false, replication_connect_quorum = rcq }
181
                return box.info, f.time() - s
182
        ]], {target_vclock, timeout, optimal_rcq}, { timeout = 2*timeout })
×
183

184
        if deadline < clock.time() then
×
185
                return false, "Timeout was reached"
×
186
        end
187

188
        if not new_info then
×
189
                return true, time_or_err
×
190
        end
191

192
        if candidate:ro({update=true}) then
×
193
                if candidate:cfg().read_only then
×
194
                        return true, "Autopromote was finished, but someone change cfg.read_only to true"
×
195
                end
196

197
                if candidate:status() ~= 'orphan' then
×
198
                        return true, "Autopromote was finished, but replica stayed in ro: status:"..candidate:status()
×
199
                end
200

201
                log.warn("Replica is orphan. Cannot change replication_connect_quorum. Trying to restart replication")
×
202

203
                local ok, err = pcall(function()
×
204
                        candidate.conn:eval([[
×
205
                                repl = box.cfg.replication
206
                                box.cfg{ replication = {} }
207
                                box.cfg{ replication = repl }
208
                                repl = nil
209
                        ]], {}, { timeout = timeout })
×
210
                end)
211

212
                if not ok then
×
213
                        return true, ("Restart replication failed: %s"):format(err)
×
214
                end
215

216
                candidate:_get{update = true}
×
217

218
                if candidate:ro() then
×
219
                        return true, ("Candidate is still in RO state. Cant do nothing with this")
×
220
                end
221

222
                log.info("Candidate was promoted successfully to master")
×
223
        end
224

225
        if not args.no_reload then
×
226
                candidate:package_reload()
×
227
        end
228

229
        return true
×
230
end
231

232
function M.run(args)
2✔
233
        assert(args.command == "auto", "command must be auto")
1✔
234

235
        if not G.etcd then
1✔
236
                e.panic("ETCD is required for switchover auto")
×
237
        end
238

239
        local how, etcd_cluster, shard = switchover_resolve.generic({args.shard}, 'shard')
1✔
240
        if how ~= 'etcd' then
1✔
241
                e.panic("ETCD must be used")
×
242
                return
×
243
        end
244
        if shard == nil then
1✔
245
                e.panic("No shard discovered from %s", args.shard)
×
246
                return
×
247
        end
248

249
        assert(shard, "shard must be defined")
1✔
250

251
        if shard:is_proxy() then
2✔
252
                e.panic("Cannot failover proxy")
×
253
        end
254

255
        ---@type Cluster
256
        local etcd_shard = shard --[[@as Cluster]]
1✔
257

258
        log.prefix(("[%s]"):format(shard.name))
1✔
259

260
        local repl = switchover_discovery.discovery({
2✔
261
                endpoints = shard and shard:endpoints(),
2✔
262
                discovery_timeout = args.discovery_timeout,
1✔
263
        })
264

265
        if #repl:masters() > 1 then
2✔
266
                e.panic("Too many masters in replicaset: %d", #repl:masters())
×
267
        end
268

269
        if repl:master() then
2✔
270
                log.warn("Master is discovered: %s. Nothing to do", repl:master())
×
271
                repl:destroy()
×
272
                return 0
×
273
        end
274

275
        local etcd_master = shard:master()
1✔
276
        assert(etcd_master.instance_uuid, "instance_uuid is not set by etcd for etcd_master")
1✔
277

278
        if not args.no_orphan_fix then
1✔
279
                local rcq, is_set = shard:RCQ()
1✔
280
                if is_set then
1✔
281
                        log.warn("Replication connect quorum is set to %d in ETCD. Cannot change it", rcq)
1✔
282
                        args.no_orphan_fix = true
1✔
283
                else
284
                        log.warn("Replication Connect Quorum is not set in ETCD: def:%d", rcq)
×
285
                end
286
        end
287

288
        if args.promote_master_uuid then
1✔
289
                args.auto_with_promote = nil
×
290
        end
291
        if args.auto_with_promote then
1✔
292
                args.promote_master_uuid = etcd_master.instance_uuid
×
293
                args.auto_with_promote = nil
×
294
        end
295

296
        if args.promote_master_uuid and etcd_master.instance_uuid ~= args.promote_master_uuid then
1✔
297
                log.warn("ETCD master has been changed %s -> %s. Refusing auto promote",
×
298
                        args.promote_master_uuid, etcd_master.instance_uuid
×
299
                )
300
                repl:destroy()
×
301
                return 1
×
302
        end
303

304
        local previous_master = repl.replicas[etcd_master.instance_uuid]
1✔
305
        if previous_master then
1✔
306
                log.info("ETCD master is connected: %s (status: %s, up: %s)",
2✔
307
                        previous_master, previous_master:status(), previous_master:info().uptime)
4✔
308
        elseif args.promote_master_uuid then
×
309
                log.error("Required to promote %s leader, but it is down. Refusing auto promote", args.promote_master_uuid)
×
310
                repl:destroy()
×
311
                return 1
×
312
        end
313

314
        local candidate
315
        local n_live_downstreams = 0
1✔
316
        for _, r in ipairs(repl.replica_list) do
4✔
317
                local mu = r:replicates_from(etcd_master.instance_uuid)
3✔
318
                if mu then
3✔
319
                        log.warn("Master is replicated by %s -> %s %s:%s %s",
4✔
320
                                etcd_master.endpoint, r.endpoint,
2✔
321
                                mu.upstream.status,
2✔
322
                                tonumber(mu.upstream.lag) and ("%.3fs"):format(tonumber(mu.upstream.lag)) or mu.upstream.lag,
2✔
323
                                json.encode((mu.downstream or {}).vclock)
2✔
324
                        )
325
                        n_live_downstreams = n_live_downstreams + 1
2✔
326
                end
327
        end
328

329
        -- Check that replication graph is complete with repl.replica_list
330

331
        if n_live_downstreams > 0 and not args.promote_master_uuid then
1✔
332
                log.warn("Master has %d live downstreams. Refusing autoswitch", n_live_downstreams)
1✔
333
                repl:destroy()
1✔
334
                return 1
1✔
335
        end
336

337
        local max_vclock = repl:max_vclock()
×
338
        if not args.promote_master_uuid then
×
339
                assert(n_live_downstreams == 0)
×
340
                log.warn("0 replicas replicate data from ETCD Master. Searching for best candidate to vclock %s",
×
341
                        json.encode(max_vclock))
×
342
        end
343

344
        ---Promote order contains list of Candidates (not ETCD master) that are okay to be next Leader.
345
        ---@type CandidateInfo[]
346
        local promote_order = {}
×
347

348
        for _, r in ipairs(repl.replica_list) do
×
349
                local instance_autofailover_role = "candidate"
×
350
                local instance_autofailover_priority = 1
×
351
                local ei = etcd_shard:instance_by_uuid(r:uuid())
×
352
                if ei then
×
353
                        instance_autofailover_role = ei.autofailover.role
×
354
                        instance_autofailover_priority = ei.autofailover.priority or 1
×
355
                end
356

357
                if ({orphan=true,running=true})[r:status()] and r ~= previous_master and instance_autofailover_role ~= "witness" then
×
358
                        table.insert(promote_order, build_candidate_info(repl, r, max_vclock, instance_autofailover_priority))
×
359
                end
360
        end
361

362
        -- if ndws > RF then fastest
363
        -- otherwise take max_achievable_vclock
364
        table.sort(promote_order, function(a, b)
×
365
                if a.score == b.score then
×
366
                        if a.mdsign == b.mdsign then
×
367
                                -- if diff signature is equal use failover priority
368
                                return a.failover_priority > b.failover_priority
×
369
                        end
370
                        -- choose with lowest diff signature
371
                        return a.mdsign < b.mdsign
×
372
                end
373

374
                -- if reload will freeze, choose the one where it will not
375
                -- if possible
376
                if a.rwf_num ~= b.rwf_num then
×
377
                        return a.rwf_num < b.rwf_num
×
378
                end
379
                -- choose with most downstreams
380
                return a.score > b.score
×
381
        end)
382

383
        candidate = promote_order[1]
×
384
        if args.promote_master_uuid then
×
385
                ---@type CandidateInfo[]
386
                -- promote_master_uuid must contain etcd_master.uuid
387
                -- and etcd_master_uuid must be previous_master
388
                -- previous_master can never be a candidate.
389
                -- so we check that our previous_master==etcd_master==promote_master_uuid is not in `promote_order`
390
                local cnds = fun.iter(promote_order)
×
391
                        ---@param cnd CandidateInfo
392
                        :grep(function(cnd) return cnd.replica:uuid() == args.promote_master_uuid end)
×
393
                        :totable()
×
394
                if #cnds ~= 0 then
×
395
                        log.error("promote_master found in candidate list (but it must not. maybe etcd changed) promote_master_uuid = %s",
×
396
                                args.promote_master_uuid
397
                        )
398
                        repl:destroy()
×
399
                        return 1
×
400
                end
401

402
                local next_candidate = build_candidate_info(repl, previous_master, max_vclock, 1)
×
403
                candidate = next_candidate
×
404
                previous_master = nil
×
405
        end
406

407
        if not candidate then
×
408
                log.error("No candidate is ready to become master")
×
409
                repl:destroy()
×
410
                return 1
×
411
    end
412
        ---@cast candidate CandidateInfo
413
        log.warn("Choosing candidate: %s / %s",
×
414
                candidate.replica:name(),
×
415
                json.encode(candidate.replica:vclock())
×
416
        )
417

418
        if candidate.mdsign ~= 0 then
×
419
                log.error("At least %d operations will be lost during failover. Failoving to vclock %s",
×
420
                        candidate.mdsign, json.encode(candidate.max_achievable_vclock))
×
421
        else
422
                log.warn("vclock %s is achievable. No operations will be lost",
×
423
                        json.encode(max_vclock))
×
424
        end
425

426
        local mutex_key = shard:switchover_path()
×
427
        log.info("Taking mutex key: %s", mutex_key)
×
428

429
        local mutex_value = ('switchover:%s:%s'):format(repl.uuid, candidate.replica:uuid())
×
430
        local timeout = math.max(args.autofailover_timeout or 0, 2.5)
×
431

432
        local ok, err = Mutex:new(mutex_key)
×
433
                :atomic(
×
434
                        { -- key
435
                                key = mutex_value,
436
                                ttl = 2*timeout,
437
                                release_on_success = true,
438
                        },
439
                        failover, -- function
440
                        {
441
                                replicaset = repl,
442
                                candidate_info = candidate,
443
                                etcd_shard = shard,
444
                                mutex_key = mutex_key,
445
                                mutex_value = mutex_value,
446
                                timeout = timeout,
447
                                no_orphan_fix = args.no_orphan_fix,
448
                                previous_master = previous_master,
449
                        }
450
                )
451

452
        if err then
×
453
                if ok then
×
454
                        log.warn("Switchover failed but replicaset is consistent. Reason: %s", err)
×
455
                else
456
                        log.error("ALERT: Switchover ruined your replicaset. Restore it by yourself. Reason: %s", err)
×
457
                end
458

459
                switchover_discovery.run {
×
460
                        command = 'discovery',
461
                        endpoints = shard:endpoints(),
462
                        discovery_timeout = args.discovery_timeout,
463
                }
464
                repl:destroy()
×
465
                return 1
×
466
        else
467
                -- Everything is fine:
468
                log.info("Candidate %s/%s was auto promoted", candidate.replica:id(), candidate.replica.endpoint)
×
469

470
                local routers = etcd_cluster and etcd_cluster:routers()
×
471
                if etcd_cluster and not args.no_reload_routers and routers then
×
472
                        log.info("Found routers in ETCD: %s", table.concat(routers:get_shard_names(), ','))
×
473

474
                        local proxy = switchover_discovery.cluster {
×
475
                                cluster = routers,
476
                                discovery_timeout = args.discovery_timeout,
477
                        }
478

479
                        for router_name, msg in pairs(proxy) do
×
480
                                local router = msg.replicaset:master()
×
481
                                if not router then
×
482
                                        log.warn('Router %s is not discovered', router_name)
×
483
                                elseif router.can_package_reload then
×
484
                                        log.info("Calling package.reload on router %s", router_name)
×
485
                                        router:package_reload()
×
486
                                else
487
                                        log.info("Router is discovered but package.reload() not found: %s", router)
×
488
                                end
489
                        end
490
                end
491

492
                repl:destroy()
×
493
                return 0
×
494
        end
495
end
496

497
return M
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc