• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ochaton / switchover / #112

13 Mar 2025 06:24PM UTC coverage: 65.59% (+0.06%) from 65.531%
#112

push

Vladislav Grubov
ci: debug

5278 of 8047 relevant lines covered (65.59%)

1509.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.35
/switchover/auto.lua
1
local M = {}
2✔
2

3
local log = require 'switchover._log'
2✔
4
local json = require 'json'
2✔
5
local fun = require 'fun'
2✔
6
local clock = require 'clock'
2✔
7
local G = require 'switchover._global'
2✔
8
local e = require 'switchover._error'
2✔
9
local Mutex = require 'switchover._mutex'
2✔
10

11
local etcd_switch = require 'switchover.heal'.etcd_switch
2✔
12
local switchover_discovery = require 'switchover.discovery'
2✔
13
local switchover_resolve = require 'switchover._resolve'
2✔
14

15
local function diff_vclock(v1, v2)
16
        local diffv = {}
×
17
        local diff_sign = 0
×
18
        local maxid = 0
×
19
        for id in pairs(v1) do
×
20
                if maxid < id then maxid = id end
×
21
        end
22
        for id in pairs(v2) do
×
23
                if maxid < id then maxid = id end
×
24
        end
25
        for id = 1, maxid do
×
26
                diffv[id] = math.abs((v1[id] or 0) - (v2[id] or 0))
×
27
                diff_sign = diff_sign + diffv[id]
×
28
        end
29
        return diffv, diff_sign
×
30
end
31

32
---@class CandidateInfo
33
---@field public replica Tarantool
34
---@field public n_downstreams number
35
---@field public score number
36
---@field public vclock table<number,number>
37
---@field public dsign number diff signature with master
38
---@field public dvclock table<number,number> diff vclock with master
39
---@field public max_achievable_vclock table<number,number>
40
---@field public mdsign number diff signature with max achievable
41
---@field public rwf_num number 0 - reload will be ok, 1 - reload will freeze
42
---@field public failover_priority number failover weight instance with greater failover priority is choosen
43

44
---@param repl SwitchoverReplicaset
45
---@param tt Tarantool
46
---@param max_vclock table
47
---@param failover_priority number
48
---@return CandidateInfo
49
local function build_candidate_info(repl, tt, max_vclock, failover_priority)
50
        local diffv, diff_sign = diff_vclock(max_vclock, tt:vclock())
×
51
        log.info("%s => dv:%s ds:%s", tt.masked_endpoint, json.encode(diffv), diff_sign)
×
52

53
        local max_achievable_vclock = repl:max_achievable_vclock(tt)
×
54
        local _, mdsign = diff_vclock(max_vclock, max_achievable_vclock)
×
55

56
        return {
×
57
                replica = tt,
58
                n_downstreams = #tt:followed_downstreams(),
59
                score = repl:stability_score(tt),
60
                vclock = tt:vclock(),
61
                dsign = diff_sign,
62
                dvclock = diffv,
63
                max_achievable_vclock = max_achievable_vclock,
64
                mdsign = mdsign,
65
                failover_priority = failover_priority,
66
                -- reload will freeze (0 - no, 1 - yes)
67
                rwf_num = tt:reload_will_freeze() and 1 or 0,
68
        }
69
end
70

71
local function failover(args)
72
        ---@type SwitchoverReplicaset
73
        local repl = assert(args.replicaset, "replicaset is required")
×
74
        ---@type CandidateInfo
75
        local candidate_info = assert(args.candidate_info, "candidate is required")
×
76
        ---@type Cluster
77
        local etcd_shard = assert(args.etcd_shard, "shard is required")
×
78
        ---@type number
79
        local timeout = assert(args.timeout, "timeout is required")
×
80
        ---@type boolean
81
        local no_orphan_fix = args.no_orphan_fix
×
82
        ---@type Tarantool
83
        local previous_master = args.previous_master
×
84
        ---@type number
85
        local fence_timeout = args.fence_timeout or 3
×
86

87
        local mutex_value = G.etcd:get(args.mutex_key, { quorum = true }, { leader = true })
×
88
        local deadline = clock.time()+timeout
×
89

90
        if mutex_value ~= args.mutex_value then
×
91
                error("ETCD mutex value changed")
×
92
        end
93

94
        -- check loading of previous master:
95
                -- loading => box.cfg{read_only=true}
96
                -- not loading => rollback ETCD to etcd_master
97
        if previous_master then
×
98
                if previous_master:cfg({update=true}).read_only ~= true then
×
99
                        local f_elapsed = clock.time()
×
100
                        log.warn("[%s] Start to fence previous master: %s", etcd_shard.name, previous_master)
×
101

102
                        local ok, err = pcall(function()
×
103
                                previous_master.conn:eval([[ box.cfg{read_only=true} ]], {}, { timeout = fence_timeout })
×
104
                        end)
105
                        if not ok then
×
106
                                log.error("[%s] fence %s eval failed: %s", etcd_shard.name, previous_master.endpoint, err)
×
107
                        else
108
                                log.warn("[%s] fencing %s complete in %.3fs", etcd_shard.name, etcd_shard:master_name(), f_elapsed)
×
109
                        end
110
                end
111

112
                if previous_master:cfg({update=true}).read_only ~= true then
×
113
                        log.error("[%s] Failed to fence master: %s", previous_master)
×
114
                        return true, "Previous master was discovered. auto failed to fence it"
×
115
                end
116
        end
117

118
        local candidate = candidate_info.replica
×
119
        log.warn("Changing etcd master to %s", candidate.endpoint)
×
120

121
        etcd_switch {
×
122
                etcd_shard = etcd_shard,
123
                etcd_master = etcd_shard:master(),
124
                etcd_master_name = etcd_shard:master_name(),
125
                candidate_uuid = candidate:uuid(),
126
        }
127

128
        local optimal_rcq
129
        local current_rcq = candidate:cfg().replication_connect_quorum or #candidate:replication()
×
130

131
        -- TODO: rework this part. RCQ=1 (because instance already bootstrapped)
132
        if current_rcq > #repl.replica_list and not no_orphan_fix then
×
133
                local n_upstreams = #candidate:followed_upstreams()
×
134
                optimal_rcq = #repl.replica_list
×
135

136
                if n_upstreams < optimal_rcq-1 then
×
137
                        log.warn("Cant fix RCQ because too Too little amount of connected upstreams (got %d, required %d)",
×
138
                                n_upstreams+1, optimal_rcq -- we count self here
×
139
                        )
140
                        optimal_rcq = nil
×
141
                end
142
                log.warn("Will fix replication_connect_quorum: %d -> %d", current_rcq, optimal_rcq)
×
143
        end
144

145
        local target_vclock = candidate_info.max_achievable_vclock
×
146
        local new_info, time_or_err = candidate.conn:eval([[
×
147
                local target_vclock, timeout, rcq = ...
148
                local log = require 'log'
149
                local clock = require 'clock'
150
                local j = require 'json'
151
                local f = require 'fiber'
152
                local s = f.time()
153
                local deadline = s+timeout
154

155
                local function vclock_reached()
156
                        for id, lsn in pairs(target_vclock) do
157
                                if (box.info.vclock[id] or 0) < lsn then
158
                                        return false
159
                                end
160
                        end
161
                        return true
162
                end
163

164
                while not vclock_reached() and clock.time() < deadline do f.sleep(0.001) end
165

166
                if not vclock_reached() then
167
                        log.error("switchover: (data safe) switchover failed. vclock %s wasnt reached",
168
                                j.encode(target_vclock))
169
                        return false, ("vclock was not reached in: %.4fs"):format(f.time()-s), box.info
170
                end
171

172
                if deadline < clock.time() then
173
                        log.warn("switchover: (data safe) Timed out reached. Node wont be promoted to master")
174
                        return false, ("timed out. no promote were done: %.4fs"):format(f.time()-s), box.info
175
                end
176

177
                log.warn("switchover: (success) vclock on candidate successfully reached. "
178
                        .."Calling box.cfg{ read_only = false } (node will become master)")
179

180
                box.cfg{ read_only = false, replication_connect_quorum = rcq }
181
                return box.info, f.time() - s
182
        ]], {target_vclock, timeout, optimal_rcq}, { timeout = 2*timeout })
×
183

184
        if deadline < clock.time() then
×
185
                return false, "Timeout was reached"
×
186
        end
187

188
        if not new_info then
×
189
                return true, time_or_err
×
190
        end
191

192
        if candidate:ro({update=true}) then
×
193
                if candidate:cfg().read_only then
×
194
                        return true, "Autopromote was finished, but someone change cfg.read_only to true"
×
195
                end
196

197
                if candidate:status() ~= 'orphan' then
×
198
                        return true, "Autopromote was finished, but replica stayed in ro: status:"..candidate:status()
×
199
                end
200

201
                log.warn("Replica is orphan. Cannot change replication_connect_quorum. Trying to restart replication")
×
202

203
                local ok, err = pcall(function()
×
204
                        candidate.conn:eval([[
×
205
                                repl = box.cfg.replication
206
                                box.cfg{ replication = {} }
207
                                box.cfg{ replication = repl }
208
                                repl = nil
209
                        ]], {}, { timeout = timeout })
×
210
                end)
211

212
                if not ok then
×
213
                        return true, ("Restart replication failed: %s"):format(err)
×
214
                end
215

216
                candidate:_get{update = true}
×
217

218
                if candidate:ro() then
×
219
                        return true, ("Candidate is still in RO state. Cant do nothing with this")
×
220
                end
221

222
                log.info("Candidate was promoted successfully to master")
×
223
        end
224

225
        if not args.no_reload then
×
226
                candidate:package_reload()
×
227
        end
228

229
        return true
×
230
end
231

232
function M.run(args)
2✔
233
        assert(args.command == "auto", "command must be auto")
1✔
234

235
        if not G.etcd then
1✔
236
                e.panic("ETCD is required for switchover auto")
×
237
        end
238

239
        local how, etcd_cluster, shard = switchover_resolve.generic({args.shard}, 'shard')
1✔
240
        if how ~= 'etcd' then
1✔
241
                e.panic("ETCD must be used")
×
242
                return
×
243
        end
244
        if shard == nil then
1✔
245
                e.panic("No shard discovered from %s", args.shard)
×
246
                return
×
247
        end
248

249
        assert(shard, "shard must be defined")
1✔
250

251
        if shard:is_proxy() then
2✔
252
                e.panic("Cannot failover proxy")
×
253
        end
254

255
        ---@type Cluster
256
        local etcd_shard = shard --[[@as Cluster]]
1✔
257

258
        log.prefix(("[%s]"):format(shard.name))
1✔
259

260
        local repl = switchover_discovery.discovery({
2✔
261
                endpoints = shard and shard:endpoints(),
2✔
262
                discovery_timeout = args.discovery_timeout,
1✔
263
        })
264

265
        if #repl:masters() > 1 then
2✔
266
                e.panic("Too many masters in replicaset: %d", #repl:masters())
×
267
        end
268

269
        if repl:has_enabled_raft() then
2✔
270
                e.panic("Refusing auto switch in RAFT shard")
×
271
        end
272

273
        if repl:master() then
2✔
274
                log.warn("Master is discovered: %s. Nothing to do", repl:master())
×
275
                repl:destroy()
×
276
                return 0
×
277
        end
278

279
        local etcd_master = shard:master()
1✔
280
        assert(etcd_master.instance_uuid, "instance_uuid is not set by etcd for etcd_master")
1✔
281

282
        if not args.no_orphan_fix then
1✔
283
                local rcq, is_set = shard:RCQ()
1✔
284
                if is_set then
1✔
285
                        log.warn("Replication connect quorum is set to %d in ETCD. Cannot change it", rcq)
1✔
286
                        args.no_orphan_fix = true
1✔
287
                else
288
                        log.warn("Replication Connect Quorum is not set in ETCD: def:%d", rcq)
×
289
                end
290
        end
291

292
        if args.promote_master_uuid then
1✔
293
                args.auto_with_promote = nil
×
294
        end
295
        if args.auto_with_promote then
1✔
296
                args.promote_master_uuid = etcd_master.instance_uuid
×
297
                args.auto_with_promote = nil
×
298
        end
299

300
        if args.promote_master_uuid and etcd_master.instance_uuid ~= args.promote_master_uuid then
1✔
301
                log.warn("ETCD master has been changed %s -> %s. Refusing auto promote",
×
302
                        args.promote_master_uuid, etcd_master.instance_uuid
×
303
                )
304
                repl:destroy()
×
305
                return 1
×
306
        end
307

308
        local previous_master = repl.replicas[etcd_master.instance_uuid]
1✔
309
        if previous_master then
1✔
310
                log.info("ETCD master is connected: %s (status: %s, up: %s)",
2✔
311
                        previous_master, previous_master:status(), previous_master:info().uptime)
4✔
312
        elseif args.promote_master_uuid then
×
313
                log.error("Required to promote %s leader, but it is down. Refusing auto promote", args.promote_master_uuid)
×
314
                repl:destroy()
×
315
                return 1
×
316
        end
317

318
        local candidate
319
        local n_live_downstreams = 0
1✔
320
        for _, r in ipairs(repl.replica_list) do
4✔
321
                local mu = r:replicates_from(etcd_master.instance_uuid)
3✔
322
                if mu then
3✔
323
                        log.warn("Master is replicated by %s -> %s %s:%s %s",
4✔
324
                                etcd_master.endpoint, r.endpoint,
2✔
325
                                mu.upstream.status,
2✔
326
                                tonumber(mu.upstream.lag) and ("%.3fs"):format(tonumber(mu.upstream.lag)) or mu.upstream.lag,
2✔
327
                                json.encode((mu.downstream or {}).vclock)
2✔
328
                        )
329
                        n_live_downstreams = n_live_downstreams + 1
2✔
330
                end
331
        end
332

333
        -- Check that replication graph is complete with repl.replica_list
334

335
        if n_live_downstreams > 0 and not args.promote_master_uuid then
1✔
336
                log.warn("Master has %d live downstreams. Refusing autoswitch", n_live_downstreams)
1✔
337
                repl:destroy()
1✔
338
                return 1
1✔
339
        end
340

341
        local max_vclock = repl:max_vclock()
×
342
        if not args.promote_master_uuid then
×
343
                assert(n_live_downstreams == 0)
×
344
                log.warn("0 replicas replicate data from ETCD Master. Searching for best candidate to vclock %s",
×
345
                        json.encode(max_vclock))
×
346
        end
347

348
        ---Promote order contains list of Candidates (not ETCD master) that are okay to be next Leader.
349
        ---@type CandidateInfo[]
350
        local promote_order = {}
×
351

352
        for _, r in ipairs(repl.replica_list) do
×
353
                local instance_autofailover_role = "candidate"
×
354
                local instance_autofailover_priority = 1
×
355
                local ei = etcd_shard:instance_by_uuid(r:uuid())
×
356
                if ei then
×
357
                        instance_autofailover_role = ei.autofailover.role
×
358
                        instance_autofailover_priority = ei.autofailover.priority or 1
×
359
                end
360

361
                if ({orphan=true,running=true})[r:status()] and r ~= previous_master and instance_autofailover_role ~= "witness" then
×
362
                        table.insert(promote_order, build_candidate_info(repl, r, max_vclock, instance_autofailover_priority))
×
363
                end
364
        end
365

366
        -- if ndws > RF then fastest
367
        -- otherwise take max_achievable_vclock
368
        table.sort(promote_order, function(a, b)
×
369
                if a.score == b.score then
×
370
                        if a.mdsign == b.mdsign then
×
371
                                -- if diff signature is equal use failover priority
372
                                return a.failover_priority > b.failover_priority
×
373
                        end
374
                        -- choose with lowest diff signature
375
                        return a.mdsign < b.mdsign
×
376
                end
377

378
                -- if reload will freeze, choose the one where it will not
379
                -- if possible
380
                if a.rwf_num ~= b.rwf_num then
×
381
                        return a.rwf_num < b.rwf_num
×
382
                end
383
                -- choose with most downstreams
384
                return a.score > b.score
×
385
        end)
386

387
        candidate = promote_order[1]
×
388
        if args.promote_master_uuid then
×
389
                ---@type CandidateInfo[]
390
                -- promote_master_uuid must contain etcd_master.uuid
391
                -- and etcd_master_uuid must be previous_master
392
                -- previous_master can never be a candidate.
393
                -- so we check that our previous_master==etcd_master==promote_master_uuid is not in `promote_order`
394
                local cnds = fun.iter(promote_order)
×
395
                        ---@param cnd CandidateInfo
396
                        :grep(function(cnd) return cnd.replica:uuid() == args.promote_master_uuid end)
×
397
                        :totable()
×
398
                if #cnds ~= 0 then
×
399
                        log.error("promote_master found in candidate list (but it must not. maybe etcd changed) promote_master_uuid = %s",
×
400
                                args.promote_master_uuid
401
                        )
402
                        repl:destroy()
×
403
                        return 1
×
404
                end
405

406
                local next_candidate = build_candidate_info(repl, previous_master, max_vclock, 1)
×
407
                candidate = next_candidate
×
408
                previous_master = nil
×
409
        end
410

411
        if not candidate then
×
412
                log.error("No candidate is ready to become master")
×
413
                repl:destroy()
×
414
                return 1
×
415
    end
416
        ---@cast candidate CandidateInfo
417
        log.warn("Choosing candidate: %s / %s",
×
418
                candidate.replica:name(),
×
419
                json.encode(candidate.replica:vclock())
×
420
        )
421

422
        if candidate.mdsign ~= 0 then
×
423
                log.error("At least %d operations will be lost during failover. Failoving to vclock %s",
×
424
                        candidate.mdsign, json.encode(candidate.max_achievable_vclock))
×
425
        else
426
                log.warn("vclock %s is achievable. No operations will be lost",
×
427
                        json.encode(max_vclock))
×
428
        end
429

430
        local mutex_key = shard:switchover_path()
×
431
        log.info("Taking mutex key: %s", mutex_key)
×
432

433
        local mutex_value = ('switchover:%s:%s'):format(repl.uuid, candidate.replica:uuid())
×
434
        local timeout = math.max(args.autofailover_timeout or 0, 2.5)
×
435

436
        local ok, err = Mutex:new(mutex_key)
×
437
                :atomic(
×
438
                        { -- key
439
                                key = mutex_value,
440
                                ttl = 2*timeout,
441
                                release_on_success = true,
442
                        },
443
                        failover, -- function
444
                        {
445
                                replicaset = repl,
446
                                candidate_info = candidate,
447
                                etcd_shard = shard,
448
                                mutex_key = mutex_key,
449
                                mutex_value = mutex_value,
450
                                timeout = timeout,
451
                                no_orphan_fix = args.no_orphan_fix,
452
                                previous_master = previous_master,
453
                        }
454
                )
455

456
        if err then
×
457
                if ok then
×
458
                        log.warn("Switchover failed but replicaset is consistent. Reason: %s", err)
×
459
                else
460
                        log.error("ALERT: Switchover ruined your replicaset. Restore it by yourself. Reason: %s", err)
×
461
                end
462

463
                switchover_discovery.run {
×
464
                        command = 'discovery',
465
                        endpoints = shard:endpoints(),
466
                        discovery_timeout = args.discovery_timeout,
467
                }
468
                repl:destroy()
×
469
                return 1
×
470
        else
471
                -- Everything is fine:
472
                log.info("Candidate %s/%s was auto promoted", candidate.replica:id(), candidate.replica.endpoint)
×
473

474
                local routers = etcd_cluster and etcd_cluster:routers()
×
475
                if etcd_cluster and not args.no_reload_routers and routers then
×
476
                        log.info("Found routers in ETCD: %s", table.concat(routers:get_shard_names(), ','))
×
477

478
                        local proxy = switchover_discovery.cluster {
×
479
                                cluster = routers,
480
                                discovery_timeout = args.discovery_timeout,
481
                        }
482

483
                        for router_name, msg in pairs(proxy) do
×
484
                                local router = msg.replicaset:master()
×
485
                                if not router then
×
486
                                        log.warn('Router %s is not discovered', router_name)
×
487
                                elseif router.can_package_reload then
×
488
                                        log.info("Calling package.reload on router %s", router_name)
×
489
                                        router:package_reload()
×
490
                                else
491
                                        log.info("Router is discovered but package.reload() not found: %s", router)
×
492
                                end
493
                        end
494
                end
495

496
                repl:destroy()
×
497
                return 0
×
498
        end
499
end
500

501
return M
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc