• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moonlibs / config / 6171223143

13 Sep 2023 10:27AM UTC coverage: 69.066%. First build
6171223143

Pull #29

github

Vladislav Grubov
ci: adds publish to coveralls.io
Pull Request #29: draft: fixes fencing and hot-reload in 2.11+

121 of 121 new or added lines in 2 files covered. (100.0%)

614 of 889 relevant lines covered (69.07%)

109.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.53
/config.lua
1
---@diagnostic disable: inject-field
2
local log = require 'log'
38✔
3
if log.new then
38✔
4
        log = log.new('moonlibs.config')
×
5
end
6
local fio = require 'fio'
38✔
7
local json = require 'json'.new()
38✔
8
local yaml = require 'yaml'.new()
38✔
9
local digest = require 'digest'
38✔
10
local fiber  = require 'fiber'
38✔
11
local clock  = require 'clock'
38✔
12
json.cfg{ encode_invalid_as_nil = true }
38✔
13
yaml.cfg{ encode_use_tostring = true }
38✔
14

15
---Retrieves all upvalues of given function and returns them as kv-map
16
---@param fun fun()
17
---@return table<string,any> variables
18
local function lookaround(fun)
19
        local vars = {}
76✔
20
        local i = 1
76✔
21
        while true do
22
                local n,v = debug.getupvalue(fun,i)
876✔
23
                if not n then break end
876✔
24
                vars[n] = v
800✔
25
                i = i + 1
800✔
26
        end
27

28
        return vars
76✔
29
end
30

31
---@private
32
---@class moonlibs.config.reflect_internals
33
---@field dynamic_cfg table<string,boolean>
34
---@field default_cfg table<string,any>
35
---@field upgrade_cfg? fun(cfg: table<string,any>, translate_cfg: table): table<string,any>
36
---@field template_cfg? table
37
---@field translate_cfg? table
38
---@field log? table
39

40

41
---Unwraps box.cfg and retrieves dynamic_cfg, default_cfg tables
42
---@return moonlibs.config.reflect_internals
43
local function reflect_internals()
44
        local peek = {
38✔
45
                dynamic_cfg   = {};
38✔
46
                default_cfg   = {};
38✔
47
                upgrade_cfg   = true;
48
                translate_cfg = true;
49
                template_cfg  = true;
50
                log           = true;
51
        }
52

53
        local steps = {}
38✔
54
        local peekf = box.cfg
38✔
55
        local allow_unwrap = true
38✔
56
        while true do
57
                local prevf = peekf
76✔
58
                local mt = debug.getmetatable(peekf)
76✔
59
                if type(peekf) == 'function' then
76✔
60
                        -- pass
61
                        table.insert(steps,"func")
70✔
62
                elseif mt and mt.__call then
6✔
63
                        peekf = mt.__call
6✔
64
                        table.insert(steps,"mt_call")
6✔
65
                else
66
                        error(string.format("Neither function nor callable argument %s after steps: %s", peekf, table.concat(steps, ", ")))
×
67
                end
68

69
                local vars = lookaround(peekf)
76✔
70
                if type(vars.default_cfg) == 'table' then
76✔
71
                        for k in pairs(vars.default_cfg) do
1,672✔
72
                                peek.default_cfg[k] = vars.default_cfg[k]
1,634✔
73
                        end
74
                end
75
                if allow_unwrap and (vars.orig_cfg or vars.origin_cfg) then
76✔
76
                        -- It's a wrap of tarantoolctl/tt, unwrap and repeat
77
                        peekf = (vars.orig_cfg or vars.origin_cfg)
×
78
                        allow_unwrap = false
×
79
                        table.insert(steps,"ctl-orig")
×
80
                elseif vars.dynamic_cfg then
76✔
81
                        log.info("Found config by steps: %s", table.concat(steps, ", "))
38✔
82
                        for k in pairs(vars.dynamic_cfg) do
1,292✔
83
                                peek.dynamic_cfg[k] = true
1,254✔
84
                        end
85
                        for k in pairs(peek) do
266✔
86
                                if peek[k] == true then
228✔
87
                                        if vars[k] ~= nil then
152✔
88
                                                peek[k] = vars[k]
152✔
89
                                        else
90
                                                peek[k] = nil
×
91
                                        end
92
                                end
93
                        end
94
                        break
38✔
95
                elseif vars.lock and vars.f and type(vars.f) == 'function' then
38✔
96
                        peekf = vars.f
38✔
97
                        table.insert(steps,"lock-unwrap")
38✔
98
                elseif vars.old_call and type(vars.old_call) == 'function' then
×
99
                        peekf = vars.old_call
×
100
                        table.insert(steps,"ctl-oldcall")
×
101
                elseif vars.orig_cfg_call and type(vars.orig_cfg_call) == 'function' then
×
102
                        peekf = vars.orig_cfg_call
×
103
                        table.insert(steps,"ctl-orig_cfg_call")
×
104
                elseif vars.load_cfg_apply_dynamic then
×
105
                        table.insert(steps,"load_cfg_apply_dynamic")
×
106
                        for k in pairs(peek) do
×
107
                                if peek[k] == true then
×
108
                                        if vars[k] ~= nil then
×
109
                                                peek[k] = vars[k]
×
110
                                        end
111
                                end
112
                        end
113
                        peekf = vars.load_cfg_apply_dynamic
×
114
                elseif vars.dynamic_cfg_modules then
×
115
                        -- print(yaml.encode(vars.dynamic_cfg_modules))
116
                        log.info("Found config by steps: %s", table.concat(steps, ", "))
×
117
                        for k, v in pairs(vars.dynamic_cfg_modules) do
×
118
                                peek.dynamic_cfg[k] = true
×
119
                                for op in pairs(v.options) do
×
120
                                        peek.dynamic_cfg[op] = true
×
121
                                end
122
                        end
123
                        break;
×
124
                elseif vars.reload_cfg then
×
125
                        table.insert(steps,"reload_cfg")
×
126
                        peekf = vars.reload_cfg
×
127
                elseif vars.reconfig_modules then
×
128
                        table.insert(steps,"reconfig_modules")
×
129
                        for k in pairs(peek) do
×
130
                                if peek[k] == true then
×
131
                                        if vars[k] ~= nil then
×
132
                                                peek[k] = vars[k]
×
133
                                        end
134
                                end
135
                        end
136
                        peekf = vars.reconfig_modules
×
137
                else
138
                        for k,v in pairs(vars) do log.info("var %s=%s",k,v) end
×
139
                        error(string.format("Bad vars for %s after steps: %s", peekf, table.concat(steps, ", ")))
×
140
                end
141

142
                if prevf == peekf then
38✔
143
                        error(string.format("Recursion for %s after steps: %s", peekf, table.concat(steps, ", ")))
×
144
                end
145
        end
146
        return peek
38✔
147
end
148

149
local load_cfg = reflect_internals()
38✔
150

151
---Filters only valid keys from given cfg
152
---
153
---Edits given cfg and returns only clear config
154
---@param cfg table<string,any>
155
---@return table<string,any>
156
local function prepare_box_cfg(cfg)
157
        -- 1. take config, if have upgrade, upgrade it
158
        if load_cfg.upgrade_cfg then
70✔
159
                cfg = load_cfg.upgrade_cfg(cfg, load_cfg.translate_cfg)
140✔
160
        end
161

162
        -- 2. check non-dynamic, and wipe them out
163
        if type(box.cfg) ~= 'function' then
70✔
164
                for key, val in pairs(cfg) do
318✔
165
                        if load_cfg.dynamic_cfg[key] == nil and box.cfg[key] ~= val then
280✔
166
                                local warn = string.format(
×
167
                                        "Can't change option '%s' dynamically from '%s' to '%s'",
168
                                        key,box.cfg[key],val
×
169
                                )
170
                                log.warn("%s",warn)
×
171
                                print(warn)
×
172
                                cfg[key] = nil
×
173
                        end
174
                end
175
        end
176

177
        return cfg
70✔
178
end
179

180
local readonly_mt = {
38✔
181
        __index = function(_,k) return rawget(_,k) end;
166✔
182
        __newindex = function(_,k)
183
                error("Modification of readonly key "..tostring(k),2)
×
184
        end;
185
        __serialize = function(_)
186
                local t = {}
×
187
                for k,v in pairs(_) do
×
188
                        t[k]=v
×
189
                end
190
                return t
×
191
        end;
192
}
193

194
local function flatten (t,prefix,result)
195
        prefix = prefix or ''
404✔
196
        local protect = not result
404✔
197
        result = result or {}
404✔
198
        for k,v in pairs(t) do
2,349✔
199
                if type(v) == 'table' then
1,945✔
200
                        flatten(v, prefix..k..'.',result)
334✔
201
                end
202
                result[prefix..k] = v
1,945✔
203
        end
204
        if protect then
404✔
205
                return setmetatable(result,readonly_mt)
70✔
206
        end
207
        return result
334✔
208
end
209

210
local function get_opt()
211
        local take = false
×
212
        local key
213
        for _,v in ipairs(arg) do
×
214
                if take then
×
215
                        if key == 'config' or key == 'c' then
×
216
                                return v
×
217
                        end
218
                else
219
                        if string.sub( v, 1, 2) == "--" then
×
220
                                local x = string.find( v, "=", 1, true )
×
221
                                if x then
×
222
                                        key = string.sub( v, 3, x-1 )
×
223
                                        -- print("have key=")
224
                                        if key == 'config' then
×
225
                                                return string.sub( v, x+1 )
×
226
                                        end
227
                                else
228
                                        -- print("have key, turn take")
229
                                        key = string.sub( v, 3 )
×
230
                                        take = true
×
231
                                end
232
                        elseif string.sub( v, 1, 1 ) == "-" then
×
233
                                if string.len(v) == 2 then
×
234
                                        key = string.sub(v,2,2)
×
235
                                        take = true
×
236
                                else
237
                                        key = string.sub(v,2,2)
×
238
                                        if key == 'c' then
×
239
                                                return string.sub( v, 3 )
×
240
                                        end
241
                                end
242
                        end
243
                end
244
        end
245
end
246

247
local function deep_merge(dst,src,keep)
248
        -- TODO: think of cyclic
249
        if not src or not dst then error("Call to deepmerge with bad args",2) end
932✔
250
        for k,v in pairs(src) do
3,492✔
251
                if type(v) == 'table' then
2,560✔
252
                        if not dst[k] then dst[k] = {} end
474✔
253
                        deep_merge(dst[k],src[k],keep)
948✔
254
                else
255
                        if dst[k] == nil or not keep then
2,086✔
256
                                dst[k] = src[k]
2,086✔
257
                        end
258
                end
259
        end
260
end
261

262
local function deep_copy(src)
263
        local t = {}
108✔
264
        deep_merge(t, src)
108✔
265
        return t
108✔
266
end
267

268
local function is_array(a)
269
        local len = 0
56✔
270
        for k in pairs(a) do
116✔
271
                len = len + 1
92✔
272
                if type(k) ~= 'number' then
92✔
273
                        return false
32✔
274
                end
275
        end
276
        return #a == len
24✔
277
end
278

279
--[[
280
        returns config diff
281
        1. deleted values returned as box.NULL
282
        2. arrays is replaced completely
283
        3. nil means no diff (and not stored in tables)
284
]]
285

286
local function value_diff(old,new)
287
        if type(old) ~= type(new) then
324✔
288
                return new
×
289
        elseif type(old) == 'table' then
324✔
290
                if new == old then return end
56✔
291

292
                if is_array(old) then
112✔
293
                        if #new ~= #old then return new end
24✔
294
                        for i = 1,#old do
84✔
295
                                local diff = value_diff(old[i], new[i])
60✔
296
                                if diff ~= nil then
60✔
297
                                        return new
×
298
                                end
299
                        end
300
                else
301
                        local diff = {}
32✔
302
                        for k in pairs(old) do
288✔
303
                                if new[ k ] == nil then
256✔
304
                                        diff[k] = box.NULL
24✔
305
                                else
306
                                        local vdiff = value_diff(old[k], new[k])
232✔
307
                                        if vdiff ~= nil then
232✔
308
                                                diff[k] = vdiff
10✔
309
                                        end
310
                                end
311
                        end
312
                        for k in pairs(new) do
264✔
313
                                if old[ k ] == nil then
232✔
314
                                        diff[k] = new[k]
×
315
                                end
316
                        end
317
                        if next(diff) then
32✔
318
                                return diff
23✔
319
                        end
320
                end
321
        else
322
                if old ~= new then
268✔
323
                        return new
10✔
324
                end
325
        end
326
        -- no diff
327
end
328

329
local function toboolean(v)
330
        if v then
163✔
331
                if type(v) == 'boolean' then return v end
×
332
                v = tostring(v):lower()
×
333
                local n = tonumber(v)
×
334
                if n then return n ~= 0 end
×
335
                if v == 'true' or v == 'yes' then
×
336
                        return true
×
337
                end
338
        end
339
        return false
163✔
340
end
341

342
---@type table<string, fun(M: moonlibs.config, instance_name: string, common_cfg: table, instance_cfg: table, cluster_cfg: table, local_cfg: table):table >
343
local master_selection_policies;
38✔
344
master_selection_policies = {
38✔
345
        ['etcd.instance.single'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
346
                local cfg = {}
16✔
347
                deep_merge(cfg, common_cfg)
16✔
348
                deep_merge(cfg, instance_cfg)
16✔
349

350
                if cluster_cfg then
16✔
351
                        error("Cluster config should not exist for single instance config")
×
352
                end
353

354
                deep_merge(cfg, local_cfg)
16✔
355

356
                if cfg.box.read_only == nil then
16✔
357
                        log.info("Instance have no read_only option, set read_only=false")
16✔
358
                        cfg.box.read_only = false
16✔
359
                end
360

361
                if cfg.box.instance_uuid and not cfg.box.replicaset_uuid then
16✔
362
                        cfg.box.replicaset_uuid = cfg.box.instance_uuid
×
363
                end
364

365
                log.info("Using policy etcd.instance.single, read_only=%s",cfg.box.read_only)
16✔
366
                return cfg
16✔
367
        end;
368
        ['etcd.instance.read_only'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
369
                local cfg = {}
×
370
                deep_merge(cfg, common_cfg)
×
371
                deep_merge(cfg, instance_cfg)
×
372

373
                if cluster_cfg then
×
374
                        log.info("cluster=%s",json.encode(cluster_cfg))
×
375
                        assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
×
376
                        cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
×
377
                end
378

379
                deep_merge(cfg, local_cfg)
×
380

381
                if M.default_read_only and cfg.box.read_only == nil then
×
382
                        log.info("Instance have no read_only option, set read_only=true")
×
383
                        cfg.box.read_only = true
×
384
                end
385

386
                log.info("Using policy etcd.instance.read_only, read_only=%s",cfg.box.read_only)
×
387
                return cfg
×
388
        end;
389
        ['etcd.cluster.master'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
390
                log.info("Using policy etcd.cluster.master")
54✔
391
                local cfg = {}
54✔
392
                deep_merge(cfg, common_cfg)
54✔
393
                deep_merge(cfg, instance_cfg)
54✔
394

395
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
54✔
396
                cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
54✔
397

398
                if cfg.box.read_only ~= nil then
54✔
399
                        log.info("Ignore box.read_only=%s value due to config policy",cfg.box.read_only)
×
400
                end
401
                if cluster_cfg.master then
54✔
402
                        if cluster_cfg.master == instance_name then
54✔
403
                                log.info("Instance is declared as cluster master, set read_only=false")
27✔
404
                                cfg.box.read_only = false
27✔
405
                                if cfg.box.bootstrap_strategy ~= 'auto' then
27✔
406
                                        cfg.box.replication_connect_quorum = 1
27✔
407
                                        cfg.box.replication_connect_timeout = 1
27✔
408
                                end
409
                        else
410
                                log.info("Cluster has another master %s, not me %s, set read_only=true", cluster_cfg.master, instance_name)
27✔
411
                                cfg.box.read_only = true
27✔
412
                        end
413
                else
414
                        log.info("Cluster have no declared master, set read_only=true")
×
415
                        cfg.box.read_only = true
×
416
                end
417

418
                deep_merge(cfg, local_cfg)
54✔
419

420
                return cfg
54✔
421
        end;
422
        ['etcd.cluster.vshard'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
423
                log.info("Using policy etcd.cluster.vshard")
×
424
                if instance_cfg.cluster then
×
425
                        return master_selection_policies['etcd.cluster.master'](M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
×
426
                else
427
                        return master_selection_policies['etcd.instance.single'](M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
×
428
                end
429
        end;
430
        ['etcd.cluster.raft'] = function(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
431
                log.info("Using policy etcd.cluster.raft")
×
432
                local cfg = {}
×
433
                deep_merge(cfg, common_cfg)
×
434
                deep_merge(cfg, instance_cfg)
×
435

436
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
×
437
                cfg.box.replicaset_uuid = cluster_cfg.replicaset_uuid
×
438

439
                if not cfg.box.election_mode then
×
440
                        cfg.box.election_mode = M.default_election_mode
×
441
                end
442

443
                -- TODO: anonymous replica
444
                if cfg.box.election_mode == 'off' then
×
445
                        log.info("Force box.read_only=true for election_mode=off")
×
446
                        cfg.box.read_only = true
×
447
                end
448

449
                if not cfg.box.replication_synchro_quorum then
×
450
                        cfg.box.replication_synchro_quorum = M.default_synchro_quorum
×
451
                end
452

453
                if cfg.box.election_mode == "candidate" then
×
454
                        cfg.box.read_only = false
×
455
                end
456

457
                deep_merge(cfg, local_cfg)
×
458

459
                return cfg
×
460
        end;
461
}
38✔
462

463
local function cast_types(c)
464
        if c then
287✔
465
                for k,v in pairs(c) do
628✔
466
                        if load_cfg.template_cfg[k] == 'boolean' and type(v) == 'string' then
341✔
467
                                c[k] = c[k] == 'true'
×
468
                        end
469
                end
470
        end
471
end
472

473
local function gen_instance_uuid(instance_name)
474
        local k,d1,d2 = instance_name:match("^([A-Za-z_]+)_(%d+)_(%d+)$")
×
475
        if k then
×
476
                return string.format(
×
477
                        "%08s-%04d-%04d-%04d-%012x",
478
                        digest.sha1_hex(k .. "_instance"):sub(1,8),
×
479
                        d1,d2,0,0
×
480
                )
481
        end
482

483
        k,d1 = instance_name:match("^([A-Za-z_]+)_(%d+)$")
×
484
        if k then
×
485
                return string.format(
×
486
                        "%08s-%04d-%04d-%04d-%012d",
487
                        digest.sha1_hex(k):sub(1,8),
×
488
                        0,0,0,d1
×
489
                )
490
        end
491
        error("Can't generate uuid for instance "..instance_name, 2)
×
492
end
493

494
local function gen_cluster_uuid(cluster_name)
495
        local k,d1 = cluster_name:match("^([A-Za-z_]+)_(%d+)$")
×
496
        if k then
×
497
                return string.format(
×
498
                        "%08s-%04d-%04d-%04d-%012d",
499
                        digest.sha1_hex(k .. "_shard"):sub(1,8),
×
500
                        d1,0,0,0
×
501
                )
502
        end
503
        error("Can't generate uuid for cluster "..cluster_name, 2)
×
504
end
505

506
---@class moonlibs.config.opts.etcd:moonlibs.config.etcd.opts
507
---@field instance_name string Mandatory name of the instance
508
---@field prefix string Mandatory prefix inside etcd tree
509
---@field uuid? 'auto' When auto config generates replicaset_uuid and instance_uuid for nodes
510
---@field fixed? table Optional ETCD tree
511

512
---Loads configuration from etcd and evaluate master_selection_policy
513
---@param M moonlibs.config
514
---@param etcd_conf moonlibs.config.opts.etcd
515
---@param local_cfg table<string,any>
516
---@return table<string, any>
517
local function etcd_load( M, etcd_conf, local_cfg )
518

519
        local etcd
520
        local instance_name = assert(etcd_conf.instance_name,"etcd.instance_name is required")
70✔
521
        local prefix = assert(etcd_conf.prefix,"etcd.prefix is required")
70✔
522

523
        if etcd_conf.fixed then
70✔
524
                etcd = setmetatable({ data = etcd_conf.fixed },{__index = {
×
525
                        discovery = function() end;
526
                        list = function(e,k)
527
                                if k:sub(1,#prefix) == prefix then
×
528
                                        k = k:sub(#prefix + 1)
×
529
                                end
530
                                local v = e.data
×
531
                                for key in k:gmatch("([^/]+)") do
×
532
                                        if type(v) ~= "table" then return end
×
533
                                        v = v[key]
×
534
                                end
535
                                return v
×
536
                        end;
537
                }})
×
538
        else
539
                etcd = require 'config.etcd' (etcd_conf)
140✔
540
        end
541
        M.etcd = etcd
70✔
542

543
        function M.etcd.get_common(e)
140✔
544
                local common_cfg = e:list(prefix .. "/common")
×
545
                assert(common_cfg.box,"no box config in etcd common tree")
×
546
                cast_types(common_cfg.box)
×
547
                return common_cfg
×
548
        end
549

550
        function M.etcd.get_instances(e)
140✔
551
                local all_instances_cfg = e:list(prefix .. "/instances")
×
552
                for inst_name,inst_cfg in pairs(all_instances_cfg) do
×
553
                        cast_types(inst_cfg.box)
×
554
                        if etcd_conf.uuid == 'auto' and not inst_cfg.box.instance_uuid then
×
555
                                inst_cfg.box.instance_uuid = gen_instance_uuid(inst_name)
×
556
                        end
557
                end
558
                return all_instances_cfg
×
559
        end
560

561
        function M.etcd.get_clusters(e)
140✔
562
                local all_clusters_cfg = e:list(prefix .. "/clusters") or etcd:list(prefix .. "/shards")
×
563
                for cluster_name,cluster_cfg in pairs(all_clusters_cfg) do
×
564
                        cast_types(cluster_cfg)
×
565
                        if etcd_conf.uuid == 'auto' and not cluster_cfg.replicaset_uuid then
×
566
                                cluster_cfg.replicaset_uuid = gen_cluster_uuid(cluster_name)
×
567
                        end
568
                end
569
                return all_clusters_cfg
×
570
        end
571

572
        function M.etcd.get_all(e)
140✔
573
                local all_cfg = e:list(prefix)
70✔
574
                cast_types(all_cfg.common.box)
70✔
575
                for inst_name,inst_cfg in pairs(all_cfg.instances) do
233✔
576
                        cast_types(inst_cfg.box)
163✔
577
                        if etcd_conf.uuid == 'auto' and not inst_cfg.box.instance_uuid then
163✔
578
                                inst_cfg.box.instance_uuid = gen_instance_uuid(inst_name)
×
579
                        end
580
                end
581
                for cluster_name,cluster_cfg in pairs(all_cfg.clusters or all_cfg.shards or {}) do
124✔
582
                        cast_types(cluster_cfg)
54✔
583
                        if etcd_conf.uuid == 'auto' and not cluster_cfg.replicaset_uuid then
54✔
584
                                cluster_cfg.replicaset_uuid = gen_cluster_uuid(cluster_name)
×
585
                        end
586
                end
587
                return all_cfg
70✔
588
        end
589

590
        etcd:discovery()
70✔
591

592
        local all_cfg = etcd:get_all()
70✔
593
        if etcd_conf.print_config then
70✔
594
                print("Loaded config from etcd",yaml.encode(all_cfg))
×
595
        end
596
        local common_cfg = all_cfg.common
70✔
597
        local all_instances_cfg = all_cfg.instances
70✔
598

599
        local instance_cfg = all_instances_cfg[instance_name]
70✔
600
        assert(instance_cfg,"Instance name "..instance_name.." is not known to etcd")
70✔
601

602
        local all_clusters_cfg = all_cfg.clusters or all_cfg.shards
70✔
603

604
        local master_selection_policy
605
        local cluster_cfg
606
        if instance_cfg.cluster or local_cfg.cluster then
70✔
607
                cluster_cfg = all_clusters_cfg[ (instance_cfg.cluster or local_cfg.cluster) ]
54✔
608
                assert(cluster_cfg,"Cluster section required");
54✔
609
                assert(cluster_cfg.replicaset_uuid,"Need cluster uuid")
54✔
610
                master_selection_policy = M.master_selection_policy or 'etcd.instance.read_only'
54✔
611
        elseif instance_cfg.router then
16✔
612
                -- TODO
613
                master_selection_policy = M.master_selection_policy or 'etcd.instance.single'
×
614
        else
615
                master_selection_policy = M.master_selection_policy or 'etcd.instance.single'
16✔
616
        end
617

618
        local master_policy = master_selection_policies[ master_selection_policy ]
70✔
619
        if not master_policy then
70✔
620
                error(string.format("Unknown master_selection_policy: %s",M.master_selection_policy),0)
×
621
        end
622

623
        local cfg = master_policy(M, instance_name, common_cfg, instance_cfg, cluster_cfg, local_cfg)
70✔
624

625
        local members = {}
70✔
626
        for _,v in pairs(all_instances_cfg) do
233✔
627
                if v.cluster == cfg.cluster then -- and k ~= instance_name then
163✔
628
                        if not toboolean(v.disabled) then
326✔
629
                                table.insert(members,v)
163✔
630
                        else
631
                                log.warn("Member '%s' from cluster '%s' listening on %s is disabled", instance_name, v.cluster, v.box.listen)
×
632
                        end
633
                end
634
        end
635

636
        if cfg.cluster then
70✔
637
                --if cfg.box.read_only then
638
                        local repl = {}
54✔
639
                        for _,member in pairs(members) do
189✔
640
                                if member.box.remote_addr then
135✔
641
                                        table.insert(repl, member.box.remote_addr)
×
642
                                else
643
                                        table.insert(repl, member.box.listen)
135✔
644
                                end
645
                        end
646
                        table.sort(repl, function(a,b)
108✔
647
                                local ha,pa = a:match('^([^:]+):(.+)')
84✔
648
                                local hb,pb = a:match('^([^:]+):(.+)')
84✔
649
                                if pa and pb then
84✔
650
                                        if pa < pb then return true end
84✔
651
                                        if ha < hb then return true end
84✔
652
                                end
653
                                return a < b
84✔
654
                        end)
655
                        if cfg.box.replication then
54✔
656
                                print(
×
657
                                        "Start instance ",cfg.box.listen,
×
658
                                        " with locally overriden replication:",table.concat(cfg.box.replication,", "),
×
659
                                        " instead of etcd's:", table.concat(repl,", ")
×
660
                                )
661
                        else
662
                                cfg.box.replication = repl
54✔
663
                                print(
108✔
664
                                        "Start instance "..cfg.box.listen,
54✔
665
                                        " with replication:"..table.concat(cfg.box.replication,", "),
54✔
666
                                        string.format("timeout: %s, quorum: %s, lag: %s",
108✔
667
                                                cfg.box.replication_connect_timeout
54✔
668
                                                        or ('def:%s'):format(load_cfg.default_cfg.replication_connect_quorum or 30),
54✔
669
                                                cfg.box.replication_connect_quorum or 'def:full',
54✔
670
                                                cfg.box.replication_sync_lag
54✔
671
                                                        or ('def:%s'):format(load_cfg.default_cfg.replication_sync_lag or 10)
54✔
672
                                        )
673
                                )
674
                        end
675
                --end
676
        end
677
        -- print(yaml.encode(cfg))
678

679
        return cfg
70✔
680
end
681

682
local function is_replication_changed (old_conf, new_conf)
683
        if type(old_conf) == 'table' and type(new_conf) == 'table' then
6✔
684
                local changed_replicas = {}
6✔
685
                for _, replica in pairs(old_conf) do
21✔
686
                        changed_replicas[replica] = true
15✔
687
                end
688

689
                for _, replica in pairs(new_conf) do
21✔
690
                        if changed_replicas[replica] then
15✔
691
                                changed_replicas[replica] = nil
15✔
692
                        else
693
                                return true
×
694
                        end
695
                end
696

697
                -- if we have some changed_replicas left, then we definitely need to reconnect
698
                return not not next(changed_replicas)
6✔
699
        else
700
                return old_conf ~= new_conf
×
701
        end
702
end
703

704
local function optimal_rcq(upstreams)
705
        local n_ups = #(upstreams or {})
16✔
706
        local rcq
707
        if n_ups == 0 then
16✔
708
                rcq = 0
4✔
709
        else
710
                rcq = 1+math.floor(n_ups/2)
12✔
711
        end
712
        return rcq
16✔
713
end
714

715
local function do_cfg(boxcfg, cfg)
716
        for key, val in pairs(cfg) do
393✔
717
                if load_cfg.default_cfg[key] == nil and load_cfg.dynamic_cfg[key] == nil then
332✔
718
                        local warn = string.format("Dropping non-boxcfg option '%s' given '%s'",key,val)
×
719
                        log.warn("%s",warn)
×
720
                        print(warn)
×
721
                        cfg[key] = nil
×
722
                end
723
        end
724
        log.info("Just before box.cfg %s", yaml.encode(cfg))
61✔
725
        boxcfg(cfg)
61✔
726
end
727

728

729
---@class moonlibs.config.opts
730
---@field bypass_non_dynamic? boolean (default: true) drops every changed non-dynamic option on reconfiguration
731
---@field tidy_load? boolean (default: true) recoveries tarantool with read_only=true
732
---@field mkdir? boolean (default: false) should moonlibs/config create memtx_dir and wal_dir
733
---@field etcd? moonlibs.config.opts.etcd [legacy] configuration of etcd
734
---@field default_replication_connect_timeout? number (default: 1.1) default RCT in seconds
735
---@field default_election_mode? election_mode (default: candidate) option is respected only when etcd.cluster.raft is used
736
---@field default_synchro_quorum? string|number (default: 'N/2+1') option is respected only when etcd.cluster.raft is used
737
---@field default_read_only? boolean (default: false) option is respected only when etcd.instance.read_only is used (deprecated)
738
---@field master_selection_policy? 'etcd.cluster.master'|'etcd.cluster.vshard'|'etcd.cluster.raft'|'etcd.instance.single' master selection policy
739
---@field strict_mode? boolean (default: false) stricts config retrievals. if key is not found config.get will raise an exception
740
---@field strict? boolean (default: false) stricts config retrievals. if key is not found config.get will raise an exception
741
---@field default? table<string,any> (default: nil) globally default options for config.get
742
---@field on_load? fun(conf: moonlibs.config, cfg: table<string,any>) callback which is called every time config is loaded from file and ETCD
743
---@field load? fun(conf: moonlibs.config, cfg: table<string,any>): table<string,any> do not use this callback
744
---@field on_before_cfg? fun(conf: moonlibs.config, cfg: table<string,any>) callback is called right before running box.cfg (but after on_load)
745
---@field boxcfg? fun(cfg: table<string,any>) [legacy] when provided this function will be called instead box.cfg. tidy_load and everything else will not be used.
746
---@field wrap_box_cfg? fun(cfg: table<string,any>) callback is called instead box.cfg. But tidy_load is respected. Use this, if you need to proxy every option to box.cfg on application side
747
---@field on_after_cfg? fun(conf: moonlibs.config, cfg: table<string,any>) callback which is called after full tarantool configuration
748

749
---@class moonlibs.config: moonlibs.config.opts
750
---@field etcd moonlibs.config.etcd
751
---@field public _load_cfg table
752
---@field public _flat table
753
---@field public _fencing_f? Fiber
754
---@operator call(moonlibs.config.opts): moonlibs.config
755

756
---@type moonlibs.config
757
local M
758
        M = setmetatable({
76✔
759
                console = {};
38✔
760
                ---Retrieves value from config
761
                ---@overload fun(k: string, def: any?): any?
762
                ---@param self moonlibs.config
763
                ---@param k string path inside config
764
                ---@param def? any optional default value
765
                ---@return any?
766
                get = function(self,k,def)
767
                        if self ~= M then
288✔
768
                                def = k
288✔
769
                                k = self
288✔
770
                        end
771
                        if M._flat[k] ~= nil then
416✔
772
                                return M._flat[k]
160✔
773
                        elseif def ~= nil then
128✔
774
                                return def
60✔
775
                        else
776
                                if M.strict_mode then
68✔
777
                                        error(string.format("no %s found in config", k))
×
778
                                else
779
                                        return
68✔
780
                                end
781
                        end
782
                end
783
        },{
38✔
784
                ---Reinitiates moonlibs.config
785
                ---@param args moonlibs.config.opts
786
                ---@return moonlibs.config
787
                __call = function(_, args)
788
                        -- args MUST belong to us, because of modification
789
                        local file
790
                        if type(args) == 'string' then
38✔
791
                                file = args
×
792
                                args = {}
×
793
                        elseif type(args) == 'table' then
38✔
794
                                args = deep_copy(args)
76✔
795
                                file = args.file
38✔
796
                        else
797
                                args = {}
×
798
                        end
799
                        if args.bypass_non_dynamic == nil then
38✔
800
                                args.bypass_non_dynamic = true
38✔
801
                        end
802
                        if args.tidy_load == nil then
38✔
803
                                args.tidy_load = true
38✔
804
                        end
805
                        M.default_replication_connect_timeout = args.default_replication_connect_timeout or 1.1
38✔
806
                        M.default_election_mode = args.default_election_mode or 'candidate'
38✔
807
                        M.default_synchro_quorum = args.default_synchro_quorum or 'N/2+1'
38✔
808
                        M.default_read_only = args.default_read_only or false
38✔
809
                        M.master_selection_policy = args.master_selection_policy
38✔
810
                        M.default = args.default
38✔
811
                        M.strict_mode = args.strict_mode or args.strict or false
38✔
812
                        M._load_cfg = load_cfg
38✔
813
                        -- print("config", "loading ",file, json.encode(args))
814
                        if not file then
38✔
815
                                file = get_opt()
×
816
                                -- todo: maybe etcd?
817
                                if not file then error("Neither config call option given not -c|--config option passed",2) end
×
818
                        end
819

820
                        print(string.format("Loading config %s %s", file, json.encode(args)))
38✔
821

822
                        local function load_config()
823

824
                                local methods = {}
70✔
825
                                function methods.merge(dst, src, keep)
70✔
826
                                        if src ~= nil then
×
827
                                                deep_merge( dst, src, keep )
×
828
                                        end
829
                                        return dst
×
830
                                end
831

832
                                function methods.include(path, opts)
70✔
833
                                        path = fio.pathjoin(fio.dirname(file), path)
×
834
                                        opts = opts or { if_exists = false }
×
835
                                        if not fio.path.exists(path) then
×
836
                                                if opts.if_exists then
×
837
                                                        return
×
838
                                                end
839
                                                error("Not found include file `"..path.."'", 2)
×
840
                                        end
841
                                        local f,e = loadfile(path)
×
842
                                        if not f then error(e,2) end
×
843
                                        setfenv(f, getfenv(2))
×
844
                                        local ret = f()
×
845
                                        if ret ~= nil then
×
846
                                                print("Return value from "..path.." is ignored")
×
847
                                        end
848
                                end
849

850
                                function methods.print(...)
70✔
851
                                        local p = {...}
×
852
                                        for i = 1, select('#', ...) do
×
853
                                                if type(p[i]) == 'table'
×
854
                                                        and not debug.getmetatable(p[i])
×
855
                                                then
856
                                                        p[i] = json.encode(p[i])
×
857
                                                end
858
                                        end
859
                                        print(unpack(p))
×
860
                                end
861

862
                                local f,e = loadfile(file)
70✔
863
                                if not f then error(e,2) end
70✔
864
                                local cfg = setmetatable({}, {
140✔
865
                                        __index = setmetatable(methods, {
140✔
866
                                                __index = setmetatable(args,{ __index = _G })
70✔
867
                                        })
70✔
868
                                })
869
                                setfenv(f, cfg)
70✔
870
                                local ret = f()
70✔
871
                                if ret ~= nil then
70✔
872
                                        print("Return value from "..file.." is ignored")
×
873
                                end
874
                                setmetatable(cfg,nil)
70✔
875
                                setmetatable(args,nil)
70✔
876
                                deep_merge(cfg,args.default or {},'keep')
70✔
877

878
                                -- subject to change, just a PoC
879
                                local etcd_conf = args.etcd or cfg.etcd
70✔
880
                                if etcd_conf then
70✔
881
                                        local s = clock.time()
70✔
882
                                        cfg = etcd_load(M, etcd_conf, cfg)
140✔
883
                                        log.info("etcd_load took %.3fs", clock.time()-s)
70✔
884
                                end
885

886
                                if args.load then
70✔
887
                                        cfg = args.load(M, cfg)
×
888
                                end
889

890
                                if not cfg.box then
70✔
891
                                        error("No box.* config given", 2)
×
892
                                end
893

894
                                if cfg.box.remote_addr then
70✔
895
                                        cfg.box.remote_addr = nil
×
896
                                end
897

898
                                if args.bypass_non_dynamic then
70✔
899
                                        cfg.box = prepare_box_cfg(cfg.box)
140✔
900
                                end
901

902
                                deep_merge(cfg,{
140✔
903
                                        sys = deep_copy(args)
140✔
904
                                })
905
                                cfg.sys.boxcfg = nil
70✔
906
                                cfg.sys.on_load = nil
70✔
907

908
                                -- latest modifications and fixups
909
                                if args.on_load then
70✔
910
                                        args.on_load(M,cfg)
×
911
                                end
912
                                return cfg
70✔
913
                        end
914

915
                        local cfg = load_config() --[[@as table]]
38✔
916

917
                        M._flat = flatten(cfg)
76✔
918

919
                        if args.on_before_cfg then
38✔
920
                                args.on_before_cfg(M,cfg)
×
921
                        end
922

923
                        if args.mkdir then
38✔
924
                                if not ( fio.path and fio.mkdir ) then
38✔
925
                                        error(string.format("Tarantool version %s is too old for mkdir: fio.path is not supported", _TARANTOOL),2)
×
926
                                end
927
                                for _,key in pairs({"work_dir", "wal_dir", "snap_dir", "memtx_dir", "vinyl_dir"}) do
228✔
928
                                        local v = cfg.box[key]
190✔
929
                                        if v and not fio.path.exists(v) then
266✔
930
                                                local r,e = fio.mktree(v)
22✔
931
                                                if not r then error(string.format("Failed to create path '%s' for %s: %s",v,key,e),2) end
22✔
932
                                        end
933
                                end
934
                                local v = cfg.box.pid_file
38✔
935
                                if v then
38✔
936
                                        v = fio.dirname(v);
937
                                        if v and not fio.path.exists(v) then
×
938
                                                local r,e = fio.mktree(v)
×
939
                                                if not r then error(string.format("Failed to create path '%s' for pid_file: %s",v,e),2) end
×
940
                                        end
941
                                end
942
                        end
943

944
                        local boxcfg = box.cfg
38✔
945

946
                        if args.boxcfg then
38✔
947
                                do_cfg(args.boxcfg, cfg.box)
×
948
                        else
949
                                if args.wrap_box_cfg then
38✔
950
                                        boxcfg = args.wrap_box_cfg
×
951
                                end
952
                                if type(box.cfg) == 'function' then
38✔
953
                                        if M.etcd then
32✔
954
                                                if args.tidy_load then
32✔
955
                                                        local snap_dir = cfg.box.snap_dir or cfg.box.memtx_dir
32✔
956
                                                        if not snap_dir then
32✔
957
                                                                if cfg.box.work_dir then
×
958
                                                                        snap_dir = cfg.box.work_dir
×
959
                                                                else
960
                                                                        snap_dir = "."
×
961
                                                                end
962
                                                        end
963
                                                        local bootstrapped
964
                                                        for _,v in pairs(fio.glob(snap_dir..'/*.snap')) do
42✔
965
                                                                bootstrapped = v
10✔
966
                                                        end
967

968
                                                        if bootstrapped then
32✔
969
                                                                print("Have etcd, use tidy load")
10✔
970
                                                                local ro = cfg.box.read_only
10✔
971
                                                                cfg.box.read_only = true
10✔
972
                                                                if cfg.box.bootstrap_strategy ~= 'auto' then
10✔
973
                                                                        if not ro then
10✔
974
                                                                                -- Only if node should be master
975
                                                                                cfg.box.replication_connect_quorum = 1
7✔
976
                                                                                cfg.box.replication_connect_timeout = M.default_replication_connect_timeout
7✔
977
                                                                        elseif not cfg.box.replication_connect_quorum then
3✔
978
                                                                                -- For replica tune up to N/2+1
979
                                                                                cfg.box.replication_connect_quorum = optimal_rcq(cfg.box.replication)
6✔
980
                                                                        end
981
                                                                end
982
                                                                log.info("Start tidy loading with ro=true%s rcq=%s rct=%s (snap=%s)",
20✔
983
                                                                        ro ~= true and string.format(' (would be %s)',ro) or '',
10✔
984
                                                                        cfg.box.replication_connect_quorum, cfg.box.replication_connect_timeout,
10✔
985
                                                                        bootstrapped
986
                                                                )
20✔
987
                                                        else
988
                                                                -- not bootstraped yet cluster
989

990
                                                                -- if cfg.box.bootstrap_strategy == 'auto' then -- ≥ Tarantool 2.11
991
                                                                -- local ro = cfg.box.read_only
992
                                                                -- local is_candidate = cfg.box.election_mode == 'candidate'
993
                                                                --         if not ro and not is_candidate then
994
                                                                --                 -- master but not Raft/candidate
995
                                                                --                 -- we decrease replication for master,
996
                                                                --                 -- to allow him bootstrap himself
997
                                                                --                 cfg.box.replication = {cfg.box.remote_addr or cfg.box.listen}
998
                                                                --         end
999
                                                                if cfg.box.bootstrap_strategy ~= 'auto' then -- < Tarantool 2.11
22✔
1000
                                                                        if cfg.box.replication_connect_quorum == nil then
22✔
1001
                                                                                cfg.box.replication_connect_quorum = optimal_rcq(cfg.box.replication)
26✔
1002
                                                                        end
1003
                                                                end
1004

1005
                                                                log.info("Start non-bootstrapped tidy loading with ro=%s rcq=%s rct=%s (dir=%s)",
44✔
1006
                                                                        cfg.box.read_only, cfg.box.replication_connect_quorum,
22✔
1007
                                                                        cfg.box.replication_connect_timeout, snap_dir
22✔
1008
                                                                )
22✔
1009
                                                        end
1010
                                                end
1011

1012
                                                do_cfg(boxcfg, cfg.box)
32✔
1013

1014
                                                log.info("Reloading config after start")
32✔
1015

1016
                                                local new_cfg = load_config()
32✔
1017
                                                local diff_box = value_diff(cfg.box, new_cfg.box)
32✔
1018

1019
                                                -- since load_config loads config also for reloading it removes non-dynamic options
1020
                                                -- therefore, they would be absent, but should not be passed. remove them
1021
                                                if diff_box then
32✔
1022
                                                        for key in pairs(diff_box) do
57✔
1023
                                                                if load_cfg.dynamic_cfg[key] == nil then
34✔
1024
                                                                        diff_box[key] = nil
×
1025
                                                                end
1026
                                                        end
1027
                                                        if not next(diff_box) then
23✔
1028
                                                                diff_box = nil
×
1029
                                                        end
1030
                                                end
1031

1032
                                                if diff_box then
32✔
1033
                                                        log.info("Reconfigure after load with %s",require'json'.encode(diff_box))
23✔
1034
                                                        do_cfg(boxcfg, diff_box)
46✔
1035
                                                else
1036
                                                        log.info("Config is actual after load")
9✔
1037
                                                end
1038

1039
                                                M._flat = flatten(new_cfg)
64✔
1040
                                        else
1041
                                                do_cfg(boxcfg, cfg.box)
×
1042
                                        end
1043
                                else
1044
                                        local replication     = cfg.box.replication_source or cfg.box.replication
6✔
1045
                                        local box_replication = box.cfg.replication_source or box.cfg.replication
6✔
1046

1047
                                        if not is_replication_changed(replication, box_replication) then
12✔
1048
                                                local r  = cfg.box.replication
6✔
1049
                                                local rs = cfg.box.replication_source
6✔
1050
                                                cfg.box.replication        = nil
6✔
1051
                                                cfg.box.replication_source = nil
6✔
1052

1053
                                                do_cfg(boxcfg, cfg.box)
6✔
1054

1055
                                                cfg.box.replication        = r
6✔
1056
                                                cfg.box.replication_source = rs
6✔
1057
                                        else
1058
                                                do_cfg(boxcfg, cfg.box)
×
1059
                                        end
1060
                                end
1061
                        end
1062

1063
                        if args.on_after_cfg then
38✔
1064
                                args.on_after_cfg(M,cfg)
38✔
1065
                        end
1066
                        -- print(string.format("Box configured"))
1067

1068
                        local msp = config.get('sys.master_selection_policy')
38✔
1069
                        if type(cfg.etcd) == 'table'
38✔
1070
                                and config.get('etcd.fencing_enabled')
38✔
1071
                                and (msp == 'etcd.cluster.master' or msp == 'etcd.cluster.vshard')
38✔
1072
                                and type(cfg.cluster) == 'string' and cfg.cluster ~= ''
30✔
1073
                                and config.get('etcd.reduce_listing_quorum') ~= true
60✔
1074
                        then
1075
                                M._fencing_f = fiber.create(function()
60✔
1076
                                        fiber.name('config/fencing')
30✔
1077
                                        fiber.yield() -- yield execution
30✔
1078
                                        local function in_my_gen() fiber.testcancel() return config._fencing_f == fiber.self() end
217✔
1079
                                        assert(cfg.cluster, "cfg.cluster must be defined")
30✔
1080

1081
                                        local watch_path = fio.pathjoin(
60✔
1082
                                                config.get('etcd.prefix'),
30✔
1083
                                                'clusters',
30✔
1084
                                                cfg.cluster
30✔
1085
                                        )
30✔
1086

1087
                                        local my_name = assert(config.get('sys.instance_name'), "instance_name is not defined")
60✔
1088
                                        local fencing_timeout = config.get('etcd.fencing_timeout', 10)
30✔
1089
                                        local fencing_pause = config.get('etcd.fencing_pause', fencing_timeout/2)
30✔
1090
                                        assert(fencing_pause < fencing_timeout, "fencing_pause must be < fencing_timeout")
30✔
1091
                                        local fencing_check_replication = config.get('etcd.fencing_check_replication')
30✔
1092
                                        if type(fencing_check_replication) == 'string' then
30✔
1093
                                                fencing_check_replication = fencing_check_replication == 'true'
×
1094
                                        else
1095
                                                fencing_check_replication = fencing_check_replication == true
30✔
1096
                                        end
1097

1098
                                        local etcd_cluster, watch_index
1099

1100
                                        local function refresh_list(opts)
1101
                                                local s = fiber.time()
18✔
1102
                                                local result, resp = config.etcd:list(watch_path, opts)
18✔
1103
                                                local elapsed = fiber.time()-s
36✔
1104

1105
                                                log.verbose("[fencing] list(%s) => %s in %.3fs %s",
36✔
1106
                                                        watch_path, resp.status, elapsed, json.encode(resp.body))
18✔
1107

1108
                                                if resp.status == 200 then
18✔
1109
                                                        etcd_cluster = result
18✔
1110
                                                        if type(resp.headers) == 'table'
18✔
1111
                                                                and tonumber(resp.headers['x-etcd-index'])
18✔
1112
                                                                and tonumber(resp.headers['x-etcd-index']) >= (tonumber(watch_index) or 0)
18✔
1113
                                                        then
1114
                                                                watch_index = (tonumber(resp.headers['x-etcd-index']) or -1) + 1
15✔
1115
                                                        end
1116
                                                end
1117
                                                return etcd_cluster, watch_index
18✔
1118
                                        end
1119

1120
                                        local function fencing_check(deadline)
1121
                                                -- we can only allow half of the time till deadline
1122
                                                local timeout = math.min((deadline-fiber.time())*0.5, fencing_pause)
24✔
1123
                                                log.verbose("[wait] timeout:%.3fs FP:%.3fs", timeout, fencing_pause)
12✔
1124

1125
                                                local check_started = fiber.time()
12✔
1126
                                                local pcall_ok, err_or_resolution, new_cluster = pcall(function()
24✔
1127
                                                        local started = fiber.time()
12✔
1128
                                                        local n_endpoints = #config.etcd.endpoints
12✔
1129
                                                        local not_timed_out, response = config.etcd:wait(watch_path, {
24✔
1130
                                                                index = watch_index,
12✔
1131
                                                                timeout = timeout/n_endpoints,
12✔
1132
                                                        })
1133
                                                        local logger
1134
                                                        if not_timed_out then
6✔
1135
                                                                if tonumber(response.status) and tonumber(response.status) >= 400 then
3✔
1136
                                                                        logger = log.error
×
1137
                                                                else
1138
                                                                        logger = log.info
3✔
1139
                                                                end
1140
                                                        else
1141
                                                                logger = log.verbose
3✔
1142
                                                        end
1143
                                                        logger("[fencing] wait(%s,index=%s,timeout=%.3fs) => %s (ind:%s) %s took %.3fs",
12✔
1144
                                                                watch_path, watch_index, timeout,
6✔
1145
                                                                response.status, (response.headers or {})['x-etcd-index'],
6✔
1146
                                                                json.encode(response.body), fiber.time()-started)
12✔
1147

1148
                                                        -- http timed out / or network drop - we'll never know
1149
                                                        if not not_timed_out then return 'timeout' end
6✔
1150
                                                        local res = json.decode(response.body)
3✔
1151

1152
                                                        if type(response.headers) == 'table'
3✔
1153
                                                                and tonumber(response.headers['x-etcd-index'])
3✔
1154
                                                                and tonumber(response.headers['x-etcd-index']) >= watch_index
3✔
1155
                                                        then
1156
                                                                watch_index = (tonumber(response.headers['x-etcd-index']) or -1) + 1
2✔
1157
                                                        end
1158

1159
                                                        if res.node then
3✔
1160
                                                                local node = {}
3✔
1161
                                                                config.etcd:recursive_extract(watch_path, res.node, node)
3✔
1162
                                                                log.info("[fencing] watch index changed: %s =>  %s", watch_path, json.encode(node))
3✔
1163
                                                                if not node.master then node = nil end
3✔
1164
                                                                return 'changed', node
3✔
1165
                                                        end
1166
                                                end)
1167

1168
                                                log.verbose("[wait] took:%.3fs exp:%.3fs", fiber.time()-check_started, timeout)
12✔
1169
                                                if not in_my_gen() then return end
12✔
1170

1171
                                                if not pcall_ok then
6✔
1172
                                                        log.warn("ETCD watch failed: %s", err_or_resolution)
×
1173
                                                end
1174

1175
                                                if err_or_resolution ~= 'changed' then
6✔
1176
                                                        new_cluster = nil
3✔
1177
                                                end
1178

1179
                                                if not new_cluster then
6✔
1180
                                                        local list_started = fiber.time()
3✔
1181
                                                        log.verbose("[listing] left:%.3fs", deadline-fiber.time())
6✔
1182
                                                        repeat
1183
                                                                local ok, e_cluster = pcall(refresh_list, {deadline = deadline})
3✔
1184
                                                                if ok and e_cluster then
3✔
1185
                                                                        new_cluster = e_cluster
3✔
1186
                                                                        break
3✔
1187
                                                                end
1188

1189
                                                                if not in_my_gen() then return end
×
1190
                                                                -- we can only sleep 50% till deadline will be reached
1191
                                                                local sleep = math.min(fencing_pause, 0.5*(deadline - fiber.time()))
×
1192
                                                                fiber.sleep(sleep)
×
1193
                                                        until fiber.time() > deadline
×
1194
                                                        log.verbose("[list] took:%.3fs left:%.3fs",
6✔
1195
                                                                fiber.time()-list_started, deadline-fiber.time())
9✔
1196
                                                end
1197

1198
                                                if not in_my_gen() then return end
12✔
1199

1200
                                                if type(new_cluster) ~= 'table' then -- ETCD is down
6✔
1201
                                                        log.warn('[fencing] ETCD %s is not discovered in etcd during %.2fs %s',
×
1202
                                                                watch_path, fiber.time()-check_started, new_cluster)
×
1203

1204
                                                        if not fencing_check_replication then
×
1205
                                                                -- ETCD is down, we do not know what is happening
1206
                                                                return nil
×
1207
                                                        end
1208

1209
                                                        -- In proper fencing we must step down immediately as soon as we discover
1210
                                                        -- that coordinator is down. But in real world there are some circumstances
1211
                                                        -- when coordinator can be down for several seconds if someone crashes network
1212
                                                        -- or ETCD itself.
1213
                                                        -- We propose that it is safe to not step down as soon as we are connected to all
1214
                                                        -- replicas in replicaset (etcd.cluster.master is fullmesh topology).
1215
                                                        -- We do not check downstreams here, because downstreams cannot lead to collisions.
1216
                                                        -- If at least 1 upstream is not in status follow
1217
                                                        -- (Tarantool replication checks with tcp-healthchecks once in box.cfg.replication_timeout)
1218
                                                        -- We immediately stepdown.
1219
                                                        for _, ru in pairs(box.info.replication) do
×
1220
                                                                if ru.id ~= box.info.id and ru.upstream then
×
1221
                                                                        if ru.upstream.status ~= "follow" then
×
1222
                                                                                log.warn("[fencing] upstream %s is not followed by me %s:%s (idle: %s, lag:%s)",
×
1223
                                                                                        ru.upstream.peer, ru.upstream.status, ru.upstream.message,
×
1224
                                                                                        ru.upstream.idle, ru.upstream.lag
×
1225
                                                                                )
1226
                                                                                return nil
×
1227
                                                                        end
1228
                                                                end
1229
                                                        end
1230

1231
                                                        log.warn('[fencing] ETCD is down but all upstreams are followed by me. Continuing leadership')
×
1232
                                                        return true
×
1233
                                                elseif new_cluster.master == my_name then
6✔
1234
                                                        -- The most commmon branch. We are registered as the leader.
1235
                                                        return true
3✔
1236
                                                elseif new_cluster.switchover then -- new_cluster.master ~= my_name
3✔
1237
                                                        -- Another instance is the leader in ETCD. But we could be the one
1238
                                                        -- who is going to be the next (cluster is under switching right now).
1239
                                                        -- It is almost impossible to get this path in production. But the only one
1240
                                                        -- protection we have is `fencing_pause` and `fencing_timeout`.
1241
                                                        -- So, we will do nothing until ETCD mutex is present
1242
                                                        log.warn('[fencing] It seems that cluster is under switchover right now %s', json.encode(new_cluster))
×
1243
                                                        -- Note: this node was rw (otherwise we would not execute fencing_check at all)
1244
                                                        -- During normal switch registered leader is RO (because we are RW, and we are not the leader)
1245
                                                        -- And in the next step coordinator will update leader info in ETCD.
1246
                                                        -- so this condition seems to be unreachable for every node
1247
                                                        return nil
×
1248
                                                else
1249
                                                        log.warn('[fencing] ETCD %s/master is %s not us. Stepping down', watch_path, new_cluster.master)
3✔
1250
                                                        -- ETCD is up, master is not us => we must step down immediately
1251
                                                        return false
3✔
1252
                                                end
1253
                                        end
1254

1255
                                        -- Main fencing loop
1256
                                        -- It is executed on every replica in the shard
1257
                                        -- if instance is ro then it will wait until instance became rw
1258
                                        while in_my_gen() do
66✔
1259
                                                -- Wait until instance became rw loop
1260
                                                while box.info.ro and in_my_gen() do
149✔
1261
                                                        -- this is just fancy sleep.
1262
                                                        -- if node became rw in less than 3 seconds we will check it immediately
1263
                                                        pcall(box.ctl.wait_rw, 3)
67✔
1264
                                                end
1265

1266
                                                -- after waiting to be rw we will step into fencing-loop
1267
                                                -- we must check that we are still in our code generation
1268
                                                -- to proceed
1269
                                                if not in_my_gen() then return end
30✔
1270

1271
                                                --- Initial Load of etcd_cluster and watch_index
1272
                                                local attempt = 0
15✔
1273
                                                while in_my_gen() do
30✔
1274
                                                        local ok, err = pcall(refresh_list)
15✔
1275
                                                        if not in_my_gen() then return end
30✔
1276

1277
                                                        if ok then break end
15✔
1278
                                                        attempt = attempt + 1
×
1279
                                                        log.warn("[fencing] initial list failed: %s (attempts: %s)", err, attempt)
×
1280

1281
                                                        fiber.sleep(math.random(math.max(0.5, fencing_pause-0.5), fencing_pause+0.5))
×
1282
                                                end
1283

1284
                                                -- we yield to get next ev_run before get fiber.time()
1285
                                                fiber.sleep(0)
15✔
1286
                                                if not in_my_gen() then return end
30✔
1287
                                                log.info("etcd_cluster is %s (index: %s)", json.encode(etcd_cluster), watch_index)
15✔
1288

1289

1290
                                                -- we will not step down until deadline.
1291
                                                local deadline = fiber.time()+fencing_timeout
30✔
1292
                                                repeat
1293
                                                        -- Before ETCD check we better pause
1294
                                                        -- we do a little bit randomized sleep to not spam ETCD
1295
                                                        local hard_limit = deadline-fiber.time()
36✔
1296
                                                        local soft_limit = fencing_timeout-fencing_pause
18✔
1297
                                                        local rand_sleep = math.random()*0.1*math.min(hard_limit, soft_limit)
18✔
1298
                                                        log.verbose("[sleep] hard:%.3fs soft:%.3fs sleep:%.3fs", hard_limit, soft_limit, rand_sleep)
18✔
1299
                                                        fiber.sleep(rand_sleep)
18✔
1300
                                                        -- After each yield we have to check that we are still in our generation
1301
                                                        if not in_my_gen() then return end
24✔
1302

1303
                                                        -- some one makes us readonly. There no need to check ETCD
1304
                                                        -- we break from this loop immediately
1305
                                                        if box.info.ro then break end
12✔
1306

1307
                                                        -- fencing_check(deadline) if it returns true,
1308
                                                        -- then we update leadership leasing
1309
                                                        local verdict = fencing_check(deadline)
12✔
1310
                                                        log.verbose("[verdict:%s] Leasing ft:%.3fs up:%.3fs left:%.3fs",
12✔
1311
                                                                verdict == true and "ok"
6✔
1312
                                                                        or verdict == false and "step"
6✔
1313
                                                                        or "unknown",
3✔
1314
                                                                fencing_timeout,
6✔
1315
                                                                verdict and (fiber.time()+fencing_timeout-deadline) or 0,
9✔
1316
                                                                deadline - fiber.time()
12✔
1317
                                                        )
1318
                                                        if verdict == false then
6✔
1319
                                                                -- immediate stepdown
1320
                                                                break
3✔
1321
                                                        elseif verdict then
3✔
1322
                                                                -- update deadline.
1323
                                                                if deadline <= fiber.time() then
6✔
1324
                                                                        log.warn("[fencing] deadline was overflowed deadline:%s, now:%s",
×
1325
                                                                                deadline, fiber.time()
×
1326
                                                                        )
1327
                                                                end
1328
                                                                deadline = fiber.time()+fencing_timeout
6✔
1329
                                                        end
1330
                                                        if not in_my_gen() then return end
6✔
1331

1332
                                                        if deadline <= fiber.time() then
6✔
1333
                                                                log.warn("[fencing] deadline has not been upgraded deadline:%s, now:%s",
×
1334
                                                                        deadline, fiber.time()
×
1335
                                                                )
1336
                                                        end
1337
                                                until box.info.ro or fiber.time() > deadline
6✔
1338

1339
                                                -- We have left deadline-loop. It means that fencing is required
1340
                                                if not box.info.ro then
3✔
1341
                                                        log.warn('[fencing] Performing self fencing (box.cfg{read_only=true})')
3✔
1342
                                                        box.cfg{read_only=true}
3✔
1343
                                                end
1344
                                        end
1345
                                end)
1346
                        end
1347

1348
                        return M
38✔
1349
                end
1350
        })
38✔
1351
        rawset(_G,'config',M)
38✔
1352

1353
return M
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc