• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / config / 10834992504

12 Sep 2024 04:34PM UTC coverage: 68.242% (-0.2%) from 68.467%
10834992504

push

github

ochaton
ci: fixes

	* etcd image was updated to v3.5.16
	* setup-tarantool was switched to @master
	* strategy/fail-fast was disabled for matrix

621 of 910 relevant lines covered (68.24%)

109.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.64
/config.lua
1
---@diagnostic disable: inject-field
2
local log = require 'log'
38✔
3
if log.new then
38✔
4
        log = log.new('moonlibs.config')
×
5
end
6
local fio = require 'fio'
38✔
7
local json = require 'json'.new()
38✔
8
local yaml = require 'yaml'.new()
38✔
9
local digest = require 'digest'
38✔
10
local fiber  = require 'fiber'
38✔
11
local clock  = require 'clock'
38✔
12
json.cfg{ encode_invalid_as_nil = true }
38✔
13
yaml.cfg{ encode_use_tostring = true }
38✔
14

15
---Retrieves all upvalues of given function and returns them as kv-map
16
---@param fun fun()
17
---@return table<string,any> variables
18
local function lookaround(fun)
19
        local vars = {}
76✔
20
        local i = 1
76✔
21
        while true do
22
                local n,v = debug.getupvalue(fun,i)
876✔
23
                if not n then break end
876✔
24
                vars[n] = v
800✔
25
                i = i + 1
800✔
26
        end
27

28
        return vars
76✔
29
end
30

31
---@private
32
---@class moonlibs.config.reflect_internals
33
---@field dynamic_cfg table<string,boolean>
34
---@field default_cfg table<string,any>
35
---@field upgrade_cfg? fun(cfg: table<string,any>, translate_cfg: table): table<string,any>
36
---@field template_cfg? table
37
---@field translate_cfg? table
38
---@field log? table
39

40

41
---Unwraps box.cfg and retrieves dynamic_cfg, default_cfg tables
42
---@return moonlibs.config.reflect_internals
43
local function reflect_internals()
44
        local peek = {
38✔
45
                dynamic_cfg   = {};
38✔
46
                default_cfg   = {};
38✔
47
                upgrade_cfg   = true;
48
                translate_cfg = true;
49
                template_cfg  = true;
50
                log           = true;
51
        }
52

53
        local steps = {}
38✔
54
        local peekf = box.cfg
38✔
55
        local allow_unwrap = true
38✔
56
        while true do
57
                local prevf = peekf
76✔
58
                local mt = debug.getmetatable(peekf)
76✔
59
                if type(peekf) == 'function' then
76✔
60
                        -- pass
61
                        table.insert(steps,"func")
70✔
62
                elseif mt and mt.__call then
6✔
63
                        peekf = mt.__call
6✔
64
                        table.insert(steps,"mt_call")
6✔
65
                else
66
                        error(string.format("Neither function nor callable argument %s after steps: %s", peekf, table.concat(steps, ", ")))
×
67
                end
68

69
                local vars = lookaround(peekf)
76✔
70
                if type(vars.default_cfg) == 'table' then
76✔
71
                        for k in pairs(vars.default_cfg) do
1,672✔
72
                                peek.default_cfg[k] = vars.default_cfg[k]
1,634✔
73
                        end
74
                end
75
                if allow_unwrap and (vars.orig_cfg or vars.origin_cfg) then
76✔
76
                        -- It's a wrap of tarantoolctl/tt, unwrap and repeat
77
                        peekf = (vars.orig_cfg or vars.origin_cfg)
×
78
                        allow_unwrap = false
×
79
                        table.insert(steps,"ctl-orig")
×
80
                elseif vars.dynamic_cfg then
76✔
81
                        log.info("Found config by steps: %s", table.concat(steps, ", "))
38✔
82
                        for k in pairs(vars.dynamic_cfg) do
1,292✔
83
                                peek.dynamic_cfg[k] = true
1,254✔
84
                        end
85
                        for k in pairs(peek) do
266✔
86
                                if peek[k] == true then
228✔
87
                                        if vars[k] ~= nil then
152✔
88
                                                peek[k] = vars[k]
152✔
89
                                        else
90
                                                peek[k] = nil
×
91
                                        end
92
                                end
93
                        end
94
                        break
38✔
95
                elseif vars.lock and vars.f and type(vars.f) == 'function' then
38✔
96
                        peekf = vars.f
38✔
97
                        table.insert(steps,"lock-unwrap")
38✔
98
                elseif vars.old_call and type(vars.old_call) == 'function' then
×
99
                        peekf = vars.old_call
×
100
                        table.insert(steps,"ctl-oldcall")
×
101
                elseif vars.orig_cfg_call and type(vars.orig_cfg_call) == 'function' then
×
102
                        peekf = vars.orig_cfg_call
×
103
                        table.insert(steps,"ctl-orig_cfg_call")
×
104
                elseif vars.load_cfg_apply_dynamic then
×
105
                        table.insert(steps,"load_cfg_apply_dynamic")
×
106
                        for k in pairs(peek) do
×
107
                                if peek[k] == true then
×
108
                                        if vars[k] ~= nil then
×
109
                                                peek[k] = vars[k]
×
110
                                        end
111
                                end
112
                        end
113
                        peekf = vars.load_cfg_apply_dynamic
×
114
                elseif vars.dynamic_cfg_modules then
×
115
                        -- print(yaml.encode(vars.dynamic_cfg_modules))
116
                        log.info("Found config by steps: %s", table.concat(steps, ", "))
×
117
                        for k, v in pairs(vars.dynamic_cfg_modules) do
×
118
                                peek.dynamic_cfg[k] = true
×
119
                                for op in pairs(v.options) do
×
120
                                        peek.dynamic_cfg[op] = true
×
121
                                end
122
                        end
123
                        break;
×
124
                elseif vars.reload_cfg then
×
125
                        table.insert(steps,"reload_cfg")
×
126
                        peekf = vars.reload_cfg
×
127
                elseif vars.reconfig_modules then
×
128
                        table.insert(steps,"reconfig_modules")
×
129
                        for k in pairs(peek) do
×
130
                                if peek[k] == true then
×
131
                                        if vars[k] ~= nil then
×
132
                                                peek[k] = vars[k]
×
133
                                        end
134
                                end
135
                        end
136
                        peekf = vars.reconfig_modules
×
137
                elseif vars.orig_call and vars.wrapper_impl and type(vars.orig_call) == 'function' and type(vars.wrapper_impl) == 'function' then
×
138
                        peekf = vars.orig_call
×
139
                        table.insert(steps,"queue-cfg_call")
×
140
                else
141
                        for k,v in pairs(vars) do log.info("var %s=%s",k,v) end
×
142
                        error(string.format("Bad vars for %s after steps: %s", peekf, table.concat(steps, ", ")))
×
143
                end
144

145
                if prevf == peekf then
38✔
146
                        error(string.format("Recursion for %s after steps: %s", peekf, table.concat(steps, ", ")))
×
147
                end
148
        end
149
        return peek
38✔
150
end
151

152
local load_cfg = reflect_internals()
38✔
153

154
---Filters only valid keys from given cfg
155
---
156
---Edits given cfg and returns only clear config
157
---@param cfg table<string,any>
158
---@return table<string,any>
159
local function prepare_box_cfg(cfg)
160
        -- 1. take config, if have upgrade, upgrade it
161
        if load_cfg.upgrade_cfg then
70✔
162
                cfg = load_cfg.upgrade_cfg(cfg, load_cfg.translate_cfg)
140✔
163
        end
164

165
        -- 2. check non-dynamic, and wipe them out
166
        if type(box.cfg) ~= 'function' then
70✔
167
                for key, val in pairs(cfg) do
318✔
168
                        if load_cfg.dynamic_cfg[key] == nil and box.cfg[key] ~= val then
280✔
169
                                local warn = string.format(
×
170
                                        "Can't change option '%s' dynamically from '%s' to '%s'",
171
                                        key,box.cfg[key],val
×
172
                                )
173
                                log.warn("%s",warn)
×
174
                                print(warn)
×
175
                                cfg[key] = nil
×
176
                        end
177
                end
178
        end
179

180
        return cfg
70✔
181
end
182

183
local readonly_mt = {
38✔
184
        __index = function(_,k) return rawget(_,k) end;
166✔
185
        __newindex = function(_,k)
186
                error("Modification of readonly key "..tostring(k),2)
×
187
        end;
188
        __serialize = function(_)
189
                local t = {}
×
190
                for k,v in pairs(_) do
×
191
                        t[k]=v
×
192
                end
193
                return t
×
194
        end;
195
}
196

197
local function flatten (t,prefix,result)
198
        prefix = prefix or ''
404✔
199
        local protect = not result
404✔
200
        result = result or {}
404✔
201
        for k,v in pairs(t) do
2,349✔
202
                if type(v) == 'table' then
1,945✔
203
                        flatten(v, prefix..k..'.',result)
334✔
204
                end
205
                result[prefix..k] = v
1,945✔
206
        end
207
        if protect then
404✔
208
                return setmetatable(result,readonly_mt)
70✔
209
        end
210
        return result
334✔
211
end
212

213
local function get_opt()
214
        local take = false
×
215
        local key
216
        for _,v in ipairs(arg) do
×
217
                if take then
×
218
                        if key == 'config' or key == 'c' then
×
219
                                return v
×
220
                        end
221
                else
222
                        if string.sub( v, 1, 2) == "--" then
×
223
                                local x = string.find( v, "=", 1, true )
×
224
                                if x then
×
225
                                        key = string.sub( v, 3, x-1 )
×
226
                                        -- print("have key=")
227
                                        if key == 'config' then
×
228
                                                return string.sub( v, x+1 )
×
229
                                        end
230
                                else
231
                                        -- print("have key, turn take")
232
                                        key = string.sub( v, 3 )
×
233
                                        take = true
×
234
                                end
235
                        elseif string.sub( v, 1, 1 ) == "-" then
×
236
                                if string.len(v) == 2 then
×
237
                                        key = string.sub(v,2,2)
×
238
                                        take = true
×
239
                                else
240
                                        key = string.sub(v,2,2)
×
241
                                        if key == 'c' then
×
242
                                                return string.sub( v, 3 )
×
243
                                        end
244
                                end
245
                        end
246
                end
247
        end
248
end
249

250
local function deep_merge(dst,src,keep)
251
        -- TODO: think of cyclic
252
        if not src or not dst then error("Call to deepmerge with bad args",2) end
1,017✔
253
        for k,v in pairs(src) do
3,969✔
254
                if type(v) == 'table' then
2,952✔
255
                        if not dst[k] then dst[k] = {} end
498✔
256
                        deep_merge(dst[k],src[k],keep)
996✔
257
                else
258
                        if dst[k] == nil or not keep then
2,454✔
259
                                dst[k] = src[k]
2,454✔
260
                        end
261
                end
262
        end
263
end
264

265
local function deep_copy(src)
266
        local t = {}
169✔
267
        deep_merge(t, src)
169✔
268
        return t
169✔
269
end
270

271
local function is_array(a)
272
        local len = 0
56✔
273
        for k in pairs(a) do
116✔
274
                len = len + 1
92✔
275
                if type(k) ~= 'number' then
92✔
276
                        return false
32✔
277
                end
278
        end
279
        return #a == len
24✔
280
end
281

282
--[[
283
        returns config diff
284
        1. deleted values returned as box.NULL
285
        2. arrays is replaced completely
286
        3. nil means no diff (and not stored in tables)
287
]]
288

289
local function value_diff(old,new)
290
        if type(old) ~= type(new) then
324✔
291
                return new
×
292
        elseif type(old) == 'table' then
324✔
293
                if new == old then return end
56✔
294

295
                if is_array(old) then
112✔
296
                        if #new ~= #old then return new end
24✔
297
                        for i = 1,#old do
84✔
298
                                local diff = value_diff(old[i], new[i])
60✔
299
                                if diff ~= nil then
60✔
300
                                        return new
×
301
                                end
302
                        end
303
                else
304
                        local diff = {}
32✔
305
                        for k in pairs(old) do
288✔
306
                                if new[ k ] == nil then
256✔
307
                                        diff[k] = box.NULL
24✔
308
                                else
309
                                        local vdiff = value_diff(old[k], new[k])
232✔
310
                                        if vdiff ~= nil then
232✔
311
                                                diff[k] = vdiff
10✔
312
                                        end
313
                                end
314
                        end
315
                        for k in pairs(new) do
264✔
316
                                if old[ k ] == nil then
232✔
317
                                        diff[k] = new[k]
×
318
                                end
319
                        end
320
                        if next(diff) then
32✔
321
                                return diff
23✔
322
                        end
323
                end
324
        else
325
                if old ~= new then
268✔
326
                        return new
10✔
327
                end
328
        end
329
        -- no diff
330
end
331

332
local function toboolean(v)
333
        if v then
163✔
334
                if type(v) == 'boolean' then return v end
×
335
                v = tostring(v):lower()
×
336
                local n = tonumber(v)
×
337
                if n then return n ~= 0 end
×
338
                if v == 'true' or v == 'yes' then
×
339
                        return true
×
340
                end
341
        end
342
        return false
163✔
343
end
344

345
---@type table<string, fun(M: moonlibs.config, instance_name: string, common_cfg: table, instance_cfg: table, cluster_cfg: table, local_cfg: table):table >
346
local master_selection_policies;
38✔
347
master_selection_policies = {
38✔
348
        ['etcd.instance.single'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
349
                local cfg = {}
16✔
350
                deep_merge(cfg, common_cfg)
16✔
351
                deep_merge(cfg, instance_cfg)
16✔
352

353
                if cluster_cfg then
16✔
354
                        error("Cluster config should not exist for single instance config")
×
355
                end
356

357
                deep_merge(cfg, local_cfg)
16✔
358

359
                if cfg.box.read_only == nil then
16✔
360
                        log.info("Instance have no read_only option, set read_only=false")
16✔
361
                        cfg.box.read_only = false
16✔
362
                end
363

364
                if cfg.box.instance_uuid and not cfg.box.replicaset_uuid then
16✔
365
                        cfg.box.replicaset_uuid = cfg.box.instance_uuid
×
366
                end
367

368
                log.info("Using policy etcd.instance.single, read_only=%s",cfg.box.read_only)
16✔
369
                return cfg
16✔
370
        end;
371
        ['etcd.instance.read_only'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
372
                local cfg = {}
×
373
                deep_merge(cfg, common_cfg)
×
374
                deep_merge(cfg, instance_cfg)
×
375

376
                if cluster_cfg then
×
377
                        log.info("cluster=%s",json.encode(cluster_cfg))
×
378
                        assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
×
379
                        cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
×
380
                end
381

382
                deep_merge(cfg, local_cfg)
×
383

384
                if M.default_read_only and cfg.box.read_only == nil then
×
385
                        log.info("Instance have no read_only option, set read_only=true")
×
386
                        cfg.box.read_only = true
×
387
                end
388

389
                log.info("Using policy etcd.instance.read_only, read_only=%s",cfg.box.read_only)
×
390
                return cfg
×
391
        end;
392
        ['etcd.cluster.master'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
393
                log.info("Using policy etcd.cluster.master")
54✔
394
                local cfg = {}
54✔
395
                deep_merge(cfg, common_cfg)
54✔
396
                deep_merge(cfg, instance_cfg)
54✔
397

398
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
54✔
399
                cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
54✔
400

401
                if cfg.box.read_only ~= nil then
54✔
402
                        log.info("Ignore box.read_only=%s value due to config policy",cfg.box.read_only)
×
403
                end
404
                if cluster_cfg.master then
54✔
405
                        if cluster_cfg.master == instance_name then
54✔
406
                                log.info("Instance is declared as cluster master, set read_only=false")
27✔
407
                                cfg.box.read_only = false
27✔
408
                                if cfg.box.bootstrap_strategy ~= 'auto' then
27✔
409
                                        cfg.box.replication_connect_quorum = 1
27✔
410
                                        cfg.box.replication_connect_timeout = 1
27✔
411
                                end
412
                        else
413
                                log.info("Cluster has another master %s, not me %s, set read_only=true", cluster_cfg.master, instance_name)
27✔
414
                                cfg.box.read_only = true
27✔
415
                        end
416
                else
417
                        log.info("Cluster have no declared master, set read_only=true")
×
418
                        cfg.box.read_only = true
×
419
                end
420

421
                deep_merge(cfg, local_cfg)
54✔
422

423
                return cfg
54✔
424
        end;
425
        ['etcd.cluster.vshard'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
426
                log.info("Using policy etcd.cluster.vshard")
×
427
                if instance_cfg.cluster then
×
428
                        return master_selection_policies['etcd.cluster.master'](M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
×
429
                else
430
                        return master_selection_policies['etcd.instance.single'](M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
×
431
                end
432
        end;
433
        ['etcd.cluster.raft'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
434
                log.info("Using policy etcd.cluster.raft")
×
435
                local cfg = {}
×
436
                deep_merge(cfg, common_cfg)
×
437
                deep_merge(cfg, instance_cfg)
×
438

439
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
×
440
                cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
×
441

442
                if not cfg.box.election_mode then
×
443
                        cfg.box.election_mode = M.default_election_mode
×
444
                end
445

446
                -- TODO: anonymous replica
447
                if cfg.box.election_mode == 'off' then
×
448
                        log.info("Force box.read_only=true for election_mode=off")
×
449
                        cfg.box.read_only = true
×
450
                end
451

452
                if not cfg.box.replication_synchro_quorum then
×
453
                        cfg.box.replication_synchro_quorum = M.default_synchro_quorum
×
454
                end
455

456
                if cfg.box.election_mode == "candidate" then
×
457
                        cfg.box.read_only = false
×
458
                end
459

460
                deep_merge(cfg, local_cfg)
×
461

462
                return cfg
×
463
        end;
464
}
38✔
465

466
local function cast_types(c)
467
        if c then
287✔
468
                for k,v in pairs(c) do
628✔
469
                        if load_cfg.template_cfg[k] == 'boolean' and type(v) == 'string' then
341✔
470
                                c[k] = c[k] == 'true'
×
471
                        end
472
                end
473
        end
474
end
475

476
local function gen_instance_uuid(instance_name)
477
        local k,d1,d2 = instance_name:match("^([A-Za-z_]+)_(%d+)_(%d+)$")
×
478
        if k then
×
479
                return string.format(
×
480
                        "%08s-%04d-%04d-%04d-%012x",
481
                        digest.sha1_hex(k .. "_instance"):sub(1,8),
×
482
                        d1,d2,0,0
×
483
                )
484
        end
485

486
        k,d1 = instance_name:match("^([A-Za-z_]+)_(%d+)$")
×
487
        if k then
×
488
                return string.format(
×
489
                        "%08s-%04d-%04d-%04d-%012d",
490
                        digest.sha1_hex(k):sub(1,8),
×
491
                        0,0,0,d1
×
492
                )
493
        end
494
        error("Can't generate uuid for instance "..instance_name, 2)
×
495
end
496

497
local function gen_cluster_uuid(cluster_name)
498
        local k,d1 = cluster_name:match("^([A-Za-z_]+)_(%d+)$")
×
499
        if k then
×
500
                return string.format(
×
501
                        "%08s-%04d-%04d-%04d-%012d",
502
                        digest.sha1_hex(k .. "_shard"):sub(1,8),
×
503
                        d1,0,0,0
×
504
                )
505
        end
506
        error("Can't generate uuid for cluster "..cluster_name, 2)
×
507
end
508

509
---@class moonlibs.config.opts.etcd:moonlibs.config.etcd.opts
510
---@field instance_name string Mandatory name of the instance
511
---@field prefix string Mandatory prefix inside etcd tree
512
---@field uuid? 'auto' When auto config generates replicaset_uuid and instance_uuid for nodes
513
---@field fixed? table Optional ETCD tree
514

515
---Loads configuration from etcd and evaluate master_selection_policy
516
---@param M moonlibs.config
517
---@param etcd_conf moonlibs.config.opts.etcd
518
---@param local_cfg table<string,any>
519
---@return table<string, any>
520
local function etcd_load( M, etcd_conf, local_cfg )
521

522
        local etcd
523
        local instance_name = assert(etcd_conf.instance_name,"etcd.instance_name is required")
70✔
524
        local prefix = assert(etcd_conf.prefix,"etcd.prefix is required")
70✔
525

526
        if etcd_conf.fixed then
70✔
527
                etcd = setmetatable({ data = etcd_conf.fixed },{__index = {
×
528
                        discovery = function() end;
529
                        list = function(e,k)
530
                                if k:sub(1,#prefix) == prefix then
×
531
                                        k = k:sub(#prefix + 1)
×
532
                                end
533
                                local v = e.data
×
534
                                for key in k:gmatch("([^/]+)") do
×
535
                                        if type(v) ~= "table" then return end
×
536
                                        v = v[key]
×
537
                                end
538
                                return v
×
539
                        end;
540
                }})
×
541
        else
542
                etcd = require 'config.etcd' (etcd_conf)
140✔
543
        end
544
        M.etcd = etcd
70✔
545

546
        function M.etcd.get_common(e)
140✔
547
                local common_cfg = e:list(prefix .. "/common")
×
548
                assert(common_cfg.box,"no box config in etcd common tree")
×
549
                cast_types(common_cfg.box)
×
550
                return common_cfg
×
551
        end
552

553
        function M.etcd.get_instances(e)
140✔
554
                local all_instances_cfg = e:list(prefix .. "/instances")
×
555
                for inst_name,inst_cfg in pairs(all_instances_cfg) do
×
556
                        cast_types(inst_cfg.box)
×
557
                        if etcd_conf.uuid == 'auto' and not inst_cfg.box.instance_uuid then
×
558
                                inst_cfg.box.instance_uuid = gen_instance_uuid(inst_name)
×
559
                        end
560
                end
561
                return all_instances_cfg
×
562
        end
563

564
        function M.etcd.get_clusters(e)
140✔
565
                local all_clusters_cfg = e:list(prefix .. "/clusters") or etcd:list(prefix .. "/shards")
×
566
                for cluster_name,cluster_cfg in pairs(all_clusters_cfg) do
×
567
                        cast_types(cluster_cfg)
×
568
                        if etcd_conf.uuid == 'auto' and not cluster_cfg.replicaset_uuid then
×
569
                                cluster_cfg.replicaset_uuid = gen_cluster_uuid(cluster_name)
×
570
                        end
571
                end
572
                return all_clusters_cfg
×
573
        end
574

575
        function M.etcd.get_all(e)
140✔
576
                local all_cfg = e:list(prefix)
70✔
577
                cast_types(all_cfg.common.box)
70✔
578
                for inst_name,inst_cfg in pairs(all_cfg.instances) do
233✔
579
                        cast_types(inst_cfg.box)
163✔
580
                        if etcd_conf.uuid == 'auto' and not inst_cfg.box.instance_uuid then
163✔
581
                                inst_cfg.box.instance_uuid = gen_instance_uuid(inst_name)
×
582
                        end
583
                end
584
                for cluster_name,cluster_cfg in pairs(all_cfg.clusters or all_cfg.shards or {}) do
124✔
585
                        cast_types(cluster_cfg)
54✔
586
                        if etcd_conf.uuid == 'auto' and not cluster_cfg.replicaset_uuid then
54✔
587
                                cluster_cfg.replicaset_uuid = gen_cluster_uuid(cluster_name)
×
588
                        end
589
                end
590
                return all_cfg
70✔
591
        end
592

593
        etcd:discovery()
70✔
594

595
        local all_cfg = etcd:get_all()
70✔
596
        if etcd_conf.print_config then
70✔
597
                print("Loaded config from etcd",yaml.encode(all_cfg))
×
598
        end
599
        local common_cfg = all_cfg.common
70✔
600
        local all_instances_cfg = all_cfg.instances
70✔
601

602
        local instance_cfg = all_instances_cfg[instance_name]
70✔
603
        assert(instance_cfg,"Instance name "..instance_name.." is not known to etcd")
70✔
604

605
        local all_clusters_cfg = all_cfg.clusters or all_cfg.shards
70✔
606

607
        local master_selection_policy
608
        local cluster_cfg
609
        if instance_cfg.cluster or local_cfg.cluster then
70✔
610
                cluster_cfg = all_clusters_cfg[ (instance_cfg.cluster or local_cfg.cluster) ]
54✔
611
                assert(cluster_cfg,"Cluster section required");
54✔
612
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
54✔
613
                master_selection_policy = M.master_selection_policy or 'etcd.instance.read_only'
54✔
614
        elseif instance_cfg.router then
16✔
615
                -- TODO
616
                master_selection_policy = M.master_selection_policy or 'etcd.instance.single'
×
617
        else
618
                master_selection_policy = M.master_selection_policy or 'etcd.instance.single'
16✔
619
        end
620

621
        local master_policy = master_selection_policies[ master_selection_policy ]
70✔
622
        if not master_policy then
70✔
623
                error(string.format("Unknown master_selection_policy: %s",M.master_selection_policy),0)
×
624
        end
625

626
        local cfg = master_policy(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
70✔
627

628
        local members = {}
70✔
629
        for _,v in pairs(all_instances_cfg) do
233✔
630
                if v.cluster == cfg.cluster then -- and k ~= instance_name then
163✔
631
                        if not toboolean(v.disabled) then
326✔
632
                                table.insert(members,v)
163✔
633
                        else
634
                                log.warn("Member '%s' from cluster '%s' listening on %s is disabled", instance_name, v.cluster, v.box.listen)
×
635
                        end
636
                end
637
        end
638

639
        if cfg.cluster then
70✔
640
                --if cfg.box.read_only then
641
                        local repl = {}
54✔
642
                        for _,member in pairs(members) do
189✔
643
                                if member.box.remote_addr then
135✔
644
                                        table.insert(repl, member.box.remote_addr)
×
645
                                else
646
                                        table.insert(repl, member.box.listen)
135✔
647
                                end
648
                        end
649
                        table.sort(repl, function(a,b)
108✔
650
                                local ha,pa = a:match('^([^:]+):(.+)')
85✔
651
                                local hb,pb = a:match('^([^:]+):(.+)')
85✔
652
                                if pa and pb then
85✔
653
                                        if pa < pb then return true end
85✔
654
                                        if ha < hb then return true end
85✔
655
                                end
656
                                return a < b
85✔
657
                        end)
658
                        if cfg.box.replication then
54✔
659
                                print(
×
660
                                        "Start instance ",cfg.box.listen,
×
661
                                        " with locally overriden replication:",table.concat(cfg.box.replication,", "),
×
662
                                        " instead of etcd's:", table.concat(repl,", ")
×
663
                                )
664
                        else
665
                                cfg.box.replication = repl
54✔
666
                                print(
108✔
667
                                        "Start instance "..cfg.box.listen,
54✔
668
                                        " with replication:"..table.concat(cfg.box.replication,", "),
54✔
669
                                        string.format("timeout: %s, quorum: %s, lag: %s",
108✔
670
                                                cfg.box.replication_connect_timeout
54✔
671
                                                        or ('def:%s'):format(load_cfg.default_cfg.replication_connect_timeout or 30),
54✔
672
                                                cfg.box.replication_connect_quorum or 'def:full',
54✔
673
                                                cfg.box.replication_sync_lag
54✔
674
                                                        or ('def:%s'):format(load_cfg.default_cfg.replication_sync_lag or 10)
54✔
675
                                        )
676
                                )
677
                        end
678
                --end
679
        end
680
        -- print(yaml.encode(cfg))
681

682
        return cfg
70✔
683
end
684

685
local function is_replication_changed (old_conf, new_conf)
686
        if type(old_conf) == 'table' and type(new_conf) == 'table' then
6✔
687
                local changed_replicas = {}
6✔
688
                for _, replica in pairs(old_conf) do
21✔
689
                        changed_replicas[replica] = true
15✔
690
                end
691

692
                for _, replica in pairs(new_conf) do
21✔
693
                        if changed_replicas[replica] then
15✔
694
                                changed_replicas[replica] = nil
15✔
695
                        else
696
                                return true
×
697
                        end
698
                end
699

700
                -- if we have some changed_replicas left, then we definitely need to reconnect
701
                return not not next(changed_replicas)
6✔
702
        else
703
                return old_conf ~= new_conf
×
704
        end
705
end
706

707
local function optimal_rcq(upstreams)
708
        local n_ups = #(upstreams or {})
16✔
709
        local rcq
710
        if n_ups == 0 then
16✔
711
                rcq = 0
4✔
712
        else
713
                rcq = 1+math.floor(n_ups/2)
12✔
714
        end
715
        return rcq
16✔
716
end
717

718
local function do_cfg(boxcfg, cfg)
719
        for key, val in pairs(cfg) do
393✔
720
                if load_cfg.template_cfg[key] == nil then
332✔
721
                        local warn = string.format("Dropping non-boxcfg option '%s' given '%s'",key,val)
×
722
                        log.warn("%s",warn)
×
723
                        print(warn)
×
724
                        cfg[key] = nil
×
725
                end
726
        end
727

728
        if type(box.cfg) ~= 'function' then
61✔
729
                for key, val in pairs(cfg) do
105✔
730
                        if load_cfg.dynamic_cfg[key] == nil and box.cfg[key] ~= val then
76✔
731
                                local warn = string.format(
×
732
                                        "Dropping dynamic option '%s' previous value '%s' new value '%s'",
733
                                        key,box.cfg[key],val
×
734
                                )
735
                                log.warn("%s",warn)
×
736
                                print(warn)
×
737
                                cfg[key] = nil
×
738
                        end
739
                end
740
        end
741

742
        log.info("Just before first box.cfg %s", yaml.encode(cfg))
61✔
743
        -- Some boxcfg loves to rewrite passing table. We pass a copy of configuration
744
        boxcfg(deep_copy(cfg))
122✔
745
end
746

747

748
---@class moonlibs.config.opts
749
---@field bypass_non_dynamic? boolean (default: true) drops every changed non-dynamic option on reconfiguration
750
---@field tidy_load? boolean (default: true) recoveries tarantool with read_only=true
751
---@field mkdir? boolean (default: false) should moonlibs/config create memtx_dir and wal_dir
752
---@field etcd? moonlibs.config.opts.etcd [legacy] configuration of etcd
753
---@field default_replication_connect_timeout? number (default: 1.1) default RCT in seconds
754
---@field default_election_mode? election_mode (default: candidate) option is respected only when etcd.cluster.raft is used
755
---@field default_synchro_quorum? string|number (default: 'N/2+1') option is respected only when etcd.cluster.raft is used
756
---@field default_read_only? boolean (default: false) option is respected only when etcd.instance.read_only is used (deprecated)
757
---@field master_selection_policy? 'etcd.cluster.master'|'etcd.cluster.vshard'|'etcd.cluster.raft'|'etcd.instance.single' master selection policy
758
---@field strict_mode? boolean (default: false) stricts config retrievals. if key is not found config.get will raise an exception
759
---@field strict? boolean (default: false) stricts config retrievals. if key is not found config.get will raise an exception
760
---@field default? table<string,any> (default: nil) globally default options for config.get
761
---@field on_load? fun(conf: moonlibs.config, cfg: table<string,any>) callback which is called every time config is loaded from file and ETCD
762
---@field load? fun(conf: moonlibs.config, cfg: table<string,any>): table<string,any> do not use this callback
763
---@field on_before_cfg? fun(conf: moonlibs.config, cfg: table<string,any>) callback is called right before running box.cfg (but after on_load)
764
---@field boxcfg? fun(cfg: table<string,any>) [legacy] when provided this function will be called instead box.cfg. tidy_load and everything else will not be used.
765
---@field wrap_box_cfg? fun(cfg: table<string,any>) callback is called instead box.cfg. But tidy_load is respected. Use this, if you need to proxy every option to box.cfg on application side
766
---@field on_after_cfg? fun(conf: moonlibs.config, cfg: table<string,any>) callback which is called after full tarantool configuration
767

768
---@class moonlibs.config: moonlibs.config.opts
769
---@field etcd moonlibs.config.etcd
770
---@field public _load_cfg table
771
---@field public _flat table
772
---@field public _fencing_f? Fiber
773
---@field public _enforced_ro? boolean
774
---@operator call(moonlibs.config.opts): moonlibs.config
775

776
---@type moonlibs.config
777
local M
778
        M = setmetatable({
76✔
779
                _VERSION = '0.7.1',
780
                console = {};
38✔
781
                ---Retrieves value from config
782
                ---@overload fun(k: string, def: any?): any?
783
                ---@param self moonlibs.config
784
                ---@param k string path inside config
785
                ---@param def? any optional default value
786
                ---@return any?
787
                get = function(self,k,def)
788
                        if self ~= M then
288✔
789
                                def = k
288✔
790
                                k = self
288✔
791
                        end
792
                        if M._flat[k] ~= nil then
416✔
793
                                return M._flat[k]
160✔
794
                        elseif def ~= nil then
128✔
795
                                return def
60✔
796
                        else
797
                                if M.strict_mode then
68✔
798
                                        error(string.format("no %s found in config", k))
×
799
                                else
800
                                        return
68✔
801
                                end
802
                        end
803
                end,
804
                enforce_ro = function()
805
                        if not M._ro_enforcable then
×
806
                                return false, 'cannot enforce readonly'
×
807
                        end
808
                        M._enforced_ro = true
×
809
                        return true, {
×
810
                                info_ro = box.info.ro,
811
                                cfg_ro = box.cfg.read_only,
812
                                enforce_ro = M._enforced_ro,
813
                        }
814
                end,
815
                _load_cfg = load_cfg,
38✔
816
        },{
38✔
817
                ---Reinitiates moonlibs.config
818
                ---@param args moonlibs.config.opts
819
                ---@return moonlibs.config
820
                __call = function(_, args)
821
                        -- args MUST belong to us, because of modification
822
                        local file
823
                        if type(args) == 'string' then
38✔
824
                                file = args
×
825
                                args = {}
×
826
                        elseif type(args) == 'table' then
38✔
827
                                args = deep_copy(args)
76✔
828
                                file = args.file
38✔
829
                        else
830
                                args = {}
×
831
                        end
832
                        if args.bypass_non_dynamic == nil then
38✔
833
                                args.bypass_non_dynamic = true
38✔
834
                        end
835
                        if args.tidy_load == nil then
38✔
836
                                args.tidy_load = true
38✔
837
                        end
838
                        M.default_replication_connect_timeout = args.default_replication_connect_timeout or 1.1
38✔
839
                        M.default_election_mode = args.default_election_mode or 'candidate'
38✔
840
                        M.default_synchro_quorum = args.default_synchro_quorum or 'N/2+1'
38✔
841
                        M.default_read_only = args.default_read_only or false
38✔
842
                        M.master_selection_policy = args.master_selection_policy
38✔
843
                        M.default = args.default
38✔
844
                        M.strict_mode = args.strict_mode or args.strict or false
38✔
845
                        -- print("config", "loading ",file, json.encode(args))
846
                        if not file then
38✔
847
                                file = get_opt()
×
848
                                -- todo: maybe etcd?
849
                                if not file then error("Neither config call option given not -c|--config option passed",2) end
×
850
                        end
851

852
                        print(string.format("Loading config %s %s", file, json.encode(args)))
38✔
853

854
                        local function load_config()
855

856
                                local methods = {}
70✔
857
                                function methods.merge(dst, src, keep)
70✔
858
                                        if src ~= nil then
×
859
                                                deep_merge( dst, src, keep )
×
860
                                        end
861
                                        return dst
×
862
                                end
863

864
                                function methods.include(path, opts)
70✔
865
                                        path = fio.pathjoin(fio.dirname(file), path)
×
866
                                        opts = opts or { if_exists = false }
×
867
                                        if not fio.path.exists(path) then
×
868
                                                if opts.if_exists then
×
869
                                                        return
×
870
                                                end
871
                                                error("Not found include file `"..path.."'", 2)
×
872
                                        end
873
                                        local f,e = loadfile(path)
×
874
                                        if not f then error(e,2) end
×
875
                                        setfenv(f, getfenv(2))
×
876
                                        local ret = f()
×
877
                                        if ret ~= nil then
×
878
                                                print("Return value from "..path.." is ignored")
×
879
                                        end
880
                                end
881

882
                                function methods.print(...)
70✔
883
                                        local p = {...}
×
884
                                        for i = 1, select('#', ...) do
×
885
                                                if type(p[i]) == 'table'
×
886
                                                        and not debug.getmetatable(p[i])
×
887
                                                then
888
                                                        p[i] = json.encode(p[i])
×
889
                                                end
890
                                        end
891
                                        print(unpack(p))
×
892
                                end
893

894
                                local f,e = loadfile(file)
70✔
895
                                if not f then error(e,2) end
70✔
896
                                local cfg = setmetatable({}, {
140✔
897
                                        __index = setmetatable(methods, {
140✔
898
                                                __index = setmetatable(args,{ __index = _G })
70✔
899
                                        })
70✔
900
                                })
901
                                setfenv(f, cfg)
70✔
902
                                local ret = f()
70✔
903
                                if ret ~= nil then
70✔
904
                                        print("Return value from "..file.." is ignored")
×
905
                                end
906
                                setmetatable(cfg,nil)
70✔
907
                                setmetatable(args,nil)
70✔
908
                                deep_merge(cfg,args.default or {},'keep')
70✔
909

910
                                -- subject to change, just a PoC
911
                                local etcd_conf = args.etcd or cfg.etcd
70✔
912
                                -- we can enforce ro during recovery only if we have etcd config
913
                                M._ro_enforcable = M._ro_enforcable and etcd_conf ~= nil
70✔
914
                                if etcd_conf then
70✔
915
                                        local s = clock.time()
70✔
916
                                        cfg = etcd_load(M, etcd_conf, cfg)
140✔
917
                                        log.info("etcd_load took %.3fs", clock.time()-s)
70✔
918
                                end
919

920
                                if args.load then
70✔
921
                                        cfg = args.load(M, cfg)
×
922
                                end
923

924
                                if not cfg.box then
70✔
925
                                        error("No box.* config given", 2)
×
926
                                end
927

928
                                if cfg.box.remote_addr then
70✔
929
                                        cfg.box.remote_addr = nil
×
930
                                end
931

932
                                if args.bypass_non_dynamic then
70✔
933
                                        cfg.box = prepare_box_cfg(cfg.box)
140✔
934
                                end
935

936
                                deep_merge(cfg,{
140✔
937
                                        sys = deep_copy(args)
140✔
938
                                })
939
                                cfg.sys.boxcfg = nil
70✔
940
                                cfg.sys.on_load = nil
70✔
941

942
                                -- latest modifications and fixups
943
                                if args.on_load then
70✔
944
                                        args.on_load(M,cfg)
×
945
                                end
946
                                return cfg
70✔
947
                        end
948

949
                        -- We cannot enforce ro if any of theese conditions not satisfied
950
                        -- Tarantool must be bootstraping with tidy_load and do not overwraps personal boxcfg
951
                        M._ro_enforcable = args.boxcfg == nil and args.tidy_load and type(box.cfg) == 'function'
38✔
952
                        local cfg = load_config() --[[@as table]]
38✔
953

954
                        M._flat = flatten(cfg)
76✔
955

956
                        if args.on_before_cfg then
38✔
957
                                args.on_before_cfg(M,cfg)
×
958
                        end
959

960
                        if args.mkdir then
38✔
961
                                if not ( fio.path and fio.mkdir ) then
38✔
962
                                        error(string.format("Tarantool version %s is too old for mkdir: fio.path is not supported", _TARANTOOL),2)
×
963
                                end
964
                                for _,key in pairs({"work_dir", "wal_dir", "snap_dir", "memtx_dir", "vinyl_dir"}) do
228✔
965
                                        local v = cfg.box[key]
190✔
966
                                        if v and not fio.path.exists(v) then
266✔
967
                                                local r,e = fio.mktree(v)
22✔
968
                                                if not r then error(string.format("Failed to create path '%s' for %s: %s",v,key,e),2) end
22✔
969
                                        end
970
                                end
971
                                local v = cfg.box.pid_file
38✔
972
                                if v then
38✔
973
                                        v = fio.dirname(v);
974
                                        if v and not fio.path.exists(v) then
×
975
                                                local r,e = fio.mktree(v)
×
976
                                                if not r then error(string.format("Failed to create path '%s' for pid_file: %s",v,e),2) end
×
977
                                        end
978
                                end
979
                        end
980

981
                        -- The code below is very hard to understand and quite hard to fix when any bugs occurs.
982
                        -- First, you must remember that this part of code is executed several times in very different environments:
983
                        -- 1) Tarantool may be started with tarantool <script-name>.lua and this part is required from the script
984
                        -- 2) Tarantool may be started under tarantoolctl (such as tarantoolctl start <script-name>.lua) then box.cfg will be wrapped
985
                        -- by tarantoolctl itself, and it be returned back to table box.cfg after first successfull execution
986
                        -- 3) Tarantool may be started inside docker container and default docker-entrypoint.lua also rewraps box.cfg
987
                        -- 4) User might want to overwrite box.cfg with his function via args.boxcfg. Though, this method is not recommended
988
                        -- it is possible in some environments
989
                        -- 5) User might want to "wrap" box.cfg with his own middleware via (args.wrap_box_cfg). It is more recommended, because
990
                        -- full algorithm of tidy_load and ro-enforcing is preserved for the user.
991

992
                        -- Moreover, first run of box.cfg in the life of the process allows to specify static box.cfg options, such as pid_file, log
993
                        -- and many others.
994
                        -- But, second reconfiguration of box.cfg (due to reload, or reconfiguration in fencing must never touch static options)
995
                        -- Part of this is fixed in `do_cfg` method of this codebase.
996

997
                        -- Because many wrappers in docker-entrypoint.lua and tarantoolctl LOVES to perform non-redoable actions inside box.cfg and
998
                        -- switch box.cfg back to builtin tarantool box.cfg, following code MUST NEVER cache value of box.cfg
999

1000
                        if args.boxcfg then
38✔
1001
                                do_cfg(args.boxcfg, cfg.box)
×
1002
                        else
1003
                                local boxcfg
1004
                                if args.wrap_box_cfg then
38✔
1005
                                        boxcfg = args.wrap_box_cfg
×
1006
                                end
1007
                                if type(box.cfg) == 'function' then
38✔
1008
                                        if M.etcd then
32✔
1009
                                                if args.tidy_load then
32✔
1010
                                                        local snap_dir = cfg.box.snap_dir or cfg.box.memtx_dir
32✔
1011
                                                        if not snap_dir then
32✔
1012
                                                                if cfg.box.work_dir then
×
1013
                                                                        snap_dir = cfg.box.work_dir
×
1014
                                                                else
1015
                                                                        snap_dir = "."
×
1016
                                                                end
1017
                                                        end
1018
                                                        local bootstrapped
1019
                                                        for _,v in pairs(fio.glob(snap_dir..'/*.snap')) do
42✔
1020
                                                                bootstrapped = v
10✔
1021
                                                        end
1022

1023
                                                        if bootstrapped then
32✔
1024
                                                                print("Have etcd, use tidy load")
10✔
1025
                                                                local ro = cfg.box.read_only
10✔
1026
                                                                cfg.box.read_only = true
10✔
1027
                                                                if cfg.box.bootstrap_strategy ~= 'auto' then
10✔
1028
                                                                        if not ro then
10✔
1029
                                                                                -- Only if node should be master
1030
                                                                                cfg.box.replication_connect_quorum = 1
7✔
1031
                                                                                cfg.box.replication_connect_timeout = M.default_replication_connect_timeout
7✔
1032
                                                                        elseif not cfg.box.replication_connect_quorum then
3✔
1033
                                                                                -- For replica tune up to N/2+1
1034
                                                                                cfg.box.replication_connect_quorum = optimal_rcq(cfg.box.replication)
6✔
1035
                                                                        end
1036
                                                                end
1037
                                                                log.info("Start tidy loading with ro=true%s rcq=%s rct=%s (snap=%s)",
20✔
1038
                                                                        ro ~= true and string.format(' (would be %s)',ro) or '',
10✔
1039
                                                                        cfg.box.replication_connect_quorum, cfg.box.replication_connect_timeout,
10✔
1040
                                                                        bootstrapped
1041
                                                                )
20✔
1042
                                                        else
1043
                                                                -- not bootstraped yet cluster
1044

1045
                                                                -- if cfg.box.bootstrap_strategy == 'auto' then -- ≥ Tarantool 2.11
1046
                                                                -- local ro = cfg.box.read_only
1047
                                                                -- local is_candidate = cfg.box.election_mode == 'candidate'
1048
                                                                --         if not ro and not is_candidate then
1049
                                                                --                 -- master but not Raft/candidate
1050
                                                                --                 -- we decrease replication for master,
1051
                                                                --                 -- to allow him bootstrap himself
1052
                                                                --                 cfg.box.replication = {cfg.box.remote_addr or cfg.box.listen}
1053
                                                                --         end
1054
                                                                if cfg.box.bootstrap_strategy ~= 'auto' then -- < Tarantool 2.11
22✔
1055
                                                                        if cfg.box.replication_connect_quorum == nil then
22✔
1056
                                                                                cfg.box.replication_connect_quorum = optimal_rcq(cfg.box.replication)
26✔
1057
                                                                        end
1058
                                                                end
1059

1060
                                                                log.info("Start non-bootstrapped tidy loading with ro=%s rcq=%s rct=%s (dir=%s)",
44✔
1061
                                                                        cfg.box.read_only, cfg.box.replication_connect_quorum,
22✔
1062
                                                                        cfg.box.replication_connect_timeout, snap_dir
22✔
1063
                                                                )
22✔
1064
                                                        end
1065
                                                end
1066

1067
                                                do_cfg(boxcfg or box.cfg, cfg.box)
32✔
1068

1069
                                                log.info("Reloading config after start")
32✔
1070

1071
                                                local new_cfg = load_config()
32✔
1072
                                                if M._enforced_ro then
32✔
1073
                                                        log.info("Enforcing RO (should be ro=%s) because told to", new_cfg.box.read_only)
×
1074
                                                        new_cfg.box.read_only = true
×
1075
                                                end
1076
                                                M._enforced_ro = nil
32✔
1077
                                                M._ro_enforcable = false
32✔
1078
                                                local diff_box = value_diff(cfg.box, new_cfg.box)
32✔
1079

1080
                                                -- since load_config loads config also for reloading it removes non-dynamic options
1081
                                                -- therefore, they would be absent, but should not be passed. remove them
1082
                                                if diff_box then
32✔
1083
                                                        for key in pairs(diff_box) do
57✔
1084
                                                                if load_cfg.dynamic_cfg[key] == nil then
34✔
1085
                                                                        diff_box[key] = nil
×
1086
                                                                end
1087
                                                        end
1088
                                                        if not next(diff_box) then
23✔
1089
                                                                diff_box = nil
×
1090
                                                        end
1091
                                                end
1092

1093
                                                if diff_box then
32✔
1094
                                                        log.info("Reconfigure after load with %s",require'json'.encode(diff_box))
23✔
1095
                                                        do_cfg(boxcfg or box.cfg, diff_box)
46✔
1096
                                                else
1097
                                                        log.info("Config is actual after load")
9✔
1098
                                                end
1099

1100
                                                M._flat = flatten(new_cfg)
64✔
1101
                                        else
1102
                                                do_cfg(boxcfg or box.cfg, cfg.box)
×
1103
                                        end
1104
                                else
1105
                                        local replication     = cfg.box.replication_source or cfg.box.replication
6✔
1106
                                        local box_replication = box.cfg.replication_source or box.cfg.replication
6✔
1107

1108
                                        if not is_replication_changed(replication, box_replication) then
12✔
1109
                                                local r  = cfg.box.replication
6✔
1110
                                                local rs = cfg.box.replication_source
6✔
1111
                                                cfg.box.replication        = nil
6✔
1112
                                                cfg.box.replication_source = nil
6✔
1113

1114
                                                do_cfg(boxcfg or box.cfg, cfg.box)
6✔
1115

1116
                                                cfg.box.replication        = r
6✔
1117
                                                cfg.box.replication_source = rs
6✔
1118
                                        else
1119
                                                do_cfg(boxcfg or box.cfg, cfg.box)
×
1120
                                        end
1121
                                end
1122
                        end
1123

1124
                        if args.on_after_cfg then
38✔
1125
                                args.on_after_cfg(M,cfg)
38✔
1126
                        end
1127
                        -- print(string.format("Box configured"))
1128

1129
                        local msp = config.get('sys.master_selection_policy')
38✔
1130
                        if type(cfg.etcd) == 'table'
38✔
1131
                                and config.get('etcd.fencing_enabled')
38✔
1132
                                and (msp == 'etcd.cluster.master' or msp == 'etcd.cluster.vshard')
38✔
1133
                                and type(cfg.cluster) == 'string' and cfg.cluster ~= ''
30✔
1134
                                and config.get('etcd.reduce_listing_quorum') ~= true
60✔
1135
                        then
1136
                                M._fencing_f = fiber.create(function()
60✔
1137
                                        fiber.name('config/fencing')
30✔
1138
                                        fiber.yield() -- yield execution
30✔
1139
                                        local function in_my_gen() fiber.testcancel() return config._fencing_f == fiber.self() end
217✔
1140
                                        assert(cfg.cluster, "cfg.cluster must be defined")
30✔
1141

1142
                                        local watch_path = fio.pathjoin(
60✔
1143
                                                config.get('etcd.prefix'),
30✔
1144
                                                'clusters',
30✔
1145
                                                cfg.cluster
30✔
1146
                                        )
30✔
1147

1148
                                        local my_name = assert(config.get('sys.instance_name'), "instance_name is not defined")
60✔
1149
                                        local fencing_timeout = config.get('etcd.fencing_timeout', 10)
30✔
1150
                                        local fencing_pause = config.get('etcd.fencing_pause', fencing_timeout/2)
30✔
1151
                                        assert(fencing_pause < fencing_timeout, "fencing_pause must be < fencing_timeout")
30✔
1152
                                        local fencing_check_replication = config.get('etcd.fencing_check_replication')
30✔
1153
                                        if type(fencing_check_replication) == 'string' then
30✔
1154
                                                fencing_check_replication = fencing_check_replication == 'true'
×
1155
                                        else
1156
                                                fencing_check_replication = fencing_check_replication == true
30✔
1157
                                        end
1158

1159
                                        local etcd_cluster, watch_index
1160

1161
                                        local function refresh_list(opts)
1162
                                                local s = fiber.time()
18✔
1163
                                                local result, resp = config.etcd:list(watch_path, opts)
18✔
1164
                                                local elapsed = fiber.time()-s
36✔
1165

1166
                                                log.verbose("[fencing] list(%s) => %s in %.3fs %s",
36✔
1167
                                                        watch_path, resp.status, elapsed, json.encode(resp.body))
18✔
1168

1169
                                                if resp.status == 200 then
18✔
1170
                                                        etcd_cluster = result
18✔
1171
                                                        if type(resp.headers) == 'table'
18✔
1172
                                                                and tonumber(resp.headers['x-etcd-index'])
18✔
1173
                                                                and tonumber(resp.headers['x-etcd-index']) >= (tonumber(watch_index) or 0)
18✔
1174
                                                        then
1175
                                                                watch_index = (tonumber(resp.headers['x-etcd-index']) or -1) + 1
15✔
1176
                                                        end
1177
                                                end
1178
                                                return etcd_cluster, watch_index
18✔
1179
                                        end
1180

1181
                                        local function fencing_check(deadline)
1182
                                                -- we can only allow half of the time till deadline
1183
                                                local timeout = math.min((deadline-fiber.time())*0.5, fencing_pause)
24✔
1184
                                                log.verbose("[wait] timeout:%.3fs FP:%.3fs", timeout, fencing_pause)
12✔
1185

1186
                                                local check_started = fiber.time()
12✔
1187
                                                local pcall_ok, err_or_resolution, new_cluster = pcall(function()
24✔
1188
                                                        local started = fiber.time()
12✔
1189
                                                        local n_endpoints = #config.etcd.endpoints
12✔
1190
                                                        local not_timed_out, response = config.etcd:wait(watch_path, {
24✔
1191
                                                                index = watch_index,
12✔
1192
                                                                timeout = timeout/n_endpoints,
12✔
1193
                                                        })
1194
                                                        local logger
1195
                                                        if not_timed_out then
6✔
1196
                                                                if tonumber(response.status) and tonumber(response.status) >= 400 then
3✔
1197
                                                                        logger = log.error
×
1198
                                                                else
1199
                                                                        logger = log.info
3✔
1200
                                                                end
1201
                                                        else
1202
                                                                logger = log.verbose
3✔
1203
                                                        end
1204
                                                        logger("[fencing] wait(%s,index=%s,timeout=%.3fs) => %s (ind:%s) %s took %.3fs",
12✔
1205
                                                                watch_path, watch_index, timeout,
6✔
1206
                                                                response.status, (response.headers or {})['x-etcd-index'],
6✔
1207
                                                                json.encode(response.body), fiber.time()-started)
12✔
1208

1209
                                                        -- http timed out / or network drop - we'll never know
1210
                                                        if not not_timed_out then return 'timeout' end
6✔
1211
                                                        local res = json.decode(response.body)
3✔
1212

1213
                                                        if type(response.headers) == 'table'
3✔
1214
                                                                and tonumber(response.headers['x-etcd-index'])
3✔
1215
                                                                and tonumber(response.headers['x-etcd-index']) >= watch_index
3✔
1216
                                                        then
1217
                                                                watch_index = (tonumber(response.headers['x-etcd-index']) or -1) + 1
2✔
1218
                                                        end
1219

1220
                                                        if res.node then
3✔
1221
                                                                local node = {}
3✔
1222
                                                                config.etcd:recursive_extract(watch_path, res.node, node)
3✔
1223
                                                                log.info("[fencing] watch index changed: %s =>  %s", watch_path, json.encode(node))
3✔
1224
                                                                if not node.master then node = nil end
3✔
1225
                                                                return 'changed', node
3✔
1226
                                                        end
1227
                                                end)
1228

1229
                                                log.verbose("[wait] took:%.3fs exp:%.3fs", fiber.time()-check_started, timeout)
12✔
1230
                                                if not in_my_gen() then return end
12✔
1231

1232
                                                if not pcall_ok then
6✔
1233
                                                        log.warn("ETCD watch failed: %s", err_or_resolution)
×
1234
                                                end
1235

1236
                                                if err_or_resolution ~= 'changed' then
6✔
1237
                                                        new_cluster = nil
3✔
1238
                                                end
1239

1240
                                                if not new_cluster then
6✔
1241
                                                        local list_started = fiber.time()
3✔
1242
                                                        log.verbose("[listing] left:%.3fs", deadline-fiber.time())
6✔
1243
                                                        repeat
1244
                                                                local ok, e_cluster = pcall(refresh_list, {deadline = deadline})
3✔
1245
                                                                if ok and e_cluster then
3✔
1246
                                                                        new_cluster = e_cluster
3✔
1247
                                                                        break
3✔
1248
                                                                end
1249

1250
                                                                if not in_my_gen() then return end
×
1251
                                                                -- we can only sleep 50% till deadline will be reached
1252
                                                                local sleep = math.min(fencing_pause, 0.5*(deadline - fiber.time()))
×
1253
                                                                fiber.sleep(sleep)
×
1254
                                                        until fiber.time() > deadline
×
1255
                                                        log.verbose("[list] took:%.3fs left:%.3fs",
6✔
1256
                                                                fiber.time()-list_started, deadline-fiber.time())
9✔
1257
                                                end
1258

1259
                                                if not in_my_gen() then return end
12✔
1260

1261
                                                if type(new_cluster) ~= 'table' then -- ETCD is down
6✔
1262
                                                        log.warn('[fencing] ETCD %s is not discovered in etcd during %.2fs %s',
×
1263
                                                                watch_path, fiber.time()-check_started, new_cluster)
×
1264

1265
                                                        if not fencing_check_replication then
×
1266
                                                                -- ETCD is down, we do not know what is happening
1267
                                                                return nil
×
1268
                                                        end
1269

1270
                                                        -- In proper fencing we must step down immediately as soon as we discover
1271
                                                        -- that coordinator is down. But in real world there are some circumstances
1272
                                                        -- when coordinator can be down for several seconds if someone crashes network
1273
                                                        -- or ETCD itself.
1274
                                                        -- We propose that it is safe to not step down as soon as we are connected to all
1275
                                                        -- replicas in replicaset (etcd.cluster.master is fullmesh topology).
1276
                                                        -- We do not check downstreams here, because downstreams cannot lead to collisions.
1277
                                                        -- If at least 1 upstream is not in status follow
1278
                                                        -- (Tarantool replication checks with tcp-healthchecks once in box.cfg.replication_timeout)
1279
                                                        -- We immediately stepdown.
1280
                                                        for _, ru in pairs(box.info.replication) do
×
1281
                                                                if ru.id ~= box.info.id and ru.upstream then
×
1282
                                                                        if ru.upstream.status ~= "follow" then
×
1283
                                                                                log.warn("[fencing] upstream %s is not followed by me %s:%s (idle: %s, lag:%s)",
×
1284
                                                                                        ru.upstream.peer, ru.upstream.status, ru.upstream.message,
×
1285
                                                                                        ru.upstream.idle, ru.upstream.lag
×
1286
                                                                                )
1287
                                                                                return nil
×
1288
                                                                        end
1289
                                                                end
1290
                                                        end
1291

1292
                                                        log.warn('[fencing] ETCD is down but all upstreams are followed by me. Continuing leadership')
×
1293
                                                        return true
×
1294
                                                elseif new_cluster.master == my_name then
6✔
1295
                                                        -- The most commmon branch. We are registered as the leader.
1296
                                                        return true
3✔
1297
                                                elseif new_cluster.switchover then -- new_cluster.master ~= my_name
3✔
1298
                                                        -- Another instance is the leader in ETCD. But we could be the one
1299
                                                        -- who is going to be the next (cluster is under switching right now).
1300
                                                        -- It is almost impossible to get this path in production. But the only one
1301
                                                        -- protection we have is `fencing_pause` and `fencing_timeout`.
1302
                                                        -- So, we will do nothing until ETCD mutex is present
1303
                                                        log.warn('[fencing] It seems that cluster is under switchover right now %s', json.encode(new_cluster))
×
1304
                                                        -- Note: this node was rw (otherwise we would not execute fencing_check at all)
1305
                                                        -- During normal switch registered leader is RO (because we are RW, and we are not the leader)
1306
                                                        -- And in the next step coordinator will update leader info in ETCD.
1307
                                                        -- so this condition seems to be unreachable for every node
1308
                                                        return nil
×
1309
                                                else
1310
                                                        log.warn('[fencing] ETCD %s/master is %s not us. Stepping down', watch_path, new_cluster.master)
3✔
1311
                                                        -- ETCD is up, master is not us => we must step down immediately
1312
                                                        return false
3✔
1313
                                                end
1314
                                        end
1315

1316
                                        -- Main fencing loop
1317
                                        -- It is executed on every replica in the shard
1318
                                        -- if instance is ro then it will wait until instance became rw
1319
                                        while in_my_gen() do
66✔
1320
                                                -- Wait until instance became rw loop
1321
                                                while box.info.ro and in_my_gen() do
149✔
1322
                                                        -- this is just fancy sleep.
1323
                                                        -- if node became rw in less than 3 seconds we will check it immediately
1324
                                                        pcall(box.ctl.wait_rw, 3)
67✔
1325
                                                end
1326

1327
                                                -- after waiting to be rw we will step into fencing-loop
1328
                                                -- we must check that we are still in our code generation
1329
                                                -- to proceed
1330
                                                if not in_my_gen() then return end
30✔
1331

1332
                                                --- Initial Load of etcd_cluster and watch_index
1333
                                                local attempt = 0
15✔
1334
                                                while in_my_gen() do
30✔
1335
                                                        local ok, err = pcall(refresh_list)
15✔
1336
                                                        if not in_my_gen() then return end
30✔
1337

1338
                                                        if ok then break end
15✔
1339
                                                        attempt = attempt + 1
×
1340
                                                        log.warn("[fencing] initial list failed: %s (attempts: %s)", err, attempt)
×
1341

1342
                                                        fiber.sleep(math.random(math.max(0.5, fencing_pause-0.5), fencing_pause+0.5))
×
1343
                                                end
1344

1345
                                                -- we yield to get next ev_run before get fiber.time()
1346
                                                fiber.sleep(0)
15✔
1347
                                                if not in_my_gen() then return end
30✔
1348
                                                log.info("etcd_cluster is %s (index: %s)", json.encode(etcd_cluster), watch_index)
15✔
1349

1350

1351
                                                -- we will not step down until deadline.
1352
                                                local deadline = fiber.time()+fencing_timeout
30✔
1353
                                                repeat
1354
                                                        -- Before ETCD check we better pause
1355
                                                        -- we do a little bit randomized sleep to not spam ETCD
1356
                                                        local hard_limit = deadline-fiber.time()
36✔
1357
                                                        local soft_limit = fencing_timeout-fencing_pause
18✔
1358
                                                        local rand_sleep = math.random()*0.1*math.min(hard_limit, soft_limit)
18✔
1359
                                                        log.verbose("[sleep] hard:%.3fs soft:%.3fs sleep:%.3fs", hard_limit, soft_limit, rand_sleep)
18✔
1360
                                                        fiber.sleep(rand_sleep)
18✔
1361
                                                        -- After each yield we have to check that we are still in our generation
1362
                                                        if not in_my_gen() then return end
24✔
1363

1364
                                                        -- some one makes us readonly. There no need to check ETCD
1365
                                                        -- we break from this loop immediately
1366
                                                        if box.info.ro then break end
12✔
1367

1368
                                                        -- fencing_check(deadline) if it returns true,
1369
                                                        -- then we update leadership leasing
1370
                                                        local verdict = fencing_check(deadline)
12✔
1371
                                                        log.verbose("[verdict:%s] Leasing ft:%.3fs up:%.3fs left:%.3fs",
12✔
1372
                                                                verdict == true and "ok"
6✔
1373
                                                                        or verdict == false and "step"
6✔
1374
                                                                        or "unknown",
3✔
1375
                                                                fencing_timeout,
6✔
1376
                                                                verdict and (fiber.time()+fencing_timeout-deadline) or 0,
9✔
1377
                                                                deadline - fiber.time()
12✔
1378
                                                        )
1379
                                                        if verdict == false then
6✔
1380
                                                                -- immediate stepdown
1381
                                                                break
3✔
1382
                                                        elseif verdict then
3✔
1383
                                                                -- update deadline.
1384
                                                                if deadline <= fiber.time() then
6✔
1385
                                                                        log.warn("[fencing] deadline was overflowed deadline:%s, now:%s",
×
1386
                                                                                deadline, fiber.time()
×
1387
                                                                        )
1388
                                                                end
1389
                                                                deadline = fiber.time()+fencing_timeout
6✔
1390
                                                        end
1391
                                                        if not in_my_gen() then return end
6✔
1392

1393
                                                        if deadline <= fiber.time() then
6✔
1394
                                                                log.warn("[fencing] deadline has not been upgraded deadline:%s, now:%s",
×
1395
                                                                        deadline, fiber.time()
×
1396
                                                                )
1397
                                                        end
1398
                                                until box.info.ro or fiber.time() > deadline
6✔
1399

1400
                                                -- We have left deadline-loop. It means that fencing is required
1401
                                                if not box.info.ro then
3✔
1402
                                                        log.warn('[fencing] Performing self fencing (box.cfg{read_only=true})')
3✔
1403
                                                        box.cfg{read_only=true}
3✔
1404
                                                end
1405
                                        end
1406
                                end)
1407
                        end
1408

1409
                        return M
38✔
1410
                end
1411
        })
38✔
1412
        rawset(_G,'config',M)
38✔
1413

1414
return M
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc