• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12235783303

09 Dec 2024 12:36PM UTC coverage: 82.037%. First build
12235783303

Pull #14362

github

web-flow
Merge 4819ded51 into 83154d24b
Pull Request #14362: refactor(resource): forbid changing resource state from `on_get_status` return

62 of 82 new or added lines in 27 files covered. (75.61%)

56457 of 68819 relevant lines covered (82.04%)

15149.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.95
/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_cluster_link_mqtt).
5

6
-include("emqx_cluster_link.hrl").
7

8
-include_lib("emqx/include/emqx.hrl").
9
-include_lib("emqx/include/emqx_mqtt.hrl").
10
-include_lib("emqx/include/logger.hrl").
11
-include_lib("snabbkaffe/include/trace.hrl").
12
-include_lib("emqx_resource/include/emqx_resource.hrl").
13

14
-behaviour(emqx_resource).
15
-behaviour(ecpool_worker).
16

17
%% ecpool
18
-export([connect/1]).
19

20
%% callbacks of behaviour emqx_resource
21
-export([
22
    callback_mode/0,
23
    resource_type/0,
24
    on_start/2,
25
    on_stop/2,
26
    on_query/3,
27
    on_query_async/4,
28
    on_get_status/2
29
]).
30

31
-export([
32
    resource_id/1,
33
    ensure_msg_fwd_resource/1,
34
    remove_msg_fwd_resource/1,
35
    decode_route_op/1,
36
    decode_forwarded_msg/1,
37
    decode_resp/1
38
]).
39

40
-export([
41
    publish_actor_init_sync/6,
42
    actor_init_ack_resp_msg/3,
43
    publish_route_sync/4,
44
    publish_heartbeat/3,
45
    encode_field/2
46
]).
47

48
-export([
49
    forward/2
50
]).
51

52
-export([
53
    get_all_resources_cluster/0,
54
    get_resource_cluster/1
55
]).
56
%% BpAPI / RPC Targets
57
-export([
58
    get_resource_local_v1/1,
59
    get_all_resources_local_v1/0
60
]).
61

62
-define(MSG_CLIENTID_SUFFIX, ":msg:").
63

64
-define(MQTT_HOST_OPTS, #{default_port => 1883}).
65

66
-define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:").
67
-define(RES_NAME(Prefix, ClusterName), <<Prefix, ClusterName/binary>>).
68
-define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)).
69
-define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)).
70
-define(HEALTH_CHECK_TIMEOUT, 1000).
71
-define(RES_GROUP, <<"emqx_cluster_link">>).
72

73
-define(PROTO_VER, 1).
74

75
-define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])).
76
-define(ENCODE(Payload), erlang:term_to_binary(Payload)).
77

78
-define(F_OPERATION, '$op').
79
-define(OP_ROUTE, <<"route">>).
80
-define(OP_HEARTBEAT, <<"heartbeat">>).
81
-define(OP_ACTOR_INIT, <<"actor_init">>).
82
-define(OP_ACTOR_INIT_ACK, <<"actor_init_ack">>).
83

84
-define(F_ACTOR, 10).
85
-define(F_INCARNATION, 11).
86
-define(F_ROUTES, 12).
87
-define(F_TARGET_CLUSTER, 13).
88
-define(F_PROTO_VER, 14).
89
-define(F_RESULT, 15).
90
-define(F_NEED_BOOTSTRAP, 16).
91

92
-define(ROUTE_DELETE, 100).
93

94
-define(PUB_TIMEOUT, 10_000).
95

96
-define(AUTO_RECONNECT_INTERVAL_S, 2).
97

98
-type cluster_name() :: binary().
99

100
-spec resource_id(cluster_name()) -> resource_id().
101
resource_id(ClusterName) ->
102
    ?MSG_RES_ID(ClusterName).
15✔
103

104
-spec ensure_msg_fwd_resource(emqx_cluster_link_schema:link()) ->
105
    {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
106
ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
107
    ResOpts1 = ResOpts#{
85✔
108
        query_mode => async,
109
        start_after_created => true
110
    },
111
    emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1).
85✔
112

113
-spec remove_msg_fwd_resource(cluster_name()) -> ok | {error, Reason :: term()}.
114
remove_msg_fwd_resource(ClusterName) ->
115
    emqx_resource:remove_local(?MSG_RES_ID(ClusterName)).
53✔
116

117
-spec get_all_resources_cluster() ->
118
    [{node(), emqx_rpc:erpc(#{cluster_name() => emqx_resource:resource_data()})}].
119
get_all_resources_cluster() ->
120
    Nodes = emqx:running_nodes(),
8✔
121
    Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes),
8✔
122
    lists:zip(Nodes, Results).
8✔
123

124
-spec get_resource_cluster(cluster_name()) ->
125
    [{node(), {ok, {ok, emqx_resource:resource_data()} | {error, not_found}} | _Error}].
126
get_resource_cluster(ClusterName) ->
127
    Nodes = emqx:running_nodes(),
26✔
128
    Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName),
26✔
129
    lists:zip(Nodes, Results).
26✔
130

131
%% RPC Target in `emqx_cluster_link_proto_v1'.
132
-spec get_resource_local_v1(cluster_name()) ->
133
    {ok, emqx_resource:resource_data()} | {error, not_found}.
134
get_resource_local_v1(ClusterName) ->
135
    case emqx_resource:get_instance(?MSG_RES_ID(ClusterName)) of
36✔
136
        {ok, _ResourceGroup, ResourceData} ->
137
            {ok, ResourceData};
30✔
138
        {error, not_found} ->
139
            {error, not_found}
6✔
140
    end.
141

142
%% RPC Target in `emqx_cluster_link_proto_v1'.
143
-spec get_all_resources_local_v1() -> #{cluster_name() => emqx_resource:resource_data()}.
144
get_all_resources_local_v1() ->
145
    lists:foldl(
11✔
146
        fun
147
            (?MSG_RES_ID(Name) = Id, Acc) ->
148
                case emqx_resource:get_instance(Id) of
9✔
149
                    {ok, ?RES_GROUP, ResourceData} ->
150
                        Acc#{Name => ResourceData};
9✔
151
                    _ ->
152
                        Acc
×
153
                end;
154
            (_Id, Acc) ->
155
                %% Doesn't follow the naming pattern; manually crafted?
156
                Acc
×
157
        end,
158
        #{},
159
        emqx_resource:list_group_instances(?RES_GROUP)
160
    ).
161

162
%%--------------------------------------------------------------------
163
%% emqx_resource callbacks (message forwarding)
164
%%--------------------------------------------------------------------
165

166
callback_mode() -> async_if_possible.
85✔
167

168
-spec resource_type() -> atom().
169
resource_type() ->
170
    cluster_link_mqtt.
85✔
171

172
on_start(ResourceId, #{pool_size := PoolSize} = ClusterConf) ->
173
    PoolName = ResourceId,
87✔
174
    Options = [
87✔
175
        {name, PoolName},
176
        {pool_size, PoolSize},
177
        {pool_type, hash},
178
        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL_S},
179
        {client_opts, emqtt_client_opts(?MSG_CLIENTID_SUFFIX, ClusterConf)}
180
    ],
181
    ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
87✔
182
    case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
87✔
183
        ok ->
184
            {ok, #{pool_name => PoolName, topic => ?MSG_FWD_TOPIC}};
70✔
185
        {error, {start_pool_failed, _, Reason}} ->
186
            {error, Reason}
17✔
187
    end.
188

189
on_stop(ResourceId, _State) ->
190
    #{pool_name := PoolName} = emqx_resource:get_allocated_resources(ResourceId),
44✔
191
    emqx_resource_pool:stop(PoolName).
44✔
192

193
on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _State) when
194
    is_record(FwdMsg, message)
195
->
196
    #message{topic = Topic, qos = QoS} = FwdMsg,
×
197
    PubResult = ecpool:pick_and_do(
×
198
        {PoolName, Topic},
199
        fun(ConnPid) ->
200
            emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS)
×
201
        end,
202
        no_handover
203
    ),
204
    ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{
×
205
        pool => PoolName,
206
        message => FwdMsg,
207
        pub_result => PubResult
208
    }),
209
    handle_send_result(PubResult).
×
210

211
on_query_async(
212
    _ResourceId, FwdMsg, CallbackIn, #{pool_name := PoolName, topic := LinkTopic} = _State
213
) ->
214
    Callback = {fun on_async_result/2, [CallbackIn]},
27✔
215
    #message{topic = Topic, qos = QoS} = FwdMsg,
27✔
216
    %% TODO check message ordering, pick by topic,client pair?
217
    Result = ecpool:pick_and_do(
27✔
218
        {PoolName, Topic},
219
        fun(ConnPid) ->
220
            %% #delivery{} record has no valuable data for a remote link...
221
            Payload = ?ENCODE(FwdMsg),
27✔
222
            %% TODO: check override QOS requirements (if any)
223
            PubResult = emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback),
27✔
224
            ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{
27✔
225
                pool => PoolName,
226
                message => FwdMsg,
227
                pub_result => PubResult
228
            }),
229
            PubResult
27✔
230
        end,
231
        no_handover
232
    ),
233
    %% This result could be `{error, ecpool_empty}', for example, which should be
234
    %% recoverable.  If we didn't handle it here, it would be considered unrecoverable.
235
    handle_send_result(Result).
27✔
236

237
%% copied from emqx_bridge_mqtt_connector
238

239
on_async_result(Callback, Result) ->
240
    apply_callback_function(Callback, handle_send_result(Result)).
27✔
241

242
apply_callback_function(F, Result) when is_function(F) ->
243
    erlang:apply(F, [Result]);
×
244
apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
245
    erlang:apply(F, A ++ [Result]);
27✔
246
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
247
    erlang:apply(M, F, A ++ [Result]).
×
248

249
handle_send_result(ok) ->
250
    ok;
27✔
251
handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
252
    ok;
25✔
253
handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
254
    ok;
2✔
255
handle_send_result({ok, Reply}) ->
256
    {error, classify_reply(Reply)};
×
257
handle_send_result({error, Reason}) ->
258
    {error, classify_error(Reason)}.
×
259

260
classify_reply(Reply = #{reason_code := _}) ->
261
    {unrecoverable_error, Reply}.
×
262

263
classify_error(disconnected = Reason) ->
264
    {recoverable_error, Reason};
×
265
classify_error(ecpool_empty) ->
266
    {recoverable_error, disconnected};
×
267
classify_error({disconnected, _RC, _} = Reason) ->
268
    {recoverable_error, Reason};
×
269
classify_error({shutdown, _} = Reason) ->
270
    {recoverable_error, Reason};
×
271
classify_error(shutdown = Reason) ->
272
    {recoverable_error, Reason};
×
273
classify_error(Reason) ->
274
    {unrecoverable_error, Reason}.
×
275

276
%% copied from emqx_bridge_mqtt_connector
277
on_get_status(_ResourceId, #{pool_name := PoolName} = _State) ->
278
    Workers = [Worker || {_Name, Worker} <- ecpool:workers(PoolName)],
70✔
279
    try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
70✔
280
        Statuses ->
281
            combine_status(Statuses)
70✔
282
    catch
283
        exit:timeout ->
NEW
284
            ?status_connecting
×
285
    end.
286

287
get_status(Worker) ->
288
    case ecpool_worker:client(Worker) of
346✔
289
        {ok, Client} -> status(Client);
346✔
290
        {error, _} -> disconnected
×
291
    end.
292

293
status(Pid) ->
294
    try
346✔
295
        case proplists:get_value(socket, emqtt:info(Pid)) of
346✔
296
            Socket when Socket /= undefined ->
297
                connected;
346✔
298
            undefined ->
299
                connecting
×
300
        end
301
    catch
302
        exit:{noproc, _} ->
303
            disconnected
×
304
    end.
305

306
combine_status(Statuses) ->
307
    %% NOTE
308
    %% Natural order of statuses: [connected, connecting, disconnected]
309
    %% * `disconnected` wins over any other status
310
    %% * `connecting` wins over `connected`
311
    case lists:reverse(lists:usort(Statuses)) of
70✔
312
        [Status | _] ->
313
            Status;
70✔
314
        [] ->
NEW
315
            ?status_disconnected
×
316
    end.
317

318
%%--------------------------------------------------------------------
319
%% ecpool
320
%%--------------------------------------------------------------------
321

322
connect(Options) ->
323
    WorkerIdBin = integer_to_binary(proplists:get_value(ecpool_worker_id, Options)),
435✔
324
    #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options),
435✔
325
    ClientId1 = <<ClientId/binary, ":", WorkerIdBin/binary>>,
435✔
326
    ClientOpts1 = ClientOpts#{clientid => ClientId1},
435✔
327
    case emqtt:start_link(ClientOpts1) of
435✔
328
        {ok, Pid} ->
329
            case emqtt:connect(Pid) of
435✔
330
                {ok, _Props} ->
331
                    {ok, Pid};
346✔
332
                Error ->
333
                    Error
89✔
334
            end;
335
        {error, Reason} = Error ->
336
            ?SLOG(error, #{
×
337
                msg => "client_start_failed",
338
                config => emqx_utils:redact(ClientOpts),
339
                reason => Reason
340
            }),
×
341
            Error
×
342
    end.
343

344
%%--------------------------------------------------------------------
345
%% Protocol
346
%%--------------------------------------------------------------------
347

348
%%% New leader-less Syncer/Actor implementation
349

350
publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incarnation) ->
351
    Payload = #{
87✔
352
        ?F_OPERATION => ?OP_ACTOR_INIT,
353
        ?F_PROTO_VER => ?PROTO_VER,
354
        ?F_TARGET_CLUSTER => TargetCluster,
355
        ?F_ACTOR => Actor,
356
        ?F_INCARNATION => Incarnation
357
    },
358
    Properties = #{
87✔
359
        'Response-Topic' => RespTopic,
360
        'Correlation-Data' => ReqId
361
    },
362
    emqtt:publish(ClientPid, ?ROUTE_TOPIC, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]).
87✔
363

364
actor_init_ack_resp_msg(Actor, InitRes, MsgIn) ->
365
    Payload = #{
87✔
366
        ?F_OPERATION => ?OP_ACTOR_INIT_ACK,
367
        ?F_PROTO_VER => ?PROTO_VER,
368
        ?F_ACTOR => Actor
369
    },
370
    Payload1 = with_res_and_bootstrap(Payload, InitRes),
87✔
371
    #{
87✔
372
        'Response-Topic' := RespTopic,
373
        'Correlation-Data' := ReqId
374
    } = emqx_message:get_header(properties, MsgIn),
375
    emqx_message:make(
87✔
376
        undefined,
377
        ?QOS_1,
378
        RespTopic,
379
        ?ENCODE(Payload1),
380
        #{},
381
        #{properties => #{'Correlation-Data' => ReqId}}
382
    ).
383

384
with_res_and_bootstrap(Payload, {ok, ActorState}) ->
385
    Payload#{
×
386
        ?F_RESULT => ok,
387
        ?F_NEED_BOOTSTRAP => not emqx_cluster_link_extrouter:is_present_incarnation(ActorState)
388
    };
389
with_res_and_bootstrap(Payload, Error) ->
390
    Payload#{
87✔
391
        ?F_RESULT => Error,
392
        ?F_NEED_BOOTSTRAP => false
393
    }.
394

395
publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
396
    Payload = #{
34✔
397
        ?F_OPERATION => ?OP_ROUTE,
398
        ?F_ACTOR => Actor,
399
        ?F_INCARNATION => Incarnation,
400
        ?F_ROUTES => Updates
401
    },
402
    emqtt:publish(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_1).
34✔
403

404
publish_heartbeat(ClientPid, Actor, Incarnation) ->
405
    Payload = #{
170✔
406
        ?F_OPERATION => ?OP_HEARTBEAT,
407
        ?F_ACTOR => Actor,
408
        ?F_INCARNATION => Incarnation
409
    },
410
    emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, {fun(_) -> ok end, []}).
170✔
411

412
decode_route_op(Payload) ->
413
    decode_route_op1(?DECODE(Payload)).
287✔
414

415
decode_resp(Payload) ->
416
    decode_resp1(?DECODE(Payload)).
82✔
417

418
decode_route_op1(#{
419
    ?F_OPERATION := ?OP_ACTOR_INIT,
420
    ?F_PROTO_VER := ProtoVer,
421
    ?F_TARGET_CLUSTER := TargetCluster,
422
    ?F_ACTOR := Actor,
423
    ?F_INCARNATION := Incr
424
}) ->
425
    Info = #{
87✔
426
        target_cluster => TargetCluster,
427
        proto_ver => ProtoVer
428
    },
429
    {actor_init, #{actor => Actor, incarnation => Incr}, Info};
87✔
430
decode_route_op1(#{
431
    ?F_OPERATION := ?OP_ROUTE,
432
    ?F_ACTOR := Actor,
433
    ?F_INCARNATION := Incr,
434
    ?F_ROUTES := RouteOps
435
}) ->
436
    RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps),
34✔
437
    {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1};
34✔
438
decode_route_op1(#{
439
    ?F_OPERATION := ?OP_HEARTBEAT,
440
    ?F_ACTOR := Actor,
441
    ?F_INCARNATION := Incr
442
}) ->
443
    {heartbeat, #{actor => Actor, incarnation => Incr}};
166✔
444
decode_route_op1(Payload) ->
445
    {error, {unknown_payload, Payload}}.
×
446

447
decode_resp1(#{
448
    ?F_OPERATION := ?OP_ACTOR_INIT_ACK,
449
    ?F_ACTOR := Actor,
450
    ?F_PROTO_VER := ProtoVer,
451
    ?F_RESULT := InitResult,
452
    ?F_NEED_BOOTSTRAP := NeedBootstrap
453
}) ->
454
    {actor_init_ack, #{
82✔
455
        actor => Actor, result => InitResult, proto_ver => ProtoVer, need_bootstrap => NeedBootstrap
456
    }}.
457

458
decode_forwarded_msg(Payload) ->
459
    case ?DECODE(Payload) of
27✔
460
        #message{} = Msg ->
461
            Msg;
27✔
462
        _ ->
463
            ?SLOG(warning, #{
×
464
                msg => "unexpected_cluster_link_forwarded_msg_payload",
465
                payload => Payload
466
            }),
×
467
            {error, Payload}
×
468
    end.
469

470
encode_field(route, {add, Route = {_Topic, _ID}}) ->
471
    Route;
25✔
472
encode_field(route, {delete, {Topic, ID}}) ->
473
    {?ROUTE_DELETE, Topic, ID}.
11✔
474

475
decode_field(route, {?ROUTE_DELETE, Topic, ID}) ->
476
    {delete, {Topic, ID}};
11✔
477
decode_field(route, Route = {_Topic, _ID}) ->
478
    {add, Route}.
25✔
479

480
%%--------------------------------------------------------------------
481
%% emqx_external_broker
482
%%--------------------------------------------------------------------
483

484
forward(ClusterName, #delivery{message = #message{topic = Topic} = Msg}) ->
485
    QueryOpts = #{pick_key => Topic},
29✔
486
    emqx_resource:query(?MSG_RES_ID(ClusterName), Msg, QueryOpts).
29✔
487

488
%%--------------------------------------------------------------------
489
%% Internal functions
490
%%--------------------------------------------------------------------
491

492
emqtt_client_opts(ClientIdSuffix, ClusterConf) ->
493
    #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf),
87✔
494
    ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]),
87✔
495
    Opts#{clientid => ClientId}.
87✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc