• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13326843742

14 Feb 2025 10:00AM UTC coverage: 82.402%. First build
13326843742

Pull #14696

github

web-flow
Merge a3f73ba84 into 43916f96a
Pull Request #14696: fix(dsraft): avoid contacting lost servers during membership changes

191 of 205 new or added lines in 4 files covered. (93.17%)

57504 of 69785 relevant lines covered (82.4%)

15306.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.44
/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
%% @doc Metadata storage for the builtin sharded database.
6
%%
7
%% Currently metadata is stored in mria; that's not ideal, but
8
%% eventually we'll replace it, so it's important not to leak
9
%% implementation details from this module.
10
-module(emqx_ds_replication_layer_meta).
11

12
-feature(maybe_expr, enable).
13
-compile(inline).
14

15
-behaviour(gen_server).
16

17
%% API:
18
-export([
19
    shards/1,
20
    my_shards/1,
21
    shard_info/2,
22
    allocate_shards/1,
23
    replica_set/2,
24
    sites/0,
25
    node/1,
26
    this_site/0,
27
    forget_site/1,
28
    print_status/0
29
]).
30

31
%% DB API:
32
-export([
33
    open_db/2,
34
    db_config/1,
35
    update_db_config/2,
36
    drop_db/1,
37
    dbs/0
38
]).
39

40
%% Site / shard allocation:
41
-export([
42
    join_db_site/2,
43
    leave_db_site/2,
44
    assign_db_sites/2,
45
    replica_set_transitions/2,
46
    claim_transition/3,
47
    update_replica_set/3,
48
    db_sites/1,
49
    target_set/2
50
]).
51

52
%% Subscriptions to changes:
53
-export([
54
    subscribe/2,
55
    unsubscribe/1
56
]).
57

58
%% gen_server
59
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
60

61
%% internal exports:
62
-export([
63
    open_db_trans/2,
64
    allocate_shards_trans/1,
65
    assign_db_sites_trans/2,
66
    modify_db_sites_trans/2,
67
    claim_transition_trans/3,
68
    update_replica_set_trans/3,
69
    update_db_config_trans/2,
70
    drop_db_trans/1,
71
    claim_site_trans/2,
72
    forget_site_trans/1,
73
    n_shards/1
74
]).
75

76
%% Migrations:
77
-export([
78
    migrate_node_table/0,
79
    migrate_shard_table/0,
80
    migrate_node_table_trans/1,
81
    migrate_shard_table_trans/1
82
]).
83

84
-export_type([
85
    site/0,
86
    transition/0,
87
    subscription_event/0,
88
    update_cluster_result/0
89
]).
90

91
-include_lib("stdlib/include/qlc.hrl").
92
-include_lib("stdlib/include/ms_transform.hrl").
93

94
%%================================================================================
95
%% Type declarations
96
%%================================================================================
97

98
-define(SERVER, ?MODULE).
99

100
-define(RLOG_SHARD, emqx_ds_builtin_metadata_shard).
101
%% DS database metadata:
102
-define(META_TAB, emqx_ds_builtin_metadata_tab).
103
%% Mapping from Site to the actual Erlang node:
104
-define(NODE_TAB, emqx_ds_builtin_node_tab2).
105
-define(NODE_TAB_LEGACY, emqx_ds_builtin_node_tab).
106
%% Shard metadata:
107
-define(SHARD_TAB, emqx_ds_builtin_shard_tab2).
108
-define(SHARD_TAB_LEGACY, emqx_ds_builtin_shard_tab).
109
%% Membership transitions:
110
-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab).
111

112
-record(?META_TAB, {
113
    db :: emqx_ds:db(),
114
    db_props :: emqx_ds_replication_layer:builtin_db_opts()
115
}).
116

117
-record(?NODE_TAB, {
118
    site :: site(),
119
    node :: node(),
120
    misc = #{} :: map()
121
}).
122

123
-record(?SHARD_TAB, {
124
    shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
125
    %% Sites that currently contain the data:
126
    replica_set :: [site()],
127
    %% Sites that should contain the data when the cluster is in the
128
    %% stable state (no nodes are being added or removed from it):
129
    target_set :: [site()] | undefined,
130
    misc = #{} :: map()
131
}).
132

133
-record(?TRANSITION_TAB, {
134
    shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
135
    transition :: transition(),
136
    misc = #{} :: map()
137
}).
138

139
%% Persistent ID of the node (independent from the IP/FQDN):
140
-type site() :: binary().
141

142
%% Membership transition of shard's replica set:
143
-type transition() :: {add | del, site()}.
144

145
-type update_cluster_result() ::
146
    {ok, unchanged | [site()]}
147
    | {error, {nonexistent_db, emqx_ds:db()}}
148
    | {error, {nonexistent_sites, [site()]}}
149
    | {error, {too_few_sites, [site()]}}
150
    | {error, _}.
151

152
%% Subject of the subscription:
153
-type subject() :: emqx_ds:db().
154

155
%% Event for the subscription:
156
-type subscription_event() ::
157
    {changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}.
158

159
%% Peristent term key:
160
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
161

162
%% Make Dialyzer happy
163
-define(NODE_PAT(),
164
    %% Equivalent of `#?NODE_TAB{_ = '_'}`:
165
    erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
166
).
167

168
-define(NODE_PAT(NODE),
169
    %% Equivalent of `#?NODE_TAB{node = NODE, _ = '_'}`:
170
    erlang:make_tuple(record_info(size, ?NODE_TAB), '_', [{#?NODE_TAB.node, NODE}])
171
).
172

173
-define(SHARD_PAT(SHARD),
174
    %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
175
    erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
176
).
177

178
-define(TRANSITION_PAT(SHARD),
179
    %% Equivalent of `#?TRANSITION_TAB{shard = SHARD, _ = '_'}`
180
    erlang:make_tuple(record_info(size, ?TRANSITION_TAB), '_', [{#?TRANSITION_TAB.shard, SHARD}])
181
).
182

183
%%================================================================================
184
%% API functions
185
%%================================================================================
186

187
-spec this_site() -> site().
188
this_site() ->
189
    persistent_term:get(?emqx_ds_builtin_site).
6,403✔
190

191
-spec n_shards(emqx_ds:db()) -> pos_integer().
192
n_shards(DB) ->
193
    [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB),
1✔
194
    NShards.
1✔
195

196
-spec start_link() -> {ok, pid()}.
197
start_link() ->
198
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
855✔
199

200
-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
201
shards(DB) ->
202
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
1,076✔
203
    [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
1,071✔
204

205
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
206
    #{replica_set := #{site() => #{status => up | down}}}
207
    | undefined.
208
shard_info(DB, Shard) ->
209
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
32✔
210
        [] ->
211
            undefined;
×
212
        [#?SHARD_TAB{replica_set = Replicas}] ->
213
            ReplicaSet = maps:from_list([
32✔
214
                begin
215
                    Status =
32✔
216
                        case mria:cluster_status(?MODULE:node(I)) of
217
                            running -> up;
32✔
218
                            stopped -> down;
×
219
                            false -> down
×
220
                        end,
221
                    ReplInfo = #{status => Status},
32✔
222
                    {I, ReplInfo}
32✔
223
                end
224
             || I <- Replicas
32✔
225
            ]),
226
            #{replica_set => ReplicaSet}
32✔
227
    end.
228

229
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
230
my_shards(DB) ->
231
    Site = this_site(),
195✔
232
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
195✔
233
    [Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)].
195✔
234

235
allocate_shards(DB) ->
236
    case mria:transaction(?RLOG_SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
135✔
237
        {atomic, Shards} ->
238
            {ok, Shards};
71✔
239
        {aborted, {shards_already_allocated, Shards}} ->
240
            {ok, Shards};
64✔
241
        {aborted, {insufficient_sites_online, Needed, Sites}} ->
242
            {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
×
243
    end.
244

245
-spec sites() -> [site()].
246
sites() ->
247
    [R#?NODE_TAB.site || R <- mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT())].
5✔
248

249
-spec node(site()) -> node() | undefined.
250
node(Site) ->
251
    case mnesia:dirty_read(?NODE_TAB, Site) of
17,336✔
252
        [#?NODE_TAB{node = Node}] ->
253
            Node;
17,336✔
254
        [] ->
255
            undefined
×
256
    end.
257

258
-spec forget_site(site()) -> ok | {error, _Reason}.
259
forget_site(Site) ->
260
    maybe
4✔
261
        [Record] ?= mnesia:dirty_read(?NODE_TAB, Site),
4✔
262
        false ?= node_status(Record#?NODE_TAB.node),
4✔
263
        %% Node is lost, proceed.
264
        transaction(fun ?MODULE:forget_site_trans/1, [Record])
4✔
265
    else
266
        [] ->
267
            {error, nonexistent_site};
×
268
        running ->
NEW
269
            {error, site_online};
×
270
        stopped ->
271
            %% Node is stopped, reject the request.
272
            %% If it's gone, it should leave the cluster first.
NEW
273
            {error, site_temporarily_offline}
×
274
    end.
275

276
%%===============================================================================
277

278
-spec print_status() -> ok.
279
print_status() ->
280
    %% TODO: Consistent view of state.
281
    Nodes = all_nodes(),
8✔
282
    Shards = all_shards(),
8✔
283
    Transitions = all_transitions(),
8✔
284
    print_status(Nodes, Shards, Transitions).
8✔
285

286
print_status(Nodes, Shards, Transitions) ->
287
    PendingTransitions = lists:filtermap(
8✔
288
        fun(Record = #?SHARD_TAB{shard = DBShard}) ->
289
            ClaimedTs = [T || T = #?TRANSITION_TAB{shard = S} <- Transitions, S == DBShard],
32✔
290
            case compute_transitions(Record, ClaimedTs) of
32✔
291
                [] -> false;
24✔
292
                ShardTransitions -> {true, {DBShard, ShardTransitions}}
8✔
293
            end
294
        end,
295
        Shards
296
    ),
297
    %% This site
298
    io:format("THIS SITE:~n"),
8✔
299
    try this_site() of
8✔
300
        Site -> io:format("~s~n", [Site])
8✔
301
    catch
302
        error:badarg ->
NEW
303
            io:format(
×
304
                "(!) UNCLAIMED~n"
305
                "(!) Likely this node's name is already known as another site in the cluster.~n"
306
                "(!) Please resolve conflicts manually.~n"
307
            )
308
    end,
309
    %% Sites information
310
    io:format("~nSITES:~n", []),
8✔
311
    print_table(
8✔
312
        ["Site", "Node", "Status"],
313
        [
314
            [Site, Node, format_node_status(node_status(Node))]
24✔
315
         || #?NODE_TAB{site = Site, node = Node} <- Nodes
8✔
316
        ]
317
    ),
318
    NodesLost = [Node || #?NODE_TAB{node = Node} <- Nodes, node_status(Node) == false],
8✔
319
    NodesLost =/= [] andalso
8✔
320
        io:format(
6✔
321
            "(!) ATTENTION~n"
322
            "(!) One or more sites are lost, replicas under their ownership are gone.~n"
323
            "(!) Availability may be compromised.~n"
324
            "(!) Please take actions to bring the cluster back to healthy state.~n"
325
        ),
326
    %% Shards information
327
    io:format("~nSHARDS:~n"),
8✔
328
    print_table(
8✔
329
        ["DB/Shard", "Replicas", "Transitions"],
330
        [
331
            [
32✔
332
                format_shard(DBShard),
333
                {group, [
334
                    {subcolumns, [
335
                        format_replicas(RS, Nodes), format_transitions(ShardTransitions)
336
                    ]}
337
                ]}
338
            ]
339
         || #?SHARD_TAB{shard = DBShard, replica_set = RS} <- Shards,
8✔
340
            ShardTransitions <- [proplists:get_value(DBShard, PendingTransitions, [])]
32✔
341
        ]
342
    ),
343
    TransitionsStuck = [
8✔
344
        DBShard
16✔
345
     || #?SHARD_TAB{shard = DBShard, replica_set = RS} <- Shards,
8✔
346
        RSLost <- [[Site || Site <- RS, site_status(Site, Nodes) == false]],
16✔
347
        length(RSLost) * 2 >= length(RS)
32✔
348
    ],
349
    TransitionsStuck =/= [] andalso
8✔
350
        io:format(
4✔
351
            "(!) ATTENTION~n"
352
            "(!) One or more shards have replica set where majority of replicas are gone.~n"
353
            "(!) Membership changes are compromised, pending transitions may never finish.~n"
354
            "(!) Please take necessary steps to deal with lost sites.~n"
355
            "(!) Prepare for the possibility of data loss.~n"
356
        ),
357
    ok.
8✔
358

359
format_shard({DB, Shard}) ->
360
    io_lib:format("~p/~s", [DB, Shard]).
32✔
361

362
format_replicas(RS, Nodes) ->
363
    [format_replica(R, Nodes) || R <- RS].
32✔
364

365
format_replica(Site, Nodes) ->
366
    [Site, format_node_marker(site_status(Site, Nodes))].
40✔
367

368
site_status(Site, Nodes) ->
369
    [Node] = [N || #?NODE_TAB{site = S, node = N} <- Nodes, S == Site],
80✔
370
    node_status(Node).
80✔
371

372
format_transitions(Transitions) ->
373
    [format_transition(T) || T <- Transitions].
32✔
374

375
format_transition({add, Site}) ->
376
    ["+ ", Site];
16✔
377
format_transition({del, Site}) ->
378
    ["- ", Site].
8✔
379

380
format_node_status(Status) ->
381
    case Status of
24✔
382
        running -> "    up";
18✔
NEW
383
        stopped -> "(x) down";
×
384
        false -> "(!) LOST"
6✔
385
    end.
386

387
format_node_marker(Status) ->
388
    case Status of
40✔
389
        running -> "";
24✔
NEW
390
        stopped -> " (x)";
×
391
        false -> " (!)"
16✔
392
    end.
393

394
node_status(Node) ->
395
    mria:cluster_status(Node).
132✔
396

397
print_table(Header, Rows) ->
398
    io:put_chars(emqx_utils_fmt:table(Header, Rows)).
16✔
399

400
%%===============================================================================
401
%% DB API
402
%%===============================================================================
403

404
-spec db_config(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts() | #{}.
405
db_config(DB) ->
406
    case mnesia:dirty_read(?META_TAB, DB) of
179,934✔
407
        [#?META_TAB{db_props = Opts}] ->
408
            Opts;
179,932✔
409
        [] ->
410
            #{}
2✔
411
    end.
412

413
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
414
    emqx_ds_replication_layer:builtin_db_opts().
415
open_db(DB, DefaultOpts) ->
416
    transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]).
135✔
417

418
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
419
    emqx_ds_replication_layer:builtin_db_opts() | {error, nonexistent_db}.
420
update_db_config(DB, DefaultOpts) ->
421
    transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]).
×
422

423
-spec drop_db(emqx_ds:db()) -> ok.
424
drop_db(DB) ->
425
    transaction(fun ?MODULE:drop_db_trans/1, [DB]).
2✔
426

427
-spec dbs() -> [emqx_ds:db()].
428
dbs() ->
429
    mnesia:dirty_all_keys(?META_TAB).
2✔
430

431
%%===============================================================================
432
%% Site / shard allocation API
433
%%===============================================================================
434

435
%% @doc Join a site to the set of sites the DB is replicated across.
436
-spec join_db_site(emqx_ds:db(), site()) -> update_cluster_result().
437
join_db_site(DB, Site) ->
438
    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]).
22✔
439

440
%% @doc Make a site leave the set of sites the DB is replicated across.
441
-spec leave_db_site(emqx_ds:db(), site()) -> update_cluster_result().
442
leave_db_site(DB, Site) ->
443
    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]).
24✔
444

445
%% @doc Assign a set of sites to the DB for replication.
446
-spec assign_db_sites(emqx_ds:db(), [site()]) -> update_cluster_result().
447
assign_db_sites(DB, Sites) ->
448
    transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]).
10✔
449

450
%% @doc List the sites the DB is replicated across.
451
-spec db_sites(emqx_ds:db()) -> [site()].
452
db_sites(DB) ->
453
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
19✔
454
    list_db_sites(Recs).
19✔
455

456
%% @doc List the sequence of transitions that should be conducted in order to
457
%% bring the set of replicas for a DB shard in line with the target set.
458
-spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
459
    [transition()] | undefined.
460
replica_set_transitions(DB, Shard) ->
461
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
9,823✔
462
        [Record] ->
463
            PendingTransitions = mnesia:dirty_read(?TRANSITION_TAB, {DB, Shard}),
9,823✔
464
            compute_transitions(Record, PendingTransitions);
9,823✔
465
        [] ->
466
            undefined
×
467
    end.
468

469
%% @doc Claim the intention to start the replica set transition for the given shard.
470
%% To be called before starting acting on transition, so that information about this
471
%% will not get lost. Once it finishes, call `update_replica_set/3`.
472
-spec claim_transition(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) ->
473
    ok | {error, {conflict, transition()} | {outdated, _Expected :: [transition()]}}.
474
claim_transition(DB, Shard, Trans) ->
475
    transaction(fun ?MODULE:claim_transition_trans/3, [DB, Shard, Trans]).
612✔
476

477
%% @doc Update the set of replication sites for a shard.
478
%% To be called after a `transition()` has been conducted successfully.
479
-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
480
update_replica_set(DB, Shard, Trans) ->
481
    transaction(fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]).
584✔
482

483
%% @doc Get the current set of replication sites for a shard.
484
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
485
    [site()] | undefined.
486
replica_set(DB, Shard) ->
487
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
8,276✔
488
        [#?SHARD_TAB{replica_set = ReplicaSet}] ->
489
            ReplicaSet;
8,276✔
490
        [] ->
491
            undefined
×
492
    end.
493

494
%% @doc Get the target set of replication sites for a DB shard.
495
%% Target set is updated every time the set of replication sites for the DB changes.
496
%% See `join_db_site/2`, `leave_db_site/2`, `assign_db_sites/2`.
497
-spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
498
    [site()] | undefined.
499
target_set(DB, Shard) ->
500
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
×
501
        [#?SHARD_TAB{target_set = TargetSet}] ->
502
            TargetSet;
×
503
        [] ->
504
            undefined
×
505
    end.
506

507
%%================================================================================
508

509
subscribe(Pid, Subject) ->
510
    gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity).
135✔
511

512
unsubscribe(Pid) ->
513
    gen_server:call(?SERVER, {unsubscribe, Pid}, infinity).
33✔
514

515
%%================================================================================
516
%% behavior callbacks
517
%%================================================================================
518

519
-record(s, {
520
    subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}}
521
}).
522

523
init([]) ->
524
    process_flag(trap_exit, true),
855✔
525
    logger:set_process_metadata(#{domain => [ds, meta]}),
855✔
526
    ok = ekka:monitor(membership),
855✔
527
    ensure_tables(),
855✔
528
    run_migrations(),
855✔
529
    ensure_site(),
855✔
530
    S = #s{},
855✔
531
    {ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}),
855✔
532
    {ok, S}.
855✔
533

534
handle_call({subscribe, Pid, Subject}, _From, S) ->
535
    {reply, ok, handle_subscribe(Pid, Subject, S)};
135✔
536
handle_call({unsubscribe, Pid}, _From, S) ->
537
    {reply, ok, handle_unsubscribe(Pid, S)};
33✔
538
handle_call(_Call, _From, S) ->
539
    {reply, {error, unknown_call}, S}.
×
540

541
handle_cast(_Cast, S) ->
542
    {noreply, S}.
×
543

544
handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) ->
545
    ok = notify_subscribers(DB, {shard, DB, Shard}, S),
4,760✔
546
    {noreply, S};
4,760✔
547
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
548
    {noreply, handle_unsubscribe(Pid, S)};
×
549
handle_info({membership, {node, leaving, Node}}, S) ->
550
    forget_node(Node),
17✔
551
    {noreply, S};
17✔
552
handle_info(_Info, S) ->
553
    {noreply, S}.
935✔
554

555
terminate(_Reason, #s{}) ->
556
    persistent_term:erase(?emqx_ds_builtin_site),
472✔
557
    ok.
472✔
558

559
%%================================================================================
560
%% Internal exports
561
%%================================================================================
562

563
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
564
    emqx_ds_replication_layer:builtin_db_opts().
565
open_db_trans(DB, CreateOpts) ->
566
    case mnesia:wread({?META_TAB, DB}) of
168✔
567
        [] ->
568
            mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
71✔
569
            CreateOpts;
71✔
570
        [#?META_TAB{db_props = Opts}] ->
571
            case maps:merge(CreateOpts, Opts) of
64✔
572
                Opts ->
573
                    ok;
64✔
574
                UpdatedOpts ->
575
                    %% NOTE
576
                    %% Preserve any new options not yet present in the DB. This is
577
                    %% most likely because `Opts` is outdated, written by earlier
578
                    %% EMQX version.
579
                    mnesia:write(#?META_TAB{db = DB, db_props = UpdatedOpts})
×
580
            end,
581
            Opts
64✔
582
    end.
583

584
-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
585
allocate_shards_trans(DB) ->
586
    Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
136✔
587
    case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
136✔
588
        [] ->
589
            ok;
71✔
590
        Records ->
591
            ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
64✔
592
            mnesia:abort({shards_already_allocated, ShardsAllocated})
64✔
593
    end,
594
    Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
71✔
595
    case length(Nodes) of
71✔
596
        N when N >= NSites ->
597
            ok;
71✔
598
        _ ->
599
            mnesia:abort({insufficient_sites_online, NSites, Nodes})
×
600
    end,
601
    Shards = gen_shards(NShards),
71✔
602
    Sites = [S || #?NODE_TAB{site = S} <- Nodes],
71✔
603
    Allocation = compute_allocation(Shards, Sites, Opts),
71✔
604
    lists:map(
71✔
605
        fun({Shard, ReplicaSet}) ->
606
            Record = #?SHARD_TAB{
840✔
607
                shard = {DB, Shard},
608
                replica_set = ReplicaSet
609
            },
610
            ok = mnesia:write(Record),
840✔
611
            Shard
840✔
612
        end,
613
        Allocation
614
    ).
615

616
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> {ok, [site()]}.
617
assign_db_sites_trans(DB, Sites) ->
618
    Opts = db_config_trans(DB),
46✔
619
    case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
44✔
620
        [] when length(Sites) == 0 ->
621
            mnesia:abort({too_few_sites, Sites});
3✔
622
        [] ->
623
            ok;
37✔
624
        NonexistentSites ->
625
            mnesia:abort({nonexistent_sites, NonexistentSites})
4✔
626
    end,
627
    %% TODO
628
    %% Optimize reallocation. The goals are:
629
    %% 1. Minimize the number of membership transitions.
630
    %% 2. Ensure that sites are responsible for roughly the same number of shards.
631
    Shards = db_shards_trans(DB),
37✔
632
    Reallocation = compute_allocation(Shards, Sites, Opts),
37✔
633
    ok = lists:foreach(
37✔
634
        fun({Record, ReplicaSet}) ->
635
            ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
552✔
636
        end,
637
        Reallocation
638
    ),
639
    {ok, Sites}.
37✔
640

641
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
642
modify_db_sites_trans(DB, Modifications) ->
643
    Shards = db_shards_trans(DB),
52✔
644
    Sites0 = list_db_target_sites(Shards),
46✔
645
    Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
46✔
646
    case Sites of
46✔
647
        Sites0 ->
648
            {ok, unchanged};
10✔
649
        _Changed ->
650
            assign_db_sites_trans(DB, Sites)
36✔
651
    end.
652

653
claim_transition_trans(DB, Shard, Trans) ->
654
    ShardRecord =
657✔
655
        case mnesia:read(?SHARD_TAB, {DB, Shard}, read) of
656
            [Record] ->
657
                Record;
637✔
658
            [] ->
659
                mnesia:abort({nonexistent_shard, {DB, Shard}})
×
660
        end,
661
    case shard_transition_trans(ShardRecord) of
637✔
662
        [#?TRANSITION_TAB{transition = Trans}] ->
663
            ok;
19✔
664
        [#?TRANSITION_TAB{transition = Conflict}] ->
665
            mnesia:abort({conflict, Conflict});
3✔
666
        [] ->
667
            case compute_transitions(ShardRecord) of
590✔
668
                [Trans | _] ->
669
                    mnesia:write(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans});
570✔
670
                Expected ->
671
                    mnesia:abort({outdated, Expected})
20✔
672
            end
673
    end.
674

675
update_replica_set_trans(DB, Shard, Trans) ->
676
    case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
587✔
677
        [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
678
            %% NOTE
679
            %% It's possible to complete a transition that's no longer planned. We
680
            %% should anticipate that we may stray _away_ from the target set.
681
            TargetSet1 = emqx_maybe:define(TargetSet0, ReplicaSet0),
584✔
682
            ReplicaSet = apply_transition(Trans, ReplicaSet0),
584✔
683
            case lists:usort(TargetSet1) of
584✔
684
                ReplicaSet ->
685
                    TargetSet = undefined;
308✔
686
                TS ->
687
                    TargetSet = TS
276✔
688
            end,
689
            %% NOTE: Not enforcing existence on that level, makes little sense.
690
            mnesia:delete_object(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}),
584✔
691
            mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
584✔
692
        [] ->
693
            mnesia:abort({nonexistent_shard, {DB, Shard}})
×
694
    end.
695

696
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
697
    emqx_ds_replication_layer:builtin_db_opts().
698
update_db_config_trans(DB, UpdateOpts) ->
699
    Opts = db_config_trans(DB, write),
×
700
    %% Since this is an update and not a reopen,
701
    %% we should keep the shard number and replication factor
702
    %% and not create a new shard server
703
    ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
×
704
    EffectiveOpts = maps:merge(Opts, ChangeableOpts),
×
705
    ok = mnesia:write(#?META_TAB{
×
706
        db = DB,
707
        db_props = EffectiveOpts
708
    }),
709
    EffectiveOpts.
×
710

711
-spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
712
db_config_trans(DB) ->
713
    db_config_trans(DB, read).
182✔
714

715
db_config_trans(DB, LockType) ->
716
    case mnesia:read(?META_TAB, DB, LockType) of
182✔
717
        [#?META_TAB{db_props = Config}] ->
718
            Config;
180✔
719
        [] ->
720
            mnesia:abort({nonexistent_db, DB})
2✔
721
    end.
722

723
db_shards_trans(DB) ->
724
    mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write).
97✔
725

726
shard_transition_trans(#?SHARD_TAB{shard = DBShard}) ->
727
    mnesia:read(?TRANSITION_TAB, DBShard, write).
653✔
728

729
-spec drop_db_trans(emqx_ds:db()) -> ok.
730
drop_db_trans(DB) ->
731
    mnesia:delete({?META_TAB, DB}),
2✔
732
    [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
2✔
733
    ok.
2✔
734

735
-spec claim_site_trans(site(), node()) -> ok.
736
claim_site_trans(Site, Node) ->
737
    case node_sites(Node) of
855✔
738
        [] ->
739
            mnesia:write(#?NODE_TAB{site = Site, node = Node});
831✔
740
        [#?NODE_TAB{site = Site}] ->
741
            ok;
24✔
742
        Records ->
743
            ExistingSites = [S || #?NODE_TAB{site = S} <- Records],
×
744
            mnesia:abort({conflicting_node_site, ExistingSites})
×
745
    end.
746

747
-spec forget_site_trans(_Record :: tuple()) -> ok.
748
forget_site_trans(Record = #?NODE_TAB{site = Site}) ->
749
    %% Safeguards.
750
    DBs = mnesia:all_keys(?META_TAB),
22✔
751
    %% 1. Compute which DBs has this site as a replica for any of the shards.
752
    SiteDBs = lists:usort([DB || DB <- DBs, S <- list_db_sites(db_shards_trans(DB)), S == Site]),
22✔
753
    %% 2. Compute which DBs has this site in a membership transition, for any of the shards.
754
    SiteTargetDBs = lists:usort([
22✔
NEW
755
        DB
×
756
     || DB <- DBs,
22✔
757
        ShardRecord <- db_shards_trans(DB),
4✔
758
        {_, S} <- compute_transitions(ShardRecord, shard_transition_trans(ShardRecord)),
16✔
NEW
759
        S == Site
×
760
    ]),
761
    case SiteDBs of
22✔
762
        [] when SiteTargetDBs == [] ->
763
            Safeguard = ok;
20✔
764
        [_ | _] ->
765
            Safeguard = {member_of_replica_sets, SiteDBs};
2✔
766
        [] ->
NEW
767
            Safeguard = {member_of_target_sets, SiteTargetDBs}
×
768
    end,
769
    case Safeguard of
22✔
770
        ok ->
771
            mnesia:delete_object(?NODE_TAB, Record, write);
20✔
772
        _Otherwise ->
773
            mnesia:abort(Safeguard)
2✔
774
    end.
775

776
node_sites(Node) ->
777
    mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT(Node)).
872✔
778

779
all_nodes() ->
780
    mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT()).
8✔
781

782
all_shards() ->
783
    mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT('_')).
8✔
784

785
all_transitions() ->
786
    mnesia:dirty_match_object(?TRANSITION_TAB, ?TRANSITION_PAT('_')).
8✔
787

788
%%================================================================================
789
%% Internal functions
790
%%================================================================================
791

792
ensure_tables() ->
793
    ok = mria:create_table(?META_TAB, [
855✔
794
        {rlog_shard, ?RLOG_SHARD},
795
        {type, ordered_set},
796
        {storage, disc_copies},
797
        {record_name, ?META_TAB},
798
        {attributes, record_info(fields, ?META_TAB)}
799
    ]),
800
    ok = mria:create_table(?NODE_TAB, [
855✔
801
        {rlog_shard, ?RLOG_SHARD},
802
        {type, ordered_set},
803
        {storage, disc_copies},
804
        {record_name, ?NODE_TAB},
805
        {attributes, record_info(fields, ?NODE_TAB)}
806
    ]),
807
    ok = mria:create_table(?SHARD_TAB, [
855✔
808
        {rlog_shard, ?RLOG_SHARD},
809
        {type, ordered_set},
810
        {storage, disc_copies},
811
        {record_name, ?SHARD_TAB},
812
        {attributes, record_info(fields, ?SHARD_TAB)}
813
    ]),
814
    ok = mria:create_table(?TRANSITION_TAB, [
855✔
815
        {rlog_shard, ?RLOG_SHARD},
816
        {type, bag},
817
        {storage, disc_copies},
818
        {record_name, ?TRANSITION_TAB},
819
        {attributes, record_info(fields, ?TRANSITION_TAB)}
820
    ]),
821
    ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
855✔
822

823
run_migrations() ->
824
    run_migrations(emqx_release:version()).
855✔
825

826
run_migrations(_Version = "5.8." ++ _) ->
827
    run_migrations_e58().
855✔
828

829
ensure_site() ->
830
    Filename = filename:join(emqx_ds_storage_layer:base_dir(), "emqx_ds_builtin_site.eterm"),
855✔
831
    case file_read_term(Filename) of
855✔
832
        {ok, Entry} ->
833
            Site = migrate_site_id(Entry);
24✔
834
        {error, Error} when Error =:= enoent; Error =:= empty ->
835
            Site = binary:encode_hex(crypto:strong_rand_bytes(8)),
831✔
836
            logger:notice("Creating a new site with ID=~s", [Site]),
831✔
837
            ok = filelib:ensure_dir(Filename),
831✔
838
            {ok, FD} = file:open(Filename, [write]),
831✔
839
            io:format(FD, "~p.", [Site]),
831✔
840
            file:close(FD)
831✔
841
    end,
842
    case transaction(fun ?MODULE:claim_site_trans/2, [Site, node()]) of
855✔
843
        ok ->
844
            persistent_term:put(?emqx_ds_builtin_site, Site);
855✔
845
        {error, Reason} ->
846
            logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason])
×
847
    end.
848

849
file_read_term(Filename) ->
850
    %% NOTE
851
    %% This mess is needed because `file:consult/1` trips over binaries encoded as
852
    %% latin1, which 5.4.0 code could have produced with `io:format(FD, "~p.", [Site])`.
853
    maybe
855✔
854
        {ok, FD} ?= file:open(Filename, [read, {encoding, latin1}]),
855✔
855
        {ok, Term, _} ?= io:read(FD, '', _Line = 1),
24✔
856
        ok = file:close(FD),
24✔
857
        {ok, Term}
24✔
858
    else
859
        {error, Reason} ->
860
            {error, Reason};
831✔
861
        {error, Reason, _} ->
862
            {error, Reason};
×
863
        {eof, _} ->
864
            {error, empty}
×
865
    end.
866

867
migrate_site_id(Site) ->
868
    case re:run(Site, "^[0-9A-F]+$") of
24✔
869
        {match, _} ->
870
            Site;
24✔
871
        nomatch ->
872
            binary:encode_hex(Site)
×
873
    end.
874

875
forget_node(Node) ->
876
    Sites = node_sites(Node),
17✔
877
    Result = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
17✔
878
    case Result of
17✔
879
        Ok when is_list(Ok) ->
880
            ok;
17✔
881
        {error, Reason} ->
882
            logger:error("Failed to forget leaving node ~p: ~p", [Node, Reason])
×
883
    end.
884

885
%% @doc Returns sorted list of sites shards are replicated across.
886
-spec list_db_sites([_Shard]) -> [site()].
887
list_db_sites(Shards) ->
888
    flatmap_sorted_set(fun get_shard_sites/1, Shards).
23✔
889

890
-spec list_db_target_sites([_Shard]) -> [site()].
891
list_db_target_sites(Shards) ->
892
    flatmap_sorted_set(fun get_shard_target_sites/1, Shards).
46✔
893

894
-spec get_shard_sites(_Shard) -> [site()].
895
get_shard_sites(#?SHARD_TAB{replica_set = ReplicaSet}) ->
896
    ReplicaSet.
705✔
897

898
-spec get_shard_target_sites(_Shard) -> [site()].
899
get_shard_target_sites(#?SHARD_TAB{target_set = Sites}) when is_list(Sites) ->
900
    Sites;
263✔
901
get_shard_target_sites(#?SHARD_TAB{target_set = undefined} = Shard) ->
902
    get_shard_sites(Shard).
425✔
903

904
-spec compute_allocation([Shard], [Site], emqx_ds_replication_layer:builtin_db_opts()) ->
905
    [{Shard, [Site, ...]}].
906
compute_allocation(Shards, Sites, Opts) ->
907
    NSites = length(Sites),
108✔
908
    ReplicationFactor = maps:get(replication_factor, Opts),
108✔
909
    NReplicas = min(NSites, ReplicationFactor),
108✔
910
    ShardsSorted = lists:sort(Shards),
108✔
911
    SitesSorted = lists:sort(Sites),
108✔
912
    {Allocation, _} = lists:mapfoldl(
108✔
913
        fun(Shard, SSites) ->
914
            {ReplicaSet, _} = emqx_utils_stream:consume(NReplicas, SSites),
1,392✔
915
            {_, SRest} = emqx_utils_stream:consume(1, SSites),
1,392✔
916
            {{Shard, ReplicaSet}, SRest}
1,392✔
917
        end,
918
        emqx_utils_stream:repeat(emqx_utils_stream:list(SitesSorted)),
919
        ShardsSorted
920
    ),
921
    Allocation.
108✔
922

923
compute_transitions(Shard, []) ->
924
    compute_transitions(Shard);
8,404✔
925
compute_transitions(Shard, [#?TRANSITION_TAB{transition = Trans}]) ->
926
    [Trans | lists:delete(Trans, compute_transitions(Shard))].
1,467✔
927

928
compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
929
    do_compute_transitions(TargetSet, ReplicaSet).
10,461✔
930

931
do_compute_transitions(undefined, _ReplicaSet) ->
932
    [];
5,824✔
933
do_compute_transitions(TargetSet, ReplicaSet) ->
934
    Additions = TargetSet -- ReplicaSet,
4,637✔
935
    Deletions = ReplicaSet -- TargetSet,
4,637✔
936
    intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).
4,637✔
937

938
%% @doc Apply a transition to a list of sites, preserving sort order.
939
-spec apply_transition(transition(), [site()]) -> [site()].
940
apply_transition({add, S}, Sites) ->
941
    lists:usort([S | Sites]);
271✔
942
apply_transition({del, S}, Sites) ->
943
    lists:delete(S, Sites).
359✔
944

945
gen_shards(NShards) ->
946
    [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)].
71✔
947

948
transaction(Fun, Args) ->
949
    case mria:transaction(?RLOG_SHARD, Fun, Args) of
2,265✔
950
        {atomic, Result} ->
951
            Result;
2,231✔
952
        {aborted, Reason} ->
953
            {error, Reason}
34✔
954
    end.
955

956
%%====================================================================
957

958
handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) ->
959
    case maps:is_key(Pid, Subs0) of
135✔
960
        false ->
961
            MRef = erlang:monitor(process, Pid),
135✔
962
            Subs = Subs0#{Pid => {Subject, MRef}},
135✔
963
            S#s{subs = Subs};
135✔
964
        true ->
965
            S
×
966
    end.
967

968
handle_unsubscribe(Pid, S = #s{subs = Subs0}) ->
969
    case maps:take(Pid, Subs0) of
33✔
970
        {{_Subject, MRef}, Subs} ->
971
            _ = erlang:demonitor(MRef, [flush]),
33✔
972
            S#s{subs = Subs};
33✔
973
        error ->
974
            S
×
975
    end.
976

977
notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
978
    maps:foreach(
4,760✔
979
        fun(Pid, {Subject, _MRef}) ->
980
            Subject == EventSubject andalso
3,545✔
981
                erlang:send(Pid, {changed, Event})
3,493✔
982
        end,
983
        Subs
984
    ).
985

986
%%====================================================================
987
%% Migrations / 5.8 Release
988
%%====================================================================
989

990
run_migrations_e58() ->
991
    _ = migrate_node_table(),
855✔
992
    _ = migrate_shard_table(),
855✔
993
    ok.
855✔
994

995
migrate_node_table() ->
996
    Tab = ?NODE_TAB_LEGACY,
855✔
997
    migrate_node_table(Tab, table_info_safe(Tab)).
855✔
998

999
migrate_node_table(Tab, #{attributes := [_Site, _Node, _Misc]}) ->
1000
    %% Table is present and looks migratable.
1001
    ok = mria:wait_for_tables([Tab]),
×
1002
    case transaction(fun ?MODULE:migrate_node_table_trans/1, [Tab]) of
×
1003
        {migrated, [], []} ->
1004
            ok;
×
1005
        {migrated, Migrated, Dups} ->
1006
            logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
×
1007
            Dups =/= [] andalso
×
1008
                logger:warning("Table '~p' duplicated entries, skipped: ~p", [Tab, Dups]),
×
1009
            {atomic, ok} = mria:clear_table(Tab);
×
1010
        {error, Reason} ->
1011
            logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
×
1012
    end;
1013
migrate_node_table(_Tab, undefined) ->
1014
    %% No legacy table exists.
1015
    ok.
855✔
1016

1017
migrate_node_table_trans(Tab) ->
1018
    %% NOTE
1019
    %% This table could have been populated when running 5.4.0 release, but the
1020
    %% representation of site IDs has changed in following versions. Legacy site IDs
1021
    %% need to be passed through `migrate_site_id/1`, otherwise expectations of the
1022
    %% existing code of those IDs to be "printable" will be broken.
1023
    %% This should be no-op when running > 5.4.0 releases.
1024
    Migstamp = mk_migstamp(),
×
1025
    Records = mnesia:match_object(Tab, {Tab, '_', '_', '_'}, read),
×
1026
    {Migrate, Dups} = unique_node_recs([migrate_node_rec(R) || R <- Records]),
×
1027
    lists:foreach(
×
1028
        fun(R) -> mnesia:write(?NODE_TAB, attach_migstamp(Migstamp, R), write) end,
×
1029
        Migrate
1030
    ),
1031
    {migrated, Migrate, Dups}.
×
1032

1033
migrate_node_rec({?NODE_TAB_LEGACY, Site, Node, Misc}) ->
1034
    #?NODE_TAB{site = migrate_site_id(Site), node = Node, misc = Misc}.
×
1035

1036
unique_node_recs(Records) ->
1037
    %% NOTE
1038
    %% Unlikely but possible that a 5.4.0 node could have assigned more than 1 Site ID
1039
    %% to itself, because of occasional inability to read back Site ID with
1040
    %% `file:consult/1. In this case it's impossible to tell in 100% of cases which one
1041
    %% was the most recent, so let's just drop all of such node's records. It will
1042
    %% insert the correct record by itself anyway, once upgraded to the recent release
1043
    %% and restarted.
1044
    Dups = Records -- lists:ukeysort(#?NODE_TAB.node, Records),
×
1045
    DupNodes = [Node || #?NODE_TAB{node = Node} <- Dups],
×
1046
    lists:partition(fun(R) -> not lists:member(R#?NODE_TAB.node, DupNodes) end, Records).
×
1047

1048
migrate_shard_table() ->
1049
    Tab = ?SHARD_TAB_LEGACY,
855✔
1050
    migrate_shard_table(Tab, table_info_safe(Tab)).
855✔
1051

1052
migrate_shard_table(Tab, #{attributes := [_Shard, _ReplicaSet, _TargetSet, _Misc]}) ->
1053
    %% Table is present and looks migratable.
1054
    ok = mria:wait_for_tables([Tab]),
×
1055
    case transaction(fun ?MODULE:migrate_shard_table_trans/1, [Tab]) of
×
1056
        {migrated, []} ->
1057
            ok;
×
1058
        {migrated, Migrated} ->
1059
            logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
×
1060
            {atomic, ok} = mria:clear_table(Tab);
×
1061
        {error, Reason} ->
1062
            logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
×
1063
    end;
1064
migrate_shard_table(Tab, #{attributes := _Incompatible}) ->
1065
    %% Table is present and is incompatible.
1066
    ok = mria:wait_for_tables([Tab]),
×
1067
    case mnesia:table_info(Tab, size) of
×
1068
        0 ->
1069
            ok;
×
1070
        Size ->
1071
            logger:warning("Table '~p' has ~p legacy entries to be abandoned", [Size, Tab]),
×
1072
            {atomic, ok} = mria:clear_table(Tab)
×
1073
    end;
1074
migrate_shard_table(_Tab, undefined) ->
1075
    %% No legacy table exists.
1076
    ok.
855✔
1077

1078
migrate_shard_table_trans(Tab) ->
1079
    %% NOTE
1080
    %% This table could have been instantiated with a different schema when running
1081
    %% 5.4.0 release but most likely never populated, so it should be fine to abandon it.
1082
    %% This table could also have been instantiated and populated when running 5.7.0
1083
    %% release with the same schema, so we just have to migrate all the recoards verbatim.
1084
    Migstamp = mk_migstamp(),
×
1085
    Records = mnesia:match_object(Tab, {Tab, '_', '_', '_', '_'}, read),
×
1086
    Migrate = [migrate_shard_rec(R) || R <- Records],
×
1087
    lists:foreach(
×
1088
        fun(R) -> mnesia:write(?SHARD_TAB, attach_migstamp(Migstamp, R), write) end,
×
1089
        Migrate
1090
    ),
1091
    {migrated, Migrate}.
×
1092

1093
migrate_shard_rec({?SHARD_TAB_LEGACY, Shard, ReplicaSet, TargetSet, Misc}) ->
1094
    #?SHARD_TAB{shard = Shard, replica_set = ReplicaSet, target_set = TargetSet, misc = Misc}.
×
1095

1096
mk_migstamp() ->
1097
    %% NOTE: Piece of information describing when and how records were migrated.
1098
    #{
×
1099
        at => erlang:system_time(millisecond),
1100
        on => emqx_release:version()
1101
    }.
1102

1103
attach_migstamp(Migstamp, Node = #?NODE_TAB{misc = Misc}) ->
1104
    Node#?NODE_TAB{misc = Misc#{migrated => Migstamp}};
×
1105
attach_migstamp(Migstamp, Shard = #?SHARD_TAB{misc = Misc}) ->
1106
    Shard#?SHARD_TAB{misc = Misc#{migrated => Migstamp}}.
×
1107

1108
table_info_safe(Tab) ->
1109
    try mnesia:table_info(Tab, all) of
1,710✔
1110
        Props ->
1111
            maps:from_list(Props)
×
1112
    catch
1113
        exit:{aborted, {no_exists, Tab, _}} ->
1114
            undefined
1,710✔
1115
    end.
1116

1117
%%====================================================================
1118

1119
%% @doc Intersperse elements of two lists.
1120
%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
1121
-spec intersperse([X], [Y]) -> [X | Y].
1122
intersperse(L1, []) ->
1123
    L1;
2,812✔
1124
intersperse([], L2) ->
1125
    L2;
1,825✔
1126
intersperse([H1 | T1], L2) ->
1127
    [H1 | intersperse(L2, T1)].
1,551✔
1128

1129
%% @doc Map list into a list of sets and return union, as a sorted list.
1130
-spec flatmap_sorted_set(fun((X) -> [Y]), [X]) -> [Y].
1131
flatmap_sorted_set(Fun, L) ->
1132
    ordsets:to_list(
69✔
1133
        lists:foldl(
1134
            fun(X, Acc) -> ordsets:union(ordsets:from_list(Fun(X)), Acc) end,
968✔
1135
            ordsets:new(),
1136
            L
1137
        )
1138
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc