• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14216172185

02 Apr 2025 09:25AM UTC coverage: 83.431%. First build
14216172185

Pull #14936

github

web-flow
Merge af5618faf into e7a476b1b
Pull Request #14936: fix(router): purge orphaned routing nodes during reconcile

24 of 25 new or added lines in 1 file covered. (96.0%)

62115 of 74451 relevant lines covered (83.43%)

16241.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.86
/apps/emqx/src/emqx_router_helper.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% Router helper process.
18
%%
19
%% Responsibility is twofold:
20
%% 1. Cleaning own portion of the global routing table when restarted.
21
%%    The assumption is that the node has crashed (worst-case), so the
22
%%    previous incarnation's routes are still present upon restart.
23
%% 2. Managing portions of global routing table belonging to dead / "left"
24
%%    cluster members, i.e. members that are not supposed to come back
25
%%    online again.
26
%%
27
%% Only core nodes are responsible for the latter task. Moreover, helper
28
%% adopts the following operational model:
29
%% 1. Core nodes are supposed to be explicitly evicted (or "left") from
30
%%    the cluster. Even if a core node is marked down for several hours,
31
%%    helper won't attempt to purge its portion of the global routing
32
%%    table.
33
%% 2. Replicant nodes are considered dead (or "left") once they are down
34
%%    for a specific timespan. Currently hardcoded as `?PURGE_DEAD_TIMEOUT`.
35
%%    Ideally it should reflect amount of time it takes for a connectivity
36
%%    failure between cores and replicants to heal worst-case.
37
%%
38
%% TODO
39
%% While cores purge unreachable replicants' routes after a timeout,
40
%% replicants _do nothing_ on connectivity loss, regardless of how long
41
%% it is. Coupled with the fact that replicants are not affected by
42
%% "autoheal" mechanism, this may still lead to routing inconsistencies.
43

44
-module(emqx_router_helper).
45

46
-behaviour(gen_server).
47

48
-include("emqx.hrl").
49
-include("emqx_router.hrl").
50
-include("logger.hrl").
51
-include("types.hrl").
52
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
53
-include_lib("stdlib/include/ms_transform.hrl").
54

55
%% Mnesia bootstrap
56
-export([create_tables/0]).
57

58
%% API
59
-export([
60
    start_link/0,
61
    post_start/0,
62
    monitor/1,
63
    is_routable/1,
64
    schedule_purge/0,
65
    schedule_force_purge/0
66
]).
67

68
%% Internal export
69
-export([stats_fun/0]).
70

71
%% gen_server callbacks
72
-export([
73
    init/1,
74
    handle_call/3,
75
    handle_cast/2,
76
    handle_info/2,
77
    terminate/2,
78
    code_change/3
79
]).
80

81
-record(routing_node, {name, const = unused}).
82

83
-define(TAB_STATUS, ?MODULE).
84

85
-define(LOCK(RESOURCE), {?MODULE, RESOURCE}).
86

87
%% How often to reconcile nodes state? (ms)
88
%% Introduce some jitter to avoid concerted firing on different nodes.
89
-define(RECONCILE_INTERVAL, {2 * 60_000, '±', 15_000}).
90
-define(RECONCILE_TURBULENCE_DELAY, 10_000).
91
%% How soon should a dead node be purged? (ms)
92
-define(PURGE_DEAD_TIMEOUT, 15 * 60_000).
93
%% How soon should a left node be purged? (ms)
94
%% This is a fallback, left node is expected to be purged right away.
95
-define(PURGE_LEFT_TIMEOUT, 15_000).
96

97
-ifdef(TEST).
98
-undef(RECONCILE_INTERVAL).
99
-undef(RECONCILE_TURBULENCE_DELAY).
100
-undef(PURGE_DEAD_TIMEOUT).
101
-undef(PURGE_LEFT_TIMEOUT).
102
-define(RECONCILE_INTERVAL, {2_000, '±', 500}).
103
-define(RECONCILE_TURBULENCE_DELAY, 1_000).
104
-define(PURGE_DEAD_TIMEOUT, 3_000).
105
-define(PURGE_LEFT_TIMEOUT, 1_000).
106
-endif.
107

108
%%--------------------------------------------------------------------
109
%% Mnesia bootstrap
110
%%--------------------------------------------------------------------
111

112
create_tables() ->
113
    ok = mria:create_table(?ROUTING_NODE, [
1,360✔
114
        {type, set},
115
        {rlog_shard, ?ROUTE_SHARD},
116
        {storage, ram_copies},
117
        {record_name, routing_node},
118
        {attributes, record_info(fields, routing_node)},
119
        {storage_properties, [{ets, [{read_concurrency, true}]}]}
120
    ]),
121
    [?ROUTING_NODE].
1,360✔
122

123
%%--------------------------------------------------------------------
124
%% API
125
%%--------------------------------------------------------------------
126

127
%% @doc Starts the router helper
128
-spec start_link() -> startlink_ret().
129
start_link() ->
130
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
1,371✔
131

132
-spec post_start() -> ignore.
133
post_start() ->
134
    %% Cleanup any routes left by old incarnations of this node (if any).
135
    %% Depending on the size of routing tables this can take signicant amount of time.
136
    _ = mria:wait_for_tables([?ROUTING_NODE]),
1,360✔
137
    _ = purge_dead_node(node()),
1,360✔
138
    ignore.
1,360✔
139

140
%% @doc Monitor routing node
141
-spec monitor(node() | {binary(), node()}) -> ok.
142
monitor({_Group, Node}) ->
143
    monitor(Node);
90✔
144
monitor(Node) when is_atom(Node) ->
145
    add_routing_node(Node).
8,263✔
146

147
%% @doc Is given node considered routable?
148
%% I.e. should the broker attempt to forward messages there, even if there are
149
%% routes to this node in the routing table?
150
-spec is_routable(node()) -> boolean().
151
is_routable(Node) when Node == node() ->
152
    true;
16,708✔
153
is_routable(Node) ->
154
    try
295✔
155
        lookup_node_reachable(Node)
295✔
156
    catch
157
        error:badarg -> true
×
158
    end.
159

160
%% @doc Schedule dead node purges.
161
-spec schedule_purge() -> scheduled.
162
schedule_purge() ->
163
    TS = erlang:monotonic_time(millisecond),
×
164
    gen_server:call(?MODULE, {purge, TS}).
×
165

166
%% @doc Force dead node purges, regardless of for how long nodes are down.
167
%% Mostly for testing purposes.
168
-spec schedule_force_purge() -> scheduled.
169
schedule_force_purge() ->
170
    TS = erlang:monotonic_time(millisecond),
×
171
    gen_server:call(?MODULE, {purge, TS + ?PURGE_DEAD_TIMEOUT * 2}).
×
172

173
%%--------------------------------------------------------------------
174
%% gen_server callbacks
175
%%--------------------------------------------------------------------
176

177
-record(ns, {
178
    node :: node(),
179
    status :: down | left,
180
    since :: _MonotonicTimestamp :: integer(),
181
    reachable = false :: boolean()
182
}).
183

184
init([]) ->
185
    process_flag(trap_exit, true),
1,371✔
186
    %% Initialize a table to cache node status.
187
    Tab = ets:new(?TAB_STATUS, [
1,371✔
188
        protected,
189
        {keypos, #ns.node},
190
        named_table,
191
        set,
192
        {read_concurrency, true}
193
    ]),
194
    %% Monitor nodes lifecycle events.
195
    ok = ekka:monitor(membership),
1,371✔
196
    % {ok, _Node} = mnesia:subscribe({table, schema, detailed}),
197
    %% Setup periodic stats reporting.
198
    ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
1,371✔
199
    TRef = schedule_task(reconcile, ?RECONCILE_TURBULENCE_DELAY),
1,371✔
200
    State = #{
1,371✔
201
        last_membership => emqx_maybe:define(cores(), []),
202
        tab_node_status => Tab,
203
        timer_reconcile => TRef
204
    },
205
    ?SLOG(notice, #{msg => router_helper_membership, initial => maps:get(last_membership, State)}),
1,371✔
206
    {ok, State, hibernate}.
1,371✔
207

208
handle_call({purge, TS}, _From, State) ->
209
    NState = schedule_purges(TS, State),
×
210
    {reply, scheduled, NState};
×
211
handle_call(Req, _From, State) ->
212
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
1✔
213
    {reply, ignored, State}.
1✔
214

215
handle_cast(Msg, State) ->
216
    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
1✔
217
    {noreply, State}.
1✔
218

219
handle_info({membership, Event}, State) ->
220
    %% NOTE
221
    %% By the time we receive `leaving` membership event, a node has essentially just
222
    %% started leaving, and not finished yet. To avoid doing things too eagerly, let's
223
    %% just debounce _any_ membership event, resuming reconciliation once the cluster
224
    %% is stable.
225
    State1 = handle_membership_event(Event, State),
1,096✔
226
    State2 = update_known_membership(Event, State1),
1,096✔
227
    NState = debounce_reconcile(State2),
1,096✔
228
    {noreply, NState};
1,096✔
229
handle_info({timeout, _TRef, {start, Task}}, State) ->
230
    NState = handle_task(Task, State),
5,742✔
231
    {noreply, NState};
5,730✔
232
handle_info(Info, State) ->
233
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
1✔
234
    {noreply, State}.
1✔
235

236
terminate(_Reason, _State) ->
237
    emqx_stats:cancel_update(route_stats),
800✔
238
    ekka:unmonitor(membership).
800✔
239

240
code_change(_OldVsn, State, _Extra) ->
241
    {ok, State}.
×
242

243
%%--------------------------------------------------------------------
244
%% Internal functions
245
%%--------------------------------------------------------------------
246

247
handle_membership_event({node, down, Node}, State) ->
248
    _ = record_node_down(Node),
592✔
249
    State;
592✔
250
handle_membership_event({node, leaving, Node}, State) ->
251
    _ = record_node_left(Node),
29✔
252
    State;
29✔
253
handle_membership_event({node, up, Node}, State) ->
254
    _ = record_node_alive(Node),
190✔
255
    State;
190✔
256
handle_membership_event(_Event, State) ->
257
    State.
285✔
258

259
update_known_membership({mnesia, up, Node}, State = #{last_membership := Membership}) ->
260
    State#{last_membership := Membership -- [Node]};
32✔
261
update_known_membership({node, leaving, Node}, State = #{last_membership := Membership}) ->
262
    State#{last_membership := lists:usort([Node | Membership])};
29✔
263
update_known_membership(_Event, State) ->
264
    State.
1,035✔
265

266
record_node_down(Node) ->
267
    NS = #ns{
595✔
268
        node = Node,
269
        status = down,
270
        since = erlang:monotonic_time(millisecond)
271
    },
272
    case ets:lookup(?TAB_STATUS, Node) of
595✔
273
        [#ns{status = left}] ->
274
            %% Node is still marked as left, keep it that way.
275
            false;
9✔
276
        [#ns{status = down}] ->
277
            %% Duplicate.
278
            ?SLOG(warning, #{msg => record_node_down, node => NS}),
×
279
            ets:insert(?TAB_STATUS, NS);
×
280
        [] ->
281
            ?SLOG(warning, #{msg => record_node_down, node => NS}),
586✔
282
            ets:insert(?TAB_STATUS, NS)
586✔
283
    end.
284

285
record_node_left(Node) ->
286
    NS = #ns{
54✔
287
        node = Node,
288
        status = left,
289
        since = erlang:monotonic_time(millisecond)
290
    },
291
    ?SLOG(warning, #{msg => record_node_left, node => NS}),
54✔
292
    ets:insert(?TAB_STATUS, NS).
54✔
293

294
record_node_alive(Node) when Node == node() ->
295
    ok;
5,578✔
296
record_node_alive(Node) ->
297
    forget_node(Node).
2,765✔
298

299
forget_node(Node) ->
300
    ?SLOG(warning, #{msg => forget_node, node => Node}),
2,861✔
301
    ets:delete(?TAB_STATUS, Node).
2,861✔
302

303
lookup_node_status(Node) ->
304
    ets:lookup_element(?TAB_STATUS, Node, #ns.status, _Default = notfound).
128✔
305

306
lookup_node_reachable(Node) ->
307
    ets:lookup_element(?TAB_STATUS, Node, #ns.reachable, _Default = true).
295✔
308

309
handle_reconcile(State) ->
310
    TRef = schedule_task(reconcile, ?RECONCILE_INTERVAL),
5,646✔
311
    NState = State#{timer_reconcile := TRef},
5,646✔
312
    TS = erlang:monotonic_time(millisecond),
5,646✔
313
    schedule_purges(TS, reconcile(NState)).
5,646✔
314

315
debounce_reconcile(State = #{timer_reconcile := TRef}) ->
316
    ok = emqx_utils:cancel_timer(TRef),
1,096✔
317
    NTRef = schedule_task(reconcile, ?RECONCILE_TURBULENCE_DELAY),
1,096✔
318
    State#{timer_reconcile := NTRef}.
1,096✔
319

320
reconcile(State = #{last_membership := MembersLast}) ->
321
    ?SLOG(notice, #{msg => router_helper_reconcile, start => []}),
5,646✔
322
    %% 1. Find if there are discrepancies in membership.
323
    %% Missing core nodes must have been "force-left" from the cluster.
324
    %% On replicants, `cores()` may return `undefined` under severe connectivity loss.
325
    Members = emqx_maybe:define(cores(), MembersLast),
5,646✔
326
    ok = lists:foreach(fun(Node) -> record_node_left(Node) end, MembersLast -- Members),
5,646✔
327
    %% 2. Find out if there are (possibly) orphaned routes in the routing table.
328
    %% Mark them as down: they are most likely replicants.
329
    RunningNodes = mria:running_nodes(),
5,646✔
330
    RoutingNodes = list_routing_nodes(),
5,646✔
331
    OrphanNodes = [N || N <- RoutingNodes -- RunningNodes, lookup_node_status(N) == notfound],
5,634✔
332
    ok = lists:foreach(fun(Node) -> record_node_down(Node) end, OrphanNodes),
5,634✔
333
    %% 3. Avoid purging live nodes, if missed lifecycle events for whatever reason.
334
    %% This is a fallback mechanism. It's also possible a node "leave" that triggered
335
    %% this reconcile has not yet finished "leaving". Since it's extremely unlikely,
336
    %% do nothing special about it: once "leave" finally finishes, the node will either
337
    %% go down or be marked as left on the next reconcile.
338
    ok = lists:foreach(fun record_node_alive/1, RunningNodes),
5,634✔
339
    ?SLOG(notice, #{msg => router_helper_reconcile, stop => []}),
5,634✔
340
    State#{last_membership := Members}.
5,634✔
341

342
select(Status) ->
343
    MS = ets:fun2ms(fun(#ns{node = Node, status = S}) when S == Status -> Node end),
5,634✔
344
    ets:select(?TAB_STATUS, MS).
5,634✔
345

346
select_outdated(Status, Since0) ->
347
    MS = ets:fun2ms(
11,268✔
348
        fun(#ns{node = Node, status = S, since = Since}) when S == Status andalso Since < Since0 ->
349
            Node
350
        end
351
    ),
352
    ets:select(?TAB_STATUS, MS).
11,268✔
353

354
filter_replicants(Nodes, #{last_membership := Members}) ->
355
    [RN || RN <- Nodes, not lists:member(RN, Members)].
5,634✔
356

357
schedule_purges(TS, State) ->
358
    %% Safety measure: purge only dead replicants.
359
    %% Assuming a dead / offline core node should:
360
    %% 1. Either come back online, and potentially reboot itself when Mria autoheal
361
    %%    will kick in.
362
    %% 2. ...Or leave the cluster (be "force-left" to be precise), in which case
363
    %%    `reconcile/1` should notice a discrepancy and schedule a purge.
364
    ok = lists:foreach(
5,634✔
365
        fun(Node) -> schedule_purge(Node, {replicant_down_for, ?PURGE_DEAD_TIMEOUT}) end,
78✔
366
        filter_replicants(select_outdated(down, TS - ?PURGE_DEAD_TIMEOUT), State)
367
    ),
368
    %% Trigger purges for "force-left" nodes found during reconcile, if resposible.
369
    ok = lists:foreach(
5,634✔
370
        fun schedule_purge_left/1,
371
        select(left)
372
    ),
373
    %% Otherwise, purge nodes marked left for a while.
374
    ok = lists:foreach(
5,634✔
375
        fun(Node) -> schedule_purge(Node, left) end,
17✔
376
        select_outdated(left, TS - ?PURGE_LEFT_TIMEOUT)
377
    ),
378
    State.
5,634✔
379

380
schedule_purge(Node, Why) ->
381
    case am_core() of
95✔
382
        true ->
383
            schedule_task({purge, Node, Why}, 0);
79✔
384
        false ->
385
            false
16✔
386
    end.
387

388
schedule_purge_left(Node) ->
389
    case am_core() andalso pick_responsible({purge, Node}) of
39✔
390
        Responsible when Responsible == node() ->
391
            %% Schedule purge on responsible node first, to avoid racing for a global lock.
392
            schedule_task({purge, Node, left}, 0);
17✔
393
        _ ->
394
            %% Replicant / not responsible.
395
            %% In the latter case try to purge on the next reconcile, as a fallback.
396
            false
22✔
397
    end.
398

399
handle_purge(Node, Why, State) ->
400
    try purge_dead_node_trans(Node) of
96✔
401
        true ->
402
            ?tp(warning, router_node_routing_table_purged, #{
14✔
403
                node => Node,
404
                reason => Why,
405
                hint => "Ignore if the node in question went offline due to cluster maintenance"
406
            }),
407
            forget_node(Node);
14✔
408
        false ->
409
            ?tp(debug, router_node_purge_skipped, #{node => Node}),
82✔
410
            forget_node(Node);
82✔
411
        aborted ->
412
            ?tp(notice, router_node_purge_aborted, #{node => Node})
×
413
    catch
414
        Kind:Error ->
415
            ?tp(warning, router_node_purge_error, #{
×
416
                node => Node,
417
                kind => Kind,
418
                error => Error
419
            })
420
    end,
421
    State.
96✔
422

423
purge_dead_node(Node) ->
424
    case node_has_routes(Node) of
1,360✔
425
        true ->
426
            ok = cleanup_routes(Node),
3✔
427
            true;
3✔
428
        false ->
429
            false
1,357✔
430
    end.
431

432
purge_dead_node_trans(Node) ->
433
    StillKnown = lookup_node_status(Node) =/= notfound,
96✔
434
    case StillKnown andalso node_has_routes(Node) of
96✔
435
        true ->
436
            case cores() of
14✔
437
                Nodes = [_ | _] ->
438
                    global:trans(
14✔
439
                        {?LOCK(Node), self()},
440
                        fun() ->
441
                            ok = cleanup_routes(Node),
14✔
442
                            true
14✔
443
                        end,
444
                        Nodes,
445
                        _Retries = 3
446
                    );
447
                undefined ->
NEW
448
                    error(no_core_nodes)
×
449
            end;
450
        false ->
451
            false
82✔
452
    end.
453

454
cleanup_routes(Node) ->
455
    emqx_router:cleanup_routes(Node),
17✔
456
    remove_routing_node(Node).
17✔
457

458
%%
459

460
add_routing_node(Node) ->
461
    case ets:member(?ROUTING_NODE, Node) of
8,263✔
462
        true -> ok;
7,694✔
463
        false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
569✔
464
    end.
465

466
remove_routing_node(Node) ->
467
    mria:dirty_delete(?ROUTING_NODE, Node).
17✔
468

469
list_routing_nodes() ->
470
    ets:select(?ROUTING_NODE, ets:fun2ms(fun(#routing_node{name = N}) -> N end)).
5,646✔
471

472
node_has_routes(Node) ->
473
    ets:member(?ROUTING_NODE, Node).
1,455✔
474

475
%%
476

477
schedule_task(Task, Timeout) ->
478
    emqx_utils:start_timer(choose_timeout(Timeout), {start, Task}).
8,209✔
479

480
handle_task({purge, Node, Why}, State) ->
481
    handle_purge(Node, Why, State);
96✔
482
handle_task(reconcile, State) ->
483
    handle_reconcile(State).
5,646✔
484

485
choose_timeout({Baseline, '±', Jitter}) ->
486
    Baseline + rand:uniform(Jitter * 2) - Jitter;
5,646✔
487
choose_timeout(Baseline) ->
488
    Baseline.
2,563✔
489

490
%%
491

492
-spec am_core() -> boolean().
493
am_core() ->
494
    mria_config:whoami() =/= replicant.
134✔
495

496
-spec cores() -> [node()] | undefined.
497
cores() ->
498
    %% Include stopped nodes as well.
499
    try
7,031✔
500
        mria:cluster_nodes(cores)
7,031✔
501
    catch
502
        error:_Timeout ->
503
            undefined
×
504
    end.
505

506
-spec pick_responsible(_Task) -> node().
507
pick_responsible(Task) ->
508
    %% Pick a responsible core node.
509
    %% We expect the same node to be picked as responsible across the cluster (unless
510
    %% the cluster is highly turbulent).
511
    Nodes = lists:sort(mria_mnesia:running_nodes()),
18✔
512
    case length(Nodes) of
18✔
513
        0 -> node();
×
514
        N -> lists:nth(1 + erlang:phash2(Task, N), Nodes)
18✔
515
    end.
516

517
%%
518

519
stats_fun() ->
520
    PSRouteCount = persistent_route_count(),
5,164✔
521
    NonPSRouteCount = emqx_router:stats(n_routes),
5,164✔
522
    emqx_stats:setstat('topics.count', 'topics.max', PSRouteCount + NonPSRouteCount).
5,164✔
523

524
persistent_route_count() ->
525
    case emqx_persistent_message:is_persistence_enabled() of
5,164✔
526
        true ->
527
            emqx_persistent_session_ds_router:stats(n_routes);
361✔
528
        false ->
529
            0
4,803✔
530
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc