• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ochaton / switchover / #96

pending completion
#96

push

ochaton
fix: Fixes ETCD endpoint selection and ignore_tarantool_quorum

28 of 28 new or added lines in 6 files covered. (100.0%)

5210 of 8029 relevant lines covered (64.89%)

963.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.8
/switchover/etcd.lua
1
local M    = {}
89✔
2
local log  = require 'switchover._log'
89✔
3
local fio  = require 'fio'
89✔
4
local fun  = require 'fun'
89✔
5
local json = require 'json'
89✔
6
local yaml = require 'yaml'
89✔
7
local G    = require 'switchover._global'
89✔
8
local e    = require 'switchover._error'
89✔
9

10
---@type table<string, ObjectChecker>
11
local parsers = {
89✔
12
        ['etcd.cluster.master'] = require 'switchover.core.etcd_cluster_master',
89✔
13
        ['etcd.instance.single'] = require 'switchover.core.etcd_instance_single',
89✔
14
}
15

16
local formates = {
89✔
17
        json = json,
89✔
18
        yaml = yaml,
89✔
19
}
20

21
local function pp(t, fmt, opts)
22
        fmt = fmt or 'yaml'
88✔
23
        opts = opts or {}
88✔
24
        local formatter = formates[fmt] or yaml
88✔
25
        print(formatter.encode(t))
88✔
26
        if opts.exit == false then return end
88✔
27
        os.exit(0)
87✔
28
end
29

30
---comment
31
---@param tree any
32
---@param errs CheckError[]
33
---@param policy string
34
local function describe_error(tree, errs, policy)
35
        local es = table.concat(fun.map(tostring, errs):totable(), "\n\t")
×
36
        log.error("Validation failed (policy: %s):\n\t%s", policy, es)
×
37

38
        -- Fill shadown tree with error messages
39
        local shadow = {}
×
40
        for _, err in ipairs(errs) do
×
41
                local t = shadow
×
42
                for i = 1, #err.path_r - 1 do
×
43
                        local k = err.path_r[i]
×
44
                        t[k] = t[k] or {}
×
45
                        t = t[k]
×
46
                end
47
                if #err.path_r > 0 then
×
48
                        t[err.path_r[#err.path_r]] = err.message
×
49
                end
50
        end
51

52
        ---Tree minus Tree
53
        ---@param node table
54
        ---@param shadow_node table
55
        local function dfs(node, shadow_node)
56
                if type(node) ~= 'table' then return end
×
57
                for key, subtree in pairs(node) do
×
58
                        if not shadow_node[key] and type(subtree) == 'table' then
×
59
                                node[key] = '...'
×
60
                        else
61
                                dfs(subtree, shadow_node[key])
×
62
                        end
63
                end
64
        end
65

66
        dfs(tree, shadow)
×
67
        for _, err in ipairs(errs) do
×
68
                local s = tree
×
69
                for _, path in ipairs(err.path_r) do
×
70
                        s = s[path]
×
71
                end
72
                if type(s) == 'table' then
×
73
                        local msg = err.message
×
74
                        setmetatable(s, {
×
75
                                __serialize = function(v)
76
                                        local mt = getmetatable(v)
×
77
                                        if mt and mt.__serialize then mt.__serialize = nil end
×
78
                                        setmetatable(v, mt)
×
79
                                        v = yaml.encode(v)
×
80
                                                :gsub("%-%-%-", "")
×
81
                                                :gsub("%.%.%.\n", "")
×
82
                                                :gsub("%s+\n", "\n")
×
83
                                                :gsub("\n[ ]+'\n", "")
×
84
                                                :gsub("\n+", "\n")
×
85
                                        return ('%{red}// ' .. msg .. '%{white}') .. v
×
86
                                end
87
                        })
88
                end
89
        end
90
        print((log.ansicolors(yaml.encode(tree))
×
91
                :gsub("\\n", "\n")
×
92
                :gsub("\\ ", " ")
×
93
                :gsub("\n[ ]+'\n", "")
×
94
                :gsub("\n+", "\n")
×
95
                :gsub("'+", "'")
×
96
                ))
97

98
end
99

100
local function validate(args)
101
        assert(args.command_etcd, "must be command_etcd")
1✔
102
        assert(args.etcd_path, "etcd_path must be given")
1✔
103

104
        local tree = G.etcd:getr(args.etcd_path, { leader = true })
1✔
105

106
        local parser = parsers[args.etcd_policy]
1✔
107
        if not parser then
1✔
108
                e.panic("Policy not implemented yet")
×
109
        end
110

111
        local data, ctx = parser:verify_safe(tree)
1✔
112
        if data == nil then
1✔
113
                describe_error(tree, ctx.errors, args.etcd_policy)
×
114
                os.exit(1)
×
115
        end
116

117
        log.info("%s is valid %s", args.etcd_path, args.etcd_policy)
1✔
118
end
119

120
---prompts user
121
---@param question string
122
---@param valids string[]
123
---@param default? string
124
local function prompt(question, valids, default)
125
        local vals = table.concat(valids, "/")
×
126
        local vmap = fun.zip(valids, fun.ones()):tomap()
×
127
        if default then
×
128
                vals = vals:gsub(default, default:upper())
×
129
        end
130

131
        if vals ~= '' then
×
132
                question = question .. ('[%s] '):format(vals)
×
133
        end
134

135
        io.stdout:setvbuf("no")
×
136

137
        local r
138
        repeat
139
                r = ''
×
140
                while r == '' do
×
141
                        io.stdout:write(question)
×
142
                        r = io.stdin:read("*l")
×
143
                        if r == '' and default then r = default end
×
144
                end
145
                if not next(vmap) then break end
×
146
        until vmap[r]
×
147

148
        return r
×
149
end
150

151
---Creates template for application
152
---@param args table
153
local function create(args)
154
        local app_name = assert(args.etcd_create_name, "name must be specified")
×
155
        local template = {
×
156
                common = {},
157
                instances = {}
×
158
        }
159

160
        local cfg = {}
×
161

162
        io.stdout:setvbuf("no")
×
163
        -- 0) Cluster or Proxy
164
        cfg.statefull = prompt("Is application statefull? ", {'y', 'n'}, 'y') == 'y'
×
165

166
        -- 1) Cluster or Single Shard?
167
        if cfg.statefull then
×
168
                cfg.n_shards = tonumber(prompt("How many shards do you need [default: 2]: ", {}, "2"))
×
169
        else
170
                cfg.n_proxies = tonumber(prompt("How many instances do you need [default: 2]: ", {}, "2"))
×
171
        end
172

173
        -- 2) Replica Factor?
174
        if cfg.statefull then
×
175
                cfg.replica_factor = tonumber(prompt("How many replicas for each shard do you need [default: 2]: ", {}, "2"))
×
176
        end
177

178
        local mm = {
×
179
                [true] = '256m',
180
                [false] = '32m',
181
        }
182
        -- 3) Initial memtx_memory
183
        while true do
184
                local memtx_memory = prompt(
×
185
                        ("What memtx_memory do you need for each instance? [default: %s] "):format(mm[cfg.statefull]),
×
186
                {}, mm[cfg.statefull])
×
187
                local x = memtx_memory:lower()
×
188
                if x:match('^[0-9]+[mg]$') then
×
189
                        local num = tonumber(x:match('[0-9]+'))
×
190
                        cfg.memtx_memory = num * tonumber((x:sub(-1, -1):gsub('.', {m=2^20, g=2^30})))
×
191
                        if cfg.memtx_memory >= 32*2^20 then break end
×
192
                        io.stdout:write("Sorry, memtx_memory can't be less than 32m\n")
×
193
                else
194
                        io.stdout:write("Didn't catch that. Please type 100m or 4g\n")
×
195
                end
196
        end
197

198
        do
199
                local q = "How many servers do you need? "
×
200
                if cfg.replica_factor then
×
201
                        q = ("How many servers do you need? [default: %s] "):format(cfg.replica_factor)
×
202
                end
203

204
                local min = tonumber(cfg.replica_factor) or 1
×
205
                while true do -- luacheck:ignore
206
                        ::again::
×
207
                        local n = prompt(q, {}, tostring(min))
×
208
                        if not n:match('^[0-9]+$') then
×
209
                                io.stdout:write("Please use only positive numeric value")
×
210
                                goto again
×
211
                        end
212
                        if tonumber(n) < min then
×
213
                                io.stdout:write(("You should have at least %s number of servers\n"):format(min))
×
214
                                goto again
×
215
                        end
216
                        cfg.n_servers = tonumber(n)
×
217
                        break
218
                end
219
        end
220

221
        -- 4) IP of servers?
222
        local srvs = {}
×
223
        for i = 1, cfg.n_servers do
×
224
                while true do -- luacheck: ignore
225
                        ::again::
×
226
                        local srv = prompt(("Specify ip for server %d: "):format(i), {})
×
227
                        if not srv:match('^[0-9]+%.[0-9]+%.[0-9]+%.[0-9]+$') then
×
228
                                local is_valid = prompt(("%q is not ipv4. Is it valid? "):format(srv), {'y', 'n'}, 'n') == 'y'
×
229
                                if not is_valid then goto again end
×
230
                        end
231

232
                        if srvs[srv] then
×
233
                                io.stdout:write(("address %q already used as server %d\n"):format(srv, srvs[srv]))
×
234
                                goto again
×
235
                        end
236

237
                        local need_ping = prompt(
×
238
                                ("Do you want me to ping %q for you? "):format(srv), {'y', 'n'}, 'n') == 'y'
×
239

240
                        if not need_ping then
×
241
                                srvs[srv] = i
×
242
                                srvs[i] = srv
×
243
                                break
244
                        end
245

246
                        local ping_ok = os.execute(("ping -W 1 -c 1 %s"):format(srv)) == 0
×
247
                        local save
248
                        if not ping_ok then
×
249
                                save = prompt(
×
250
                                        ("%q does not respond to ping still use it? "):format(srv), {'y', 'n'}, 'n') == 'y'
×
251
                        else
252
                                save = prompt(
×
253
                                        ("ping to %q is received. Use it? "):format(srv), {'y', 'n'}, 'n') == 'y'
×
254
                        end
255

256
                        if not save then
×
257
                                goto again
×
258
                        end
259

260
                        srvs[srv] = i
×
261
                        srvs[i] = srv
×
262
                        break
263
                end
264
        end
265

266
        cfg.servers = fun.totable(srvs)
×
267

268
        -- 5) Port range of instances?
269
        do
270
                while true do -- luacheck: ignore
271
                        ::again::
×
272
                        local num = prompt("Please specify port_base for instance. (ex: 7100 allows 100 instances) ", {})
×
273
                        if not num:match('^[0-9]+$') or not tonumber(num) then
×
274
                                io.stdout:write("You response must be numeric\n")
×
275
                                goto again
×
276
                        end
277
                        local port = tonumber(num)
×
278
                        if port < 1024 or port > 32678 then
×
279
                                io.stdout:write("Please use base in range [1024-32768]\n")
×
280
                                goto again
×
281
                        end
282

283
                        if port % 100 ~= 0 then
×
284
                                io.stdout:write(("Cannot use port %q as base, it must end with 00\n"):format(port))
×
285
                                goto again
×
286
                        end
287
                        cfg.port_base = port
×
288
                        break
289
                end
290
        end
291
        -- 6) Instance uuid/Replicaset uuid prefixes? or auto?
292
        local l337 = function(s)
293
                return (s:lower():gsub('.', {
×
294
                        a = 'a', b = '8', c = 'c', d = 'd', e = 'e', f = 'f',
295
                        g = '6', h = '4', i = '9', j = '7', k = '0', l = '1',
296
                        m = '44', n = '11', o = '0', p = '17', q = '9', r = '12',
297
                        s = '5', t = '7', u = '4', v = '5', w = '22', x = '8', y = '7', z = '5',
298
                }))
299
        end
300

301
        local uuid_pref = ("%8s"):format(l337(app_name):sub(1, 8)):gsub(" ", '0')
×
302
        local uuid_template = ('%s-%04d-%04d-0000-000000000000')
×
303

304
        local function rcq(n)
305
                if n <= 2 then return n end
×
306
                return math.floor(n/2)+1
×
307
        end
308

309
        template.common.box = {
×
310
                replication_connect_quorum = cfg .statefull and rcq(cfg.replica_factor) or nil,
311
                memtx_memory = cfg.memtx_memory,
312
                log_level = 5,
313
        }
314

315
        if cfg.statefull then
×
316
                template.clusters = {}
×
317
                if cfg.n_shards == 1 then
×
318
                        template.clusters[app_name] = {
×
319
                                master = app_name .. '_01',
320
                                replicaset_uuid = uuid_template:format(uuid_pref, 1, 0),
321
                        }
322
                        for i = 1, cfg.replica_factor do
×
323
                                local inst_name = app_name .. ('_%02d'):format(i)
×
324
                                local listen_port = cfg.port_base + 10*0 + i
×
325
                                template.instances[inst_name] = {
×
326
                                        cluster = app_name,
327
                                        box = {
×
328
                                                instance_uuid = uuid_template:format(uuid_pref, 0, i),
329
                                                listen = cfg.servers[i]..':'..listen_port,
330
                                        }
331
                                }
332
                        end
333
                else
334
                        for n = 1, cfg.n_shards do
×
335
                                local shard_name = app_name .. ('_%03d'):format(n)
×
336
                                template.clusters[shard_name] = {
×
337
                                        master = shard_name .. '_01',
338
                                        replicaset_uuid = uuid_template:format(uuid_pref, n, 0),
339
                                }
340
                                for i = 1, cfg.replica_factor do
×
341
                                        local inst_name = shard_name .. ('_%02d'):format(i)
×
342
                                        local listen_port = cfg.port_base + 10*n + i
×
343
                                        template.instances[inst_name] = {
×
344
                                                cluster = shard_name,
345
                                                box = {
×
346
                                                        instance_uuid = uuid_template:format(uuid_pref, n, i),
347
                                                        listen = cfg.servers[i]..':'..listen_port,
348
                                                }
349
                                        }
350
                                end
351
                        end
352
                end
353
        else
354
                template.clusters = nil
×
355
                template.instances = {}
×
356

357
                for i = 1, cfg.n_proxies do
×
358
                        local inst_name = app_name .. ('_%03d'):format(i)
×
359
                        local listen_port = cfg.port_base + i
×
360
                        local listen_addr = cfg.servers[i%#cfg.servers+1] .. ':'..listen_port
×
361
                        template.instances[inst_name] = {
×
362
                                box = {
×
363
                                        instance_uuid = uuid_template:format(uuid_pref, 0, i),
364
                                        listen = listen_addr,
365
                                },
366
                        }
367
                end
368
        end
369

370
        pp({ [app_name] =  template }, args.etcd_output_format, {exit=false})
×
371

372
        if prompt("Save configuration to file? ", {'y', 'n'}, 'y') == 'y' then
×
373
                local fname = ("%s.etcd.yaml"):format(app_name)
×
374
                local fh = assert(io.open(fname, "w"))
×
375
                assert(fh:write(yaml.encode({ [app_name] = template })))
×
376
                assert(fh:close())
×
377
                print("Saved to file "..fname)
×
378
                os.exit(0)
×
379
        end
380
        os.exit(0)
×
381
end
382

383
--- ETCD API
384
function M.run(args)
89✔
385
        assert(args.command_etcd, "must be command_etcd")
89✔
386
        if not G.etcd and not args.etcd_create then
89✔
387
                e.panic("ETCD is required for command etcd")
×
388
        end
389

390
        if args.etcd_list then
89✔
391
                local listing
392
                if args.recursive then
16✔
393
                        listing = G.etcd:lsr(args.etcd_path, {}, { leader = true })
×
394
                else
395
                        listing = G.etcd:ls(args.etcd_path, {}, { leader = true })
32✔
396
                end
397
                table.sort(listing)
16✔
398
                pp(listing, args.etcd_output_format)
16✔
399
        elseif args.etcd_get then
73✔
400
                if args.recursive then
44✔
401
                        pp(G.etcd:getr(args.etcd_path, {}, { leader = true }), args.etcd_output_format)
70✔
402
                else
403
                        pp(G.etcd:get(args.etcd_path, {}, { leader = true }), args.etcd_output_format)
18✔
404
                end
405
        elseif args.etcd_set then
29✔
406
                pp(G.etcd:set(args.etcd_path, args.etcd_value, {}, { leader = true }), args.etcd_output_format)
2✔
407
        elseif args.etcd_load then
28✔
408
                local data = args.etcd_input:read("*all")
23✔
409

410
                if args.etcd_load_input_format == "json" then
23✔
411
                        data = json.decode(data)
×
412
                else
413
                        data = yaml.decode(data)
23✔
414
                end
415

416
                do
417
                        local etcd_data = G.etcd:getr(args.etcd_path, {}, { raw = true, leader = true })
23✔
418
                        local fname = fio.pathjoin(G.workdir,
46✔
419
                                G.etcd.last_headers["x-etcd-index"] .. "_" .. os.date("%Y%m%dT%H%M%S") ..
23✔
420
                                "_etcd_dump_" .. args.etcd_path:gsub("/", "_"))
23✔
421
                        local f = assert(io.open(fname, "wx"))
23✔
422
                        if etcd_data.node then
23✔
423
                                assert(f:write(yaml.encode(G.etcd:unpack(etcd_data.node, args.etcd_path))))
36✔
424
                                assert(f:close())
18✔
425
                        end
426
                        log.verbose("ETCD %s dumped => %s", args.etcd_path, fname)
23✔
427
                end
428

429
                pp(G.etcd:fill(args.etcd_path, data, {
69✔
430
                        leader = true,
431
                        dry_run = args.etcd_load_dry_run,
23✔
432
                        sync_delete = args.etcd_load_delete,
23✔
433
                        no_change = args.etcd_load_append_only,
23✔
434
                }), args.etcd_output_format)
23✔
435
        elseif args.etcd_rm then
5✔
436
                pp(G.etcd:rm(args.etcd_path, { recursive = args.etcd_recursive == true }, { leader = true }), args.etcd_output_format)
×
437
        elseif args.etcd_rmdir then
5✔
438
                pp(G.etcd:rmdir(args.etcd_path, { leader = true }), args.etcd_output_format)
×
439
        elseif args.etcd_wait then
5✔
440
                pp(G.etcd:wait(args.etcd_path,
6✔
441
                        { recursive = args.etcd_recursive == true, waitIndex = args.etcd_index },
2✔
442
                        { timeout = args.timeout }
2✔
443
                ), args.etcd_output_format)
2✔
444
        elseif args.etcd_validate then
3✔
445
                return validate(args)
1✔
446
        elseif args.etcd_create then
2✔
447
                return create(args)
×
448
        elseif args.etcd_version then
2✔
449
                pp(G.etcd:version())
2✔
450
        elseif args.etcd_health then
1✔
451
                local health = G.etcd:health()
1✔
452
                local is_healthy = true
1✔
453
                for _, v in pairs(health) do
4✔
454
                        if v ~= 'healthy' then
3✔
455
                                is_healthy = false
×
456
                        end
457
                end
458
                pp(health, 'yaml', {exit=false})
1✔
459
                if is_healthy then
1✔
460
                        os.exit(0)
1✔
461
                else
462
                        log.warn("ETCD is degraded")
×
463
                        os.exit(1)
×
464
                end
465
        else
466
                e.panic("Command is Not Implemented Yet")
×
467
        end
468
end
469

470
return M
89✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc